auto_trade_sys/trading_system/user_data_stream.py
薇薇安 418eff6fb7 feat(risk_manager, user_data_stream): 增强多账号支持与缓存逻辑
在 `risk_manager.py` 中新增可用保证金检查,确保在保证金不足时拒绝开仓请求,提升风险控制能力。在 `user_data_stream.py` 中更新缓存填充逻辑,支持多账号隔离,确保 Redis 缓存键按账号区分,避免数据混淆。此更新优化了系统的稳定性与风险管理。
2026-02-21 10:09:59 +08:00

678 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
合约 User Data Stream订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。
- 先 REST 下单并落库 pendingWS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。
- ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。
- listenKey 每 30 分钟 keepalive收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。
- 单连接有效期不超过 24 小时,届时主动重连(文档建议)。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
# 持仓推送缓存ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
# 余额推送缓存ACCOUNT_UPDATE 的 B 数组a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc }
# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用)
_stream_instance: Optional["UserDataStream"] = None
# 是否已用 REST 结果播种过(未播种时业务应走 REST不返回空缓存
_position_cache_seeded = False
_balance_cache_seeded = False
def get_position_updates_cache() -> Dict[str, List[Dict]]:
"""返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。"""
return dict(_position_updates_cache)
def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]:
"""返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。"""
return dict(_balance_updates_cache)
def get_stream_instance() -> Optional["UserDataStream"]:
"""返回当前运行的 UserDataStream 实例(未启动时为 None"""
return _stream_instance
try:
from .redis_ttl import TTL_POSITIONS
except ImportError:
TTL_POSITIONS = 300
try:
from .redis_ttl import TTL_BALANCE
except ImportError:
TTL_BALANCE = 300
def _cache_account_id() -> int:
"""当前进程的 account_id多账号隔离时 Redis 缓存键必须按账号区分)"""
import os
try:
return int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1)
except Exception:
return 1
async def seed_position_cache(positions: List[Dict], redis_cache: Any = None, account_id: int = None) -> None:
"""用 REST 全量持仓结果填充缓存。有 Redis 时只写 Redis、不占进程内存无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。"""
global _position_updates_cache, _position_cache_seeded
_position_cache_seeded = True
positions_list = []
for pos in positions or []:
symbol = (pos.get("symbol") or "").strip()
if not symbol:
continue
amt = float(pos.get("positionAmt") or 0)
if not redis_cache:
if symbol not in _position_updates_cache:
_position_updates_cache[symbol] = []
_position_updates_cache[symbol] = [{
"s": symbol,
"pa": amt,
"ep": str(pos.get("entryPrice") or "0"),
"up": str(pos.get("unRealizedProfit") or "0"),
"ps": pos.get("positionSide") or "BOTH",
}]
if amt != 0:
positions_list.append({
"symbol": symbol,
"positionAmt": amt,
"entryPrice": float(pos.get("entryPrice") or 0),
"markPrice": float(pos.get("markPrice") or 0),
"unRealizedProfit": float(pos.get("unRealizedProfit") or 0),
"leverage": int(pos.get("leverage") or 0),
})
if redis_cache:
_position_updates_cache.clear()
if positions_list:
try:
aid = int(account_id) if account_id is not None else _cache_account_id()
await redis_cache.set(f"ats:positions:cache:{aid}", positions_list, ttl=TTL_POSITIONS)
except Exception as e:
logger.debug(f"写入持仓缓存到 Redis 失败: {e}")
logger.debug(f"UserDataStream: 已填充持仓缓存Redis=%s", bool(redis_cache))
async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None, account_id: int = None) -> None:
"""用 REST 余额结果填充缓存。有 Redis 时只写 Redis无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。"""
global _balance_updates_cache, _balance_cache_seeded
_balance_cache_seeded = True
if balance and isinstance(balance, dict):
wb = balance.get("walletBalance") or balance.get("total") or 0
av = balance.get("availableBalance") or balance.get("available") or wb
balance_data = {"wb": str(wb), "cw": str(av), "bc": "0"}
if redis_cache:
try:
aid = int(account_id) if account_id is not None else _cache_account_id()
await redis_cache.set(f"ats:balance:cache:USDT:{aid}", balance_data, ttl=TTL_BALANCE)
except Exception as e:
logger.debug(f"写入余额缓存到 Redis 失败: {e}")
else:
_balance_updates_cache["USDT"] = balance_data
logger.debug("UserDataStream: 已填充余额缓存 (USDT, Redis=%s)", bool(redis_cache))
async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = None, account_id: int = None) -> Optional[List[Dict]]:
"""
将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None业务应走 REST
⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。
"""
aid = int(account_id) if account_id is not None else _cache_account_id()
if redis_cache:
try:
redis_key = f"ats:positions:cache:{aid}"
cached = await redis_cache.get(redis_key)
if cached and isinstance(cached, list):
# 过滤最小名义价值
filtered = []
for pos in cached:
try:
pa = float(pos.get("positionAmt") or 0)
ep = float(pos.get("entryPrice") or 0)
if pa == 0:
continue
if min_notional > 0 and abs(pa) * ep < min_notional:
continue
filtered.append(pos)
except Exception:
continue
if filtered:
return filtered
except Exception as e:
logger.debug(f"从 Redis 读取持仓缓存失败: {e}")
# 降级到进程内存缓存
if not _position_cache_seeded:
return None
out = []
for symbol, plist in _position_updates_cache.items():
for p in plist if isinstance(plist, list) else []:
try:
pa = float(p.get("pa") or 0)
except (TypeError, ValueError):
pa = 0
if pa == 0:
continue
ep = float(p.get("ep") or 0)
if min_notional > 0 and abs(pa) * ep < min_notional:
continue
out.append({
"symbol": symbol,
"positionAmt": pa,
"entryPrice": ep,
"markPrice": float(p.get("markPrice") or 0),
"unRealizedProfit": float(p.get("up") or 0),
"leverage": int(p.get("leverage") or 0),
})
return out
async def get_balance_from_cache(redis_cache: Any = None, account_id: int = None) -> Optional[Dict[str, Any]]:
"""
从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。
⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。
"""
aid = int(account_id) if account_id is not None else _cache_account_id()
if redis_cache:
try:
redis_key = f"ats:balance:cache:USDT:{aid}"
cached = await redis_cache.get(redis_key)
if cached and isinstance(cached, dict):
try:
wb = float(cached.get("wb") or cached.get("total") or 0)
cw = float(cached.get("cw") or cached.get("available") or wb)
return {"ok": True, "available": cw, "total": wb, "margin": wb}
except (TypeError, ValueError):
pass
except Exception as e:
logger.debug(f"从 Redis 读取余额缓存失败: {e}")
# 降级到进程内存缓存
if not _balance_cache_seeded:
return None
u = _balance_updates_cache.get("USDT")
if not u:
return None
try:
wb = float(u.get("wb") or 0)
cw = float(u.get("cw") or wb)
except (TypeError, ValueError):
return None
return {"ok": True, "available": cw, "total": wb, "margin": wb}
class UserDataStream:
"""合约用户数据流:创建/保活 listenKey连接 WS处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。"""
def __init__(self, client: Any, account_id: int):
self.client = client
self.account_id = int(account_id)
self._listen_key: Optional[str] = None
self._ws = None
self._task: Optional[asyncio.Task] = None
self._keepalive_task: Optional[asyncio.Task] = None
self._running = False
self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连)
def _ws_base_url(self) -> str:
if getattr(self.client, "testnet", False):
return "wss://stream.binancefuture.com/ws"
return "wss://fstream.binance.com/ws"
async def start(self) -> bool:
"""
创建 listenKey 并启动 WS 接收循环与 keepalive 任务。
⚠️ 优化:优先从缓存获取 listenKey避免重复创建。
"""
global _stream_instance
if self._running:
return True
# ⚠️ 优化:优先从缓存获取 listenKey多进程/多实例共享)
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
self._listen_key = await cache.get_listen_key(self.account_id, self.client)
if self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取 listenKey")
except Exception as e:
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
# 如果缓存未命中,直接创建
if not self._listen_key:
self._listen_key = await self.client.create_futures_listen_key()
if not self._listen_key:
logger.warning("UserDataStream: 无法创建 listenKey跳过启动")
return False
self._running = True
_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
self._keepalive_task = asyncio.create_task(self._run_keepalive())
logger.info("UserDataStream: 已启动ORDER_TRADE_UPDATE / ACCOUNT_UPDATE含持仓与余额推送")
return True
async def stop(self):
"""停止 WS 与 keepalive。"""
global _stream_instance
self._running = False
_stream_instance = None
if self._keepalive_task:
self._keepalive_task.cancel()
try:
await self._keepalive_task
except asyncio.CancelledError:
pass
self._keepalive_task = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
self._listen_key = None
logger.info("UserDataStream: 已停止")
async def _run_keepalive(self):
"""
每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。
⚠️ 优化:
1. 优先使用 WebSocket API keepalive减少 REST 调用
2. 使用缓存管理器更新 listenKey支持多进程共享
"""
while self._running:
await asyncio.sleep(30 * 60)
if not self._running or not self._listen_key:
break
# ⚠️ 优化:使用缓存管理器更新 listenKey
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
# 使用缓存管理器更新(会自动 keepalive 或创建新的)
new_key = await cache.renew_listen_key(self.account_id, self.client, self._listen_key)
if new_key:
if new_key != self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已更新keepalive 失败,创建了新 key")
self._listen_key = new_key
# 如果 key 变了,需要重新连接
if self._ws:
try:
await self._ws.close()
except Exception:
pass
break
continue
except Exception as e:
logger.debug(f"UserDataStream: 使用缓存管理器更新 listenKey 失败: {e}")
# 回退到直接 keepalive如果缓存管理器不可用
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
if not ok and code_1125 and self._ws:
logger.warning("UserDataStream: keepalive 返回 -1125listenKey 不存在),主动断线以换新 key 重连")
try:
await self._ws.close()
except Exception:
pass
break
if not ok:
logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey")
async def _run_ws(self):
"""连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。"""
import aiohttp
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
while self._running:
url = f"{self._ws_base_url()}/{self._listen_key}"
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
self._ws = ws
self._conn_start_time = time.monotonic()
logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
should_break = await self._handle_message(msg.data)
if should_break:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning("UserDataStream: WS 错误")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
# 单连接 24h 主动重连文档每个链接有效期不超过24小时
if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec:
logger.info("UserDataStream: 连接已近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"UserDataStream(account_id=%s): WS 异常 %s: %s60s 后重连",
self.account_id, err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(60)
self._ws = None
self._conn_start_time = None
if not self._running:
break
# ⚠️ 优化:优先从缓存获取 listenKey多进程共享避免重复创建
try:
from .listen_key_cache import get_listen_key_cache
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
if cache:
# 从缓存获取 listenKey如果缓存中有有效的 key会直接返回否则会创建新的
cached_key = await cache.get_listen_key(self.account_id, self.client)
if cached_key:
if cached_key == self._listen_key:
logger.debug(f"UserDataStream(account_id={self.account_id}): 从缓存获取到相同的 listenKey复用")
else:
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取到新的 listenKey可能其他进程创建的")
self._listen_key = cached_key
# 继续使用现有的或缓存中的 listenKey
continue
except Exception as e:
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
# 如果缓存不可用,回退到原有逻辑
# ⚠️ 优化:重连前先尝试 keepalive 现有 listenKey避免重复创建
need_new_key = True
if self._listen_key:
logger.debug(f"UserDataStream(account_id={self.account_id}): 重连前尝试 keepalive 现有 listenKey...")
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
if ok:
logger.info(f"UserDataStream(account_id={self.account_id}): 现有 listenKey keepalive 成功,复用现有 key")
need_new_key = False
elif code_1125:
logger.debug(f"UserDataStream(account_id={self.account_id}): 现有 listenKey 已失效(-1125需要创建新 key")
else:
logger.debug(f"UserDataStream(account_id={self.account_id}): keepalive 失败,尝试创建新 key")
# 只有在需要新 key 时才创建keepalive 失败或没有现有 key
if need_new_key:
# ⚠️ 优化:增加重试机制,避免网络波动导致失败
listen_key_retries = 3
listen_key_created = False
for retry in range(listen_key_retries):
# 注意:根据币安文档,如果账户已有有效的 listenKey创建接口会返回现有 key 并延长有效期
# 所以这里即使"创建"也可能返回现有的 key这是正常的
new_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=1)
if new_key:
# 如果返回的 key 与现有 key 相同,说明是复用现有 key
if new_key == self._listen_key:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已复用(币安返回现有 key")
else:
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已创建(重试 {retry + 1}/{listen_key_retries}")
self._listen_key = new_key
listen_key_created = True
break
if retry < listen_key_retries - 1:
wait_sec = (retry + 1) * 10 # 10秒、20秒、30秒
logger.debug(f"UserDataStream(account_id={self.account_id}): listenKey 创建失败,{wait_sec}秒后重试...")
await asyncio.sleep(wait_sec)
if not listen_key_created:
logger.warning(
"UserDataStream(account_id=%s): 重新创建 listenKey 失败(已重试 %d60s 后重试(请检查该账号 API 权限/网络/IP 白名单)",
self.account_id, listen_key_retries,
)
await asyncio.sleep(60)
continue
async def _handle_message(self, raw: str) -> bool:
"""处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired以触发重连。"""
try:
data = json.loads(raw)
except Exception:
logger.debug(f"UserDataStream: 无法解析推送消息: {raw[:200]}")
return False
e = data.get("e")
if e == "listenKeyExpired":
logger.warning("UserDataStream: 收到 listenKeyExpired将换新 key 重连")
return True
if e == "ORDER_TRADE_UPDATE":
logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送")
self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E"))
elif e == "ACCOUNT_UPDATE":
logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送")
self._on_account_update(data.get("a") or {})
elif e == "ALGO_UPDATE":
logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送")
self._on_algo_update(data.get("o") or {})
else:
logger.debug(f"UserDataStream: 收到未知事件类型: {e}")
return False
def _on_order_trade_update(self, o: Dict, event_time_ms=None):
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对, n=手续费数量, N=手续费资产
event_type = (o.get("x") or "").strip().upper()
status = (o.get("X") or "").strip().upper()
symbol = (o.get("s") or "").strip()
order_id = o.get("i")
logger.debug(f"UserDataStream: ORDER_TRADE_UPDATE symbol={symbol!r} orderId={order_id} event={event_type} status={status}")
if status != "FILLED":
logger.debug(f"UserDataStream: 订单状态非 FILLED跳过 symbol={symbol!r} orderId={order_id} status={status}")
return
client_order_id = (o.get("c") or "").strip() or None
order_id = o.get("i")
ap = o.get("ap")
z = o.get("z")
reduce_only = o.get("R") is True
symbol = (o.get("s") or "").strip()
rp = o.get("rp") # 实现盈亏
if order_id is None or ap is None or z is None:
return
try:
ap_f = float(ap)
z_f = float(z)
except (TypeError, ValueError):
return
# 特殊 clientOrderId强平/ADL/结算,仅打日志不参与系统 pending 完善
if client_order_id:
if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"):
logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}")
return
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交文档x=TRADE 表示有成交)
if event_type and event_type != "TRADE":
return
# 开仓成交:完善 pending 记录(优先按 client_order_id无 c 时用 orderId 兜底)
if not reduce_only:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
if client_order_id:
updated = Trade.update_pending_to_filled(
client_order_id, self.account_id, order_id, ap_f, z_f
)
if updated:
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
else:
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善client_order_id={client_order_id!r} orderId={order_id}")
else:
# 无 clientOrderId 时用 orderId 兜底:若该 symbol+account 下仅有一条 pending 且无 entry_order_id则用本订单号完善
updated = Trade.update_pending_by_entry_order_id(symbol, self.account_id, order_id, ap_f, z_f)
if updated:
logger.info(f"UserDataStream: 开仓成交已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
else:
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId且兜底未匹配到唯一 pending orderId={order_id} symbol={symbol!r}")
except Exception as ex:
logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}")
else:
# 平仓成交(支付式闭环):回写 exit_order_id 并用推送数据更新 exit_price/pnl/commission仅 WS 驱动 DB
if symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
ok, trade_id = Trade.set_exit_order_id_for_open_trade(
symbol, self.account_id, order_id, entry_order_id_hint=None
)
if ok:
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
# 可能已由 ALGO_UPDATE 写过 exit_order_id直接按 exit_order_id 取记录
tid = trade_id
if tid is None:
row = Trade.get_by_exit_order_id(order_id)
tid = row.get("id") if row else None
if tid is not None:
try:
rp_f = float(rp) if rp is not None else None
commission_f = None
try:
n_val = o.get("n")
if n_val is not None:
commission_f = float(n_val)
except (TypeError, ValueError):
pass
exit_time_ts = None
if event_time_ms is not None:
try:
exit_time_ts = int(int(event_time_ms) / 1000)
except (TypeError, ValueError):
pass
# 用推送的 ap/rp 更新平仓信息,与币安一致
Trade.update_exit(
trade_id=tid,
exit_price=ap_f,
exit_reason="sync",
pnl=rp_f if rp_f is not None else 0.0,
pnl_percent=0.0,
exit_order_id=order_id,
exit_time_ts=exit_time_ts,
realized_pnl=rp_f,
commission=commission_f,
commission_asset=o.get("N"),
)
logger.info(
f"UserDataStream: 平仓记录已更新 symbol={symbol!r} orderId={order_id} "
f"exit_price={ap_f} realized_pnl={rp_f}"
)
except Exception as ex2:
logger.warning(f"UserDataStream: update_exit 失败 orderId={order_id}: {ex2}")
elif not ok:
logger.debug(f"UserDataStream: 平仓订单未匹配到 open 记录 symbol={symbol!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: 平仓回写/更新失败 orderId={order_id}: {ex}")
def _on_account_update(self, a: Dict):
# 文档: a.B = 余额数组a.P = 持仓信息数组。有 Redis 时只写 Redis、不写进程内存。
global _position_updates_cache, _balance_updates_cache
redis_cache = getattr(self.client, "redis_cache", None)
B = a.get("B")
if isinstance(B, list) and B:
for b in B:
asset = (b.get("a") or "").strip()
if asset:
balance_data = {
"wb": b.get("wb"),
"cw": b.get("cw"),
"bc": b.get("bc"),
}
if redis_cache:
if asset == "USDT":
asyncio.create_task(self._write_balance_to_redis(asset, balance_data))
else:
_balance_updates_cache[asset] = balance_data
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
P = a.get("P")
if isinstance(P, list) and P:
positions_list = []
for p in P:
s = (p.get("s") or "").strip()
if s:
if not redis_cache:
if s not in _position_updates_cache:
_position_updates_cache[s] = []
_position_updates_cache[s] = [p]
try:
pa = float(p.get("pa") or 0)
ep = float(p.get("ep") or 0)
if pa != 0:
positions_list.append({
"symbol": s,
"positionAmt": pa,
"entryPrice": ep,
"markPrice": float(p.get("markPrice") or 0),
"unRealizedProfit": float(p.get("up") or 0),
"leverage": int(p.get("leverage") or 0),
})
except Exception:
pass
if redis_cache and positions_list:
asyncio.create_task(self._write_positions_to_redis(positions_list))
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
async def _write_balance_to_redis(self, asset: str, balance_data: Dict):
"""写入余额缓存到 Redis带 TTL按 account_id 隔离)"""
try:
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
redis_key = f"ats:balance:cache:{asset}:{self.account_id}"
await redis_cache.set(redis_key, balance_data, ttl=TTL_BALANCE)
except Exception as e:
logger.debug(f"写入余额缓存到 Redis 失败: {e}")
async def _write_positions_to_redis(self, positions_list: List[Dict]):
"""写入持仓缓存到 Redis带 TTL按 account_id 隔离)"""
try:
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
redis_key = f"ats:positions:cache:{self.account_id}"
await redis_cache.set(redis_key, positions_list, ttl=TTL_POSITIONS)
except Exception as e:
logger.debug(f"写入持仓缓存到 Redis 失败: {e}")
def _on_algo_update(self, o: Dict):
# 条件单交易更新推送X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
x = (o.get("X") or "").strip().upper()
ai = o.get("ai")
symbol = (o.get("s") or "").strip()
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
ok, _ = Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai)
if ok:
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
except Exception as ex:
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")