feat(trades): 优化订单同步逻辑以补全缺失的平仓和开仓订单
在 `trades.py` 中增强了 `sync_trades_from_binance` 方法,新增对平仓订单和开仓订单的分类处理,确保能够补全缺失的订单号。引入了对已存在订单的跳过逻辑,记录无法匹配的情况,并优化了日志记录以提升可追溯性。此改动提升了交易记录的完整性和系统的稳定性。
This commit is contained in:
parent
55ae7b5b08
commit
01b8a4932f
|
|
@ -492,11 +492,22 @@ async def sync_trades_from_binance(
|
|||
# 同步订单到数据库(仅当前账号)
|
||||
synced_count = 0
|
||||
updated_count = 0
|
||||
entry_order_id_filled = 0 # 补全的开仓订单号数量
|
||||
exit_order_id_filled = 0 # 补全的平仓订单号数量
|
||||
skipped_existing = 0 # 已存在且完整的订单数
|
||||
skipped_no_match = 0 # 无法匹配的记录数
|
||||
|
||||
# 按时间排序,从旧到新
|
||||
all_orders.sort(key=lambda x: x.get('time', 0))
|
||||
|
||||
for order in all_orders:
|
||||
# 先处理平仓订单(reduceOnly),再处理开仓订单
|
||||
close_orders = [o for o in all_orders if o.get('reduceOnly', False)]
|
||||
open_orders = [o for o in all_orders if not o.get('reduceOnly', False)]
|
||||
|
||||
logger.info(f"开始同步:平仓订单 {len(close_orders)} 个,开仓订单 {len(open_orders)} 个")
|
||||
|
||||
# 1. 处理平仓订单
|
||||
for order in close_orders:
|
||||
symbol = order.get('symbol')
|
||||
order_id = order.get('orderId')
|
||||
side = order.get('side')
|
||||
|
|
@ -521,17 +532,42 @@ async def sync_trades_from_binance(
|
|||
# 这是平仓订单
|
||||
# 首先检查是否已经通过订单号同步过(避免重复)
|
||||
existing_trade = Trade.get_by_exit_order_id(order_id)
|
||||
if existing_trade and (existing_trade.get("exit_reason") not in (None, "", "sync")):
|
||||
logger.debug(f"订单 {order_id} 已同步过且 exit_reason={existing_trade.get('exit_reason')},跳过")
|
||||
# 如果已有 exit_order_id 且 exit_reason 不是 sync,说明已完整同步,跳过
|
||||
if existing_trade and existing_trade.get("exit_order_id") and existing_trade.get("exit_reason") not in (None, "", "sync"):
|
||||
skipped_existing += 1
|
||||
logger.debug(f"订单 {order_id} 已完整同步过(exit_reason={existing_trade.get('exit_reason')}),跳过")
|
||||
continue
|
||||
|
||||
# 查找数据库中该交易对的 open 状态记录(仅当前账号)
|
||||
# 查找数据库中该交易对的 open 状态记录(仅当前账号),或已平仓但 exit_order_id 为空的记录
|
||||
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id or DEFAULT_ACCOUNT_ID)
|
||||
if existing_trade or open_trades:
|
||||
# 找到匹配的交易记录(通过symbol匹配,如果有多个则取最近的)
|
||||
trade = existing_trade or open_trades[0] # 取第一个
|
||||
closed_trades_no_exit_id = []
|
||||
if not existing_trade and not open_trades:
|
||||
# 如果没有 open 记录,查找已平仓但 exit_order_id 为空的记录
|
||||
try:
|
||||
closed_trades = Trade.get_by_symbol(symbol, status='closed', account_id=account_id or DEFAULT_ACCOUNT_ID)
|
||||
closed_trades_no_exit_id = [
|
||||
t for t in closed_trades
|
||||
if not t.get("exit_order_id") or str(t.get("exit_order_id")).strip() in ("", "0")
|
||||
]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if existing_trade or open_trades or closed_trades_no_exit_id:
|
||||
# 找到匹配的交易记录(优先用 existing_trade,其次 open_trades,最后 closed_trades_no_exit_id)
|
||||
if existing_trade:
|
||||
trade = existing_trade
|
||||
elif open_trades:
|
||||
trade = open_trades[0] # 取第一个 open 记录
|
||||
else:
|
||||
# 从已平仓但无 exit_order_id 的记录中选择(按 entry_time 最近的一条)
|
||||
closed_trades_no_exit_id.sort(key=lambda x: x.get('entry_time', 0) or 0, reverse=True)
|
||||
trade = closed_trades_no_exit_id[0]
|
||||
trade_id = trade['id']
|
||||
|
||||
# 如果之前没有 exit_order_id,记录为补全
|
||||
if not trade.get("exit_order_id") or str(trade.get("exit_order_id")).strip() in ("", "0"):
|
||||
exit_order_id_filled += 1
|
||||
|
||||
# 计算盈亏
|
||||
entry_price = float(trade['entry_price'])
|
||||
entry_quantity = float(trade['quantity'])
|
||||
|
|
@ -635,32 +671,91 @@ async def sync_trades_from_binance(
|
|||
realized_pnl=sync_realized_pnl,
|
||||
)
|
||||
updated_count += 1
|
||||
logger.debug(
|
||||
logger.info(
|
||||
f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, "
|
||||
f"类型: {otype or '-'}, 原因: {exit_reason}, 成交价: {avg_price:.4f})"
|
||||
)
|
||||
else:
|
||||
# 这是开仓订单,检查数据库中是否已存在(通过订单号)
|
||||
existing_trade = Trade.get_by_entry_order_id(order_id)
|
||||
if not existing_trade:
|
||||
# 如果不存在,可以创建新记录(但需要更多信息,暂时跳过)
|
||||
logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建")
|
||||
else:
|
||||
# 没有找到匹配的记录
|
||||
skipped_no_match += 1
|
||||
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录(无 open 状态且无 exit_order_id 为空的 closed 记录),跳过")
|
||||
else:
|
||||
# 这是开仓订单
|
||||
existing_trade = Trade.get_by_entry_order_id(order_id)
|
||||
if existing_trade:
|
||||
# 如果已存在,跳过(开仓订单信息通常已完整)
|
||||
logger.debug(f"开仓订单 {order_id} 已存在,跳过")
|
||||
else:
|
||||
# 如果不存在,尝试查找没有 entry_order_id 的记录并补全
|
||||
try:
|
||||
# 查找该 symbol 下没有 entry_order_id 的记录(按时间匹配)
|
||||
order_time_ms = order.get('time', 0)
|
||||
order_time_sec = order_time_ms // 1000 if order_time_ms > 0 else 0
|
||||
|
||||
# 查找时间范围内(订单时间前后 1 小时)且没有 entry_order_id 的记录
|
||||
time_window_start = order_time_sec - 3600
|
||||
time_window_end = order_time_sec + 3600
|
||||
|
||||
trades_no_entry_id = Trade.get_all(
|
||||
account_id=account_id or DEFAULT_ACCOUNT_ID,
|
||||
symbol=symbol,
|
||||
start_timestamp=time_window_start,
|
||||
end_timestamp=time_window_end,
|
||||
time_filter="entry", # 使用 entry_time 过滤
|
||||
)
|
||||
|
||||
# 过滤出没有 entry_order_id 的记录
|
||||
trades_no_entry_id = [
|
||||
t for t in trades_no_entry_id
|
||||
if not t.get("entry_order_id") or str(t.get("entry_order_id")).strip() in ("", "0")
|
||||
]
|
||||
|
||||
# 按价格和数量匹配(允许 5% 误差)
|
||||
matched_trade = None
|
||||
order_qty = float(order.get('executedQty', 0))
|
||||
order_price = float(order.get('avgPrice', 0))
|
||||
|
||||
for t in trades_no_entry_id:
|
||||
t_qty = float(t.get('quantity', 0))
|
||||
t_price = float(t.get('entry_price', 0))
|
||||
# 数量匹配(允许 5% 误差)且价格匹配(允许 2% 误差)
|
||||
if (order_qty > 0 and t_qty > 0 and
|
||||
abs(t_qty - order_qty) / max(order_qty, 1e-8) <= 0.05 and
|
||||
order_price > 0 and t_price > 0 and
|
||||
abs(t_price - order_price) / max(order_price, 1e-8) <= 0.02):
|
||||
matched_trade = t
|
||||
break
|
||||
|
||||
if matched_trade:
|
||||
# 补全 entry_order_id
|
||||
from database.models import Trade as TradeModel
|
||||
if TradeModel.update_entry_order_id(matched_trade['id'], order_id):
|
||||
entry_order_id_filled += 1
|
||||
logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})")
|
||||
else:
|
||||
logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})")
|
||||
else:
|
||||
logger.debug(f"发现新的开仓订单 {order_id} ({symbol}, qty={order_qty}, price={order_price:.4f}),但无法匹配到现有记录(时间窗口内无 entry_order_id 为空的记录),跳过创建")
|
||||
except Exception as e:
|
||||
logger.debug(f"补全开仓订单号失败 {order_id}: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
|
||||
continue
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)",
|
||||
"message": f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条平仓记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个",
|
||||
"total_orders": len(all_orders),
|
||||
"updated_trades": updated_count,
|
||||
"close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]),
|
||||
"open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)])
|
||||
"entry_order_id_filled": entry_order_id_filled,
|
||||
"exit_order_id_filled": exit_order_id_filled,
|
||||
"skipped_existing": skipped_existing,
|
||||
"skipped_no_match": skipped_no_match,
|
||||
"close_orders": len(close_orders),
|
||||
"open_orders": len(open_orders)
|
||||
}
|
||||
|
||||
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录")
|
||||
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,补全开仓订单号 {entry_order_id_filled} 个,补全平仓订单号 {exit_order_id_filled} 个,跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个")
|
||||
return result
|
||||
|
||||
finally:
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user