diff --git a/backend/database/models.py b/backend/database/models.py index 7e5f512..dce48dc 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -950,6 +950,37 @@ class Trade: logger.debug(f"get_trades_missing_entry_order_id 失败: {e}") return [] + @staticmethod + def get_pending_recent(account_id: int, limit: int = 50, max_age_sec: int = 86400): + """ + 获取最近时间范围内 status=pending 的记录,用于正向流程 pending 对账。 + 返回有 client_order_id 或 entry_order_id 的记录,按 id 降序。 + """ + if account_id is None: + return [] + try: + if not _table_has_column("trades", "account_id"): + return [] + if not _table_has_column("trades", "client_order_id") and not _table_has_column("trades", "entry_order_id"): + return [] + import time + cutoff_ts = int(time.time()) - max(1, int(max_age_sec)) + # 用 created_at 或 entry_time 作为时间过滤(created_at 更准确表示插入时间) + time_col = "created_at" if _table_has_column("trades", "created_at") else "entry_time" + query = f""" + SELECT * FROM trades + WHERE account_id = %s AND status = 'pending' + AND {time_col} >= %s + AND (client_order_id IS NOT NULL AND client_order_id != '' + OR entry_order_id IS NOT NULL AND entry_order_id != 0 AND entry_order_id != '') + ORDER BY id DESC + LIMIT %s + """ + return db.execute_query(query, (int(account_id), cutoff_ts, int(limit))) + except Exception as e: + logger.debug(f"get_pending_recent 失败: {e}") + return [] + @staticmethod def get_by_client_order_id(client_order_id, account_id: int = None): """根据 clientOrderId 获取交易记录(可选按 account_id 隔离)""" diff --git a/docs/正向流程漏洞分析与加固方案.md b/docs/正向流程漏洞分析与加固方案.md new file mode 100644 index 0000000..c157887 --- /dev/null +++ b/docs/正向流程漏洞分析与加固方案.md @@ -0,0 +1,172 @@ +# 正向流程漏洞分析与加固方案 + +> 目标:弄清楚正常流程为什么会出现「币安有持仓、DB 无 open 记录」,并完善正向流程,使正常流程掌控全局。 + +## 一、正常流程梳理 + +### 1.1 开仓链路(简化) + +``` +strategy.open_position() + → 创建 pending 记录 (DB, client_order_id) + → place_order 下单币安 + → _wait_for_order_filled() REST 轮询 + → 若 FILLED:update_pending_to_filled(),update_open_fields() + → 若超时/CANCELED:撤单,返回 None +``` + +### 1.2 两条更新路径 + +| 路径 | 触发点 | 说明 | +|------|--------|------| +| **REST 路径** | `open_position` 中 `_wait_for_order_filled` 返回 ok | 进程存活、未超时 | +| **WS 路径** | `UserDataStream._on_order_trade_update` 收到 ORDER_TRADE_UPDATE | 需 listenKey 连接正常 | + +正常情况下,REST 和 WS 都会尝试更新;`update_pending_to_filled` 幂等,重复更新无害。 + +--- + +## 二、遗漏场景分析 + +### 2.1 进程崩溃 / 重启(最主要) + +**场景**: +- 创建 pending,下单币安,进入 `_wait_for_order_filled` 轮询 +- 进程在轮询过程中崩溃(OOM、kill、异常退出) +- 订单在币安成交 +- 重启后: + - REST 路径不会再执行(`open_position` 已结束) + - WS 不会重放历史推送,`ORDER_TRADE_UPDATE` 已丢失 + +**结果**:pending 长期残留,币安有持仓,DB 无 open。 + +### 2.2 WS 断线期间成交 + +**场景**: +- listenKey 失效 / 网络抖动,WS 断开 +- 在断开期间订单成交,未收到 ORDER_TRADE_UPDATE +- REST 路径:若 `open_position` 仍在运行,轮询会拿到 FILLED,可以更新;若已超时返回,则不会更新 + +**结论**:只有在「WS 断线 + REST 已超时返回」时,才会出现漏记。`open_position` 中限价单超时后会撤单,一般不留下持仓;但若存在「撤单与成交」的竞态,仍可能漏记。 + +### 2.3 listenKey 失效 + +- 60 分钟无 keepalive 会失效 +- 文档建议 24 小时主动重连 +- 失效期间新连接不会重放历史事件 + +效果同 2.2。 + +### 2.4 重连空窗期 + +- 断线 → 60s 后重连 → 新 listenKey +- 空窗期内的成交事件永久丢失 +- 若该期间 REST 也未完成轮询(例如进程崩溃),则必然漏记 + +### 2.5 update_pending_to_filled 异常 + +- 若 `Trade.update_pending_to_filled` 抛异常,`_on_order_trade_update` 会 catch 并打日志,pending 保持 +- REST 路径若在调用前崩溃,则完全依赖 WS;WS 路径若异常,则完全依赖 REST +- 任一路径失败且另一路径也失败,则漏记 + +### 2.6 竞态:撤单 vs 成交 + +- 限价单超时,调用 `cancel_order` +- 若撤单请求发出时订单刚好成交,撤单可能失败 +- 当前逻辑:超时则 `return None`,pending 保留,不会执行 `update_pending_to_filled` +- **结果**:pending 残留 + 币安已成交,属于漏记 + +--- + +## 三、当前 sync_positions_with_binance 的覆盖范围 + +| 情况 | 是否处理 | +|------|----------| +| DB open、币安无 | ✅ 更新 DB 为 closed | +| DB open、币安有 | ✅ 加载到 active_positions | +| 币安有、DB 无 open | ❌ 不处理(依赖 SYNC_RECOVER_MISSING_POSITIONS) | +| DB pending、币安订单已 FILLED | ❌ 不处理 | + +当前同步逻辑不包含「pending 对账」:不会主动查币安订单状态,也不会把已成交的 pending 转为 open。 + +--- + +## 四、正向流程加固方案 + +### 4.1 思路 + +不依赖补建(SYNC_RECOVER_MISSING_POSITIONS),在正向流程中补齐「pending 对账」能力,使: + +- 有 pending 且有 client_order_id / entry_order_id 时,主动查币安订单状态 +- 若已 FILLED,则执行 `update_pending_to_filled`,将 pending 转为 open + +### 4.2 加固点 1:WS 重连后 pending 对账(推荐) + +**位置**:`user_data_stream.py`,`_run_ws` 重连成功后 + +**逻辑**: +- 重连成功后,查询当前账号下 status=pending 且有 client_order_id 的记录(可限制如 24h 内) +- 对每条记录调用币安 REST:`futures_get_order(symbol, orderId)` 或按 client_order_id 查 +- 若 status=FILLED,调用 `Trade.update_pending_to_filled` + +**意义**:补齐 WS 断线期间丢失的 ORDER_TRADE_UPDATE。 + +### 4.3 加固点 2:sync_positions_with_binance 中增加 pending 对账(推荐) + +**位置**:`position_manager.sync_positions_with_binance` + +**逻辑**: +- 在现有「DB open vs 币安持仓」同步之外,增加: + - 查询 `Trade.get_pending_recent(account_id, limit=50, max_age_sec=86400)`(需在 models 中新增) + - 对每条 pending,若存在 client_order_id 或 entry_order_id,查币安订单 + - 若 FILLED,则 `update_pending_to_filled` 或 `update_pending_by_entry_order_id` + +**意义**:周期性兜底,覆盖进程重启、WS 漏推等场景。 + +### 4.4 加固点 3:撤单后校验是否已成交(可选) + +**位置**:`position_manager.open_position`,在 `cancel_order` 之后 + +**逻辑**: +- 撤单后(或撤单异常时),再查一次订单状态 +- 若 status=FILLED,则按 REST 路径正常执行 `update_pending_to_filled`,避免竞态漏记 + +**意义**:消除「撤单与成交」竞态导致的漏记。 + +### 4.5 加固点 4:update_pending_to_filled robustness + +- 保持幂等(当前已满足) +- 异常时记录清晰日志,便于排查 +- 可选:对瞬时 DB 异常做有限次重试 + +--- + +## 五、实现优先级建议 + +| 优先级 | 加固点 | 影响 | 复杂度 | +|--------|--------|------|--------| +| P0 | sync_positions_with_binance 中 pending 对账 | 覆盖进程重启、WS 漏推等主要漏记 | 中 | +| P0 | WS 重连后 pending 对账 | 覆盖断线期间的漏推 | 中 | +| P1 | 撤单后校验是否已成交 | 消除竞态漏记 | 低 | +| P2 | update_pending_to_filled 重试与日志 | 提升可靠性 | 低 | + +--- + +## 六、需要的 DB / API 支持 + +1. **Trade 模型**: + - `get_pending_recent(account_id, limit, max_age_sec)`:返回指定时间范围内的 pending 记录,用于对账 + +2. **币安 API**: + - 按 `clientOrderId` 查询:`futures_get_all_orders(symbol, limit)` 过滤,或使用支持 `origClientOrderId` 的接口(如有) + - 按 `orderId` 查询:`futures_get_order(symbol, orderId)`(已有) + +3. **多账号**:以上逻辑需按 `account_id` 隔离,保证对账时使用对应账号的 client / API。 + +--- + +## 七、小结 + +- 主要漏记来自:**进程崩溃 + 订单已成交**,以及 **WS 断线期间的成交**。 +- 正向流程目前缺少「pending 对账」:不会主动用币安订单状态修正 pending。 +- 加固方向:在 **WS 重连后** 和 **sync_positions_with_binance** 中加入 pending 对账,使正常流程在运行中即可发现并修正漏记,而不依赖单独的补建逻辑。 diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 907d9f3..61e6b25 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -2329,6 +2329,24 @@ class BinanceClient: logger.debug(f"取消 Algo 条件单失败 algoId={algo_id}: {e}") return False + async def get_order_by_client_order_id(self, symbol: str, client_order_id: str) -> Optional[Dict]: + """ + 按 origClientOrderId 查询订单(用于 pending 对账)。 + 返回订单信息或 None(不存在/异常)。 + """ + if not symbol or not (client_order_id or "").strip(): + return None + try: + info = await self.client.futures_get_order( + symbol=symbol, + origClientOrderId=(client_order_id or "").strip(), + recvWindow=20000, + ) + return info if isinstance(info, dict) else None + except Exception as e: + logger.debug(f"{symbol} 按 origClientOrderId 查询失败: {e}") + return None + async def get_open_orders(self, symbol: str) -> List[Dict]: """ 获取某交易对的未成交委托(用于防止重复挂保护单)。 diff --git a/trading_system/pending_reconcile.py b/trading_system/pending_reconcile.py new file mode 100644 index 0000000..23280a9 --- /dev/null +++ b/trading_system/pending_reconcile.py @@ -0,0 +1,85 @@ +""" +正向流程加固:pending 对账模块 +供 sync_positions_with_binance 和 UserDataStream 重连后调用,补齐因 WS 断线/进程重启漏掉的 pending→open。 +""" +import logging + +logger = logging.getLogger(__name__) + +DB_AVAILABLE = False +Trade = None +try: + import sys + from pathlib import Path + project_root = Path(__file__).parent.parent + backend_path = project_root / "backend" + if backend_path.exists(): + sys.path.insert(0, str(backend_path)) + from database.models import Trade + DB_AVAILABLE = True +except Exception: + pass + + +async def reconcile_pending_with_binance(client, account_id: int, limit: int = 50, max_age_sec: int = 86400) -> int: + """ + 对 status=pending 记录查币安订单,若已 FILLED 则更新为 open。 + 返回成功对账数量。 + """ + if not DB_AVAILABLE or not Trade: + return 0 + try: + pending_list = Trade.get_pending_recent(account_id, limit=limit, max_age_sec=max_age_sec) + if not pending_list: + return 0 + reconciled = 0 + for row in pending_list: + try: + symbol = (row.get("symbol") or "").strip() + client_order_id = (row.get("client_order_id") or "").strip() or None + entry_order_id = row.get("entry_order_id") + if not symbol: + continue + order_info = None + if client_order_id and hasattr(client, "get_order_by_client_order_id"): + order_info = await client.get_order_by_client_order_id(symbol, client_order_id) + if not order_info and entry_order_id: + try: + oid = int(entry_order_id) + order_info = await client.client.futures_get_order( + symbol=symbol, orderId=oid, recvWindow=20000 + ) + except Exception: + pass + if not order_info or str(order_info.get("status")).upper() != "FILLED": + continue + ap = order_info.get("avgPrice") or order_info.get("price") or 0 + z = order_info.get("executedQty") or 0 + try: + ap_f = float(ap) + z_f = float(z) + except (TypeError, ValueError): + continue + if ap_f <= 0 or z_f <= 0: + continue + order_id = order_info.get("orderId") + if client_order_id: + ok = Trade.update_pending_to_filled( + client_order_id, account_id, order_id, ap_f, z_f + ) + else: + ok = Trade.update_pending_by_entry_order_id( + symbol, account_id, order_id, ap_f, z_f + ) + if ok: + reconciled += 1 + logger.info( + f"[账号{account_id}] pending 对账: {symbol} client_order_id={client_order_id!r} " + f"已完善为 open (orderId={order_id} 成交价={ap_f:.4f} 数量={z_f:.4f})" + ) + except Exception as e: + logger.debug(f"[账号{account_id}] pending 对账单条失败: {e}") + return reconciled + except Exception as e: + logger.debug(f"[账号{account_id}] pending 对账失败: {e}") + return 0 diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 9e38854..378caf4 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -672,15 +672,21 @@ class PositionManager: actual_entry_price = float(res.get("avg_price") or 0) filled_quantity = float(res.get("executed_qty") or 0) else: - # 未成交(NEW/超时/CANCELED 等)属于“策略未触发入场”或“挂单没成交” - # 这不应当当作系统错误;同时需要撤单(best-effort),避免留下悬挂委托造成后续混乱。 - logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},跳过本次开仓并撤销挂单") + # 未成交(NEW/超时/CANCELED 等):撤单。撤单后校验是否已成交(竞态:撤单瞬间订单可能已成交) + logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},撤销挂单") try: await self.client.cancel_order(symbol, int(entry_order_id)) except Exception: pass - self._pending_entry_orders.pop(symbol, None) - return None + # 正向流程加固:撤单后校验,若已成交则继续流程 + recheck = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=5, poll_sec=0.5) + if recheck.get("ok"): + actual_entry_price = float(recheck.get("avg_price") or 0) + filled_quantity = float(recheck.get("executed_qty") or 0) + logger.info(f"{symbol} [开仓] 撤单后发现已成交,继续完善记录 (成交价={actual_entry_price:.4f} 数量={filled_quantity:.4f})") + else: + self._pending_entry_orders.pop(symbol, None) + return None if not actual_entry_price or actual_entry_price <= 0: logger.error(f"{symbol} [开仓] ❌ 无法获取实际成交价格,不保存到数据库") @@ -2395,6 +2401,14 @@ class PositionManager: logger.error(f"获取持仓摘要失败: {e}") return {} + async def _reconcile_pending_with_binance(self) -> int: + """正向流程加固:对 status=pending 记录查币安订单,若已 FILLED 则更新为 open。""" + try: + from .pending_reconcile import reconcile_pending_with_binance + return await reconcile_pending_with_binance(self.client, self.account_id) + except ImportError: + return 0 + async def sync_positions_with_binance(self): """ 同步币安实际持仓状态与数据库状态 @@ -2407,6 +2421,11 @@ class PositionManager: try: logger.info("开始同步币安持仓状态与数据库...") + # 0. 正向流程加固:pending 对账(补齐因 WS 断线/进程重启漏掉的 pending→open) + n = await self._reconcile_pending_with_binance() + if n > 0: + logger.info(f"[账号{self.account_id}] pending 对账完成,{n} 条已更新为 open") + # 1. 获取币安实际持仓 binance_positions = await self._get_open_positions() binance_symbols = {p['symbol'] for p in binance_positions} @@ -3099,7 +3118,7 @@ class PositionManager: missing_in_db = binance_symbols - db_open_symbols if missing_in_db: logger.info( - f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有 open 记录: " + f"[账号{self.account_id}] 发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有 open 记录: " f"{', '.join(missing_in_db)}" ) sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False) @@ -3110,7 +3129,7 @@ class PositionManager: sync_recover = False sync_create_manual = False if missing_in_db: - logger.debug(f" ONLY_AUTO_TRADE_CREATES_RECORDS=True,跳过补建/手动开仓创建 ({len(missing_in_db)} 个仅币安持仓)") + logger.debug(f"[账号{self.account_id}] ONLY_AUTO_TRADE_CREATES_RECORDS=True,跳过补建/手动开仓创建 ({len(missing_in_db)} 个仅币安持仓)") if sync_recover: system_order_prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip() @@ -3301,7 +3320,8 @@ class PositionManager: logger.warning(f" ✗ {symbol} [状态同步] 补建失败: {e}") elif not sync_create_manual: logger.info( - " → 已跳过自动创建交易记录(SYNC_CREATE_MANUAL_ENTRY_RECORD=False, SYNC_RECOVER_MISSING_POSITIONS 未开启)。" + f"[账号{self.account_id}] → 已跳过自动创建交易记录(SYNC_CREATE_MANUAL_ENTRY_RECORD=False, " + "SYNC_RECOVER_MISSING_POSITIONS 未开启)。" " 若确认为本策略开仓可开启 SYNC_RECOVER_MISSING_POSITIONS=True(仅补建有止损止盈单的)。" ) elif sync_create_manual: diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index a38e657..8053bca 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -339,6 +339,15 @@ class UserDataStream: import aiohttp _24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连 while self._running: + # 正向流程加固:每次重连前 pending 对账(补齐断线期间漏掉的 ORDER_TRADE_UPDATE) + if self._listen_key: + try: + from .pending_reconcile import reconcile_pending_with_binance + n = await reconcile_pending_with_binance(self.client, self.account_id, limit=50, max_age_sec=86400) + if n > 0: + logger.info(f"UserDataStream(account_id={self.account_id}): 重连前 pending 对账完成,{n} 条已更新为 open") + except Exception as e: + logger.debug(f"UserDataStream(account_id={self.account_id}): pending 对账失败: {e}") url = f"{self._ws_base_url()}/{self._listen_key}" try: async with aiohttp.ClientSession() as session: