删除了多个不再使用的文档和代码文件,包括交易更新推送、条件订单推送、REST API 文档、WebSocket API 文档及相关的策略分析文档。这些文件的移除有助于清理代码库,确保项目的整洁性与可维护性。
667 lines
33 KiB
Python
667 lines
33 KiB
Python
"""
|
||
合约 User Data Stream:订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。
|
||
- 先 REST 下单并落库 pending,WS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。
|
||
- ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。
|
||
- listenKey 每 30 分钟 keepalive;收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。
|
||
- 单连接有效期不超过 24 小时,届时主动重连(文档建议)。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import time
|
||
from typing import Dict, List, Optional, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
|
||
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
|
||
# 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
|
||
_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc }
|
||
# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用)
|
||
_stream_instance: Optional["UserDataStream"] = None
|
||
# 是否已用 REST 结果播种过(未播种时业务应走 REST,不返回空缓存)
|
||
_position_cache_seeded = False
|
||
_balance_cache_seeded = False
|
||
|
||
|
||
def get_position_updates_cache() -> Dict[str, List[Dict]]:
|
||
"""返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。"""
|
||
return dict(_position_updates_cache)
|
||
|
||
|
||
def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]:
|
||
"""返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。"""
|
||
return dict(_balance_updates_cache)
|
||
|
||
|
||
def get_stream_instance() -> Optional["UserDataStream"]:
|
||
"""返回当前运行的 UserDataStream 实例(未启动时为 None)。"""
|
||
return _stream_instance
|
||
|
||
|
||
try:
|
||
from .redis_ttl import TTL_POSITIONS
|
||
except ImportError:
|
||
TTL_POSITIONS = 300
|
||
try:
|
||
from .redis_ttl import TTL_BALANCE
|
||
except ImportError:
|
||
TTL_BALANCE = 300
|
||
|
||
|
||
async def seed_position_cache(positions: List[Dict], redis_cache: Any = None) -> None:
|
||
"""用 REST 全量持仓结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。"""
|
||
global _position_updates_cache, _position_cache_seeded
|
||
_position_cache_seeded = True
|
||
positions_list = []
|
||
for pos in positions or []:
|
||
symbol = (pos.get("symbol") or "").strip()
|
||
if not symbol:
|
||
continue
|
||
amt = float(pos.get("positionAmt") or 0)
|
||
if not redis_cache:
|
||
if symbol not in _position_updates_cache:
|
||
_position_updates_cache[symbol] = []
|
||
_position_updates_cache[symbol] = [{
|
||
"s": symbol,
|
||
"pa": amt,
|
||
"ep": str(pos.get("entryPrice") or "0"),
|
||
"up": str(pos.get("unRealizedProfit") or "0"),
|
||
"ps": pos.get("positionSide") or "BOTH",
|
||
}]
|
||
if amt != 0:
|
||
positions_list.append({
|
||
"symbol": symbol,
|
||
"positionAmt": amt,
|
||
"entryPrice": float(pos.get("entryPrice") or 0),
|
||
"markPrice": float(pos.get("markPrice") or 0),
|
||
"unRealizedProfit": float(pos.get("unRealizedProfit") or 0),
|
||
"leverage": int(pos.get("leverage") or 0),
|
||
})
|
||
if redis_cache:
|
||
_position_updates_cache.clear()
|
||
if positions_list:
|
||
try:
|
||
await redis_cache.set("ats:positions:cache", positions_list, ttl=TTL_POSITIONS)
|
||
except Exception as e:
|
||
logger.debug(f"写入持仓缓存到 Redis 失败: {e}")
|
||
logger.debug(f"UserDataStream: 已填充持仓缓存(Redis=%s)", bool(redis_cache))
|
||
|
||
|
||
async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None) -> None:
|
||
"""用 REST 余额结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。"""
|
||
global _balance_updates_cache, _balance_cache_seeded
|
||
_balance_cache_seeded = True
|
||
if balance and isinstance(balance, dict):
|
||
wb = balance.get("walletBalance") or balance.get("total") or 0
|
||
av = balance.get("availableBalance") or balance.get("available") or wb
|
||
balance_data = {"wb": str(wb), "cw": str(av), "bc": "0"}
|
||
if redis_cache:
|
||
try:
|
||
await redis_cache.set("ats:balance:cache:USDT", balance_data, ttl=TTL_BALANCE)
|
||
except Exception as e:
|
||
logger.debug(f"写入余额缓存到 Redis 失败: {e}")
|
||
else:
|
||
_balance_updates_cache["USDT"] = balance_data
|
||
logger.debug("UserDataStream: 已填充余额缓存 (USDT, Redis=%s)", bool(redis_cache))
|
||
|
||
|
||
async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = None) -> Optional[List[Dict]]:
|
||
"""
|
||
将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。
|
||
⚠️ 内存优化:优先从 Redis 读取,减少进程内存占用。
|
||
"""
|
||
# 优先从 Redis 读取(多进程共享)
|
||
if redis_cache:
|
||
try:
|
||
redis_key = "ats:positions:cache"
|
||
cached = await redis_cache.get(redis_key)
|
||
if cached and isinstance(cached, list):
|
||
# 过滤最小名义价值
|
||
filtered = []
|
||
for pos in cached:
|
||
try:
|
||
pa = float(pos.get("positionAmt") or 0)
|
||
ep = float(pos.get("entryPrice") or 0)
|
||
if pa == 0:
|
||
continue
|
||
if min_notional > 0 and abs(pa) * ep < min_notional:
|
||
continue
|
||
filtered.append(pos)
|
||
except Exception:
|
||
continue
|
||
if filtered:
|
||
return filtered
|
||
except Exception as e:
|
||
logger.debug(f"从 Redis 读取持仓缓存失败: {e}")
|
||
|
||
# 降级到进程内存缓存
|
||
if not _position_cache_seeded:
|
||
return None
|
||
out = []
|
||
for symbol, plist in _position_updates_cache.items():
|
||
for p in plist if isinstance(plist, list) else []:
|
||
try:
|
||
pa = float(p.get("pa") or 0)
|
||
except (TypeError, ValueError):
|
||
pa = 0
|
||
if pa == 0:
|
||
continue
|
||
ep = float(p.get("ep") or 0)
|
||
if min_notional > 0 and abs(pa) * ep < min_notional:
|
||
continue
|
||
out.append({
|
||
"symbol": symbol,
|
||
"positionAmt": pa,
|
||
"entryPrice": ep,
|
||
"markPrice": float(p.get("markPrice") or 0),
|
||
"unRealizedProfit": float(p.get("up") or 0),
|
||
"leverage": int(p.get("leverage") or 0),
|
||
})
|
||
return out
|
||
|
||
|
||
async def get_balance_from_cache(redis_cache: Any = None) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。
|
||
⚠️ 内存优化:优先从 Redis 读取,减少进程内存占用。
|
||
"""
|
||
# 优先从 Redis 读取(多进程共享)
|
||
if redis_cache:
|
||
try:
|
||
redis_key = "ats:balance:cache:USDT"
|
||
cached = await redis_cache.get(redis_key)
|
||
if cached and isinstance(cached, dict):
|
||
try:
|
||
wb = float(cached.get("wb") or cached.get("total") or 0)
|
||
cw = float(cached.get("cw") or cached.get("available") or wb)
|
||
return {"ok": True, "available": cw, "total": wb, "margin": wb}
|
||
except (TypeError, ValueError):
|
||
pass
|
||
except Exception as e:
|
||
logger.debug(f"从 Redis 读取余额缓存失败: {e}")
|
||
|
||
# 降级到进程内存缓存
|
||
if not _balance_cache_seeded:
|
||
return None
|
||
u = _balance_updates_cache.get("USDT")
|
||
if not u:
|
||
return None
|
||
try:
|
||
wb = float(u.get("wb") or 0)
|
||
cw = float(u.get("cw") or wb)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
return {"ok": True, "available": cw, "total": wb, "margin": wb}
|
||
|
||
|
||
class UserDataStream:
|
||
"""合约用户数据流:创建/保活 listenKey,连接 WS,处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。"""
|
||
|
||
def __init__(self, client: Any, account_id: int):
|
||
self.client = client
|
||
self.account_id = int(account_id)
|
||
self._listen_key: Optional[str] = None
|
||
self._ws = None
|
||
self._task: Optional[asyncio.Task] = None
|
||
self._keepalive_task: Optional[asyncio.Task] = None
|
||
self._running = False
|
||
self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连)
|
||
|
||
def _ws_base_url(self) -> str:
|
||
if getattr(self.client, "testnet", False):
|
||
return "wss://stream.binancefuture.com/ws"
|
||
return "wss://fstream.binance.com/ws"
|
||
|
||
async def start(self) -> bool:
|
||
"""
|
||
创建 listenKey 并启动 WS 接收循环与 keepalive 任务。
|
||
|
||
⚠️ 优化:优先从缓存获取 listenKey,避免重复创建。
|
||
"""
|
||
global _stream_instance
|
||
if self._running:
|
||
return True
|
||
|
||
# ⚠️ 优化:优先从缓存获取 listenKey(多进程/多实例共享)
|
||
try:
|
||
from .listen_key_cache import get_listen_key_cache
|
||
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
|
||
if cache:
|
||
self._listen_key = await cache.get_listen_key(self.account_id, self.client)
|
||
if self._listen_key:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取 listenKey")
|
||
except Exception as e:
|
||
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
|
||
|
||
# 如果缓存未命中,直接创建
|
||
if not self._listen_key:
|
||
self._listen_key = await self.client.create_futures_listen_key()
|
||
if not self._listen_key:
|
||
logger.warning("UserDataStream: 无法创建 listenKey,跳过启动")
|
||
return False
|
||
|
||
self._running = True
|
||
_stream_instance = self
|
||
self._task = asyncio.create_task(self._run_ws())
|
||
self._keepalive_task = asyncio.create_task(self._run_keepalive())
|
||
logger.info("UserDataStream: 已启动(ORDER_TRADE_UPDATE / ACCOUNT_UPDATE,含持仓与余额推送)")
|
||
return True
|
||
|
||
async def stop(self):
|
||
"""停止 WS 与 keepalive。"""
|
||
global _stream_instance
|
||
self._running = False
|
||
_stream_instance = None
|
||
if self._keepalive_task:
|
||
self._keepalive_task.cancel()
|
||
try:
|
||
await self._keepalive_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
self._keepalive_task = None
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
self._task = None
|
||
if self._ws:
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
self._ws = None
|
||
self._listen_key = None
|
||
logger.info("UserDataStream: 已停止")
|
||
|
||
async def _run_keepalive(self):
|
||
"""
|
||
每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。
|
||
|
||
⚠️ 优化:
|
||
1. 优先使用 WebSocket API keepalive,减少 REST 调用
|
||
2. 使用缓存管理器更新 listenKey,支持多进程共享
|
||
"""
|
||
while self._running:
|
||
await asyncio.sleep(30 * 60)
|
||
if not self._running or not self._listen_key:
|
||
break
|
||
|
||
# ⚠️ 优化:使用缓存管理器更新 listenKey
|
||
try:
|
||
from .listen_key_cache import get_listen_key_cache
|
||
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
|
||
if cache:
|
||
# 使用缓存管理器更新(会自动 keepalive 或创建新的)
|
||
new_key = await cache.renew_listen_key(self.account_id, self.client, self._listen_key)
|
||
if new_key:
|
||
if new_key != self._listen_key:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已更新(keepalive 失败,创建了新 key)")
|
||
self._listen_key = new_key
|
||
# 如果 key 变了,需要重新连接
|
||
if self._ws:
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
break
|
||
continue
|
||
except Exception as e:
|
||
logger.debug(f"UserDataStream: 使用缓存管理器更新 listenKey 失败: {e}")
|
||
|
||
# 回退到直接 keepalive(如果缓存管理器不可用)
|
||
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
|
||
if not ok and code_1125 and self._ws:
|
||
logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连")
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
break
|
||
if not ok:
|
||
logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey")
|
||
|
||
async def _run_ws(self):
|
||
"""连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。"""
|
||
import aiohttp
|
||
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
|
||
while self._running:
|
||
url = f"{self._ws_base_url()}/{self._listen_key}"
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
|
||
self._ws = ws
|
||
self._conn_start_time = time.monotonic()
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送")
|
||
async for msg in ws:
|
||
if not self._running:
|
||
break
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
should_break = await self._handle_message(msg.data)
|
||
if should_break:
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning("UserDataStream: WS 错误")
|
||
break
|
||
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
||
break
|
||
# 单连接 24h 主动重连(文档:每个链接有效期不超过24小时)
|
||
if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec:
|
||
logger.info("UserDataStream: 连接已近 24h,主动重连")
|
||
break
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
err_msg = getattr(e, "message", str(e)) or repr(e)
|
||
err_type = type(e).__name__
|
||
logger.warning(
|
||
"UserDataStream(account_id=%s): WS 异常 %s: %s,60s 后重连",
|
||
self.account_id, err_type, err_msg,
|
||
exc_info=logger.isEnabledFor(logging.DEBUG),
|
||
)
|
||
await asyncio.sleep(60)
|
||
self._ws = None
|
||
self._conn_start_time = None
|
||
if not self._running:
|
||
break
|
||
|
||
# ⚠️ 优化:优先从缓存获取 listenKey(多进程共享,避免重复创建)
|
||
try:
|
||
from .listen_key_cache import get_listen_key_cache
|
||
cache = get_listen_key_cache(getattr(self.client, "redis_cache", None))
|
||
if cache:
|
||
# 从缓存获取 listenKey(如果缓存中有有效的 key,会直接返回;否则会创建新的)
|
||
cached_key = await cache.get_listen_key(self.account_id, self.client)
|
||
if cached_key:
|
||
if cached_key == self._listen_key:
|
||
logger.debug(f"UserDataStream(account_id={self.account_id}): 从缓存获取到相同的 listenKey,复用")
|
||
else:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取到新的 listenKey(可能其他进程创建的)")
|
||
self._listen_key = cached_key
|
||
# 继续使用现有的或缓存中的 listenKey
|
||
continue
|
||
except Exception as e:
|
||
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
|
||
|
||
# 如果缓存不可用,回退到原有逻辑
|
||
# ⚠️ 优化:重连前先尝试 keepalive 现有 listenKey,避免重复创建
|
||
need_new_key = True
|
||
if self._listen_key:
|
||
logger.debug(f"UserDataStream(account_id={self.account_id}): 重连前尝试 keepalive 现有 listenKey...")
|
||
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True)
|
||
if ok:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): 现有 listenKey keepalive 成功,复用现有 key")
|
||
need_new_key = False
|
||
elif code_1125:
|
||
logger.debug(f"UserDataStream(account_id={self.account_id}): 现有 listenKey 已失效(-1125),需要创建新 key")
|
||
else:
|
||
logger.debug(f"UserDataStream(account_id={self.account_id}): keepalive 失败,尝试创建新 key")
|
||
|
||
# 只有在需要新 key 时才创建(keepalive 失败或没有现有 key)
|
||
if need_new_key:
|
||
# ⚠️ 优化:增加重试机制,避免网络波动导致失败
|
||
listen_key_retries = 3
|
||
listen_key_created = False
|
||
for retry in range(listen_key_retries):
|
||
# 注意:根据币安文档,如果账户已有有效的 listenKey,创建接口会返回现有 key 并延长有效期
|
||
# 所以这里即使"创建"也可能返回现有的 key,这是正常的
|
||
new_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=1)
|
||
if new_key:
|
||
# 如果返回的 key 与现有 key 相同,说明是复用现有 key
|
||
if new_key == self._listen_key:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已复用(币安返回现有 key)")
|
||
else:
|
||
logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已创建(重试 {retry + 1}/{listen_key_retries})")
|
||
self._listen_key = new_key
|
||
listen_key_created = True
|
||
break
|
||
if retry < listen_key_retries - 1:
|
||
wait_sec = (retry + 1) * 10 # 10秒、20秒、30秒
|
||
logger.debug(f"UserDataStream(account_id={self.account_id}): listenKey 创建失败,{wait_sec}秒后重试...")
|
||
await asyncio.sleep(wait_sec)
|
||
|
||
if not listen_key_created:
|
||
logger.warning(
|
||
"UserDataStream(account_id=%s): 重新创建 listenKey 失败(已重试 %d 次),60s 后重试(请检查该账号 API 权限/网络/IP 白名单)",
|
||
self.account_id, listen_key_retries,
|
||
)
|
||
await asyncio.sleep(60)
|
||
continue
|
||
|
||
async def _handle_message(self, raw: str) -> bool:
|
||
"""处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。"""
|
||
try:
|
||
data = json.loads(raw)
|
||
except Exception:
|
||
logger.debug(f"UserDataStream: 无法解析推送消息: {raw[:200]}")
|
||
return False
|
||
e = data.get("e")
|
||
if e == "listenKeyExpired":
|
||
logger.warning("UserDataStream: 收到 listenKeyExpired,将换新 key 重连")
|
||
return True
|
||
if e == "ORDER_TRADE_UPDATE":
|
||
logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送")
|
||
self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E"))
|
||
elif e == "ACCOUNT_UPDATE":
|
||
logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送")
|
||
self._on_account_update(data.get("a") or {})
|
||
elif e == "ALGO_UPDATE":
|
||
logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送")
|
||
self._on_algo_update(data.get("o") or {})
|
||
else:
|
||
logger.debug(f"UserDataStream: 收到未知事件类型: {e}")
|
||
return False
|
||
|
||
def _on_order_trade_update(self, o: Dict, event_time_ms=None):
|
||
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
|
||
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对, n=手续费数量, N=手续费资产
|
||
event_type = (o.get("x") or "").strip().upper()
|
||
status = (o.get("X") or "").strip().upper()
|
||
symbol = (o.get("s") or "").strip()
|
||
order_id = o.get("i")
|
||
logger.debug(f"UserDataStream: ORDER_TRADE_UPDATE symbol={symbol!r} orderId={order_id} event={event_type} status={status}")
|
||
if status != "FILLED":
|
||
logger.debug(f"UserDataStream: 订单状态非 FILLED,跳过 symbol={symbol!r} orderId={order_id} status={status}")
|
||
return
|
||
client_order_id = (o.get("c") or "").strip() or None
|
||
order_id = o.get("i")
|
||
ap = o.get("ap")
|
||
z = o.get("z")
|
||
reduce_only = o.get("R") is True
|
||
symbol = (o.get("s") or "").strip()
|
||
rp = o.get("rp") # 实现盈亏
|
||
if order_id is None or ap is None or z is None:
|
||
return
|
||
try:
|
||
ap_f = float(ap)
|
||
z_f = float(z)
|
||
except (TypeError, ValueError):
|
||
return
|
||
# 特殊 clientOrderId:强平/ADL/结算,仅打日志不参与系统 pending 完善
|
||
if client_order_id:
|
||
if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"):
|
||
logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}")
|
||
return
|
||
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交)
|
||
if event_type and event_type != "TRADE":
|
||
return
|
||
# 开仓成交:完善 pending 记录(优先按 client_order_id,无 c 时用 orderId 兜底)
|
||
if not reduce_only:
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / "backend"
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import Trade
|
||
if client_order_id:
|
||
updated = Trade.update_pending_to_filled(
|
||
client_order_id, self.account_id, order_id, ap_f, z_f
|
||
)
|
||
if updated:
|
||
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
|
||
else:
|
||
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}")
|
||
else:
|
||
# 无 clientOrderId 时用 orderId 兜底:若该 symbol+account 下仅有一条 pending 且无 entry_order_id,则用本订单号完善
|
||
updated = Trade.update_pending_by_entry_order_id(symbol, self.account_id, order_id, ap_f, z_f)
|
||
if updated:
|
||
logger.info(f"UserDataStream: 开仓成交已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
|
||
else:
|
||
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,且兜底未匹配到唯一 pending orderId={order_id} symbol={symbol!r}")
|
||
except Exception as ex:
|
||
logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}")
|
||
else:
|
||
# 平仓成交(支付式闭环):回写 exit_order_id 并用推送数据更新 exit_price/pnl/commission,仅 WS 驱动 DB
|
||
if symbol:
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / "backend"
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import Trade
|
||
ok, trade_id = Trade.set_exit_order_id_for_open_trade(
|
||
symbol, self.account_id, order_id, entry_order_id_hint=None
|
||
)
|
||
if ok:
|
||
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
|
||
# 可能已由 ALGO_UPDATE 写过 exit_order_id,直接按 exit_order_id 取记录
|
||
tid = trade_id
|
||
if tid is None:
|
||
row = Trade.get_by_exit_order_id(order_id)
|
||
tid = row.get("id") if row else None
|
||
if tid is not None:
|
||
try:
|
||
rp_f = float(rp) if rp is not None else None
|
||
commission_f = None
|
||
try:
|
||
n_val = o.get("n")
|
||
if n_val is not None:
|
||
commission_f = float(n_val)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
exit_time_ts = None
|
||
if event_time_ms is not None:
|
||
try:
|
||
exit_time_ts = int(int(event_time_ms) / 1000)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
# 用推送的 ap/rp 更新平仓信息,与币安一致
|
||
Trade.update_exit(
|
||
trade_id=tid,
|
||
exit_price=ap_f,
|
||
exit_reason="sync",
|
||
pnl=rp_f if rp_f is not None else 0.0,
|
||
pnl_percent=0.0,
|
||
exit_order_id=order_id,
|
||
exit_time_ts=exit_time_ts,
|
||
realized_pnl=rp_f,
|
||
commission=commission_f,
|
||
commission_asset=o.get("N"),
|
||
)
|
||
logger.info(
|
||
f"UserDataStream: 平仓记录已更新 symbol={symbol!r} orderId={order_id} "
|
||
f"exit_price={ap_f} realized_pnl={rp_f}"
|
||
)
|
||
except Exception as ex2:
|
||
logger.warning(f"UserDataStream: update_exit 失败 orderId={order_id}: {ex2}")
|
||
elif not ok:
|
||
logger.debug(f"UserDataStream: 平仓订单未匹配到 open 记录 symbol={symbol!r} orderId={order_id}")
|
||
except Exception as ex:
|
||
logger.warning(f"UserDataStream: 平仓回写/更新失败 orderId={order_id}: {ex}")
|
||
|
||
def _on_account_update(self, a: Dict):
|
||
# 文档: a.B = 余额数组,a.P = 持仓信息数组。有 Redis 时只写 Redis、不写进程内存。
|
||
global _position_updates_cache, _balance_updates_cache
|
||
redis_cache = getattr(self.client, "redis_cache", None)
|
||
|
||
B = a.get("B")
|
||
if isinstance(B, list) and B:
|
||
for b in B:
|
||
asset = (b.get("a") or "").strip()
|
||
if asset:
|
||
balance_data = {
|
||
"wb": b.get("wb"),
|
||
"cw": b.get("cw"),
|
||
"bc": b.get("bc"),
|
||
}
|
||
if redis_cache:
|
||
if asset == "USDT":
|
||
asyncio.create_task(self._write_balance_to_redis(asset, balance_data))
|
||
else:
|
||
_balance_updates_cache[asset] = balance_data
|
||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
|
||
P = a.get("P")
|
||
if isinstance(P, list) and P:
|
||
positions_list = []
|
||
for p in P:
|
||
s = (p.get("s") or "").strip()
|
||
if s:
|
||
if not redis_cache:
|
||
if s not in _position_updates_cache:
|
||
_position_updates_cache[s] = []
|
||
_position_updates_cache[s] = [p]
|
||
try:
|
||
pa = float(p.get("pa") or 0)
|
||
ep = float(p.get("ep") or 0)
|
||
if pa != 0:
|
||
positions_list.append({
|
||
"symbol": s,
|
||
"positionAmt": pa,
|
||
"entryPrice": ep,
|
||
"markPrice": float(p.get("markPrice") or 0),
|
||
"unRealizedProfit": float(p.get("up") or 0),
|
||
"leverage": int(p.get("leverage") or 0),
|
||
})
|
||
except Exception:
|
||
pass
|
||
if redis_cache and positions_list:
|
||
asyncio.create_task(self._write_positions_to_redis(positions_list))
|
||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
|
||
|
||
async def _write_balance_to_redis(self, asset: str, balance_data: Dict):
|
||
"""写入余额缓存到 Redis(带 TTL,避免无限增长)"""
|
||
try:
|
||
redis_cache = getattr(self.client, "redis_cache", None)
|
||
if redis_cache:
|
||
redis_key = f"ats:balance:cache:{asset}"
|
||
await redis_cache.set(redis_key, balance_data, ttl=TTL_BALANCE)
|
||
except Exception as e:
|
||
logger.debug(f"写入余额缓存到 Redis 失败: {e}")
|
||
|
||
async def _write_positions_to_redis(self, positions_list: List[Dict]):
|
||
"""写入持仓缓存到 Redis(带 TTL,避免无限增长)"""
|
||
try:
|
||
redis_cache = getattr(self.client, "redis_cache", None)
|
||
if redis_cache:
|
||
redis_key = "ats:positions:cache"
|
||
await redis_cache.set(redis_key, positions_list, ttl=TTL_POSITIONS)
|
||
except Exception as e:
|
||
logger.debug(f"写入持仓缓存到 Redis 失败: {e}")
|
||
|
||
def _on_algo_update(self, o: Dict):
|
||
# 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
|
||
x = (o.get("X") or "").strip().upper()
|
||
ai = o.get("ai")
|
||
symbol = (o.get("s") or "").strip()
|
||
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / "backend"
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import Trade
|
||
ok, _ = Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai)
|
||
if ok:
|
||
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
|
||
except Exception as ex:
|
||
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
|
||
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")
|