feat(binance_client, position_manager, user_data_stream): 增强算法更新处理与日志记录

在 `binance_client.py` 中新增 `client_algo_id` 参数以支持 ALGO_UPDATE 的精确匹配。更新 `position_manager.py` 以生成并缓存 SL_/TP_ 的 `client_algo_id`,确保在止损和止盈时能够正确关联到相应的订单。`user_data_stream.py` 中优化了 ALGO_UPDATE 处理逻辑,优先使用 `clientAlgoId` 进行订单匹配,并在 Redis 中缓存相关信息。这些改进提升了系统在处理算法订单时的准确性与效率。
This commit is contained in:
薇薇安 2026-02-21 22:41:15 +08:00
parent e40f5c797f
commit 32c50466f3
4 changed files with 103 additions and 10 deletions

View File

@ -2420,6 +2420,7 @@ class BinanceClient:
stop_price: float,
current_price: Optional[float] = None,
working_type: str = "MARK_PRICE",
client_algo_id: Optional[str] = None,
) -> Optional[Dict]:
"""
在币安侧挂保护单用于止损/止盈
@ -2556,6 +2557,7 @@ class BinanceClient:
# Algo 条件单接口使用 triggerPrice不是 stopPrice
# 显式传 timeInForce=GTC避免交易所对 closePosition 单默认用 GTE 导致 "GTE can only be used with open positions"
# clientAlgoId用于 ALGO_UPDATE 时按 entry_order_id 精确匹配 DB 平仓记录
params: Dict[str, Any] = {
"algoType": "CONDITIONAL",
"symbol": symbol,
@ -2566,6 +2568,8 @@ class BinanceClient:
"closePosition": True,
"timeInForce": "GTC",
}
if client_algo_id and len(str(client_algo_id)) <= 36:
params["clientAlgoId"] = str(client_algo_id)
# 单向持仓模式ONE_WAY_POSITION_ONLY不传 positionSide否则按检测结果处理
if not one_way_only and dual is True:

View File

@ -72,9 +72,10 @@ def log_order_event(account_id: int, event_type: str, event_time_ms: Optional[in
elif event_type == "ALGO_UPDATE" and data:
o = data
row["symbol"] = (o.get("s") or "").strip()
row["algoId"] = o.get("i")
row["algoId"] = o.get("aid") # 币安 ALGO_UPDATE 用 aid
row["clientAlgoId"] = (o.get("caid") or "").strip() or None # 币安用 caidSL_/TP_+entry_order_id
row["algoStatus"] = (o.get("X") or "").strip() # TRIGGERED/FINISHED
row["triggeredOrderId"] = o.get("ai") # 触发后的普通订单 id
row["triggeredOrderId"] = o.get("ai") or None # 触发后的普通订单 id
else:
row["raw"] = data
line = json.dumps(row, ensure_ascii=False, default=_default_serializer) + "\n"

View File

@ -1384,6 +1384,16 @@ class PositionManager:
if not position_info or not isinstance(position_info, dict):
return
# 用于 ALGO_UPDATE 时按 entry_order_id 精确匹配 DB 平仓记录
entry_order_id = position_info.get("orderId") or position_info.get("entry_order_id")
client_algo_id_sl = None
client_algo_id_tp = None
if entry_order_id:
eid = str(entry_order_id).strip()
if eid and len(eid) <= 30: # 留足 SL_/TP_ 前缀
client_algo_id_sl = f"SL_{eid}"[:36]
client_algo_id_tp = f"TP_{eid}"[:36]
side = (position_info.get("side") or "").upper()
if side not in {"BUY", "SELL"}:
return
@ -1565,6 +1575,7 @@ class PositionManager:
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_sl,
)
elif side == "SELL":
# 做空:当前价 >= 止损价,说明已触发止损
@ -1585,6 +1596,7 @@ class PositionManager:
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_sl,
)
else:
sl_order = await self.client.place_trigger_close_position_order(
@ -1594,6 +1606,7 @@ class PositionManager:
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_sl,
)
except AlgoOrderPositionUnavailableError:
sl_failed_due_to_gte = True
@ -1615,6 +1628,7 @@ class PositionManager:
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_sl,
)
except AlgoOrderPositionUnavailableError:
sl_failed_due_to_gte = True
@ -1636,6 +1650,7 @@ class PositionManager:
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_sl,
)
except AlgoOrderPositionUnavailableError:
sl_failed_due_to_gte = True
@ -1649,6 +1664,19 @@ class PositionManager:
logger.error(f"{symbol} 挂止损单失败: {e}")
sl_order = None
if sl_order and entry_order_id:
algo_id = sl_order.get("algoId")
if algo_id is not None:
try:
rc = getattr(self.client, "redis_cache", None)
if rc:
await rc.set(
f"ats:algo2entry:{self.account_id}:{algo_id}",
str(entry_order_id),
ttl=7 * 86400,
)
except Exception:
pass
if sl_order:
logger.info(f"[账号{self.account_id}] {symbol} ✓ 止损单已成功挂到交易所: {sl_order.get('algoId', 'N/A')}")
else:
@ -1739,6 +1767,7 @@ class PositionManager:
stop_price=take_profit,
current_price=current_price,
working_type="MARK_PRICE",
client_algo_id=client_algo_id_tp,
)
except Exception as e:
# 处理 -2021: Order would immediately trigger
@ -1752,6 +1781,19 @@ class PositionManager:
else:
logger.warning(f"{symbol} 挂止盈单失败: {e}")
tp_order = None
if tp_order and entry_order_id:
algo_id = tp_order.get("algoId")
if algo_id is not None:
try:
rc = getattr(self.client, "redis_cache", None)
if rc:
await rc.set(
f"ats:algo2entry:{self.account_id}:{algo_id}",
str(entry_order_id),
ttl=7 * 86400,
)
except Exception:
pass
if tp_order:
logger.info(f"[账号{self.account_id}] {symbol} ✓ 止盈单已成功挂到交易所: {tp_order.get('algoId', 'N/A')}")
else:

View File

@ -468,7 +468,7 @@ class UserDataStream:
log_order_event(self.account_id, "ORDER_TRADE_UPDATE", data.get("E"), data.get("o") or {})
except Exception:
pass
self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E"))
await self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E"))
elif e == "ACCOUNT_UPDATE":
logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送")
self._on_account_update(data.get("a") or {})
@ -479,12 +479,12 @@ class UserDataStream:
log_order_event(self.account_id, "ALGO_UPDATE", data.get("E"), data.get("o") or {})
except Exception:
pass
self._on_algo_update(data.get("o") or {})
await self._on_algo_update(data.get("o") or {})
else:
logger.debug(f"UserDataStream: 收到未知事件类型: {e}")
return False
def _on_order_trade_update(self, o: Dict, event_time_ms=None):
async def _on_order_trade_update(self, o: Dict, event_time_ms=None):
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对, n=手续费数量, N=手续费资产
event_type = (o.get("x") or "").strip().upper()
@ -545,7 +545,16 @@ class UserDataStream:
except Exception as ex:
logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}")
else:
# 平仓成交(支付式闭环):回写 exit_order_id 并用推送数据更新 exit_price/pnl/commission仅 WS 驱动 DB
# 平仓成交reduceOnly优先从 Redis 取 entry_order_idALGO_UPDATE 可能先到并已缓存)
exit_entry_order_id = None
try:
rc = getattr(self.client, "redis_cache", None)
if rc:
raw = await rc.get(f"ats:exit_order2entry:{self.account_id}:{order_id}")
if raw:
exit_entry_order_id = int(raw) if str(raw).strip().isdigit() else None
except Exception:
pass
if symbol:
try:
import sys
@ -556,7 +565,8 @@ class UserDataStream:
sys.path.insert(0, str(backend_path))
from database.models import Trade
ok, trade_id = Trade.set_exit_order_id_for_open_trade(
symbol, self.account_id, order_id, entry_order_id=None
symbol, self.account_id, order_id,
entry_order_id=exit_entry_order_id
)
if ok:
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
@ -674,12 +684,45 @@ class UserDataStream:
except Exception as e:
logger.debug(f"写入持仓缓存到 Redis 失败: {e}")
def _on_algo_update(self, o: Dict):
async def _on_algo_update(self, o: Dict):
# 条件单交易更新推送X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
# 优先按 clientAlgoId(SL_/TP_+entry_order_id) 或 Redis 映射精确匹配,避免同 symbol 多笔持仓时匹配错误
x = (o.get("X") or "").strip().upper()
ai = o.get("ai")
symbol = (o.get("s") or "").strip()
algo_id = o.get("aid") # 币安 ALGO_UPDATE 用 aidAlgo Id
client_algo_id = (o.get("caid") or "").strip() or None # 币安用 caidClient Algo Id
entry_order_id = None
if client_algo_id and (client_algo_id.startswith("SL_") or client_algo_id.startswith("TP_")):
try:
entry_order_id = client_algo_id.split("_", 1)[1].strip()
if not entry_order_id:
entry_order_id = None
except Exception:
pass
if entry_order_id is None and algo_id is not None:
try:
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
raw = await redis_cache.get(f"ats:algo2entry:{self.account_id}:{algo_id}")
if raw:
entry_order_id = str(raw).strip() if raw else None
except Exception:
pass
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
entry_for_db = int(entry_order_id) if entry_order_id and str(entry_order_id).isdigit() else None
# 缓存 orderId -> entry_order_id供可能先到的 ORDER_TRADE_UPDATE 使用
if entry_for_db is not None:
try:
rc = getattr(self.client, "redis_cache", None)
if rc:
await rc.set(
f"ats:exit_order2entry:{self.account_id}:{ai}",
str(entry_for_db),
ttl=120,
)
except Exception:
pass
try:
import sys
from pathlib import Path
@ -688,9 +731,12 @@ class UserDataStream:
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
ok, _ = Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai)
ok, _ = Trade.set_exit_order_id_for_open_trade(
symbol, self.account_id, ai,
entry_order_id=entry_for_db
)
if ok:
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}" + (f" entry_order_id={entry_order_id}" if entry_order_id else ""))
except Exception as ex:
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")