auto_trade_sys/backend/api/routes/trades.py
薇薇安 ac022bd62a 1
2026-02-25 23:07:14 +08:00

1018 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易记录API
"""
from fastapi import APIRouter, Query, HTTPException, Header, Depends
from typing import Optional
from datetime import datetime, timedelta
from collections import Counter
import json
import sys
from pathlib import Path
import logging
import asyncio
import time
from datetime import timezone, timedelta
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
from database.models import Trade, Account
from api.auth_deps import get_account_id
router = APIRouter()
# 在模块级别创建logger与其他路由文件保持一致
logger = logging.getLogger(__name__)
def get_timestamp_range(period: Optional[str] = None):
"""
根据时间段参数返回开始和结束时间戳Unix时间戳秒数
Args:
period: 时间段 ('1d', '7d', '30d', 'today', 'week', 'month', 'custom')
Returns:
(start_timestamp, end_timestamp) 元组Unix时间戳秒数
"""
# 使用当前时间作为结束时间Unix时间戳秒数
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
end_timestamp = int(now.timestamp())
if period == '1d':
# 最近1天当前时间减去24小时
start_timestamp = end_timestamp - 24 * 3600
elif period == '7d':
# 最近7天当前时间减去7*24小时
start_timestamp = end_timestamp - 7 * 24 * 3600
elif period == '30d':
# 最近30天当前时间减去30*24小时
start_timestamp = end_timestamp - 30 * 24 * 3600
elif period == 'today':
# 今天从今天00:00:00到现在
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(today_start.timestamp())
elif period == 'week':
# 本周从本周一00:00:00到现在
days_since_monday = now.weekday() # 0=Monday, 6=Sunday
week_start = (now - timedelta(days=days_since_monday)).replace(hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(week_start.timestamp())
elif period == 'month':
# 本月从本月1日00:00:00到现在
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(month_start.timestamp())
else:
return None, None
return start_timestamp, end_timestamp
@router.get("")
@router.get("/")
async def get_trades(
account_id: int = Depends(get_account_id),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天), 'today'(今天), 'week'(本周), 'month'(本月)"),
symbol: Optional[str] = Query(None, description="交易对筛选"),
trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"),
exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"),
status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"),
time_filter: str = Query("exit", description="时间范围按哪种时间筛选: 'exit'(按平仓时间), 'entry'(按开仓时间,适合策略分析), 'created'(按创建时间), 'both'"),
include_sync: bool = Query(True, description="是否包含 entry_reason 为 sync_recovered 的补建/同步单(默认包含,便于订单记录与币安一致)"),
reconciled_only: bool = Query(True, description="仅返回可对账记录(有 entry_order_id已平仓的还有 exit_order_id与币安一致统计真实"),
limit: int = Query(100, ge=1, le=1000, description="返回记录数限制"),
):
"""
获取交易记录
支持两种筛选方式:
1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d', 'today', 'week', 'month')
2. 自定义时间段筛选:使用 start_date 和 end_date 参数会转换为Unix时间戳
默认按平仓时间(time_filter=exit):选「今天」= 今天平掉的单 + 今天开的未平仓,更符合直觉。
"""
try:
logger.debug(f"获取交易记录: period={period}, symbol={symbol}, status={status}, limit={limit}, reconciled_only={reconciled_only}")
start_timestamp = None
end_timestamp = None
if period:
period_start, period_end = get_timestamp_range(period)
if period_start is not None and period_end is not None:
start_timestamp = period_start
end_timestamp = period_end
elif start_date or end_date:
# 自定义时间段将日期字符串转换为Unix时间戳
beijing_tz = timezone(timedelta(hours=8))
if start_date:
if len(start_date) == 10: # YYYY-MM-DD
start_date = f"{start_date} 00:00:00"
try:
dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
start_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的开始日期格式: {start_date}")
if end_date:
if len(end_date) == 10: # YYYY-MM-DD
end_date = f"{end_date} 23:59:59"
try:
dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
end_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的结束日期格式: {end_date}")
trades = Trade.get_all(
start_timestamp, end_timestamp, symbol, status, trade_type, exit_reason,
account_id=account_id, time_filter=time_filter or "exit",
limit=limit, reconciled_only=reconciled_only, include_sync=include_sync,
)
formatted_trades = []
for trade in trades:
formatted_trade = dict(trade)
# 将 exit_reason 转换为中文显示
exit_reason = trade.get('exit_reason', '')
if exit_reason:
exit_reason_map = {
'manual': '手动平仓',
'stop_loss': '自动平仓(止损)',
'take_profit': '自动平仓(止盈)',
'take_profit_partial_then_take_profit': '自动平仓(分步止盈后止盈)',
'take_profit_partial_then_stop': '自动平仓(分步止盈后止损)',
'take_profit_partial_then_trailing_stop': '自动平仓(分步止盈后移动止损)',
'early_take_profit': '自动平仓(早止盈)',
'trailing_stop': '自动平仓(移动止损)',
'sync': '同步平仓'
}
formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason)
else:
formatted_trade['exit_reason_display'] = ''
# 入场思路 entry_context 可能从 DB 以 JSON 字符串返回,解析为对象便于前端/分析使用
if formatted_trade.get('entry_context') is not None and isinstance(formatted_trade['entry_context'], str):
try:
formatted_trade['entry_context'] = json.loads(formatted_trade['entry_context'])
except Exception:
pass
formatted_trades.append(formatted_trade)
result = {
"total": len(formatted_trades),
"trades": formatted_trades,
"filters": {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
"start_date": datetime.fromtimestamp(start_timestamp).strftime('%Y-%m-%d %H:%M:%S') if start_timestamp else None,
"end_date": datetime.fromtimestamp(end_timestamp).strftime('%Y-%m-%d %H:%M:%S') if end_timestamp else None,
"period": period,
"symbol": symbol,
"status": status,
"reconciled_only": reconciled_only,
}
}
# 仅可对账且无记录时提示:可先同步币安订单补全订单号,或取消「仅可对账」查看全部
if reconciled_only and len(formatted_trades) == 0:
result["hint"] = "当前没有可对账记录。可尝试1) 调用「同步币安订单」接口补全开仓/平仓订单号2) 使用 reconciled_only=false 查看全部记录(含无订单号的补建单)。"
logger.debug(f"返回交易记录: {len(result['trades'])}")
return result
except Exception as e:
logger.error(f"获取交易记录失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/stats")
async def get_trade_stats(
account_id: int = Depends(get_account_id),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d', 'today', 'week', 'month'"),
symbol: Optional[str] = Query(None, description="交易对筛选"),
time_filter: str = Query("exit", description="时间范围按哪种时间: 'exit'(按平仓时间), 'entry'(按开仓时间), 'created'(按创建时间), 'both'"),
include_sync: bool = Query(True, description="是否包含 entry_reason 为 sync_recovered 的补建/同步单(默认与订单记录一致)"),
reconciled_only: bool = Query(True, description="仅统计可对账记录,与币安一致,避免系统盈利/币安亏损偏差"),
):
"""获取交易统计(默认按平仓时间统计:今日=今日平仓的盈亏,与订单记录筛选一致)"""
try:
logger.debug(f"获取交易统计: period={period}, symbol={symbol}, reconciled_only={reconciled_only}")
start_timestamp = None
end_timestamp = None
if period:
period_start, period_end = get_timestamp_range(period)
if period_start is not None and period_end is not None:
start_timestamp = period_start
end_timestamp = period_end
elif start_date or end_date:
# 自定义时间段将日期字符串转换为Unix时间戳
beijing_tz = timezone(timedelta(hours=8))
if start_date:
if len(start_date) == 10: # YYYY-MM-DD
start_date = f"{start_date} 00:00:00"
try:
dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
start_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的开始日期格式: {start_date}")
if end_date:
if len(end_date) == 10: # YYYY-MM-DD
end_date = f"{end_date} 23:59:59"
try:
dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
end_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的结束日期格式: {end_date}")
trades = Trade.get_all(
start_timestamp, end_timestamp, symbol, None,
account_id=account_id, time_filter=time_filter or "exit",
limit=None, reconciled_only=reconciled_only, include_sync=include_sync,
)
closed_trades = [t for t in trades if t.get("status") == "closed"]
# 辅助函数:计算净盈亏(优先使用 realized_pnl - commission
def get_net_pnl(t):
pnl = float(t.get('pnl') or 0)
realized = t.get('realized_pnl')
if realized is not None:
pnl = float(realized)
commission = float(t.get('commission') or 0)
commission_asset = t.get('commission_asset')
# 如果手续费是 USDT则扣除
if commission_asset == 'USDT':
pnl -= commission
return pnl
# 排除0盈亏的订单abs(pnl) < 0.01 USDT视为0盈亏这些订单不应该影响胜率统计
ZERO_PNL_THRESHOLD = 0.01 # 0.01 USDT的阈值小于此值视为0盈亏
meaningful_trades = [t for t in closed_trades if abs(get_net_pnl(t)) >= ZERO_PNL_THRESHOLD]
zero_pnl_trades = [t for t in closed_trades if abs(get_net_pnl(t)) < ZERO_PNL_THRESHOLD]
# 只统计有意义的交易排除0盈亏的胜率
win_trades = [t for t in meaningful_trades if get_net_pnl(t) > 0]
loss_trades = [t for t in meaningful_trades if get_net_pnl(t) < 0]
# 盈利/亏损均值(用于观察是否接近 3:1
avg_win_pnl = sum(get_net_pnl(t) for t in win_trades) / len(win_trades) if win_trades else 0.0
avg_loss_pnl_abs = (
sum(abs(get_net_pnl(t)) for t in loss_trades) / len(loss_trades) if loss_trades else 0.0
)
win_loss_ratio = (avg_win_pnl / avg_loss_pnl_abs) if avg_loss_pnl_abs > 0 else None
# 实际盈亏比(所有盈利单的总盈利 / 所有亏损单的总亏损,必须 > 1.5,目标 2.5-3.0
total_win_pnl = sum(get_net_pnl(t) for t in win_trades) if win_trades else 0.0
total_loss_pnl_abs = sum(abs(get_net_pnl(t)) for t in loss_trades) if loss_trades else 0.0
actual_profit_loss_ratio = (total_win_pnl / total_loss_pnl_abs) if total_loss_pnl_abs > 0 else None
# 盈利因子(总盈利金额 / 总亏损金额,必须 > 1.1,目标 1.5+
profit_factor = (total_win_pnl / total_loss_pnl_abs) if total_loss_pnl_abs > 0 else None
# 平仓原因分布(用来快速定位胜率低的主要来源:止损/止盈/同步等)
exit_reason_counts = Counter((t.get("exit_reason") or "unknown") for t in meaningful_trades)
# 平均持仓时长(分钟):
# - 优先使用 duration_minutes若历史没写入则用 exit_time-entry_time 实时计算)
durations = []
for t in meaningful_trades:
dm = t.get("duration_minutes")
if dm is not None:
try:
dm_f = float(dm)
if dm_f >= 0:
durations.append(dm_f)
continue
except Exception:
pass
et = t.get("entry_time")
xt = t.get("exit_time")
try:
if et is None or xt is None:
continue
et_i = int(et)
xt_i = int(xt)
if xt_i >= et_i:
durations.append((xt_i - et_i) / 60.0)
except Exception:
continue
avg_duration_minutes = (sum(durations) / len(durations)) if durations else None
stats = {
"total_trades": len(trades),
"closed_trades": len(closed_trades),
"open_trades": len(trades) - len(closed_trades),
"meaningful_trades": len(meaningful_trades), # 有意义的交易数排除0盈亏
"zero_pnl_trades": len(zero_pnl_trades), # 0盈亏交易数
"win_trades": len(win_trades),
"loss_trades": len(loss_trades),
"win_rate": len(win_trades) / len(meaningful_trades) * 100 if meaningful_trades else 0, # 基于有意义的交易计算胜率
"total_pnl": sum(get_net_pnl(t) for t in closed_trades),
"avg_pnl": sum(get_net_pnl(t) for t in closed_trades) / len(closed_trades) if closed_trades else 0,
# 额外统计:盈利单均值 vs 亏损单均值(绝对值)以及比值(目标 3:1
"avg_win_pnl": avg_win_pnl,
"avg_loss_pnl_abs": avg_loss_pnl_abs,
"avg_win_loss_ratio": win_loss_ratio,
"avg_win_loss_ratio_target": 3.0,
# 实际盈亏比(所有盈利单总盈利 / 所有亏损单总亏损,目标 > 2.0
"actual_profit_loss_ratio": actual_profit_loss_ratio,
"actual_profit_loss_ratio_target": 2.0,
"total_win_pnl": total_win_pnl,
"total_loss_pnl_abs": total_loss_pnl_abs,
# 盈利因子(总盈利 / 总亏损,目标 > 1.2
"profit_factor": profit_factor,
"profit_factor_target": 1.2,
"exit_reason_counts": dict(exit_reason_counts),
"avg_duration_minutes": avg_duration_minutes,
# 总交易量(名义下单量口径):优先使用 notional_usdt新字段否则回退 entry_price * quantity
"total_notional_usdt": sum(
float(t.get('notional_usdt') or (float(t.get('entry_price', 0)) * float(t.get('quantity', 0))))
for t in trades
),
"filters": {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
"start_date": datetime.fromtimestamp(start_timestamp).strftime('%Y-%m-%d %H:%M:%S') if start_timestamp else None,
"end_date": datetime.fromtimestamp(end_timestamp).strftime('%Y-%m-%d %H:%M:%S') if end_timestamp else None,
"period": period,
"symbol": symbol,
"reconciled_only": reconciled_only,
}
}
logger.debug(
f"交易统计: total={stats['total_trades']}, closed={stats['closed_trades']}, "
f"win_rate={stats['win_rate']:.2f}%, total_pnl={stats['total_pnl']:.2f}"
)
return stats
except Exception as e:
logger.error(f"获取交易统计失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync-binance")
async def sync_trades_from_binance(
account_id: int = Depends(get_account_id),
days: int = Query(7, ge=1, le=30, description="同步最近N天的订单"),
sync_all_symbols: bool = Query(False, description="是否同步所有交易对的订单(不限于数据库中的),用于补全缺失记录"),
):
"""
从币安同步历史订单,补全 DB 中缺失的 exit_order_id / 平仓信息,并可创建缺失的交易记录。
**WS 已接入后**:开仓/平仓订单号主要由 User Data Stream 回写,此接口仅作**冷启动或补漏**,建议降低调用频率。
参数:
- sync_all_symbols=True: 同步所有交易对的订单(用于数据库记录缺失时补全)
- sync_all_symbols=False: 仅对「DB 中近期有记录的 symbol」拉取订单默认避免全市场请求
"""
try:
logger.info(f"开始从币安同步历史订单(最近{days}sync_all_symbols={sync_all_symbols}account_id={account_id or DEFAULT_ACCOUNT_ID}...")
# 导入必要的模块
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
alternative_path = project_root / 'backend' / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
else:
raise HTTPException(status_code=500, detail="交易系统模块不存在")
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
from database.models import DEFAULT_ACCOUNT_ID
aid = account_id or DEFAULT_ACCOUNT_ID
api_key, api_secret, use_testnet, status = Account.get_credentials(aid)
if not api_key or not api_secret:
raise HTTPException(
status_code=400,
detail=f"账号 {aid} 未配置 API 密钥,无法从币安同步订单。请在配置中设置该账号的 BINANCE_API_KEY 和 BINANCE_API_SECRET。",
)
# 计算时间范围(秒,供 DB 查询)
end_time_ms = int(time.time() * 1000)
start_time_ms = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
start_ts_sec = start_time_ms // 1000
end_ts_sec = end_time_ms // 1000
# 使用当前账号的 API 初始化客户端(保证同步的是当前账号的订单)
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet,
)
await client.connect()
# 获取需要同步的 symbol 列表
symbol_list = []
auto_full_sync = False # 是否因 DB 无记录而自动全量
try:
if sync_all_symbols:
# 如果用户选择同步所有交易对,从币安获取所有 USDT 永续合约
logger.info("用户选择同步所有交易对的订单,从币安获取所有 USDT 永续合约列表...")
all_symbols = await client.get_all_usdt_pairs()
symbol_list = list(all_symbols) if all_symbols else []
logger.info(f"从币安获取到 {len(symbol_list)} 个 USDT 永续合约交易对")
else:
# 默认策略:仅对 DB 中有记录的 symbol 拉取订单
trades_in_range = Trade.get_all(
start_timestamp=start_ts_sec,
end_timestamp=end_ts_sec,
account_id=aid,
time_filter="both",
)
symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")})
logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对")
# 如果时间范围内没有记录,再尝试「所有有记录的 symbol」限制条数避免全表加载
if not symbol_list:
all_trades = Trade.get_all(account_id=aid, limit=5000)
symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")})
logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)}")
# 若仍然无 symbolDB 完全无记录),自动按全量同步,否则同步结果为 0 条
if not symbol_list:
logger.info("DB 中该账号无任何交易记录,自动按全量交易对从币安拉取订单以补全记录")
all_symbols = await client.get_all_usdt_pairs()
symbol_list = list(all_symbols) if all_symbols else []
auto_full_sync = True
logger.info(f"自动全量:从币安获取到 {len(symbol_list)} 个 USDT 永续合约交易对")
except Exception as e:
logger.warning(f"获取 symbol 列表失败: {e}", exc_info=True)
await client.disconnect()
return {
"success": False,
"message": f"获取交易对列表失败: {str(e)}",
"total_orders": 0,
"updated_trades": 0,
"created_trades": 0,
"entry_order_id_filled": 0,
"exit_order_id_filled": 0,
"close_orders": 0,
"open_orders": 0,
}
if not symbol_list:
logger.info(f"没有需要同步的交易对,跳过同步")
await client.disconnect()
return {
"success": True,
"message": f"没有需要同步的交易对sync_all_symbols={sync_all_symbols}",
"total_orders": 0,
"updated_trades": 0,
"created_trades": 0,
"entry_order_id_filled": 0,
"exit_order_id_filled": 0,
"close_orders": 0,
"open_orders": 0,
}
try:
# 仅对上述 symbol 拉取订单
all_orders = []
try:
if sync_all_symbols:
logger.info(f"全量同步模式:对 {len(symbol_list)} 个交易对拉取订单(时间范围: {days}天,{datetime.fromtimestamp(start_ts_sec)}{datetime.fromtimestamp(end_ts_sec)}")
else:
logger.info(f"增量同步模式:对 {len(symbol_list)} 个有 DB 记录的 symbol 拉取订单(时间范围: {days}天,{datetime.fromtimestamp(start_ts_sec)}{datetime.fromtimestamp(end_ts_sec)}")
for idx, symbol in enumerate(symbol_list, 1):
try:
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time_ms,
endTime=end_time_ms
)
filled_orders = [o for o in orders if o.get('status') == 'FILLED']
if filled_orders:
logger.info(f" [{idx}/{len(symbol_list)}] {symbol}: 获取到 {len(filled_orders)} 个已成交订单")
elif orders:
logger.debug(f" [{idx}/{len(symbol_list)}] {symbol}: 获取到 {len(orders)} 个订单,但无已成交订单")
all_orders.extend(filled_orders)
await asyncio.sleep(0.1)
except Exception as e:
logger.warning(f" [{idx}/{len(symbol_list)}] 获取 {symbol} 订单失败: {e}")
continue
logger.info(f"✓ 从币安获取到 {len(all_orders)} 个已成交订单(涉及 {len(symbol_list)} 个交易对)")
if len(all_orders) == 0:
logger.warning(f"⚠️ 从币安获取到的订单数为 0可能原因1) 时间范围内确实没有订单 2) 交易对列表为空 3) API 调用失败。时间范围: {datetime.fromtimestamp(start_ts_sec)}{datetime.fromtimestamp(end_ts_sec)}")
except Exception as e:
logger.error(f"获取币安订单失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}")
if len(all_orders) == 0:
logger.warning(f"⚠️ 从币安获取到的订单数为 0可能原因1) 时间范围内确实没有订单 2) 交易对列表为空 3) API 调用失败")
return {
"success": True,
"message": f"从币安获取到 0 个订单(时间范围: {days}天,交易对数量: {len(symbol_list)})。可能原因:时间范围内确实没有订单,或请检查币安 API 连接。",
"total_orders": 0,
"updated_trades": 0,
"created_trades": 0,
"entry_order_id_filled": 0,
"exit_order_id_filled": 0,
"skipped_existing": 0,
"skipped_no_match": 0,
"close_orders": 0,
"open_orders": 0,
}
# 同步订单到数据库(仅当前账号)
synced_count = 0
updated_count = 0
created_count = 0 # 新创建的交易记录数
entry_order_id_filled = 0 # 补全的开仓订单号数量
exit_order_id_filled = 0 # 补全的平仓订单号数量
skipped_existing = 0 # 已存在且完整的订单数
skipped_no_match = 0 # 无法匹配的记录数
# 按时间排序,从旧到新
all_orders.sort(key=lambda x: x.get('time', 0))
# 全量或自动全量时,对无法匹配的开仓订单也创建新记录
effective_sync_all = sync_all_symbols or auto_full_sync
# 先处理平仓订单reduceOnly再处理开仓订单
close_orders = [o for o in all_orders if o.get('reduceOnly', False)]
open_orders = [o for o in all_orders if not o.get('reduceOnly', False)]
logger.info(f"开始同步:平仓订单 {len(close_orders)} 个,开仓订单 {len(open_orders)}")
# 1. 处理平仓订单
logger.info(f"开始处理 {len(close_orders)} 个平仓订单...")
for order in close_orders:
symbol = order.get('symbol')
order_id = order.get('orderId')
side = order.get('side')
quantity = float(order.get('executedQty', 0))
avg_price = float(order.get('avgPrice', 0))
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
reduce_only = order.get('reduceOnly', False)
otype = str(order.get('type') or order.get('origType') or '').upper()
exit_time_ts = None
try:
ms = order.get('updateTime') or order.get('time')
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
if quantity <= 0 or avg_price <= 0:
continue
try:
# 这是平仓订单close_orders 已经筛选出 reduceOnly=True 的订单)
# 首先检查是否已经通过订单号同步过(避免重复)
existing_trade = Trade.get_by_exit_order_id(order_id)
# 如果已有 exit_order_id 且 exit_reason 不是 sync说明已完整同步跳过
if existing_trade and existing_trade.get("exit_order_id") and existing_trade.get("exit_reason") not in (None, "", "sync"):
skipped_existing += 1
logger.debug(f"订单 {order_id} 已完整同步过exit_reason={existing_trade.get('exit_reason')}),跳过")
continue
# 查找数据库中该交易对的 open 状态记录(仅当前账号),或已平仓但 exit_order_id 为空的记录
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=aid)
closed_trades_no_exit_id = []
if not existing_trade and not open_trades:
# 如果没有 open 记录,查找已平仓但 exit_order_id 为空的记录
try:
closed_trades = Trade.get_by_symbol(symbol, status='closed', account_id=aid)
closed_trades_no_exit_id = [
t for t in closed_trades
if not t.get("exit_order_id") or str(t.get("exit_order_id")).strip() in ("", "0")
]
except Exception:
pass
if existing_trade or open_trades or closed_trades_no_exit_id:
# 找到匹配的交易记录(优先用 existing_trade其次 open_trades最后 closed_trades_no_exit_id
if existing_trade:
trade = existing_trade
elif open_trades:
trade = open_trades[0] # 取第一个 open 记录
else:
# 从已平仓但无 exit_order_id 的记录中选择(按 entry_time 最近的一条)
closed_trades_no_exit_id.sort(key=lambda x: x.get('entry_time', 0) or 0, reverse=True)
trade = closed_trades_no_exit_id[0]
trade_id = trade['id']
# 如果之前没有 exit_order_id记录为补全
if not trade.get("exit_order_id") or str(trade.get("exit_order_id")).strip() in ("", "0"):
exit_order_id_filled += 1
# 计算盈亏
entry_price = float(trade['entry_price'])
entry_quantity = float(trade['quantity'])
# 使用实际成交数量(可能部分平仓)
actual_quantity = min(quantity, entry_quantity)
if trade['side'] == 'BUY':
pnl = (avg_price - entry_price) * actual_quantity
pnl_percent = ((avg_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - avg_price) * actual_quantity
pnl_percent = ((entry_price - avg_price) / entry_price) * 100
# 细分 exit_reason优先使用币安订单类型其次用价格接近止损/止盈做兜底
exit_reason = "sync"
# 检查订单的 reduceOnly 字段:如果是 true说明是自动平仓不应该标记为 manual
is_reduce_only = order.get("reduceOnly", False) if isinstance(order, dict) else False
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
# 如果是 reduceOnly 订单,说明是自动平仓(可能是保护单触发的),先标记为 sync后续用价格判断
if is_reduce_only:
exit_reason = "sync" # 临时标记,后续用价格判断
else:
exit_reason = "manual" # 非 reduceOnly 的 MARKET/LIMIT 订单才是真正的手动平仓
try:
def _close_to(a: float, b: float, max_pct: float = 0.02) -> bool: # 放宽到2%,因为滑点可能导致价格不完全一致
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(avg_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
# 优先检查止损
if sl is not None and _close_to(ep, float(sl), max_pct=0.02):
exit_reason = "stop_loss"
# 然后检查止盈
elif tp is not None and _close_to(ep, float(tp), max_pct=0.02):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.02):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.02):
exit_reason = "take_profit"
# 如果价格接近入场价,可能是移动止损触发的
elif is_reduce_only and exit_reason == "sync":
entry_price_val = float(trade.get("entry_price", 0) or 0)
if entry_price_val > 0 and _close_to(ep, entry_price_val, max_pct=0.01):
exit_reason = "trailing_stop"
except Exception:
pass
# 从币安成交获取手续费与实际盈亏,保证统计与币安一致
sync_commission = None
sync_commission_asset = None
sync_realized_pnl = None
try:
recent_trades = await client.get_recent_trades(symbol, limit=30)
related = [t for t in recent_trades if str(t.get('orderId')) == str(order_id)]
if related:
sync_commission = sum(float(t.get('commission', 0)) for t in related)
assets = {t.get('commissionAsset') for t in related if t.get('commissionAsset')}
sync_commission_asset = "/".join(assets) if assets else None
sync_realized_pnl = sum(float(t.get('realizedPnl', 0)) for t in related)
except Exception as fee_err:
logger.debug(f"同步订单 {order_id} 手续费失败: {fee_err}")
# 持仓持续时间(分钟)
duration_minutes = None
try:
et = trade.get("entry_time")
if et is not None and exit_time_ts is not None:
et_i = int(et)
if exit_time_ts >= et_i:
duration_minutes = int((exit_time_ts - et_i) / 60)
except Exception:
duration_minutes = None
# 更新数据库(包含订单号、手续费与实际盈亏)
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order_id, # 保存订单号,确保唯一性
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
commission=sync_commission,
commission_asset=sync_commission_asset,
realized_pnl=sync_realized_pnl,
)
updated_count += 1
logger.info(
f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, "
f"类型: {otype or '-'}, 原因: {exit_reason}, 成交价: {avg_price:.4f})"
)
else:
# 没有找到匹配的记录
if effective_sync_all:
# 如果启用了同步所有交易对,尝试创建完整的交易记录(开仓+平仓)
try:
# 查找是否有对应的开仓订单(通过时间窗口和价格匹配)
# 注意:平仓订单通常有 reduceOnly=True我们需要找到对应的开仓订单
# 由于币安订单历史可能不完整,这里先跳过创建,只记录日志
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录,且 sync_all_symbols=True但创建完整交易记录需要开仓订单信息暂时跳过")
skipped_no_match += 1
except Exception as e:
logger.debug(f"处理平仓订单失败 {order_id}: {e}")
skipped_no_match += 1
else:
skipped_no_match += 1
logger.debug(f"平仓订单 {order_id} ({symbol}) 无法匹配到现有记录(无 open 状态且无 exit_order_id 为空的 closed 记录),跳过")
except Exception as e:
logger.warning(f"同步平仓订单失败 {symbol} (订单ID: {order_id}): {e}")
continue
# 2. 处理开仓订单
logger.info(f"开始处理 {len(open_orders)} 个开仓订单...")
for order in open_orders:
symbol = order.get('symbol')
order_id = order.get('orderId')
side = order.get('side')
quantity = float(order.get('executedQty', 0))
avg_price = float(order.get('avgPrice', 0))
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
otype = str(order.get('type') or order.get('origType') or '').upper()
if quantity <= 0 or avg_price <= 0:
continue
try:
# 这是开仓订单
existing_trade = Trade.get_by_entry_order_id(order_id)
if existing_trade:
# 如果已存在,跳过(开仓订单信息通常已完整)
logger.debug(f"开仓订单 {order_id} 已存在,跳过")
else:
# 如果不存在,尝试查找没有 entry_order_id 的记录并补全,或创建新记录
try:
# 查找该 symbol 下没有 entry_order_id 的记录(按时间匹配)
order_time_ms = order.get('time', 0)
order_time_sec = order_time_ms // 1000 if order_time_ms > 0 else 0
# 查找时间范围内(订单时间前后 1 小时)且没有 entry_order_id 的记录
time_window_start = order_time_sec - 3600
time_window_end = order_time_sec + 3600
trades_in_window = Trade.get_all(
account_id=aid,
symbol=symbol,
start_timestamp=time_window_start,
end_timestamp=time_window_end,
time_filter="entry",
reconciled_only=False,
limit=500,
)
trades_no_entry_id = [
t for t in trades_in_window
if not t.get("entry_order_id") or str(t.get("entry_order_id")).strip() in ("", "0")
]
# 并入缺少 entry_order_id 的其它记录(含 entry_time 为 NULL 的旧记录),便于补全后「仅可对账」能显示
extra = Trade.get_trades_missing_entry_order_id(symbol, aid, limit=50)
seen_ids = {t["id"] for t in trades_no_entry_id}
for t in extra:
if t.get("id") not in seen_ids:
trades_no_entry_id.append(t)
seen_ids.add(t["id"])
# 按价格和数量匹配(允许 5% 误差)
matched_trade = None
order_qty = float(order.get('executedQty', 0))
order_price = float(order.get('avgPrice', 0))
order_side = order.get('side', '').upper()
for t in trades_no_entry_id:
t_qty = float(t.get('quantity', 0))
t_price = float(t.get('entry_price', 0))
# 数量匹配(允许 5% 误差)且价格匹配(允许 2% 误差)
if (order_qty > 0 and t_qty > 0 and
abs(t_qty - order_qty) / max(order_qty, 1e-8) <= 0.05 and
order_price > 0 and t_price > 0 and
abs(t_price - order_price) / max(order_price, 1e-8) <= 0.02):
matched_trade = t
break
if matched_trade:
# 补全 entry_order_id
from database.models import Trade as TradeModel
if TradeModel.update_entry_order_id(matched_trade['id'], order_id):
entry_order_id_filled += 1
logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})")
else:
logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})")
else:
# 订单统一由自动下单入 DB同步仅补全已有记录的订单号不创建新记录
logger.debug(f"发现开仓订单 {order_id} ({symbol}) 无法匹配到现有记录已跳过仅自动下单入DB")
except Exception as e:
logger.debug(f"处理开仓订单失败 {order_id}: {e}")
except Exception as e:
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
continue
msg = f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,创建了 {created_count} 条新记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}"
if auto_full_sync:
msg += "(因 DB 无记录已自动按全量交易对拉取)"
result = {
"success": True,
"message": msg,
"total_orders": len(all_orders),
"updated_trades": updated_count,
"created_trades": created_count,
"entry_order_id_filled": entry_order_id_filled,
"exit_order_id_filled": exit_order_id_filled,
"skipped_existing": skipped_existing,
"skipped_no_match": skipped_no_match,
"close_orders": len(close_orders),
"open_orders": len(open_orders)
}
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,创建了 {created_count} 条新记录,补全开仓订单号 {entry_order_id_filled} 个,补全平仓订单号 {exit_order_id_filled} 个,跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match}")
return result
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"同步币安订单失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")
@router.get("/verify-binance")
async def verify_trades_against_binance(
account_id: int = Depends(get_account_id),
days: int = Query(30, ge=1, le=90, description="校验最近 N 天的记录,默认 30"),
limit: int = Query(100, ge=1, le=500, description="最多校验条数,默认 100"),
):
"""
用币安订单接口逐条校验本账号「可对账」交易记录的准确性,便于把握策略执行分析所依赖的订单数据是否与交易所一致。
- 只校验有 entry_order_id 的记录(已平仓的还会校验 exit_order_id
- 每条记录会请求币安 futures_get_order 核对订单是否存在、symbol/side/数量是否一致。
- 返回汇总(一致/缺失/不一致数量)与明细,便于排查对不上的记录。
"""
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
end_ts = int(now.timestamp())
start_ts = end_ts - days * 24 * 3600
trades = Trade.get_all(
account_id=account_id,
start_timestamp=start_ts,
end_timestamp=end_ts,
limit=10000,
)
# 只校验「可对账」记录:有 entry_order_id若已平仓则还须有 exit_order_id
def _has_entry(eid):
return eid is not None and str(eid).strip() not in ("", "0")
def _has_exit(xid):
return xid is not None and str(xid).strip() not in ("", "0")
to_verify = [
t for t in trades
if _has_entry(t.get("entry_order_id"))
and (t.get("status") != "closed" or _has_exit(t.get("exit_order_id")))
][:limit]
if not to_verify:
return {
"success": True,
"account_id": account_id,
"summary": {"total_verified": 0, "entry_ok": 0, "entry_missing": 0, "entry_mismatch": 0, "exit_ok": 0, "exit_missing": 0, "exit_mismatch": 0},
"details": [],
"message": "该时间范围内没有可对账记录(需有 entry_order_id已平仓需有 exit_order_id",
}
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if not api_key or not api_secret:
raise HTTPException(status_code=400, detail=f"账号 {account_id} 未配置 API 密钥,无法请求币安")
trading_system_path = project_root / "trading_system"
if not trading_system_path.exists():
trading_system_path = project_root / "backend" / "trading_system"
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(trading_system_path))
try:
from binance_client import BinanceClient
except ImportError:
raise HTTPException(status_code=500, detail="无法导入 BinanceClient")
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
summary = {"total_verified": 0, "entry_ok": 0, "entry_missing": 0, "entry_mismatch": 0, "exit_ok": 0, "exit_missing": 0, "exit_mismatch": 0}
details = []
for t in to_verify:
tid = t.get("id")
symbol = t.get("symbol") or ""
eid = t.get("entry_order_id")
xid = t.get("exit_order_id")
status_t = t.get("status") or "open"
side_t = t.get("side") or ""
qty_t = float(t.get("quantity") or 0)
entry_price_t = float(t.get("entry_price") or 0)
row = {
"trade_id": tid,
"symbol": symbol,
"side": side_t,
"status": status_t,
"entry_order_id": eid,
"exit_order_id": xid,
"entry_verified": None,
"entry_message": None,
"exit_verified": None,
"exit_message": None,
}
# 校验开仓订单
if _has_entry(eid):
summary["total_verified"] += 1
try:
order = await client.client.futures_get_order(symbol=symbol, orderId=int(eid))
if not order:
summary["entry_missing"] += 1
row["entry_verified"] = False
row["entry_message"] = "币安未返回订单"
else:
ob_side = (order.get("side") or "").upper()
ob_qty = float(order.get("origQty") or order.get("executedQty") or 0)
ob_price = float(order.get("avgPrice") or 0)
if ob_side != side_t or abs(ob_qty - qty_t) > 1e-8:
summary["entry_mismatch"] += 1
row["entry_verified"] = False
row["entry_message"] = f"币安 side={ob_side} qty={ob_qty}DB side={side_t} qty={qty_t}"
else:
summary["entry_ok"] += 1
row["entry_verified"] = True
row["entry_message"] = "一致"
except Exception as ex:
err = str(ex)
if "Unknown order" in err or "-2011" in err or "404" in err.lower():
summary["entry_missing"] += 1
row["entry_verified"] = False
row["entry_message"] = "币安无此订单"
else:
summary["entry_mismatch"] += 1
row["entry_verified"] = False
row["entry_message"] = err[:200]
# 校验平仓订单(仅已平仓且存在 exit_order_id
if status_t == "closed" and _has_exit(xid):
try:
order = await client.client.futures_get_order(symbol=symbol, orderId=int(xid))
if not order:
summary["exit_missing"] += 1
row["exit_verified"] = False
row["exit_message"] = "币安未返回订单"
else:
ob_side = (order.get("side") or "").upper()
ob_qty = float(order.get("executedQty") or order.get("origQty") or 0)
if ob_side != side_t or abs(ob_qty - qty_t) > 1e-8:
summary["exit_mismatch"] += 1
row["exit_verified"] = False
row["exit_message"] = f"币安 side={ob_side} qty={ob_qty}DB side={side_t} qty={qty_t}"
elif not order.get("reduceOnly"):
summary["exit_mismatch"] += 1
row["exit_verified"] = False
row["exit_message"] = "币安订单非 reduceOnly非平仓单"
else:
summary["exit_ok"] += 1
row["exit_verified"] = True
row["exit_message"] = "一致"
except Exception as ex:
err = str(ex)
if "Unknown order" in err or "-2011" in err or "404" in err.lower():
summary["exit_missing"] += 1
row["exit_verified"] = False
row["exit_message"] = "币安无此订单"
else:
summary["exit_mismatch"] += 1
row["exit_verified"] = False
row["exit_message"] = err[:200]
details.append(row)
await asyncio.sleep(0.05)
return {
"success": True,
"account_id": account_id,
"summary": summary,
"details": details,
"message": f"已校验 {summary['total_verified']} 条开仓订单,开仓一致 {summary['entry_ok']}、缺失 {summary['entry_missing']}、不一致 {summary['entry_mismatch']};平仓一致 {summary['exit_ok']}、缺失 {summary['exit_missing']}、不一致 {summary['exit_mismatch']}",
}
finally:
await client.disconnect()