""" 合约 User Data Stream:订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。 - 先 REST 下单并落库 pending,WS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。 - ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。 - listenKey 每 30 分钟 keepalive;收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。 - 单连接有效期不超过 24 小时,届时主动重连(文档建议)。 """ import asyncio import json import logging import time from typing import Dict, List, Optional, Any logger = logging.getLogger(__name__) # 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions _position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...) # 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 _balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc } # 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用) _stream_instance: Optional["UserDataStream"] = None # 是否已用 REST 结果播种过(未播种时业务应走 REST,不返回空缓存) _position_cache_seeded = False _balance_cache_seeded = False def get_position_updates_cache() -> Dict[str, List[Dict]]: """返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。""" return dict(_position_updates_cache) def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]: """返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。""" return dict(_balance_updates_cache) def get_stream_instance() -> Optional["UserDataStream"]: """返回当前运行的 UserDataStream 实例(未启动时为 None)。""" return _stream_instance try: from .redis_ttl import TTL_POSITIONS except ImportError: TTL_POSITIONS = 300 try: from .redis_ttl import TTL_BALANCE except ImportError: TTL_BALANCE = 300 def _cache_account_id() -> int: """当前进程的 account_id(多账号隔离时 Redis 缓存键必须按账号区分)""" import os try: return int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1) except Exception: return 1 async def seed_position_cache(positions: List[Dict], redis_cache: Any = None, account_id: int = None) -> None: """用 REST 全量持仓结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。""" global _position_updates_cache, _position_cache_seeded _position_cache_seeded = True positions_list = [] for pos in positions or []: symbol = (pos.get("symbol") or "").strip() if not symbol: continue amt = float(pos.get("positionAmt") or 0) if not redis_cache: if symbol not in _position_updates_cache: _position_updates_cache[symbol] = [] _position_updates_cache[symbol] = [{ "s": symbol, "pa": amt, "ep": str(pos.get("entryPrice") or "0"), "up": str(pos.get("unRealizedProfit") or "0"), "ps": pos.get("positionSide") or "BOTH", }] if amt != 0: positions_list.append({ "symbol": symbol, "positionAmt": amt, "entryPrice": float(pos.get("entryPrice") or 0), "markPrice": float(pos.get("markPrice") or 0), "unRealizedProfit": float(pos.get("unRealizedProfit") or 0), "leverage": int(pos.get("leverage") or 0), }) if redis_cache: _position_updates_cache.clear() if positions_list: try: aid = int(account_id) if account_id is not None else _cache_account_id() await redis_cache.set(f"ats:positions:cache:{aid}", positions_list, ttl=TTL_POSITIONS) except Exception as e: logger.debug(f"写入持仓缓存到 Redis 失败: {e}") logger.debug(f"UserDataStream: 已填充持仓缓存(Redis=%s)", bool(redis_cache)) async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None, account_id: int = None) -> None: """用 REST 余额结果填充缓存。有 Redis 时只写 Redis;无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。""" global _balance_updates_cache, _balance_cache_seeded _balance_cache_seeded = True if balance and isinstance(balance, dict): wb = balance.get("walletBalance") or balance.get("total") or 0 av = balance.get("availableBalance") or balance.get("available") or wb balance_data = {"wb": str(wb), "cw": str(av), "bc": "0"} if redis_cache: try: aid = int(account_id) if account_id is not None else _cache_account_id() await redis_cache.set(f"ats:balance:cache:USDT:{aid}", balance_data, ttl=TTL_BALANCE) except Exception as e: logger.debug(f"写入余额缓存到 Redis 失败: {e}") else: _balance_updates_cache["USDT"] = balance_data logger.debug("UserDataStream: 已填充余额缓存 (USDT, Redis=%s)", bool(redis_cache)) async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = None, account_id: int = None) -> Optional[List[Dict]]: """ 将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。 ⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。 """ aid = int(account_id) if account_id is not None else _cache_account_id() if redis_cache: try: redis_key = f"ats:positions:cache:{aid}" cached = await redis_cache.get(redis_key) if cached and isinstance(cached, list): # 过滤最小名义价值 filtered = [] for pos in cached: try: pa = float(pos.get("positionAmt") or 0) ep = float(pos.get("entryPrice") or 0) if pa == 0: continue if min_notional > 0 and abs(pa) * ep < min_notional: continue filtered.append(pos) except Exception: continue if filtered: return filtered except Exception as e: logger.debug(f"从 Redis 读取持仓缓存失败: {e}") # 降级到进程内存缓存 if not _position_cache_seeded: return None out = [] for symbol, plist in _position_updates_cache.items(): for p in plist if isinstance(plist, list) else []: try: pa = float(p.get("pa") or 0) except (TypeError, ValueError): pa = 0 if pa == 0: continue ep = float(p.get("ep") or 0) if min_notional > 0 and abs(pa) * ep < min_notional: continue out.append({ "symbol": symbol, "positionAmt": pa, "entryPrice": ep, "markPrice": float(p.get("markPrice") or 0), "unRealizedProfit": float(p.get("up") or 0), "leverage": int(p.get("leverage") or 0), }) return out async def get_balance_from_cache(redis_cache: Any = None, account_id: int = None) -> Optional[Dict[str, Any]]: """ 从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。 ⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。 """ aid = int(account_id) if account_id is not None else _cache_account_id() if redis_cache: try: redis_key = f"ats:balance:cache:USDT:{aid}" cached = await redis_cache.get(redis_key) if cached and isinstance(cached, dict): try: wb = float(cached.get("wb") or cached.get("total") or 0) cw = float(cached.get("cw") or cached.get("available") or wb) return {"ok": True, "available": cw, "total": wb, "margin": wb} except (TypeError, ValueError): pass except Exception as e: logger.debug(f"从 Redis 读取余额缓存失败: {e}") # 降级到进程内存缓存 if not _balance_cache_seeded: return None u = _balance_updates_cache.get("USDT") if not u: return None try: wb = float(u.get("wb") or 0) cw = float(u.get("cw") or wb) except (TypeError, ValueError): return None return {"ok": True, "available": cw, "total": wb, "margin": wb} class UserDataStream: """合约用户数据流:创建/保活 listenKey,连接 WS,处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。""" def __init__(self, client: Any, account_id: int): self.client = client self.account_id = int(account_id) self._listen_key: Optional[str] = None self._ws = None self._task: Optional[asyncio.Task] = None self._keepalive_task: Optional[asyncio.Task] = None self._running = False self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连) def _ws_base_url(self) -> str: if getattr(self.client, "testnet", False): return "wss://stream.binancefuture.com/ws" return "wss://fstream.binance.com/ws" async def start(self) -> bool: """ 创建 listenKey 并启动 WS 接收循环与 keepalive 任务。 ⚠️ 优化:优先从缓存获取 listenKey,避免重复创建。 """ global _stream_instance if self._running: return True # ⚠️ 优化:优先从缓存获取 listenKey(多进程/多实例共享) try: from .listen_key_cache import get_listen_key_cache cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) if cache: self._listen_key = await cache.get_listen_key(self.account_id, self.client) if self._listen_key: logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取 listenKey") except Exception as e: logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}") # 如果缓存未命中,直接创建(每个账号独立,使用对应 client 的 API Key) if not self._listen_key: self._listen_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=3) if not self._listen_key: logger.warning(f"UserDataStream(account_id={self.account_id}): 无法创建 listenKey,跳过启动") return False self._running = True _stream_instance = self self._task = asyncio.create_task(self._run_ws()) self._keepalive_task = asyncio.create_task(self._run_keepalive()) logger.info("UserDataStream: 已启动(ORDER_TRADE_UPDATE / ACCOUNT_UPDATE,含持仓与余额推送)") return True async def stop(self): """停止 WS 与 keepalive。""" global _stream_instance self._running = False _stream_instance = None if self._keepalive_task: self._keepalive_task.cancel() try: await self._keepalive_task except asyncio.CancelledError: pass self._keepalive_task = None if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None self._listen_key = None logger.info("UserDataStream: 已停止") async def _run_keepalive(self): """ 每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。 ⚠️ 优化: 1. 优先使用 WebSocket API keepalive,减少 REST 调用 2. 使用缓存管理器更新 listenKey,支持多进程共享 """ while self._running: await asyncio.sleep(30 * 60) if not self._running or not self._listen_key: break # ⚠️ 优化:使用缓存管理器更新 listenKey try: from .listen_key_cache import get_listen_key_cache cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) if cache: # 使用缓存管理器更新(会自动 keepalive 或创建新的) new_key = await cache.renew_listen_key(self.account_id, self.client, self._listen_key) if new_key: if new_key != self._listen_key: logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已更新(keepalive 失败,创建了新 key)") self._listen_key = new_key # 如果 key 变了,需要重新连接 if self._ws: try: await self._ws.close() except Exception: pass break continue except Exception as e: logger.debug(f"UserDataStream: 使用缓存管理器更新 listenKey 失败: {e}") # 回退到直接 keepalive(如果缓存管理器不可用) ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True) if not ok and code_1125 and self._ws: logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连") try: await self._ws.close() except Exception: pass break if not ok: logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey") async def _run_ws(self): """连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。""" import aiohttp _24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连 while self._running: # 正向流程加固:每次重连前 pending 对账(补齐断线期间漏掉的 ORDER_TRADE_UPDATE) if self._listen_key: try: from .pending_reconcile import reconcile_pending_with_binance n = await reconcile_pending_with_binance(self.client, self.account_id, limit=50, max_age_sec=86400) if n > 0: logger.info(f"UserDataStream(account_id={self.account_id}): 重连前 pending 对账完成,{n} 条已更新为 open") except Exception as e: logger.debug(f"UserDataStream(account_id={self.account_id}): pending 对账失败: {e}") url = f"{self._ws_base_url()}/{self._listen_key}" try: async with aiohttp.ClientSession() as session: async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws: self._ws = ws self._conn_start_time = time.monotonic() logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: should_break = await self._handle_message(msg.data) if should_break: break elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning("UserDataStream: WS 错误") break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break # 单连接 24h 主动重连(文档:每个链接有效期不超过24小时) if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec: logger.info("UserDataStream: 连接已近 24h,主动重连") break except asyncio.CancelledError: break except Exception as e: err_msg = getattr(e, "message", str(e)) or repr(e) err_type = type(e).__name__ logger.warning( "UserDataStream(account_id=%s): WS 异常 %s: %s,60s 后重连", self.account_id, err_type, err_msg, exc_info=logger.isEnabledFor(logging.DEBUG), ) await asyncio.sleep(60) self._ws = None self._conn_start_time = None if not self._running: break # ⚠️ 优化:优先从缓存获取 listenKey(多进程共享,避免重复创建) try: from .listen_key_cache import get_listen_key_cache cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) if cache: # 从缓存获取 listenKey(如果缓存中有有效的 key,会直接返回;否则会创建新的) cached_key = await cache.get_listen_key(self.account_id, self.client) if cached_key: if cached_key == self._listen_key: logger.debug(f"UserDataStream(account_id={self.account_id}): 从缓存获取到相同的 listenKey,复用") else: logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取到新的 listenKey(可能其他进程创建的)") self._listen_key = cached_key # 继续使用现有的或缓存中的 listenKey continue except Exception as e: logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}") # 如果缓存不可用,回退到原有逻辑 # ⚠️ 优化:重连前先尝试 keepalive 现有 listenKey,避免重复创建 need_new_key = True if self._listen_key: logger.debug(f"UserDataStream(account_id={self.account_id}): 重连前尝试 keepalive 现有 listenKey...") ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True) if ok: logger.info(f"UserDataStream(account_id={self.account_id}): 现有 listenKey keepalive 成功,复用现有 key") need_new_key = False elif code_1125: logger.debug(f"UserDataStream(account_id={self.account_id}): 现有 listenKey 已失效(-1125),需要创建新 key") else: logger.debug(f"UserDataStream(account_id={self.account_id}): keepalive 失败,尝试创建新 key") # 只有在需要新 key 时才创建(keepalive 失败或没有现有 key) if need_new_key: # ⚠️ 优化:增加重试机制,避免网络波动导致失败 listen_key_retries = 3 listen_key_created = False for retry in range(listen_key_retries): # 注意:根据币安文档,如果账户已有有效的 listenKey,创建接口会返回现有 key 并延长有效期 # 所以这里即使"创建"也可能返回现有的 key,这是正常的 new_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=1) if new_key: # 如果返回的 key 与现有 key 相同,说明是复用现有 key if new_key == self._listen_key: logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已复用(币安返回现有 key)") else: logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已创建(重试 {retry + 1}/{listen_key_retries})") self._listen_key = new_key listen_key_created = True break if retry < listen_key_retries - 1: wait_sec = (retry + 1) * 10 # 10秒、20秒、30秒 logger.debug(f"UserDataStream(account_id={self.account_id}): listenKey 创建失败,{wait_sec}秒后重试...") await asyncio.sleep(wait_sec) if not listen_key_created: logger.warning( "UserDataStream(account_id=%s): 重新创建 listenKey 失败(已重试 %d 次),60s 后重试(请检查该账号 API 权限/网络/IP 白名单)", self.account_id, listen_key_retries, ) await asyncio.sleep(60) continue async def _handle_message(self, raw: str) -> bool: """处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。""" try: data = json.loads(raw) except Exception: logger.debug(f"UserDataStream: 无法解析推送消息: {raw[:200]}") return False e = data.get("e") if e == "listenKeyExpired": logger.warning("UserDataStream: 收到 listenKeyExpired,将换新 key 重连") return True if e == "ORDER_TRADE_UPDATE": logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送") try: from .binance_order_event_logger import log_order_event log_order_event(self.account_id, "ORDER_TRADE_UPDATE", data.get("E"), data.get("o") or {}) except Exception: pass await self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E")) elif e == "ACCOUNT_UPDATE": logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送") self._on_account_update(data.get("a") or {}) elif e == "ALGO_UPDATE": logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送") try: from .binance_order_event_logger import log_order_event log_order_event(self.account_id, "ALGO_UPDATE", data.get("E"), data.get("o") or {}) except Exception: pass await self._on_algo_update(data.get("o") or {}) else: logger.debug(f"UserDataStream: 收到未知事件类型: {e}") return False async def _on_order_trade_update(self, o: Dict, event_time_ms=None): # 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId # ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对, n=手续费数量, N=手续费资产 event_type = (o.get("x") or "").strip().upper() status = (o.get("X") or "").strip().upper() symbol = (o.get("s") or "").strip() order_id = o.get("i") logger.debug(f"UserDataStream: ORDER_TRADE_UPDATE symbol={symbol!r} orderId={order_id} event={event_type} status={status}") if status != "FILLED": logger.debug(f"UserDataStream: 订单状态非 FILLED,跳过 symbol={symbol!r} orderId={order_id} status={status}") return client_order_id = (o.get("c") or "").strip() or None order_id = o.get("i") ap = o.get("ap") z = o.get("z") reduce_only = o.get("R") is True symbol = (o.get("s") or "").strip() rp = o.get("rp") # 实现盈亏 if order_id is None or ap is None or z is None: return try: ap_f = float(ap) z_f = float(z) except (TypeError, ValueError): return # 特殊 clientOrderId:强平/ADL/结算,仅打日志不参与系统 pending 完善 if client_order_id: if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"): logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}") return # 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交) if event_type and event_type != "TRADE": return # 开仓成交:完善 pending 记录(优先按 client_order_id,无 c 时用 orderId 兜底) if not reduce_only: try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade if client_order_id: updated = Trade.update_pending_to_filled( client_order_id, self.account_id, order_id, ap_f, z_f ) if updated: logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") else: logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}") else: # 无 clientOrderId 时用 orderId 兜底:若该 symbol+account 下仅有一条 pending 且无 entry_order_id,则用本订单号完善 updated = Trade.update_pending_by_entry_order_id(symbol, self.account_id, order_id, ap_f, z_f) if updated: logger.info(f"UserDataStream: 开仓成交已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") else: logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,且兜底未匹配到唯一 pending orderId={order_id} symbol={symbol!r}") except Exception as ex: logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}") else: # 平仓成交(reduceOnly):优先从 Redis 取 entry_order_id(ALGO_UPDATE 可能先到并已缓存) exit_entry_order_id = None try: rc = getattr(self.client, "redis_cache", None) if rc: raw = await rc.get(f"ats:exit_order2entry:{self.account_id}:{order_id}") if raw: exit_entry_order_id = int(raw) if str(raw).strip().isdigit() else None except Exception: pass if symbol: try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade ok, trade_id = Trade.set_exit_order_id_for_open_trade( symbol, self.account_id, order_id, entry_order_id=exit_entry_order_id ) if ok: logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}") # 可能已由 ALGO_UPDATE 写过 exit_order_id,直接按 exit_order_id 取记录 tid = trade_id if tid is None: row = Trade.get_by_exit_order_id(order_id) tid = row.get("id") if row else None if tid is not None: try: rp_f = float(rp) if rp is not None else None commission_f = None try: n_val = o.get("n") if n_val is not None: commission_f = float(n_val) except (TypeError, ValueError): pass exit_time_ts = None if event_time_ms is not None: try: exit_time_ts = int(int(event_time_ms) / 1000) except (TypeError, ValueError): pass # 用推送的 ap/rp 更新平仓信息,与币安一致 Trade.update_exit( trade_id=tid, exit_price=ap_f, exit_reason="sync", pnl=rp_f if rp_f is not None else 0.0, pnl_percent=0.0, exit_order_id=order_id, exit_time_ts=exit_time_ts, realized_pnl=rp_f, commission=commission_f, commission_asset=o.get("N"), ) logger.info( f"UserDataStream: 平仓记录已更新 symbol={symbol!r} orderId={order_id} " f"exit_price={ap_f} realized_pnl={rp_f}" ) except Exception as ex2: logger.warning(f"UserDataStream: update_exit 失败 orderId={order_id}: {ex2}") elif not ok: logger.debug(f"UserDataStream: 平仓订单未匹配到 open 记录 symbol={symbol!r} orderId={order_id}") except Exception as ex: logger.warning(f"UserDataStream: 平仓回写/更新失败 orderId={order_id}: {ex}") def _on_account_update(self, a: Dict): # 文档: a.B = 余额数组,a.P = 持仓信息数组。有 Redis 时只写 Redis、不写进程内存。 global _position_updates_cache, _balance_updates_cache redis_cache = getattr(self.client, "redis_cache", None) B = a.get("B") if isinstance(B, list) and B: for b in B: asset = (b.get("a") or "").strip() if asset: balance_data = { "wb": b.get("wb"), "cw": b.get("cw"), "bc": b.get("bc"), } if redis_cache: if asset == "USDT": asyncio.create_task(self._write_balance_to_redis(asset, balance_data)) else: _balance_updates_cache[asset] = balance_data logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}") P = a.get("P") if isinstance(P, list) and P: positions_list = [] for p in P: s = (p.get("s") or "").strip() if s: if not redis_cache: if s not in _position_updates_cache: _position_updates_cache[s] = [] _position_updates_cache[s] = [p] try: pa = float(p.get("pa") or 0) ep = float(p.get("ep") or 0) if pa != 0: positions_list.append({ "symbol": s, "positionAmt": pa, "entryPrice": ep, "markPrice": float(p.get("markPrice") or 0), "unRealizedProfit": float(p.get("up") or 0), "leverage": int(p.get("leverage") or 0), }) except Exception: pass if redis_cache and positions_list: asyncio.create_task(self._write_positions_to_redis(positions_list)) logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}") async def _write_balance_to_redis(self, asset: str, balance_data: Dict): """写入余额缓存到 Redis(带 TTL,按 account_id 隔离)""" try: redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: redis_key = f"ats:balance:cache:{asset}:{self.account_id}" await redis_cache.set(redis_key, balance_data, ttl=TTL_BALANCE) except Exception as e: logger.debug(f"写入余额缓存到 Redis 失败: {e}") async def _write_positions_to_redis(self, positions_list: List[Dict]): """写入持仓缓存到 Redis(带 TTL,按 account_id 隔离)""" try: redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: redis_key = f"ats:positions:cache:{self.account_id}" await redis_cache.set(redis_key, positions_list, ttl=TTL_POSITIONS) except Exception as e: logger.debug(f"写入持仓缓存到 Redis 失败: {e}") async def _on_algo_update(self, o: Dict): # 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id # 优先按 clientAlgoId(SL_/TP_+entry_order_id) 或 Redis 映射精确匹配,避免同 symbol 多笔持仓时匹配错误 x = (o.get("X") or "").strip().upper() ai = o.get("ai") symbol = (o.get("s") or "").strip() # 币安文档:aid/caid;兼容 i/c、algoId/clientAlgoId algo_id = o.get("aid") or o.get("i") or o.get("algoId") client_algo_id = (o.get("caid") or o.get("c") or o.get("clientAlgoId") or "").strip() or None entry_order_id = None if client_algo_id and (client_algo_id.startswith("SL_") or client_algo_id.startswith("TP_")): try: entry_order_id = client_algo_id.split("_", 1)[1].strip() if not entry_order_id: entry_order_id = None except Exception: pass if entry_order_id is None and algo_id is not None: try: redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: raw = await redis_cache.get(f"ats:algo2entry:{self.account_id}:{algo_id}") if raw: entry_order_id = str(raw).strip() if raw else None except Exception: pass if x in ("TRIGGERED", "FINISHED") and ai and symbol: entry_for_db = int(entry_order_id) if entry_order_id and str(entry_order_id).isdigit() else None # 缓存 orderId -> entry_order_id,供可能先到的 ORDER_TRADE_UPDATE 使用 if entry_for_db is not None: try: rc = getattr(self.client, "redis_cache", None) if rc: await rc.set( f"ats:exit_order2entry:{self.account_id}:{ai}", str(entry_for_db), ttl=120, ) except Exception: pass try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade ok, _ = Trade.set_exit_order_id_for_open_trade( symbol, self.account_id, ai, entry_order_id=entry_for_db ) if ok: logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}" + (f" entry_order_id={entry_order_id}" if entry_order_id else "")) except Exception as ex: logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}") logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")