feat(trades, trade_list, api): 增强历史订单同步功能以支持所有交易对的补全

在 `trades.py` 中更新 `sync_trades_from_binance` 方法,新增 `sync_all_symbols` 参数,允许用户选择同步所有交易对的历史订单并创建缺失的交易记录。更新前端组件 `TradeList.jsx` 以支持该功能,添加用户确认提示和状态显示,提升用户体验和数据完整性。同时,调整 API 接口以处理新的参数,确保与后端交互的准确性。此改动增强了交易记录的完整性和系统的灵活性。
This commit is contained in:
薇薇安 2026-02-17 22:51:31 +08:00
parent ac1336dab8
commit 1430ddc532
3 changed files with 126 additions and 46 deletions

View File

@ -395,11 +395,15 @@ async def get_trade_stats(
async def sync_trades_from_binance( async def sync_trades_from_binance(
account_id: int = Depends(get_account_id), account_id: int = Depends(get_account_id),
days: int = Query(7, ge=1, le=30, description="同步最近N天的订单"), days: int = Query(7, ge=1, le=30, description="同步最近N天的订单"),
sync_all_symbols: bool = Query(False, description="是否同步所有交易对的订单(不限于数据库中的),用于补全缺失记录"),
): ):
""" """
从币安同步历史订单补全 DB 中缺失的 exit_order_id / 平仓信息 从币安同步历史订单补全 DB 中缺失的 exit_order_id / 平仓信息并可创建缺失的交易记录
**WS 已接入后**开仓/平仓订单号主要由 User Data Stream 回写此接口仅作**冷启动或补漏**建议降低调用频率 **WS 已接入后**开仓/平仓订单号主要由 User Data Stream 回写此接口仅作**冷启动或补漏**建议降低调用频率
仅对DB 中近期有记录的 symbol拉取订单避免全市场逐 symbol 请求
参数
- sync_all_symbols=True: 同步所有交易对的订单用于数据库记录缺失时补全
- sync_all_symbols=False: 仅对DB 中近期有记录的 symbol拉取订单默认避免全市场请求
""" """
try: try:
logger.info(f"开始从币安同步历史订单(最近{days}天)...") logger.info(f"开始从币安同步历史订单(最近{days}天)...")
@ -426,35 +430,52 @@ async def sync_trades_from_binance(
start_ts_sec = start_time_ms // 1000 start_ts_sec = start_time_ms // 1000
end_ts_sec = end_time_ms // 1000 end_ts_sec = end_time_ms // 1000
# 初始化客户端(提前初始化,用于获取所有交易对)
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
# 获取需要同步的 symbol 列表 # 获取需要同步的 symbol 列表
# 策略:先查时间范围内的记录,如果没有,则查所有有记录的 symbol用于补全历史订单号
symbol_list = [] symbol_list = []
try: try:
# 先尝试用 "both" 过滤确保能找到所有相关记录包括3天前开仓但最近平仓的 if sync_all_symbols:
trades_in_range = Trade.get_all( # 如果用户选择同步所有交易对,从币安获取所有 USDT 永续合约
start_timestamp=start_ts_sec, logger.info("用户选择同步所有交易对的订单,从币安获取所有 USDT 永续合约列表...")
end_timestamp=end_ts_sec, all_symbols = await client.get_all_usdt_pairs()
account_id=account_id or DEFAULT_ACCOUNT_ID, symbol_list = list(all_symbols) if all_symbols else []
time_filter="both", # 使用 both 确保能找到所有相关记录 logger.info(f"从币安获取到 {len(symbol_list)} 个 USDT 永续合约交易对")
) else:
symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")}) # 默认策略:仅对 DB 中有记录的 symbol 拉取订单
logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对") # 先尝试用 "both" 过滤确保能找到所有相关记录包括3天前开仓但最近平仓的
trades_in_range = Trade.get_all(
# 如果时间范围内没有记录,尝试获取所有有记录的 symbol用于补全历史订单号 start_timestamp=start_ts_sec,
if not symbol_list: end_timestamp=end_ts_sec,
logger.info(f"时间范围内({days}天)无记录,尝试获取所有有记录的 symbol 用于补全订单号")
all_trades = Trade.get_all(
account_id=account_id or DEFAULT_ACCOUNT_ID, account_id=account_id or DEFAULT_ACCOUNT_ID,
time_filter="both", # 使用 both 确保能找到所有相关记录
) )
symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")}) symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")})
logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)}") logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对")
# 如果时间范围内没有记录,尝试获取所有有记录的 symbol用于补全历史订单号
if not symbol_list:
logger.info(f"时间范围内({days}天)无记录,尝试获取所有有记录的 symbol 用于补全订单号")
all_trades = Trade.get_all(
account_id=account_id or DEFAULT_ACCOUNT_ID,
)
symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")})
logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)}")
except Exception as e: except Exception as e:
logger.warning(f"从 DB 获取 symbol 列表失败,将跳过同步: {e}", exc_info=True) logger.warning(f"获取 symbol 列表失败: {e}", exc_info=True)
await client.disconnect()
return { return {
"success": False, "success": False,
"message": f"从数据库获取交易对列表失败: {str(e)}", "message": f"获取交易对列表失败: {str(e)}",
"total_orders": 0, "total_orders": 0,
"updated_trades": 0, "updated_trades": 0,
"created_trades": 0,
"entry_order_id_filled": 0, "entry_order_id_filled": 0,
"exit_order_id_filled": 0, "exit_order_id_filled": 0,
"close_orders": 0, "close_orders": 0,
@ -462,27 +483,20 @@ async def sync_trades_from_binance(
} }
if not symbol_list: if not symbol_list:
logger.info(f"DB 中无任何交易记录,跳过同步") logger.info(f"没有需要同步的交易对,跳过同步")
await client.disconnect()
return { return {
"success": True, "success": True,
"message": f"数据库中没有交易记录,无法同步订单", "message": f"没有需要同步的交易对sync_all_symbols={sync_all_symbols}",
"total_orders": 0, "total_orders": 0,
"updated_trades": 0, "updated_trades": 0,
"created_trades": 0,
"entry_order_id_filled": 0, "entry_order_id_filled": 0,
"exit_order_id_filled": 0, "exit_order_id_filled": 0,
"close_orders": 0, "close_orders": 0,
"open_orders": 0, "open_orders": 0,
} }
# 初始化客户端
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try: try:
# 仅对上述 symbol 拉取订单 # 仅对上述 symbol 拉取订单
all_orders = [] all_orders = []
@ -511,6 +525,7 @@ async def sync_trades_from_binance(
# 同步订单到数据库(仅当前账号) # 同步订单到数据库(仅当前账号)
synced_count = 0 synced_count = 0
updated_count = 0 updated_count = 0
created_count = 0 # 新创建的交易记录数
entry_order_id_filled = 0 # 补全的开仓订单号数量 entry_order_id_filled = 0 # 补全的开仓订单号数量
exit_order_id_filled = 0 # 补全的平仓订单号数量 exit_order_id_filled = 0 # 补全的平仓订单号数量
skipped_existing = 0 # 已存在且完整的订单数 skipped_existing = 0 # 已存在且完整的订单数
@ -696,8 +711,20 @@ async def sync_trades_from_binance(
) )
else: else:
# 没有找到匹配的记录 # 没有找到匹配的记录
skipped_no_match += 1 if sync_all_symbols:
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录(无 open 状态且无 exit_order_id 为空的 closed 记录),跳过") # 如果启用了同步所有交易对,尝试创建完整的交易记录(开仓+平仓)
try:
# 查找是否有对应的开仓订单(通过时间窗口和价格匹配)
# 注意:平仓订单通常有 reduceOnly=True我们需要找到对应的开仓订单
# 由于币安订单历史可能不完整,这里先跳过创建,只记录日志
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录,且 sync_all_symbols=True但创建完整交易记录需要开仓订单信息暂时跳过")
skipped_no_match += 1
except Exception as e:
logger.debug(f"处理平仓订单失败 {order_id}: {e}")
skipped_no_match += 1
else:
skipped_no_match += 1
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录(无 open 状态且无 exit_order_id 为空的 closed 记录),跳过")
else: else:
# 这是开仓订单 # 这是开仓订单
existing_trade = Trade.get_by_entry_order_id(order_id) existing_trade = Trade.get_by_entry_order_id(order_id)
@ -705,7 +732,7 @@ async def sync_trades_from_binance(
# 如果已存在,跳过(开仓订单信息通常已完整) # 如果已存在,跳过(开仓订单信息通常已完整)
logger.debug(f"开仓订单 {order_id} 已存在,跳过") logger.debug(f"开仓订单 {order_id} 已存在,跳过")
else: else:
# 如果不存在,尝试查找没有 entry_order_id 的记录并补全 # 如果不存在,尝试查找没有 entry_order_id 的记录并补全,或创建新记录
try: try:
# 查找该 symbol 下没有 entry_order_id 的记录(按时间匹配) # 查找该 symbol 下没有 entry_order_id 的记录(按时间匹配)
order_time_ms = order.get('time', 0) order_time_ms = order.get('time', 0)
@ -733,6 +760,7 @@ async def sync_trades_from_binance(
matched_trade = None matched_trade = None
order_qty = float(order.get('executedQty', 0)) order_qty = float(order.get('executedQty', 0))
order_price = float(order.get('avgPrice', 0)) order_price = float(order.get('avgPrice', 0))
order_side = order.get('side', '').upper()
for t in trades_no_entry_id: for t in trades_no_entry_id:
t_qty = float(t.get('quantity', 0)) t_qty = float(t.get('quantity', 0))
@ -753,19 +781,49 @@ async def sync_trades_from_binance(
logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})") logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})")
else: else:
logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})") logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})")
elif sync_all_symbols:
# 如果启用了同步所有交易对,且无法匹配到现有记录,创建新记录
try:
# 从订单信息中提取杠杆(如果有)
leverage = 10 # 默认杠杆
try:
# 尝试从订单的 positionSide 或其他字段获取杠杆信息
# 如果没有,使用默认值
pass
except:
pass
# 创建新的交易记录
trade_id = Trade.create(
symbol=symbol,
side=order_side,
quantity=order_qty,
entry_price=order_price,
leverage=leverage,
entry_reason='sync_from_binance',
entry_order_id=order_id,
client_order_id=order.get('clientOrderId'),
account_id=account_id or DEFAULT_ACCOUNT_ID,
status='open', # 先标记为 open如果后续有平仓订单会更新
)
created_count += 1
logger.info(f"✓ 创建新交易记录: {symbol} (ID: {trade_id}, orderId: {order_id}, side={order_side}, qty={order_qty}, price={order_price:.4f})")
except Exception as create_err:
logger.warning(f"创建交易记录失败 {symbol} (orderId: {order_id}): {create_err}")
else: else:
logger.debug(f"发现新的开仓订单 {order_id} ({symbol}, qty={order_qty}, price={order_price:.4f}),但无法匹配到现有记录(时间窗口内无 entry_order_id 为空的记录),跳过创建") logger.debug(f"发现新的开仓订单 {order_id} ({symbol}, qty={order_qty}, price={order_price:.4f}),但无法匹配到现有记录(sync_all_symbols=False跳过创建")
except Exception as e: except Exception as e:
logger.debug(f"补全开仓订单号失败 {order_id}: {e}") logger.debug(f"处理开仓订单失败 {order_id}: {e}")
except Exception as e: except Exception as e:
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}") logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
continue continue
result = { result = {
"success": True, "success": True,
"message": f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条平仓记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}", "message": f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count}记录,创建了 {created_count} 条新记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}",
"total_orders": len(all_orders), "total_orders": len(all_orders),
"updated_trades": updated_count, "updated_trades": updated_count,
"created_trades": created_count,
"entry_order_id_filled": entry_order_id_filled, "entry_order_id_filled": entry_order_id_filled,
"exit_order_id_filled": exit_order_id_filled, "exit_order_id_filled": exit_order_id_filled,
"skipped_existing": skipped_existing, "skipped_existing": skipped_existing,
@ -774,7 +832,7 @@ async def sync_trades_from_binance(
"open_orders": len(open_orders) "open_orders": len(open_orders)
} }
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,补全开仓订单号 {entry_order_id_filled} 个,补全平仓订单号 {exit_order_id_filled} 个,跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}") logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,创建了 {created_count} 条新记录,补全开仓订单号 {entry_order_id_filled} 个,补全平仓订单号 {exit_order_id_filled} 个,跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}")
return result return result
finally: finally:

