From 32c50466f337da29d487a0d3fc1c370c587a1740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Sat, 21 Feb 2026 22:41:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(binance=5Fclient,=20position=5Fmanager,=20?= =?UTF-8?q?user=5Fdata=5Fstream):=20=E5=A2=9E=E5=BC=BA=E7=AE=97=E6=B3=95?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=A4=84=E7=90=86=E4=B8=8E=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `binance_client.py` 中新增 `client_algo_id` 参数以支持 ALGO_UPDATE 的精确匹配。更新 `position_manager.py` 以生成并缓存 SL_/TP_ 的 `client_algo_id`,确保在止损和止盈时能够正确关联到相应的订单。`user_data_stream.py` 中优化了 ALGO_UPDATE 处理逻辑,优先使用 `clientAlgoId` 进行订单匹配,并在 Redis 中缓存相关信息。这些改进提升了系统在处理算法订单时的准确性与效率。 --- trading_system/binance_client.py | 4 ++ trading_system/binance_order_event_logger.py | 5 +- trading_system/position_manager.py | 42 +++++++++++++ trading_system/user_data_stream.py | 62 +++++++++++++++++--- 4 files changed, 103 insertions(+), 10 deletions(-) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 6c3f837..71de13b 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -2420,6 +2420,7 @@ class BinanceClient: stop_price: float, current_price: Optional[float] = None, working_type: str = "MARK_PRICE", + client_algo_id: Optional[str] = None, ) -> Optional[Dict]: """ 在币安侧挂“保护单”,用于止损/止盈: @@ -2556,6 +2557,7 @@ class BinanceClient: # Algo 条件单接口使用 triggerPrice(不是 stopPrice) # 显式传 timeInForce=GTC,避免交易所对 closePosition 单默认用 GTE 导致 "GTE can only be used with open positions" + # clientAlgoId:用于 ALGO_UPDATE 时按 entry_order_id 精确匹配 DB 平仓记录 params: Dict[str, Any] = { "algoType": "CONDITIONAL", "symbol": symbol, @@ -2566,6 +2568,8 @@ class BinanceClient: "closePosition": True, "timeInForce": "GTC", } + if client_algo_id and len(str(client_algo_id)) <= 36: + params["clientAlgoId"] = str(client_algo_id) # 单向持仓模式(ONE_WAY_POSITION_ONLY):不传 positionSide;否则按检测结果处理 if not one_way_only and dual is True: diff --git a/trading_system/binance_order_event_logger.py b/trading_system/binance_order_event_logger.py index 56bef3c..3193a35 100644 --- a/trading_system/binance_order_event_logger.py +++ b/trading_system/binance_order_event_logger.py @@ -72,9 +72,10 @@ def log_order_event(account_id: int, event_type: str, event_time_ms: Optional[in elif event_type == "ALGO_UPDATE" and data: o = data row["symbol"] = (o.get("s") or "").strip() - row["algoId"] = o.get("i") + row["algoId"] = o.get("aid") # 币安 ALGO_UPDATE 用 aid + row["clientAlgoId"] = (o.get("caid") or "").strip() or None # 币安用 caid,SL_/TP_+entry_order_id row["algoStatus"] = (o.get("X") or "").strip() # TRIGGERED/FINISHED - row["triggeredOrderId"] = o.get("ai") # 触发后的普通订单 id + row["triggeredOrderId"] = o.get("ai") or None # 触发后的普通订单 id else: row["raw"] = data line = json.dumps(row, ensure_ascii=False, default=_default_serializer) + "\n" diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 378caf4..225916c 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -1384,6 +1384,16 @@ class PositionManager: if not position_info or not isinstance(position_info, dict): return + # 用于 ALGO_UPDATE 时按 entry_order_id 精确匹配 DB 平仓记录 + entry_order_id = position_info.get("orderId") or position_info.get("entry_order_id") + client_algo_id_sl = None + client_algo_id_tp = None + if entry_order_id: + eid = str(entry_order_id).strip() + if eid and len(eid) <= 30: # 留足 SL_/TP_ 前缀 + client_algo_id_sl = f"SL_{eid}"[:36] + client_algo_id_tp = f"TP_{eid}"[:36] + side = (position_info.get("side") or "").upper() if side not in {"BUY", "SELL"}: return @@ -1565,6 +1575,7 @@ class PositionManager: stop_price=stop_loss, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_sl, ) elif side == "SELL": # 做空:当前价 >= 止损价,说明已触发止损 @@ -1585,6 +1596,7 @@ class PositionManager: stop_price=stop_loss, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_sl, ) else: sl_order = await self.client.place_trigger_close_position_order( @@ -1594,6 +1606,7 @@ class PositionManager: stop_price=stop_loss, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_sl, ) except AlgoOrderPositionUnavailableError: sl_failed_due_to_gte = True @@ -1615,6 +1628,7 @@ class PositionManager: stop_price=stop_loss, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_sl, ) except AlgoOrderPositionUnavailableError: sl_failed_due_to_gte = True @@ -1636,6 +1650,7 @@ class PositionManager: stop_price=stop_loss, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_sl, ) except AlgoOrderPositionUnavailableError: sl_failed_due_to_gte = True @@ -1649,6 +1664,19 @@ class PositionManager: logger.error(f"{symbol} 挂止损单失败: {e}") sl_order = None + if sl_order and entry_order_id: + algo_id = sl_order.get("algoId") + if algo_id is not None: + try: + rc = getattr(self.client, "redis_cache", None) + if rc: + await rc.set( + f"ats:algo2entry:{self.account_id}:{algo_id}", + str(entry_order_id), + ttl=7 * 86400, + ) + except Exception: + pass if sl_order: logger.info(f"[账号{self.account_id}] {symbol} ✓ 止损单已成功挂到交易所: {sl_order.get('algoId', 'N/A')}") else: @@ -1739,6 +1767,7 @@ class PositionManager: stop_price=take_profit, current_price=current_price, working_type="MARK_PRICE", + client_algo_id=client_algo_id_tp, ) except Exception as e: # 处理 -2021: Order would immediately trigger @@ -1752,6 +1781,19 @@ class PositionManager: else: logger.warning(f"{symbol} 挂止盈单失败: {e}") tp_order = None + if tp_order and entry_order_id: + algo_id = tp_order.get("algoId") + if algo_id is not None: + try: + rc = getattr(self.client, "redis_cache", None) + if rc: + await rc.set( + f"ats:algo2entry:{self.account_id}:{algo_id}", + str(entry_order_id), + ttl=7 * 86400, + ) + except Exception: + pass if tp_order: logger.info(f"[账号{self.account_id}] {symbol} ✓ 止盈单已成功挂到交易所: {tp_order.get('algoId', 'N/A')}") else: diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index a5fd4c3..e53d8e3 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -468,7 +468,7 @@ class UserDataStream: log_order_event(self.account_id, "ORDER_TRADE_UPDATE", data.get("E"), data.get("o") or {}) except Exception: pass - self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E")) + await self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E")) elif e == "ACCOUNT_UPDATE": logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送") self._on_account_update(data.get("a") or {}) @@ -479,12 +479,12 @@ class UserDataStream: log_order_event(self.account_id, "ALGO_UPDATE", data.get("E"), data.get("o") or {}) except Exception: pass - self._on_algo_update(data.get("o") or {}) + await self._on_algo_update(data.get("o") or {}) else: logger.debug(f"UserDataStream: 收到未知事件类型: {e}") return False - def _on_order_trade_update(self, o: Dict, event_time_ms=None): + async def _on_order_trade_update(self, o: Dict, event_time_ms=None): # 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId # ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对, n=手续费数量, N=手续费资产 event_type = (o.get("x") or "").strip().upper() @@ -545,7 +545,16 @@ class UserDataStream: except Exception as ex: logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}") else: - # 平仓成交(支付式闭环):回写 exit_order_id 并用推送数据更新 exit_price/pnl/commission,仅 WS 驱动 DB + # 平仓成交(reduceOnly):优先从 Redis 取 entry_order_id(ALGO_UPDATE 可能先到并已缓存) + exit_entry_order_id = None + try: + rc = getattr(self.client, "redis_cache", None) + if rc: + raw = await rc.get(f"ats:exit_order2entry:{self.account_id}:{order_id}") + if raw: + exit_entry_order_id = int(raw) if str(raw).strip().isdigit() else None + except Exception: + pass if symbol: try: import sys @@ -556,7 +565,8 @@ class UserDataStream: sys.path.insert(0, str(backend_path)) from database.models import Trade ok, trade_id = Trade.set_exit_order_id_for_open_trade( - symbol, self.account_id, order_id, entry_order_id=None + symbol, self.account_id, order_id, + entry_order_id=exit_entry_order_id ) if ok: logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}") @@ -674,12 +684,45 @@ class UserDataStream: except Exception as e: logger.debug(f"写入持仓缓存到 Redis 失败: {e}") - def _on_algo_update(self, o: Dict): + async def _on_algo_update(self, o: Dict): # 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id + # 优先按 clientAlgoId(SL_/TP_+entry_order_id) 或 Redis 映射精确匹配,避免同 symbol 多笔持仓时匹配错误 x = (o.get("X") or "").strip().upper() ai = o.get("ai") symbol = (o.get("s") or "").strip() + algo_id = o.get("aid") # 币安 ALGO_UPDATE 用 aid(Algo Id) + client_algo_id = (o.get("caid") or "").strip() or None # 币安用 caid(Client Algo Id) + entry_order_id = None + if client_algo_id and (client_algo_id.startswith("SL_") or client_algo_id.startswith("TP_")): + try: + entry_order_id = client_algo_id.split("_", 1)[1].strip() + if not entry_order_id: + entry_order_id = None + except Exception: + pass + if entry_order_id is None and algo_id is not None: + try: + redis_cache = getattr(self.client, "redis_cache", None) + if redis_cache: + raw = await redis_cache.get(f"ats:algo2entry:{self.account_id}:{algo_id}") + if raw: + entry_order_id = str(raw).strip() if raw else None + except Exception: + pass if x in ("TRIGGERED", "FINISHED") and ai and symbol: + entry_for_db = int(entry_order_id) if entry_order_id and str(entry_order_id).isdigit() else None + # 缓存 orderId -> entry_order_id,供可能先到的 ORDER_TRADE_UPDATE 使用 + if entry_for_db is not None: + try: + rc = getattr(self.client, "redis_cache", None) + if rc: + await rc.set( + f"ats:exit_order2entry:{self.account_id}:{ai}", + str(entry_for_db), + ttl=120, + ) + except Exception: + pass try: import sys from pathlib import Path @@ -688,9 +731,12 @@ class UserDataStream: if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade - ok, _ = Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai) + ok, _ = Trade.set_exit_order_id_for_open_trade( + symbol, self.account_id, ai, + entry_order_id=entry_for_db + ) if ok: - logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}") + logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}" + (f" entry_order_id={entry_order_id}" if entry_order_id else "")) except Exception as ex: logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}") logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")