""" 合约 User Data Stream:订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。 - 先 REST 下单并落库 pending,WS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。 - ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。 - listenKey 每 30 分钟 keepalive;收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。 - 单连接有效期不超过 24 小时,届时主动重连(文档建议)。 """ import asyncio import json import logging import time from typing import Dict, List, Optional, Any logger = logging.getLogger(__name__) # 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions _position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...) # 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 _balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc } # 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用) _stream_instance: Optional["UserDataStream"] = None # 是否已用 REST 结果播种过(未播种时业务应走 REST,不返回空缓存) _position_cache_seeded = False _balance_cache_seeded = False def get_position_updates_cache() -> Dict[str, List[Dict]]: """返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。""" return dict(_position_updates_cache) def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]: """返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。""" return dict(_balance_updates_cache) def get_stream_instance() -> Optional["UserDataStream"]: """返回当前运行的 UserDataStream 实例(未启动时为 None)。""" return _stream_instance def seed_position_cache(positions: List[Dict]) -> None: """用 REST 全量持仓结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。""" global _position_updates_cache, _position_cache_seeded _position_updates_cache.clear() _position_cache_seeded = True for pos in positions or []: symbol = (pos.get("symbol") or "").strip() if not symbol: continue amt = float(pos.get("positionAmt") or 0) _position_updates_cache[symbol] = [{ "s": symbol, "pa": amt, "ep": str(pos.get("entryPrice") or "0"), "up": str(pos.get("unRealizedProfit") or "0"), "ps": pos.get("positionSide") or "BOTH", }] logger.debug(f"UserDataStream: 已填充持仓缓存 {len(_position_updates_cache)} 个 symbol") def seed_balance_cache(balance: Dict[str, Any]) -> None: """用 REST 余额结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。""" global _balance_updates_cache, _balance_cache_seeded _balance_cache_seeded = True if balance and isinstance(balance, dict): wb = balance.get("walletBalance") or balance.get("total") or 0 av = balance.get("availableBalance") or balance.get("available") or wb _balance_updates_cache["USDT"] = {"wb": str(wb), "cw": str(av), "bc": "0"} logger.debug("UserDataStream: 已填充余额缓存 (USDT)") def get_positions_from_cache(min_notional: float = 1.0) -> Optional[List[Dict]]: """将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。""" if not _position_cache_seeded: return None out = [] for symbol, plist in _position_updates_cache.items(): for p in plist if isinstance(plist, list) else []: try: pa = float(p.get("pa") or 0) except (TypeError, ValueError): pa = 0 if pa == 0: continue ep = float(p.get("ep") or 0) if min_notional > 0 and abs(pa) * ep < min_notional: continue out.append({ "symbol": symbol, "positionAmt": pa, "entryPrice": ep, "markPrice": float(p.get("markPrice") or 0), "unRealizedProfit": float(p.get("up") or 0), "leverage": int(p.get("leverage") or 0), }) return out def get_balance_from_cache() -> Optional[Dict[str, Any]]: """从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。""" if not _balance_cache_seeded: return None u = _balance_updates_cache.get("USDT") if not u: return None try: wb = float(u.get("wb") or 0) cw = float(u.get("cw") or wb) except (TypeError, ValueError): return None return {"ok": True, "available": cw, "total": wb, "margin": wb} class UserDataStream: """合约用户数据流:创建/保活 listenKey,连接 WS,处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。""" def __init__(self, client: Any, account_id: int): self.client = client self.account_id = int(account_id) self._listen_key: Optional[str] = None self._ws = None self._task: Optional[asyncio.Task] = None self._keepalive_task: Optional[asyncio.Task] = None self._running = False self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连) def _ws_base_url(self) -> str: if getattr(self.client, "testnet", False): return "wss://stream.binancefuture.com/ws" return "wss://fstream.binance.com/ws" async def start(self) -> bool: """创建 listenKey 并启动 WS 接收循环与 keepalive 任务。""" global _stream_instance if self._running: return True self._listen_key = await self.client.create_futures_listen_key() if not self._listen_key: logger.warning("UserDataStream: 无法创建 listenKey,跳过启动") return False self._running = True _stream_instance = self self._task = asyncio.create_task(self._run_ws()) self._keepalive_task = asyncio.create_task(self._run_keepalive()) logger.info("UserDataStream: 已启动(ORDER_TRADE_UPDATE / ACCOUNT_UPDATE,含持仓与余额推送)") return True async def stop(self): """停止 WS 与 keepalive。""" global _stream_instance self._running = False _stream_instance = None if self._keepalive_task: self._keepalive_task.cancel() try: await self._keepalive_task except asyncio.CancelledError: pass self._keepalive_task = None if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None self._listen_key = None logger.info("UserDataStream: 已停止") async def _run_keepalive(self): """每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。""" while self._running: await asyncio.sleep(30 * 60) if not self._running or not self._listen_key: break ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key) if not ok and code_1125 and self._ws: logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连") try: await self._ws.close() except Exception: pass break if not ok: logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey") async def _run_ws(self): """连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。""" import aiohttp _24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连 while self._running: url = f"{self._ws_base_url()}/{self._listen_key}" try: async with aiohttp.ClientSession() as session: async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws: self._ws = ws self._conn_start_time = time.monotonic() logger.info("UserDataStream: WS 已连接") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: should_break = await self._handle_message(msg.data) if should_break: break elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning("UserDataStream: WS 错误") break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break # 单连接 24h 主动重连(文档:每个链接有效期不超过24小时) if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec: logger.info("UserDataStream: 连接已近 24h,主动重连") break except asyncio.CancelledError: break except Exception as e: logger.warning(f"UserDataStream: WS 异常 {e},60s 后重连") await asyncio.sleep(60) self._ws = None self._conn_start_time = None if not self._running: break # 重连前重新创建 listenKey(旧 key 可能已失效或 listenKeyExpired) self._listen_key = await self.client.create_futures_listen_key() if not self._listen_key: await asyncio.sleep(60) continue async def _handle_message(self, raw: str) -> bool: """处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。""" try: data = json.loads(raw) except Exception: return False e = data.get("e") if e == "listenKeyExpired": logger.warning("UserDataStream: 收到 listenKeyExpired,将换新 key 重连") return True if e == "ORDER_TRADE_UPDATE": self._on_order_trade_update(data.get("o") or {}) elif e == "ACCOUNT_UPDATE": self._on_account_update(data.get("a") or {}) elif e == "ALGO_UPDATE": self._on_algo_update(data.get("o") or {}) return False def _on_order_trade_update(self, o: Dict): # 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId # ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对 event_type = (o.get("x") or "").strip().upper() status = (o.get("X") or "").strip().upper() if status != "FILLED": return client_order_id = (o.get("c") or "").strip() or None order_id = o.get("i") ap = o.get("ap") z = o.get("z") reduce_only = o.get("R") is True symbol = (o.get("s") or "").strip() rp = o.get("rp") # 实现盈亏 if order_id is None or ap is None or z is None: return try: ap_f = float(ap) z_f = float(z) except (TypeError, ValueError): return # 特殊 clientOrderId:强平/ADL/结算,仅打日志不参与系统 pending 完善 if client_order_id: if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"): logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}") return # 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交) if event_type and event_type != "TRADE": return # 开仓成交:完善 pending 记录 if not reduce_only: if not client_order_id: return try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade updated = Trade.update_pending_to_filled( client_order_id, self.account_id, order_id, ap_f, z_f ) if updated: logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}") except Exception as ex: logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}") else: # 平仓成交:按 symbol 回写 open 记录的 exit_order_id;若有 rp 可记入日志 if rp is not None: logger.debug(f"UserDataStream: 平仓订单 FILLED orderId={order_id} symbol={symbol!r} 实现盈亏 rp={rp}") if symbol: try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id): logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}") except Exception as ex: logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}") def _on_account_update(self, a: Dict): # 文档: a.B = 余额数组,每项 a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 # 文档: a.P = 持仓信息数组,每项 s=symbol, pa=仓位, ep=入仓价, ps=LONG/SHORT/BOTH 等 global _position_updates_cache, _balance_updates_cache B = a.get("B") if isinstance(B, list) and B: for b in B: asset = (b.get("a") or "").strip() if asset: _balance_updates_cache[asset] = { "wb": b.get("wb"), "cw": b.get("cw"), "bc": b.get("bc"), } logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}") P = a.get("P") if isinstance(P, list) and P: for p in P: s = (p.get("s") or "").strip() if s: if s not in _position_updates_cache: _position_updates_cache[s] = [] _position_updates_cache[s] = [p] logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}") def _on_algo_update(self, o: Dict): # 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id x = (o.get("X") or "").strip().upper() ai = o.get("ai") symbol = (o.get("s") or "").strip() if x in ("TRIGGERED", "FINISHED") and ai and symbol: try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / "backend" if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai): logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}") except Exception as ex: logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}") logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")