View File

@ -25,6 +25,7 @@ const TradeList = () => {
const [syncing, setSyncing] = useState(false) // const [syncing, setSyncing] = useState(false) //
const [syncResult, setSyncResult] = useState(null) // const [syncResult, setSyncResult] = useState(null) //
const [syncDays, setSyncDays] = useState(7) // const [syncDays, setSyncDays] = useState(7) //
const [syncAllSymbols, setSyncAllSymbols] = useState(false) //
useEffect(() => { useEffect(() => {
loadData() loadData()
@ -96,7 +97,11 @@ const TradeList = () => {
return // return //
} }
if (!window.confirm(`确定要同步最近 ${syncDays} 天的订单吗?\n这将从币安拉取历史订单并补全缺失的订单号。`)) { const confirmMsg = syncAllSymbols
? `确定要同步最近 ${syncDays} 天的所有交易对订单吗?\n这将从币安拉取所有交易对的历史订单并创建缺失的交易记录。\n⚠ 注意:这会请求大量数据,可能需要较长时间。`
: `确定要同步最近 ${syncDays} 天的订单吗?\n这将从币安拉取历史订单并补全缺失的订单号仅限数据库中已有的交易对`
if (!window.confirm(confirmMsg)) {
return return
} }
@ -104,10 +109,10 @@ const TradeList = () => {
setSyncResult(null) setSyncResult(null)
try { try {
const result = await api.syncTradesFromBinance(syncDays) const result = await api.syncTradesFromBinance(syncDays, syncAllSymbols)
setSyncResult({ setSyncResult({
success: true, success: true,
message: `同步完成:共处理 ${result.total_orders || 0} 个订单,更新 ${result.updated_trades || 0} 条记录`, message: `同步完成:共处理 ${result.total_orders || 0} 个订单,更新 ${result.updated_trades || 0} 条记录${result.created_trades ? `,创建 ${result.created_trades} 条新记录` : ''}`,
details: result details: result
}) })
// //
@ -522,7 +527,7 @@ const TradeList = () => {
<button className="btn-secondary" onClick={handleReset}> <button className="btn-secondary" onClick={handleReset}>
重置 重置
</button> </button>
<div style={{ display: 'flex', alignItems: 'center', gap: '8px', marginLeft: '10px' }}> <div style={{ display: 'flex', alignItems: 'center', gap: '8px', marginLeft: '10px', flexWrap: 'wrap' }}>
<label style={{ fontSize: '12px', color: '#666' }}>同步天数</label> <label style={{ fontSize: '12px', color: '#666' }}>同步天数</label>
<select <select
value={syncDays} value={syncDays}
@ -535,6 +540,16 @@ const TradeList = () => {
<option value={14}>14</option> <option value={14}>14</option>
<option value={30}>30</option> <option value={30}>30</option>
</select> </select>
<label style={{ display: 'flex', alignItems: 'center', gap: '4px', fontSize: '12px', cursor: 'pointer', marginLeft: '8px' }}>
<input
type="checkbox"
checked={syncAllSymbols}
onChange={(e) => setSyncAllSymbols(e.target.checked)}
disabled={syncing}
style={{ cursor: 'pointer' }}
/>
<span title="勾选后将同步所有交易对的订单(不限于数据库中的),用于补全缺失的交易记录">同步所有交易对</span>
</label>
<button <button
className="btn-secondary" className="btn-secondary"
onClick={handleSyncOrders} onClick={handleSyncOrders}
@ -546,7 +561,7 @@ const TradeList = () => {
padding: '6px 12px', padding: '6px 12px',
fontSize: '12px' fontSize: '12px'
}} }}
title="从币安同步历史订单,补全缺失的订单号(不会重复同步)" title={syncAllSymbols ? "从币安同步所有交易对的历史订单,并创建缺失的交易记录(会请求大量数据)" : "从币安同步历史订单,补全缺失的订单号(仅限数据库中已有的交易对,不会重复同步)"}
> >
{syncing ? '同步中...' : '同步订单'} {syncing ? '同步中...' : '同步订单'}
</button> </button>
@ -578,6 +593,9 @@ const TradeList = () => {
开仓订单{syncResult.details.open_orders || 0} | 开仓订单{syncResult.details.open_orders || 0} |
平仓订单{syncResult.details.close_orders || 0} | 平仓订单{syncResult.details.close_orders || 0} |
更新记录{syncResult.details.updated_trades || 0} 更新记录{syncResult.details.updated_trades || 0}
{syncResult.details.created_trades ? ` | 新建记录:${syncResult.details.created_trades}` : ''}
{syncResult.details.entry_order_id_filled ? ` | 补全开仓订单号:${syncResult.details.entry_order_id_filled}` : ''}
{syncResult.details.exit_order_id_filled ? ` | 补全平仓订单号:${syncResult.details.exit_order_id_filled}` : ''}
</div> </div>
)} )}
<button <button

View File

@ -406,8 +406,12 @@ export const api = {
}, },
// 同步订单:从币安同步历史订单,补全缺失的订单号 // 同步订单:从币安同步历史订单,补全缺失的订单号
syncTradesFromBinance: async (days = 7) => { syncTradesFromBinance: async (days = 7, syncAllSymbols = false) => {
const response = await fetch(buildUrl(`/api/trades/sync-binance?days=${days}`), { const params = new URLSearchParams({ days: String(days) });
if (syncAllSymbols) {
params.set('sync_all_symbols', 'true');
}
const response = await fetch(buildUrl(`/api/trades/sync-binance?${params.toString()}`), {
method: 'POST', method: 'POST',
headers: { headers: {
...withAccountHeaders({ 'Content-Type': 'application/json' }), ...withAccountHeaders({ 'Content-Type': 'application/json' }),