From 415589e625eab94f72bb3689474ab81ec6d9c62e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Tue, 17 Feb 2026 22:11:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(trade,=20position=5Fmanager,=20user=5Fdata?= =?UTF-8?q?=5Fstream):=20=E5=A2=9E=E5=BC=BA=E4=BA=A4=E6=98=93=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E7=AE=A1=E7=90=86=E4=B8=8E=E7=94=A8=E6=88=B7=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=B5=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `models.py` 中新增 `update_entry_order_id` 方法,用于补全或更新开仓订单号,提升交易记录的完整性。更新 `set_exit_order_id_for_open_trade` 方法以支持按 `entry_order_id` 精确匹配,优化平仓订单的回写逻辑。在 `position_manager.py` 中添加对 `entry_order_id` 的处理,确保在保存交易记录时能够及时补全。更新 `user_data_stream.py` 中的日志记录,提供更详细的状态信息,增强系统的可追溯性与调试能力。 --- backend/database/models.py | 38 +++++++++- trading_system/check_user_data_stream.py | 96 ++++++++++++++++++++++++ trading_system/main.py | 10 ++- trading_system/position_manager.py | 25 +++++- trading_system/user_data_stream.py | 16 +++- 5 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 trading_system/check_user_data_stream.py diff --git a/backend/database/models.py b/backend/database/models.py index dac70a0..aa78fe0 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -935,6 +935,21 @@ class Trade: logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}") return False + @staticmethod + def update_entry_order_id(trade_id: int, entry_order_id): + """补全或更新开仓订单号(用于 REST 兜底补全)""" + if not trade_id or not entry_order_id: + return False + try: + db.execute_update( + """UPDATE trades SET entry_order_id = %s WHERE id = %s AND (entry_order_id IS NULL OR entry_order_id = '')""", + (str(entry_order_id), int(trade_id)) + ) + return True + except Exception as e: + logger.warning(f"update_entry_order_id 失败 trade_id={trade_id}: {e}") + return False + @staticmethod def get_all(start_timestamp=None, end_timestamp=None, symbol=None, status=None, trade_type=None, exit_reason=None, account_id: int = None, time_filter: str = "exit"): """ @@ -1021,18 +1036,35 @@ class Trade: ) @staticmethod - def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id) -> bool: - """ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号(仅更新一条)。""" + def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id, entry_order_id: int = None) -> bool: + """ + ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号。 + 优先按 entry_order_id 精确匹配,若无则按 symbol 匹配最早的一条 open 记录。 + """ if not symbol or account_id is None or exit_order_id is None: return False try: if not _table_has_column("trades", "account_id"): return False - # 更新一条:open 且 (exit_order_id 为空) + # 优先按 entry_order_id 精确匹配(如果提供了 entry_order_id) + if entry_order_id: + n = db.execute_update( + """UPDATE trades SET exit_order_id = %s + WHERE account_id = %s AND symbol = %s AND status = 'open' + AND entry_order_id = %s + AND (exit_order_id IS NULL OR exit_order_id = '') + LIMIT 1""", + (str(exit_order_id), int(account_id), symbol.strip(), str(entry_order_id)) + ) + if n and n > 0: + logger.debug(f"set_exit_order_id_for_open_trade: 按 entry_order_id={entry_order_id} 精确匹配成功") + return True + # 否则按 symbol 匹配最早的一条 open 记录(按 entry_time 排序) n = db.execute_update( """UPDATE trades SET exit_order_id = %s WHERE account_id = %s AND symbol = %s AND status = 'open' AND (exit_order_id IS NULL OR exit_order_id = '') + ORDER BY entry_time ASC LIMIT 1""", (str(exit_order_id), int(account_id), symbol.strip()) ) diff --git a/trading_system/check_user_data_stream.py b/trading_system/check_user_data_stream.py new file mode 100644 index 0000000..54d29ec --- /dev/null +++ b/trading_system/check_user_data_stream.py @@ -0,0 +1,96 @@ +""" +User Data Stream 诊断工具:检查 listenKey 创建和 WS 连接状态 +""" +import asyncio +import sys +from pathlib import Path + +# 添加路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root / "trading_system")) +sys.path.insert(0, str(project_root / "backend")) + +async def check_user_data_stream(): + """检查 User Data Stream 状态""" + import os + from binance_client import BinanceClient + import config + + account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") + print(f"检查账号 {account_id} 的 User Data Stream 状态...") + + # 初始化客户端 + client = BinanceClient( + api_key=config.BINANCE_API_KEY, + api_secret=config.BINANCE_API_SECRET, + testnet=config.USE_TESTNET + ) + await client.connect() + + print("\n1. 检查 listenKey 创建...") + listen_key = await client.create_futures_listen_key() + if listen_key: + print(f" ✓ listenKey 创建成功: {listen_key[:20]}...") + else: + print(" ❌ listenKey 创建失败") + print(" 可能原因:") + print(" - API Key 权限不足(需要启用 'Enable Reading' 和 'Enable Futures')") + print(" - 网络连接问题") + print(" - IP 白名单限制") + return + + print("\n2. 检查 WebSocket 连接...") + try: + import aiohttp + ws_url = f"wss://fstream.binance.com/ws/{listen_key}" + print(f" 连接地址: {ws_url[:50]}...") + + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + ws_url, + heartbeat=50, + timeout=aiohttp.ClientTimeout(total=10) + ) as ws: + print(" ✓ WebSocket 连接成功") + print(" 等待接收推送消息(10秒)...") + + received_messages = [] + try: + async with asyncio.timeout(10): + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + received_messages.append(msg.data) + print(f" ✓ 收到消息: {msg.data[:100]}...") + elif msg.type == aiohttp.WSMsgType.ERROR: + print(f" ❌ WebSocket 错误: {msg}") + break + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): + print(" ⚠ WebSocket 已关闭") + break + except asyncio.TimeoutError: + pass + + if received_messages: + print(f"\n ✓ 成功收到 {len(received_messages)} 条推送消息") + else: + print("\n ⚠ 10秒内未收到任何推送消息(可能当前无订单/持仓变化)") + print(" 这是正常的,如果有订单成交或持仓变化,会收到推送") + except Exception as e: + print(f" ❌ WebSocket 连接失败: {e}") + print(f" 错误类型: {type(e).__name__}") + + print("\n3. 检查 keepalive...") + ok, code_1125 = await client.keepalive_futures_listen_key(listen_key) + if ok: + print(" ✓ keepalive 成功") + else: + if code_1125: + print(" ❌ keepalive 返回 -1125(listenKey 不存在或已过期)") + else: + print(" ❌ keepalive 失败") + + await client.disconnect() + print("\n检查完成") + +if __name__ == "__main__": + asyncio.run(check_user_data_stream()) diff --git a/trading_system/main.py b/trading_system/main.py index 6b04914..2da8222 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -358,8 +358,9 @@ async def main(): import os account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") user_data_stream = UserDataStream(client, account_id) + logger.info(f"正在启动 User Data Stream(账号 {account_id})...") if await user_data_stream.start(): - logger.info("✓ User Data Stream 已启动(订单/持仓/余额 WS 推送,30 分钟 keepalive)") + logger.info(f"✓ User Data Stream 已启动(账号 {account_id},订单/持仓/余额 WS 推送,30 分钟 keepalive)") # 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存 try: seed_balance_cache(balance) @@ -369,7 +370,12 @@ async def main(): except Exception as e: logger.warning(f"播种 WS 缓存失败(将仅用 REST): {e}") else: - logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓") + logger.warning(f"⚠ User Data Stream 未启动(账号 {account_id}),将仅依赖 REST 同步订单与持仓") + logger.warning(" 可能原因:") + logger.warning(" 1. listenKey 创建失败(检查 API Key 权限:需要 'Enable Reading' 和 'Enable Futures')") + logger.warning(" 2. 网络连接问题") + logger.warning(" 3. IP 白名单限制") + logger.warning(" 提示:可运行 python -m trading_system.check_user_data_stream 进行诊断") user_data_stream = None # 3.0 市场 WS 多进程共用:选主 + 仅 Leader 建连接,非 Leader 从 Redis 读 diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index b3efe58..492bc35 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -832,6 +832,7 @@ class PositionManager: # 无 pending 或未匹配到:走新建(兜底) logger.info(f"正在保存 {symbol} 交易记录到数据库...") fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id + # 如果 REST 已获取到 entry_order_id,直接写入;否则留空,等待 WS 推送或后续同步补全 trade_id = Trade.create( symbol=symbol, side=side, @@ -839,7 +840,7 @@ class PositionManager: entry_price=entry_price, leverage=leverage, entry_reason=entry_reason, - entry_order_id=entry_order_id, + entry_order_id=entry_order_id, # REST 已获取则直接写入 client_order_id=fallback_client_order_id, stop_loss_price=stop_loss_price, take_profit_price=take_profit_price, @@ -851,7 +852,27 @@ class PositionManager: entry_context=entry_context, account_id=self.account_id, ) - logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") + if entry_order_id: + logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") + else: + logger.warning(f"⚠ {symbol} 交易记录已保存但 entry_order_id 为空 (ID: {trade_id}),等待 WS 推送或后续同步补全") + # 如果有 client_order_id,尝试通过 REST 查询订单号补全 + if fallback_client_order_id: + try: + # 延迟查询,确保订单已入库 + await asyncio.sleep(1) + orders = await self.client.client.futures_get_all_orders( + symbol=symbol, limit=10, recvWindow=10000 + ) + for o in orders or []: + if str(o.get("clientOrderId", "")).strip() == fallback_client_order_id: + found_order_id = o.get("orderId") + if found_order_id: + Trade.update_entry_order_id(trade_id, found_order_id) + logger.info(f"✓ {symbol} 已通过 REST 补全 entry_order_id: {found_order_id}") + break + except Exception as e: + logger.debug(f"{symbol} REST 补全 entry_order_id 失败: {e}") except Exception as e: logger.error(f"❌ 保存交易记录到数据库失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index 482e219..1745d3b 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -202,7 +202,7 @@ class UserDataStream: async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws: self._ws = ws self._conn_start_time = time.monotonic() - logger.info("UserDataStream: WS 已连接") + logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送") async for msg in ws: if not self._running: break @@ -294,6 +294,7 @@ class UserDataStream: # 开仓成交:完善 pending 记录 if not reduce_only: if not client_order_id: + logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,跳过完善 orderId={order_id} symbol={symbol!r}") return try: import sys @@ -307,9 +308,11 @@ class UserDataStream: client_order_id, self.account_id, order_id, ap_f, z_f ) if updated: - logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}") + logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") + else: + logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}") except Exception as ex: - logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}") + logger.warning(f"UserDataStream: update_pending_to_filled 失败 client_order_id={client_order_id!r}: {ex}") else: # 平仓成交:按 symbol 回写 open 记录的 exit_order_id;若有 rp 可记入日志 if rp is not None: @@ -323,8 +326,13 @@ class UserDataStream: if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade - if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id): + # 尝试从订单信息中获取关联的开仓订单号(如果有) + # 注意:币安平仓订单推送中可能不包含开仓订单号,这里先按 symbol 匹配 + entry_order_id_hint = None # 未来可从订单关联信息中提取 + if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id, entry_order_id_hint): logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}") + else: + logger.debug(f"UserDataStream: 平仓订单回写失败(可能已存在或无可匹配记录)symbol={symbol!r} orderId={order_id}") except Exception as ex: logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}")