auto_trade_sys/trading_system/user_data_stream.py
薇薇安 0a7bb0de2d feat(user_data_stream, binance_client): 优化 listenKey 管理与缓存机制
在 `user_data_stream.py` 中更新 `start` 方法,优先从缓存获取 listenKey,避免重复创建。增强了错误处理和日志记录,确保在缓存不可用时能够回退到创建新 key 的逻辑。更新 `binance_client.py` 中的 `create_futures_listen_key` 方法,新增重试机制以提高稳定性。此改动提升了 listenKey 管理的灵活性和系统的性能。
2026-02-18 00:11:54 +08:00

496 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
合约 User Data Stream订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。
- 先 REST 下单并落库 pendingWS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。
- ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。
- listenKey 每 30 分钟 keepalive收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。
- 单连接有效期不超过 24 小时,届时主动重连(文档建议)。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
# 持仓推送缓存ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
# 余额推送缓存ACCOUNT_UPDATE 的 B 数组a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc }
# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用)
_stream_instance: Optional["UserDataStream"] = None
# 是否已用 REST 结果播种过(未播种时业务应走 REST不返回空缓存
_position_cache_seeded = False
_balance_cache_seeded = False
def get_position_updates_cache() -> Dict[str, List[Dict]]:
"""返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。"""
return dict(_position_updates_cache)
def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]:
"""返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。"""
return dict(_balance_updates_cache)
def get_stream_instance() -> Optional["UserDataStream"]:
"""返回当前运行的 UserDataStream 实例(未启动时为 None"""
return _stream_instance
def seed_position_cache(positions: List[Dict]) -> None:
"""用 REST 全量持仓结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
global _position_updates_cache, _position_cache_seeded
_position_updates_cache.clear()
_position_cache_seeded = True
for pos in positions or []:
symbol = (pos.get("symbol") or "").strip()
if not symbol:
continue
amt = float(pos.get("positionAmt") or 0)
_position_updates_cache[symbol] = [{
"s": symbol,
"pa": amt,
"ep": str(pos.get("entryPrice") or "0"),
"up": str(pos.get("unRealizedProfit") or "0"),
"ps": pos.get("positionSide") or "BOTH",
}]
logger.debug(f"UserDataStream: 已填充持仓缓存 {len(_position_updates_cache)} 个 symbol")
def seed_balance_cache(balance: Dict[str, Any]) -> None:
"""用 REST 余额结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
global _balance_updates_cache, _balance_cache_seeded
_balance_cache_seeded = True
if balance and isinstance(balance, dict):
wb = balance.get("walletBalance") or balance.get("total") or 0
av = balance.get("availableBalance") or balance.get("available") or wb
_balance_updates_cache["USDT"] = {"wb": str(wb), "cw": str(av), "bc": "0"}
logger.debug("UserDataStream: 已填充余额缓存 (USDT)")
def get_positions_from_cache(min_notional: float = 1.0) -> Optional[List[Dict]]:
"""将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None业务应走 REST"""
if not _position_cache_seeded:
return None
out = []
for symbol, plist in _position_updates_cache.items():
for p in plist if isinstance(plist, list) else []:
try:
pa = float(p.get("pa") or 0)
except (TypeError, ValueError):
pa = 0
if pa == 0:
continue
ep = float(p.get("ep") or 0)
if min_notional > 0 and abs(pa) * ep < min_notional:
continue
out.append({
"symbol": symbol,
"positionAmt": pa,
"entryPrice": ep,
"markPrice": float(p.get("markPrice") or 0),
"unRealizedProfit": float(p.get("up") or 0),
"leverage": int(p.get("leverage") or 0),
})
return out
def get_balance_from_cache() -> Optional[Dict[str, Any]]:
"""从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。"""
if not _balance_cache_seeded:
return None
u = _balance_updates_cache.get("USDT")
if not u:
return None
try:
wb = float(u.get("wb") or 0)
cw = float(u.get("cw") or wb)
except (TypeError, ValueError):
return None
return {"ok": True, "available": cw, "total": wb, "margin": wb}
class UserDataStream:
"""合约用户数据流:创建/保活 listenKey连接 WS处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。"""
def __init__(self, client: Any, account_id: int):
self.client = client
self.account_id = int(account_id)
self._listen_key: Optional[str] = None
self._ws = None
self._task: Optional[asyncio.Task] = None
self._keepalive_task: Optional[asyncio.Task] = None
self._running = False
self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连)
def _ws_base_url(self) -> str:
if getattr(self.client, "testnet", False):
return "wss://stream.binancefuture.com/ws"
return "wss://fstream.binance.com/ws"
async def start(self) -> bool:
"""
创建 listenKey 并启动 WS 接收循环与 keepalive 任务。
⚠️ 优化:优先从缓存获取 listenKey避免重复创建。
"""
global _stream_instance
if self._running:
return True
# ⚠️ 优化:优先从缓存获取 listenKey多进程/多实例共享)
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
self._listen_key = await cache.get_listen_key(self.account_id, self.client)
if self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取 listenKey")
except Exception as e:
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
# 如果缓存未命中,直接创建
if not self._listen_key:
self._listen_key = await self.client.create_futures_listen_key()
if not self._listen_key:
logger.warning("UserDataStream: 无法创建 listenKey跳过启动")
return False
self._running = True
_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
self._keepalive_task = asyncio.create_task(self._run_keepalive())
logger.info("UserDataStream: 已启动ORDER_TRADE_UPDATE / ACCOUNT_UPDATE含持仓与余额推送")
return True
async def stop(self):
"""停止 WS 与 keepalive。"""
global _stream_instance
self._running = False
_stream_instance = None
if self._keepalive_task:
self._keepalive_task.cancel()
try:
await self._keepalive_task
except asyncio.CancelledError:
pass
self._keepalive_task = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
self._listen_key = None
logger.info("UserDataStream: 已停止")
async def _run_keepalive(self):
"""
每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。
⚠️ 优化:
1. 优先使用 WebSocket API keepalive减少 REST 调用
2. 使用缓存管理器更新 listenKey支持多进程共享
"""
while self._running:
await asyncio.sleep(30 * 60)
if not self._running or not self._listen_key:
break
# ⚠️ 优化:使用缓存管理器更新 listenKey
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
# 使用缓存管理器更新(会自动 keepalive 或创建新的)
new_key = await cache.renew_listen_key(self.account_id, self.client, self._listen_key)
if new_key:
if new_key != self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已更新keepalive 失败,创建了新 key")
self._listen_key = new_key
# 如果 key 变了,需要重新连接
if self._ws:
try:
await self._ws.close()
except Exception:
pass
break
continue
except Exception as e:
logger.debug(f"UserDataStream: 使用缓存管理器更新 listenKey 失败: {e}")
# 回退到直接 keepalive如果缓存管理器不可用
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
if not ok and code_1125 and self._ws:
logger.warning("UserDataStream: keepalive 返回 -1125listenKey 不存在),主动断线以换新 key 重连")
try:
await self._ws.close()
except Exception:
pass
break
if not ok:
logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey")
async def _run_ws(self):
"""连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。"""
import aiohttp
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
while self._running:
url = f"{self._ws_base_url()}/{self._listen_key}"
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
self._ws = ws
self._conn_start_time = time.monotonic()
logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
should_break = await self._handle_message(msg.data)
if should_break:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning("UserDataStream: WS 错误")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
# 单连接 24h 主动重连文档每个链接有效期不超过24小时
if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec:
logger.info("UserDataStream: 连接已近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"UserDataStream(account_id=%s): WS 异常 %s: %s60s 后重连",
self.account_id, err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(60)
self._ws = None
self._conn_start_time = None
if not self._running:
break
# ⚠️ 优化:优先从缓存获取 listenKey多进程共享避免重复创建
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
# 从缓存获取 listenKey如果缓存中有有效的 key会直接返回否则会创建新的
cached_key = await cache.get_listen_key(self.account_id, self.client)
if cached_key:
if cached_key == self._listen_key:
logger.debug(f"UserDataStream(account_id={self.account_id}): 从缓存获取到相同的 listenKey复用")
else:
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取到新的 listenKey可能其他进程创建的")
self._listen_key = cached_key
# 继续使用现有的或缓存中的 listenKey
continue
except Exception as e:
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
# 如果缓存不可用,回退到原有逻辑
# ⚠️ 优化:重连前先尝试 keepalive 现有 listenKey避免重复创建
need_new_key = True
if self._listen_key:
logger.debug(f"UserDataStream(account_id={self.account_id}): 重连前尝试 keepalive 现有 listenKey...")
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
if ok:
logger.info(f"UserDataStream(account_id={self.account_id}): 现有 listenKey keepalive 成功,复用现有 key")
need_new_key = False
elif code_1125:
logger.debug(f"UserDataStream(account_id={self.account_id}): 现有 listenKey 已失效(-1125需要创建新 key")
else:
logger.debug(f"UserDataStream(account_id={self.account_id}): keepalive 失败,尝试创建新 key")
# 只有在需要新 key 时才创建keepalive 失败或没有现有 key
if need_new_key:
# ⚠️ 优化:增加重试机制,避免网络波动导致失败
listen_key_retries = 3
listen_key_created = False
for retry in range(listen_key_retries):
# 注意:根据币安文档,如果账户已有有效的 listenKey创建接口会返回现有 key 并延长有效期
# 所以这里即使"创建"也可能返回现有的 key这是正常的
new_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=1)
if new_key:
# 如果返回的 key 与现有 key 相同,说明是复用现有 key
if new_key == self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已复用(币安返回现有 key")
else:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已创建(重试 {retry + 1}/{listen_key_retries}")
self._listen_key = new_key
listen_key_created = True
break
if retry < listen_key_retries - 1:
wait_sec = (retry + 1) * 10 # 10秒、20秒、30秒
logger.debug(f"UserDataStream(account_id={self.account_id}): listenKey 创建失败,{wait_sec}秒后重试...")
await asyncio.sleep(wait_sec)
if not listen_key_created:
logger.warning(
"UserDataStream(account_id=%s): 重新创建 listenKey 失败(已重试 %d60s 后重试(请检查该账号 API 权限/网络/IP 白名单)",
self.account_id, listen_key_retries,
)
await asyncio.sleep(60)
continue
async def _handle_message(self, raw: str) -> bool:
"""处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired以触发重连。"""
try:
data = json.loads(raw)
except Exception:
logger.debug(f"UserDataStream: 无法解析推送消息: {raw[:200]}")
return False
e = data.get("e")
if e == "listenKeyExpired":
logger.warning("UserDataStream: 收到 listenKeyExpired将换新 key 重连")
return True
if e == "ORDER_TRADE_UPDATE":
logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送")
self._on_order_trade_update(data.get("o") or {})
elif e == "ACCOUNT_UPDATE":
logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送")
self._on_account_update(data.get("a") or {})
elif e == "ALGO_UPDATE":
logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送")
self._on_algo_update(data.get("o") or {})
else:
logger.debug(f"UserDataStream: 收到未知事件类型: {e}")
return False
def _on_order_trade_update(self, o: Dict):
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对
event_type = (o.get("x") or "").strip().upper()
status = (o.get("X") or "").strip().upper()
symbol = (o.get("s") or "").strip()
order_id = o.get("i")
logger.debug(f"UserDataStream: ORDER_TRADE_UPDATE symbol={symbol!r} orderId={order_id} event={event_type} status={status}")
if status != "FILLED":
logger.debug(f"UserDataStream: 订单状态非 FILLED跳过 symbol={symbol!r} orderId={order_id} status={status}")
return
client_order_id = (o.get("c") or "").strip() or None
order_id = o.get("i")
ap = o.get("ap")
z = o.get("z")
reduce_only = o.get("R") is True
symbol = (o.get("s") or "").strip()
rp = o.get("rp") # 实现盈亏
if order_id is None or ap is None or z is None:
return
try:
ap_f = float(ap)
z_f = float(z)
except (TypeError, ValueError):
return
# 特殊 clientOrderId强平/ADL/结算,仅打日志不参与系统 pending 完善
if client_order_id:
if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"):
logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}")
return
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交文档x=TRADE 表示有成交)
if event_type and event_type != "TRADE":
return
# 开仓成交:完善 pending 记录
if not reduce_only:
if not client_order_id:
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId跳过完善 orderId={order_id} symbol={symbol!r}")
return
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
updated = Trade.update_pending_to_filled(
client_order_id, self.account_id, order_id, ap_f, z_f
)
if updated:
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
else:
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善client_order_id={client_order_id!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: update_pending_to_filled 失败 client_order_id={client_order_id!r}: {ex}")
else:
# 平仓成交:按 symbol 回写 open 记录的 exit_order_id若有 rp 可记入日志
if rp is not None:
logger.debug(f"UserDataStream: 平仓订单 FILLED orderId={order_id} symbol={symbol!r} 实现盈亏 rp={rp}")
if symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
# 尝试从订单信息中获取关联的开仓订单号(如果有)
# 注意:币安平仓订单推送中可能不包含开仓订单号,这里先按 symbol 匹配
entry_order_id_hint = None # 未来可从订单关联信息中提取
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id, entry_order_id_hint):
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
else:
logger.debug(f"UserDataStream: 平仓订单回写失败可能已存在或无可匹配记录symbol={symbol!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}")
def _on_account_update(self, a: Dict):
# 文档: a.B = 余额数组,每项 a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
# 文档: a.P = 持仓信息数组,每项 s=symbol, pa=仓位, ep=入仓价, ps=LONG/SHORT/BOTH 等
global _position_updates_cache, _balance_updates_cache
B = a.get("B")
if isinstance(B, list) and B:
for b in B:
asset = (b.get("a") or "").strip()
if asset:
_balance_updates_cache[asset] = {
"wb": b.get("wb"),
"cw": b.get("cw"),
"bc": b.get("bc"),
}
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
P = a.get("P")
if isinstance(P, list) and P:
for p in P:
s = (p.get("s") or "").strip()
if s:
if s not in _position_updates_cache:
_position_updates_cache[s] = []
_position_updates_cache[s] = [p]
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
def _on_algo_update(self, o: Dict):
# 条件单交易更新推送X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
x = (o.get("X") or "").strip().upper()
ai = o.get("ai")
symbol = (o.get("s") or "").strip()
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai):
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
except Exception as ex:
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")