diff --git a/backend/api/routes/config.py b/backend/api/routes/config.py index 8589159..b26c819 100644 --- a/backend/api/routes/config.py +++ b/backend/api/routes/config.py @@ -491,11 +491,17 @@ async def get_global_configs( "category": "strategy", "description": "是否启用移动止损(默认关闭,让利润奔跑)", }, + "ONLY_AUTO_TRADE_CREATES_RECORDS": { + "value": True, + "type": "boolean", + "category": "position", + "description": "为 True 时仅自动开仓写入 DB,不补建「仅币安有仓」;改为 False 并保持 SYNC_RECOVER_MISSING_POSITIONS=True 可使币安持仓与 DB 一致(补建缺失记录并挂 SL/TP)", + }, "SYNC_RECOVER_MISSING_POSITIONS": { "value": True, "type": "boolean", "category": "position", - "description": "同步时补建「币安有仓、DB 无记录」的交易记录(便于订单记录与统计)", + "description": "同步时补建「币安有仓、DB 无记录」的交易记录(须 ONLY_AUTO_TRADE_CREATES_RECORDS=False 才生效)", }, "SYNC_RECOVER_ONLY_WHEN_HAS_SLTP": { "value": True, diff --git a/backend/config_manager.py b/backend/config_manager.py index a766869..f81228e 100644 --- a/backend/config_manager.py +++ b/backend/config_manager.py @@ -827,6 +827,7 @@ class ConfigManager: 'MAX_DAILY_ENTRIES': eff_get('MAX_DAILY_ENTRIES', max_daily_default), # 同步/系统单标识(全局配置,账号可覆盖) + 'ONLY_AUTO_TRADE_CREATES_RECORDS': eff_get('ONLY_AUTO_TRADE_CREATES_RECORDS', True), # True=不补建「仅币安有仓」;False 时配合 SYNC_RECOVER 可补建 'SYNC_RECOVER_MISSING_POSITIONS': eff_get('SYNC_RECOVER_MISSING_POSITIONS', True), 'SYNC_RECOVER_ONLY_WHEN_HAS_SLTP': eff_get('SYNC_RECOVER_ONLY_WHEN_HAS_SLTP', True), 'SYSTEM_ORDER_ID_PREFIX': eff_get('SYSTEM_ORDER_ID_PREFIX', 'SYS') or '', diff --git a/backend/database/models.py b/backend/database/models.py index c8bbb61..7e9b02c 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -888,7 +888,7 @@ class Trade: tuple(values) ) except Exception as e: - logger.warning(f"update_open_fields trade_id={trade_id} 失败: {e}") + logger.error(f"[DB] update_open_fields trade_id={trade_id} 失败 (SL/TP 等未写入): {e}") @staticmethod def update_status(trade_id, status: str) -> bool: @@ -899,7 +899,7 @@ class Trade: db.execute_update("UPDATE trades SET status = %s WHERE id = %s", (str(status).strip(), int(trade_id))) return True except Exception as e: - logger.warning(f"update_status trade_id={trade_id} 失败: {e}") + logger.error(f"[DB] update_status trade_id={trade_id} status={status} 失败: {e}") return False @staticmethod @@ -1018,7 +1018,7 @@ class Trade: ) return True except Exception as e: - logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}") + logger.error(f"[DB] update_pending_to_filled 失败 client_order_id={client_order_id!r} account_id={account_id}: {e}") return False @staticmethod diff --git a/docs/币安持仓与DB同步说明.md b/docs/币安持仓与DB同步说明.md new file mode 100644 index 0000000..9fad637 --- /dev/null +++ b/docs/币安持仓与DB同步说明.md @@ -0,0 +1,60 @@ +# 币安持仓与 DB 同步说明 + +当出现「币安有仓、数据库没有 open 记录」时,仪表盘/持仓列表会显示这些仓位没有 `id`、`stop_loss_price`、`take_profit_price`,且点「补挂止盈止损」依赖接口侧用配置比例计算(已支持无 DB 补挂)。若希望**在 trading_system 里为这些仓位补建 DB 记录**,并自动挂止损止盈、纳入监控,需正确配置以下项。 + +## 为何会“跳过补建” + +逻辑顺序: + +1. **ONLY_AUTO_TRADE_CREATES_RECORDS**(默认 `True`) + 为 True 时表示「只允许自动开仓写入 DB」,会**强制关闭**补建与手动开仓建记录,即: + - `SYNC_RECOVER_MISSING_POSITIONS` 被置为 False + - `SYNC_CREATE_MANUAL_ENTRY_RECORD` 被置为 False + 因此日志会看到“已跳过自动创建交易记录”。 + +2. **SYNC_RECOVER_MISSING_POSITIONS**(默认 `True`) + 为 True 时,在**未**被上一条关闭的前提下,会对「币安有仓、DB 无 open」的 symbol 做**补建**:写 DB、挂 SL/TP、纳入监控。 + +## 让币安持仓与 DB 一致(推荐配置) + +目标:让那 15 个(或更多)「仅币安有仓」的持仓在下次同步时自动写入 DB、挂止损止盈并进入监控。 + +1. **ONLY_AUTO_TRADE_CREATES_RECORDS = False** + 允许同步/补建时创建 DB 记录(必改,否则补建永远关闭)。 + +2. **SYNC_RECOVER_MISSING_POSITIONS = True** + 开启「补建缺失持仓」(通常已是 True,只要没被上一条覆盖即可)。 + +3. 修改方式: + - **推荐**:在**后台配置/仪表盘**的「持仓」类配置中,将 `ONLY_AUTO_TRADE_CREATES_RECORDS` 设为 `false`(保存后会写入 Redis,trading_system 下次拉取配置即生效)。 + - 或直接在 Redis / 数据库配置表中把 `ONLY_AUTO_TRADE_CREATES_RECORDS` 设为 false。 + - 若策略从本地 `config.py` 读配置,则在该处将 `ONLY_AUTO_TRADE_CREATES_RECORDS` 设为 False。 + +4. **重启 trading_system** 或等待下一次持仓同步(例如定时 sync 或启动时 sync),即可对当前「币安有、DB 无」的持仓执行补建。 + +补建时会: + +- 为每个 symbol 创建一条 open 交易记录(`entry_reason` 为 `sync_recovered` 或 `sync_recovered_unknown_origin`); +- 先读交易所已有止损/止盈(若有且已达保本则沿用),否则用配置的 `STOP_LOSS_PERCENT` / `TAKE_PROFIT_PERCENT` 计算并挂单; +- 将仓位加入内存监控(保本/移动止损等)。 + +## 可选配置 + +- **SYNC_RECOVER_ONLY_WHEN_HAS_SLTP**(默认 True) + 未配置 `SYSTEM_ORDER_ID_PREFIX` 时:仅对「已有止损/止盈单」的持仓补建。若希望**无 SL/TP 的仓位也补建**,可设为 False。 + +- **SYSTEM_ORDER_ID_PREFIX** + 若配置了系统单前缀(如 `SYS`),补建会尝试用开仓订单的 `clientOrderId` 识别是否为本系统单;无论是否带前缀,当前逻辑都会**一律补建并挂 SL/TP**(不再因“来历不明”或“无 SL/TP”跳过)。 + +- **SYNC_CREATE_MANUAL_ENTRY_RECORD**(默认 False) + 仅当 `sync_recover` 为 False 时生效:为 True 时会对「仅币安有仓」走另一条「手动开仓」建记录分支。通常只要把 `ONLY_AUTO_TRADE_CREATES_RECORDS=False` 且 `SYNC_RECOVER_MISSING_POSITIONS=True` 即可,不必单独开这个。 + +## 日志说明 + +- 若看到: + `因 ONLY_AUTO_TRADE_CREATES_RECORDS=True 已关闭补建,共 N 个仅币安持仓未写入 DB。` + 表示当前配置不允许补建,按上文把 `ONLY_AUTO_TRADE_CREATES_RECORDS` 设为 False 并保证 `SYNC_RECOVER_MISSING_POSITIONS=True` 即可。 + +- 若看到: + `→ 补建缺失持仓:一律写入 DB、自动挂止损止盈并纳入监控` + 表示补建已开启,下次同步会对缺失的 symbol 执行补建。 diff --git a/trading_system/pending_reconcile.py b/trading_system/pending_reconcile.py index 23280a9..344f684 100644 --- a/trading_system/pending_reconcile.py +++ b/trading_system/pending_reconcile.py @@ -74,12 +74,20 @@ async def reconcile_pending_with_binance(client, account_id: int, limit: int = 5 if ok: reconciled += 1 logger.info( - f"[账号{account_id}] pending 对账: {symbol} client_order_id={client_order_id!r} " + f"[DB] [账号{account_id}] pending 对账: {symbol} client_order_id={client_order_id!r} " f"已完善为 open (orderId={order_id} 成交价={ap_f:.4f} 数量={z_f:.4f})" ) + else: + logger.warning( + f"[DB] [账号{account_id}] pending 对账完善失败 symbol={symbol} client_order_id={client_order_id!r} orderId={order_id} " + f"(可能无对应 pending 或已非 pending 状态)" + ) except Exception as e: - logger.debug(f"[账号{account_id}] pending 对账单条失败: {e}") + logger.error( + f"[DB] [账号{account_id}] pending 对账单条异常 symbol={symbol} client_order_id={client_order_id!r} " + f"error_type={type(e).__name__} error={e}" + ) return reconciled except Exception as e: - logger.debug(f"[账号{account_id}] pending 对账失败: {e}") + logger.error(f"[DB] [账号{account_id}] pending 对账异常 error_type={type(e).__name__} error={e}") return 0 diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 135b604..3c3a302 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -497,6 +497,7 @@ class PositionManager: pending_trade_id = None if DB_AVAILABLE and Trade: try: + logger.info(f"[DB] {symbol} 写入 pending 记录 client_order_id={client_order_id!r} (保证与下单一致性)") pending_trade_id = Trade.create( symbol=symbol, side=side, @@ -517,9 +518,17 @@ class PositionManager: entry_context=entry_context, status="pending", ) - logger.debug(f"{symbol} 已落库 pending 记录 (client_order_id={client_order_id!r}, id={pending_trade_id})") + logger.info(f"[DB] {symbol} pending 已落库 id={pending_trade_id} client_order_id={client_order_id!r}") except Exception as e: - logger.warning(f"{symbol} 创建 pending 记录失败: {e}") + logger.error( + f"[DB] {symbol} 创建 pending 记录失败,将导致后续无法按 client_order_id 完善,易产生补单: " + f"client_order_id={client_order_id!r} error_type={type(e).__name__} error={e}" + ) + _log_trade_db_failure( + symbol=symbol, entry_order_id=None, side=side, quantity=quantity, entry_price=entry_price, + account_id=self.account_id, reason="pending_create_failed", + error_type=type(e).__name__, error_message=str(e) + ) pending_trade_id = None if not smart_entry_enabled: @@ -538,7 +547,12 @@ class PositionManager: ) if not order: if pending_trade_id and DB_AVAILABLE and Trade: - Trade.update_status(pending_trade_id, "cancelled") + try: + ok = Trade.update_status(pending_trade_id, "cancelled") + if not ok: + logger.error(f"[DB] {symbol} 下单失败后更新 pending 为 cancelled 失败 trade_id={pending_trade_id}") + except Exception as ex: + logger.error(f"[DB] {symbol} 下单失败后更新 pending 为 cancelled 异常 trade_id={pending_trade_id} error={ex}") return None entry_order_id = order.get("orderId") if entry_order_id: @@ -558,7 +572,12 @@ class PositionManager: ) if not order: if pending_trade_id and DB_AVAILABLE and Trade: - Trade.update_status(pending_trade_id, "cancelled") + try: + ok = Trade.update_status(pending_trade_id, "cancelled") + if not ok: + logger.error(f"[DB] {symbol} 智能入场首单失败后更新 pending 为 cancelled 失败 trade_id={pending_trade_id}") + except Exception as ex: + logger.error(f"[DB] {symbol} 智能入场首单失败后更新 pending 为 cancelled 异常 trade_id={pending_trade_id} error={ex}") return None entry_order_id = order.get("orderId") if entry_order_id: @@ -644,7 +663,12 @@ class PositionManager: ) if not order: if pending_trade_id and DB_AVAILABLE and Trade: - Trade.update_status(pending_trade_id, "cancelled") + try: + ok = Trade.update_status(pending_trade_id, "cancelled") + if not ok: + logger.error(f"[DB] {symbol} 追价下单失败后更新 pending 为 cancelled 失败 trade_id={pending_trade_id}") + except Exception as ex: + logger.error(f"[DB] {symbol} 追价下单失败后更新 pending 为 cancelled 异常 trade_id={pending_trade_id} error={ex}") self._pending_entry_orders.pop(symbol, None) return None entry_order_id = order.get("orderId") @@ -837,22 +861,32 @@ class PositionManager: row = Trade.get_by_client_order_id(client_order_id, self.account_id) trade_id = row.get("id") if row else None if trade_id: - Trade.update_open_fields( - trade_id, - stop_loss_price=stop_loss_price, - take_profit_price=take_profit_price, - take_profit_1=take_profit_1, - take_profit_2=take_profit_2, - notional_usdt=notional_usdt, - margin_usdt=margin_usdt, - entry_context=entry_context, - atr=atr, - ) - logger.info(f"✓ {symbol} 已完善 pending 记录 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") + try: + Trade.update_open_fields( + trade_id, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + take_profit_1=take_profit_1, + take_profit_2=take_profit_2, + notional_usdt=notional_usdt, + margin_usdt=margin_usdt, + entry_context=entry_context, + atr=atr, + ) + logger.info(f"[DB] {symbol} 已完善 pending→open (ID: {trade_id}, orderId: {entry_order_id}, 成交价: {entry_price:.4f}, 数量: {quantity:.4f})") + except Exception as of_ex: + logger.error(f"[DB] {symbol} 完善 open 字段失败 trade_id={trade_id} (SL/TP/notional 等未写入): {of_ex}") + else: + logger.error( + f"[DB] {symbol} 完善 pending→open 失败,将走新建兜底或依赖补单: " + f"client_order_id={client_order_id!r} entry_order_id={entry_order_id}" + ) + elif row and str(row.get("status")) != "pending": + logger.debug(f"[DB] {symbol} 已有非 pending 记录 status={row.get('status')},将新建或跳过") if trade_id is None: # 无 pending 或未匹配到:走新建(兜底) - logger.info(f"正在保存 {symbol} 交易记录到数据库...") fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id + logger.info(f"[DB] {symbol} 无 pending 记录,新建 open 记录 client_order_id={fallback_client_order_id!r} entry_order_id={entry_order_id}") # 如果 REST 已获取到 entry_order_id,直接写入;否则留空,等待 WS 推送或后续同步补全 trade_id = Trade.create( symbol=symbol, @@ -893,7 +927,7 @@ class PositionManager: logger.info(f"✓ {symbol} 已通过 REST 补全 entry_order_id: {found_order_id}") break except Exception as e: - logger.debug(f"{symbol} REST 补全 entry_order_id 失败: {e}") + logger.warning(f"[DB] {symbol} REST 补全 entry_order_id 失败 trade_id={trade_id}: {e}") except Exception as e: logger.error(f"❌ 保存交易记录到数据库失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") @@ -906,14 +940,14 @@ class PositionManager: ) return None elif not DB_AVAILABLE: - logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录") + logger.error(f"[DB] 数据库不可用,无法保存成交记录 symbol={symbol} entry_order_id={entry_order_id} client_order_id={client_order_id!r},将依赖补单") _log_trade_db_failure( symbol=symbol, entry_order_id=entry_order_id, side=side, quantity=quantity, entry_price=entry_price, account_id=self.account_id, reason="DB_AVAILABLE=False" ) elif not Trade: - logger.warning(f"Trade模型未导入,无法保存 {symbol} 交易记录") + logger.error(f"[DB] Trade 模型未导入,无法保存成交记录 symbol={symbol} entry_order_id={entry_order_id},将依赖补单") _log_trade_db_failure( symbol=symbol, entry_order_id=entry_order_id, side=side, quantity=quantity, entry_price=entry_price, account_id=self.account_id, @@ -3236,11 +3270,16 @@ class PositionManager: sync_recover = config.TRADING_CONFIG.get("SYNC_RECOVER_MISSING_POSITIONS", True) sync_recover_only_has_sltp = config.TRADING_CONFIG.get("SYNC_RECOVER_ONLY_WHEN_HAS_SLTP", True) # 订单统一由自动下单入 DB:同步/持仓 sync 不创建新记录,仅 WS 与自动开仓写库 - if config.TRADING_CONFIG.get("ONLY_AUTO_TRADE_CREATES_RECORDS", True): + only_auto_creates = config.TRADING_CONFIG.get("ONLY_AUTO_TRADE_CREATES_RECORDS", True) + if only_auto_creates: sync_recover = False sync_create_manual = False if missing_in_db: - logger.debug(f"[账号{self.account_id}] ONLY_AUTO_TRADE_CREATES_RECORDS=True,跳过补建/手动开仓创建 ({len(missing_in_db)} 个仅币安持仓)") + logger.info( + f"[账号{self.account_id}] 因 ONLY_AUTO_TRADE_CREATES_RECORDS=True 已关闭补建," + f"共 {len(missing_in_db)} 个仅币安持仓未写入 DB。" + " 若要让币安与 DB 一致,请设 ONLY_AUTO_TRADE_CREATES_RECORDS=False 且 SYNC_RECOVER_MISSING_POSITIONS=True,然后重启或等待下次同步。" + ) if sync_recover: system_order_prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip() @@ -3262,10 +3301,28 @@ class PositionManager: notional = quantity * entry_price if notional < 1.0: continue - # 补建时尽量拿到 entry_order_id:优先按时间范围查开仓订单并用 clientOrderId 前缀锁定本系统单,避免拿错/拿不到 + # 补建时尽量拿到 entry_order_id:优先用 WS 缓存的 FILLED 消息(基于消息补建),否则按时间范围查开仓订单 entry_order_id = None client_order_id_sync = None - if system_order_prefix: + try: + rc = getattr(self.client, "redis_cache", None) + if rc: + cache_key = f"ats:filled_open_orders:{self.account_id}" + cached = await rc.hget(cache_key, symbol) + if cached and isinstance(cached, dict): + cs = (cached.get("side") or "").strip().upper() + ap = float(cached.get("avgPrice") or 0) + eq = float(cached.get("executedQty") or 0) + if (cs == side and ap > 0 + and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 + and abs(eq - quantity) < 1e-6): + entry_order_id = cached.get("orderId") + client_order_id_sync = (cached.get("clientOrderId") or "").strip() or None + if entry_order_id: + logger.debug(f" {symbol} 补建从 WS 缓存 FILLED 取得 orderId={entry_order_id}, clientOrderId={client_order_id_sync!r}") + except Exception as e: + logger.debug(f" {symbol} 补建读 WS 缓存失败: {e}") + if system_order_prefix and entry_order_id is None: try: end_ms = int(time.time() * 1000) start_ms = end_ms - (24 * 3600 * 1000) @@ -3384,21 +3441,34 @@ class PositionManager: entry_time_ts = int(same_side[0].get("time", 0)) // 1000 except Exception: pass - trade_id = Trade.create( - symbol=symbol, - side=side, - quantity=quantity, - entry_price=entry_price, - leverage=binance_position.get("leverage", 10), - entry_reason=entry_reason_sync, - entry_order_id=entry_order_id, - client_order_id=client_order_id_sync, - notional_usdt=notional, - margin_usdt=(notional / float(binance_position.get("leverage", 10) or 10)) if float(binance_position.get("leverage", 10) or 0) > 0 else None, - account_id=self.account_id, - entry_time=entry_time_ts, - ) - logger.info(f" ✓ {symbol} [状态同步] 已补建交易记录 (ID: {trade_id}, orderId: {entry_order_id or '-'}, entry_reason={entry_reason_sync})") + try: + trade_id = Trade.create( + symbol=symbol, + side=side, + quantity=quantity, + entry_price=entry_price, + leverage=binance_position.get("leverage", 10), + entry_reason=entry_reason_sync, + entry_order_id=entry_order_id, + client_order_id=client_order_id_sync, + notional_usdt=notional, + margin_usdt=(notional / float(binance_position.get("leverage", 10) or 10)) if float(binance_position.get("leverage", 10) or 0) > 0 else None, + account_id=self.account_id, + entry_time=entry_time_ts, + ) + logger.info(f" ✓ [DB] {symbol} [状态同步] 补建记录已落库 ID={trade_id} orderId={entry_order_id or '-'} entry_reason={entry_reason_sync}") + except Exception as create_ex: + logger.error( + f" [DB] {symbol} [状态同步] 补建写入 DB 失败,该持仓将无法纳入监控: " + f"orderId={entry_order_id} error_type={type(create_ex).__name__} error={create_ex}" + ) + _log_trade_db_failure( + symbol=symbol, entry_order_id=entry_order_id, side=side, quantity=quantity, + entry_price=entry_price, account_id=self.account_id, + reason="sync_recover_create_failed", + error_type=type(create_ex).__name__, error_message=str(create_ex) + ) + raise ticker = await self.client.get_ticker_24h(symbol) current_price = ticker["price"] if ticker else entry_price lev = float(binance_position.get("leverage", 10)) @@ -3455,9 +3525,8 @@ class PositionManager: logger.warning(f" ✗ {symbol} [状态同步] 补建失败: {e}") elif not sync_create_manual: logger.info( - f"[账号{self.account_id}] → 已跳过自动创建交易记录(SYNC_CREATE_MANUAL_ENTRY_RECORD=False, " - "SYNC_RECOVER_MISSING_POSITIONS 未开启)。" - " 若确认为本策略开仓可开启 SYNC_RECOVER_MISSING_POSITIONS=True(仅补建有止损止盈单的)。" + f"[账号{self.account_id}] → 已跳过自动创建交易记录(SYNC_RECOVER_MISSING_POSITIONS 未开启或 ONLY_AUTO_TRADE_CREATES_RECORDS=True)。" + " 若要让币安持仓与 DB 一致:设 ONLY_AUTO_TRADE_CREATES_RECORDS=False、SYNC_RECOVER_MISSING_POSITIONS=True,重启或等待下次同步。" ) elif sync_create_manual: # 为手动开仓的持仓创建数据库记录并启动监控(仅当显式开启且未走上面的「补建系统单」时) @@ -3510,21 +3579,33 @@ class PositionManager: except Exception: pass # 创建数据库记录(显式传入 account_id、真实开仓时间) - trade_id = Trade.create( - symbol=symbol, - side=side, - quantity=quantity, - entry_price=entry_price, - leverage=binance_position.get('leverage', 10), - entry_reason='manual_entry', # 标记为手动开仓 - entry_order_id=entry_order_id, - notional_usdt=notional, - margin_usdt=(notional / float(binance_position.get('leverage', 10) or 10)) if float(binance_position.get('leverage', 10) or 0) > 0 else None, - account_id=self.account_id, - entry_time=entry_time_ts, - ) - - logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})") + try: + trade_id = Trade.create( + symbol=symbol, + side=side, + quantity=quantity, + entry_price=entry_price, + leverage=binance_position.get('leverage', 10), + entry_reason='manual_entry', # 标记为手动开仓 + entry_order_id=entry_order_id, + notional_usdt=notional, + margin_usdt=(notional / float(binance_position.get('leverage', 10) or 10)) if float(binance_position.get('leverage', 10) or 0) > 0 else None, + account_id=self.account_id, + entry_time=entry_time_ts, + ) + logger.info(f"[DB] {symbol} [状态同步] 手动开仓记录已落库 ID={trade_id} orderId={entry_order_id or '-'}") + except Exception as create_ex: + logger.error( + f"[DB] {symbol} [状态同步] 手动开仓写入 DB 失败: " + f"orderId={entry_order_id} error_type={type(create_ex).__name__} error={create_ex}" + ) + _log_trade_db_failure( + symbol=symbol, entry_order_id=entry_order_id, side=side, quantity=quantity, + entry_price=entry_price, account_id=self.account_id, + reason="manual_entry_create_failed", + error_type=type(create_ex).__name__, error_message=str(create_ex) + ) + continue # 创建本地持仓记录(用于监控) ticker = await self.client.get_ticker_24h(symbol) diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index d462a10..91f208a 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -543,18 +543,44 @@ class UserDataStream: client_order_id, self.account_id, order_id, ap_f, z_f ) if updated: - logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") + logger.info(f"[DB] UserDataStream: 开仓 FILLED 已完善 pending→open client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") else: - logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}") + logger.warning( + f"[DB] UserDataStream: 开仓 FILLED 完善失败(无 pending 或已完善,易导致补单)" + f" client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r}" + ) else: # 无 clientOrderId 时用 orderId 兜底:若该 symbol+account 下仅有一条 pending 且无 entry_order_id,则用本订单号完善 updated = Trade.update_pending_by_entry_order_id(symbol, self.account_id, order_id, ap_f, z_f) if updated: - logger.info(f"UserDataStream: 开仓成交已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") + logger.info(f"[DB] UserDataStream: 开仓 FILLED 已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}") else: - logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,且兜底未匹配到唯一 pending orderId={order_id} symbol={symbol!r}") + logger.warning( + f"[DB] UserDataStream: 开仓 FILLED 无 clientOrderId 且兜底未匹配到 pending orderId={order_id} symbol={symbol!r},将依赖补单" + ) except Exception as ex: - logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}") + logger.error( + f"[DB] UserDataStream: 开仓 FILLED 完善 DB 异常 symbol={symbol!r} orderId={order_id} client_order_id={client_order_id!r} " + f"error_type={type(ex).__name__} error={ex}" + ) + # 开仓 FILLED 写入缓存,供补建时优先基于消息匹配(避免仅依赖 REST 历史) + try: + rc = getattr(self.client, "redis_cache", None) + if rc and symbol: + side = (o.get("S") or "").strip().upper() + cache_key = f"ats:filled_open_orders:{self.account_id}" + payload = { + "orderId": order_id, + "clientOrderId": client_order_id, + "side": side, + "avgPrice": ap_f, + "executedQty": z_f, + "time_ms": event_time_ms or int(time.time() * 1000), + } + await rc.hset(cache_key, symbol, payload, ttl=86400 * 2) + logger.debug(f"UserDataStream: 开仓 FILLED 已写入缓存 symbol={symbol!r} orderId={order_id} 供补建使用") + except Exception as ex: + logger.debug(f"UserDataStream: 开仓 FILLED 写缓存失败 orderId={order_id}: {ex}") else: # 平仓成交(reduceOnly):优先从 Redis 取 entry_order_id(ALGO_UPDATE 可能先到并已缓存) exit_entry_order_id = None