From 01b8a4932fdc4b3692e8c526c14fc834d2ac0db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Tue, 17 Feb 2026 22:41:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(trades):=20=E4=BC=98=E5=8C=96=E8=AE=A2?= =?UTF-8?q?=E5=8D=95=E5=90=8C=E6=AD=A5=E9=80=BB=E8=BE=91=E4=BB=A5=E8=A1=A5?= =?UTF-8?q?=E5=85=A8=E7=BC=BA=E5=A4=B1=E7=9A=84=E5=B9=B3=E4=BB=93=E5=92=8C?= =?UTF-8?q?=E5=BC=80=E4=BB=93=E8=AE=A2=E5=8D=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `trades.py` 中增强了 `sync_trades_from_binance` 方法,新增对平仓订单和开仓订单的分类处理,确保能够补全缺失的订单号。引入了对已存在订单的跳过逻辑,记录无法匹配的情况,并优化了日志记录以提升可追溯性。此改动提升了交易记录的完整性和系统的稳定性。 --- backend/api/routes/trades.py | 131 ++++++++++++++++++++++++++++++----- 1 file changed, 113 insertions(+), 18 deletions(-) diff --git a/backend/api/routes/trades.py b/backend/api/routes/trades.py index 05ab56c..9b4e6a5 100644 --- a/backend/api/routes/trades.py +++ b/backend/api/routes/trades.py @@ -492,11 +492,22 @@ async def sync_trades_from_binance( # 同步订单到数据库(仅当前账号) synced_count = 0 updated_count = 0 + entry_order_id_filled = 0 # 补全的开仓订单号数量 + exit_order_id_filled = 0 # 补全的平仓订单号数量 + skipped_existing = 0 # 已存在且完整的订单数 + skipped_no_match = 0 # 无法匹配的记录数 # 按时间排序,从旧到新 all_orders.sort(key=lambda x: x.get('time', 0)) - for order in all_orders: + # 先处理平仓订单(reduceOnly),再处理开仓订单 + close_orders = [o for o in all_orders if o.get('reduceOnly', False)] + open_orders = [o for o in all_orders if not o.get('reduceOnly', False)] + + logger.info(f"开始同步:平仓订单 {len(close_orders)} 个,开仓订单 {len(open_orders)} 个") + + # 1. 处理平仓订单 + for order in close_orders: symbol = order.get('symbol') order_id = order.get('orderId') side = order.get('side') @@ -521,17 +532,42 @@ async def sync_trades_from_binance( # 这是平仓订单 # 首先检查是否已经通过订单号同步过(避免重复) existing_trade = Trade.get_by_exit_order_id(order_id) - if existing_trade and (existing_trade.get("exit_reason") not in (None, "", "sync")): - logger.debug(f"订单 {order_id} 已同步过且 exit_reason={existing_trade.get('exit_reason')},跳过") + # 如果已有 exit_order_id 且 exit_reason 不是 sync,说明已完整同步,跳过 + if existing_trade and existing_trade.get("exit_order_id") and existing_trade.get("exit_reason") not in (None, "", "sync"): + skipped_existing += 1 + logger.debug(f"订单 {order_id} 已完整同步过(exit_reason={existing_trade.get('exit_reason')}),跳过") continue - # 查找数据库中该交易对的 open 状态记录(仅当前账号) + # 查找数据库中该交易对的 open 状态记录(仅当前账号),或已平仓但 exit_order_id 为空的记录 open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id or DEFAULT_ACCOUNT_ID) - if existing_trade or open_trades: - # 找到匹配的交易记录(通过symbol匹配,如果有多个则取最近的) - trade = existing_trade or open_trades[0] # 取第一个 + closed_trades_no_exit_id = [] + if not existing_trade and not open_trades: + # 如果没有 open 记录,查找已平仓但 exit_order_id 为空的记录 + try: + closed_trades = Trade.get_by_symbol(symbol, status='closed', account_id=account_id or DEFAULT_ACCOUNT_ID) + closed_trades_no_exit_id = [ + t for t in closed_trades + if not t.get("exit_order_id") or str(t.get("exit_order_id")).strip() in ("", "0") + ] + except Exception: + pass + + if existing_trade or open_trades or closed_trades_no_exit_id: + # 找到匹配的交易记录(优先用 existing_trade,其次 open_trades,最后 closed_trades_no_exit_id) + if existing_trade: + trade = existing_trade + elif open_trades: + trade = open_trades[0] # 取第一个 open 记录 + else: + # 从已平仓但无 exit_order_id 的记录中选择(按 entry_time 最近的一条) + closed_trades_no_exit_id.sort(key=lambda x: x.get('entry_time', 0) or 0, reverse=True) + trade = closed_trades_no_exit_id[0] trade_id = trade['id'] + # 如果之前没有 exit_order_id,记录为补全 + if not trade.get("exit_order_id") or str(trade.get("exit_order_id")).strip() in ("", "0"): + exit_order_id_filled += 1 + # 计算盈亏 entry_price = float(trade['entry_price']) entry_quantity = float(trade['quantity']) @@ -635,32 +671,91 @@ async def sync_trades_from_binance( realized_pnl=sync_realized_pnl, ) updated_count += 1 - logger.debug( + logger.info( f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, " f"类型: {otype or '-'}, 原因: {exit_reason}, 成交价: {avg_price:.4f})" ) - else: - # 这是开仓订单,检查数据库中是否已存在(通过订单号) - existing_trade = Trade.get_by_entry_order_id(order_id) - if not existing_trade: - # 如果不存在,可以创建新记录(但需要更多信息,暂时跳过) - logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建") else: + # 没有找到匹配的记录 + skipped_no_match += 1 + logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录(无 open 状态且无 exit_order_id 为空的 closed 记录),跳过") + else: + # 这是开仓订单 + existing_trade = Trade.get_by_entry_order_id(order_id) + if existing_trade: + # 如果已存在,跳过(开仓订单信息通常已完整) logger.debug(f"开仓订单 {order_id} 已存在,跳过") + else: + # 如果不存在,尝试查找没有 entry_order_id 的记录并补全 + try: + # 查找该 symbol 下没有 entry_order_id 的记录(按时间匹配) + order_time_ms = order.get('time', 0) + order_time_sec = order_time_ms // 1000 if order_time_ms > 0 else 0 + + # 查找时间范围内(订单时间前后 1 小时)且没有 entry_order_id 的记录 + time_window_start = order_time_sec - 3600 + time_window_end = order_time_sec + 3600 + + trades_no_entry_id = Trade.get_all( + account_id=account_id or DEFAULT_ACCOUNT_ID, + symbol=symbol, + start_timestamp=time_window_start, + end_timestamp=time_window_end, + time_filter="entry", # 使用 entry_time 过滤 + ) + + # 过滤出没有 entry_order_id 的记录 + trades_no_entry_id = [ + t for t in trades_no_entry_id + if not t.get("entry_order_id") or str(t.get("entry_order_id")).strip() in ("", "0") + ] + + # 按价格和数量匹配(允许 5% 误差) + matched_trade = None + order_qty = float(order.get('executedQty', 0)) + order_price = float(order.get('avgPrice', 0)) + + for t in trades_no_entry_id: + t_qty = float(t.get('quantity', 0)) + t_price = float(t.get('entry_price', 0)) + # 数量匹配(允许 5% 误差)且价格匹配(允许 2% 误差) + if (order_qty > 0 and t_qty > 0 and + abs(t_qty - order_qty) / max(order_qty, 1e-8) <= 0.05 and + order_price > 0 and t_price > 0 and + abs(t_price - order_price) / max(order_price, 1e-8) <= 0.02): + matched_trade = t + break + + if matched_trade: + # 补全 entry_order_id + from database.models import Trade as TradeModel + if TradeModel.update_entry_order_id(matched_trade['id'], order_id): + entry_order_id_filled += 1 + logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})") + else: + logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})") + else: + logger.debug(f"发现新的开仓订单 {order_id} ({symbol}, qty={order_qty}, price={order_price:.4f}),但无法匹配到现有记录(时间窗口内无 entry_order_id 为空的记录),跳过创建") + except Exception as e: + logger.debug(f"补全开仓订单号失败 {order_id}: {e}") except Exception as e: logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}") continue result = { "success": True, - "message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)", + "message": f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条平仓记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个", "total_orders": len(all_orders), "updated_trades": updated_count, - "close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]), - "open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)]) + "entry_order_id_filled": entry_order_id_filled, + "exit_order_id_filled": exit_order_id_filled, + "skipped_existing": skipped_existing, + "skipped_no_match": skipped_no_match, + "close_orders": len(close_orders), + "open_orders": len(open_orders) } - logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录") + logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,补全开仓订单号 {entry_order_id_filled} 个,补全平仓订单号 {exit_order_id_filled} 个,跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个") return result finally: