feat(trades, database): 增强订单同步与记录完善逻辑
在 `trades.py` 中更新 `sync_trades_from_binance` 方法,确保使用当前账号的 API 密钥进行订单同步,并优化了日志记录以反映同步状态。新增自动全量同步逻辑,处理无记录情况下的补全需求。更新 `database/models.py` 中的 `update_pending_by_entry_order_id` 方法,提供兜底机制以完善 pending 记录,确保在缺失 clientOrderId 时仍能更新交易状态。此改动提升了交易记录的完整性与系统的稳定性。
This commit is contained in:
parent
44458dca90
commit
7139b5de76
|
|
@ -421,25 +421,33 @@ async def sync_trades_from_binance(
|
|||
sys.path.insert(0, str(project_root))
|
||||
|
||||
from binance_client import BinanceClient
|
||||
import config
|
||||
from database.models import DEFAULT_ACCOUNT_ID
|
||||
|
||||
aid = account_id or DEFAULT_ACCOUNT_ID
|
||||
api_key, api_secret, use_testnet, status = Account.get_credentials(aid)
|
||||
if not api_key or not api_secret:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"账号 {aid} 未配置 API 密钥,无法从币安同步订单。请在配置中设置该账号的 BINANCE_API_KEY 和 BINANCE_API_SECRET。",
|
||||
)
|
||||
|
||||
# 计算时间范围(秒,供 DB 查询)
|
||||
end_time_ms = int(time.time() * 1000)
|
||||
start_time_ms = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
|
||||
start_ts_sec = start_time_ms // 1000
|
||||
end_ts_sec = end_time_ms // 1000
|
||||
|
||||
# 初始化客户端(提前初始化,用于获取所有交易对)
|
||||
# 使用当前账号的 API 初始化客户端(保证同步的是当前账号的订单)
|
||||
client = BinanceClient(
|
||||
api_key=config.BINANCE_API_KEY,
|
||||
api_secret=config.BINANCE_API_SECRET,
|
||||
testnet=config.USE_TESTNET
|
||||
api_key=api_key,
|
||||
api_secret=api_secret,
|
||||
testnet=use_testnet,
|
||||
)
|
||||
await client.connect()
|
||||
|
||||
|
||||
# 获取需要同步的 symbol 列表
|
||||
symbol_list = []
|
||||
auto_full_sync = False # 是否因 DB 无记录而自动全量
|
||||
try:
|
||||
if sync_all_symbols:
|
||||
# 如果用户选择同步所有交易对,从币安获取所有 USDT 永续合约
|
||||
|
|
@ -449,24 +457,28 @@ async def sync_trades_from_binance(
|
|||
logger.info(f"从币安获取到 {len(symbol_list)} 个 USDT 永续合约交易对")
|
||||
else:
|
||||
# 默认策略:仅对 DB 中有记录的 symbol 拉取订单
|
||||
# 先尝试用 "both" 过滤,确保能找到所有相关记录(包括3天前开仓但最近平仓的)
|
||||
trades_in_range = Trade.get_all(
|
||||
start_timestamp=start_ts_sec,
|
||||
end_timestamp=end_ts_sec,
|
||||
account_id=account_id or DEFAULT_ACCOUNT_ID,
|
||||
time_filter="both", # 使用 both 确保能找到所有相关记录
|
||||
account_id=aid,
|
||||
time_filter="both",
|
||||
)
|
||||
symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")})
|
||||
logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对")
|
||||
|
||||
# 如果时间范围内没有记录,尝试获取所有有记录的 symbol(用于补全历史订单号)
|
||||
|
||||
# 如果时间范围内没有记录,再尝试「所有有记录的 symbol」
|
||||
if not symbol_list:
|
||||
logger.info(f"时间范围内({days}天)无记录,尝试获取所有有记录的 symbol 用于补全订单号")
|
||||
all_trades = Trade.get_all(
|
||||
account_id=account_id or DEFAULT_ACCOUNT_ID,
|
||||
)
|
||||
all_trades = Trade.get_all(account_id=aid)
|
||||
symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")})
|
||||
logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)} 个")
|
||||
|
||||
# 若仍然无 symbol(DB 完全无记录),自动按全量同步,否则同步结果为 0 条
|
||||
if not symbol_list:
|
||||
logger.info("DB 中该账号无任何交易记录,自动按全量交易对从币安拉取订单以补全记录")
|
||||
all_symbols = await client.get_all_usdt_pairs()
|
||||
symbol_list = list(all_symbols) if all_symbols else []
|
||||
auto_full_sync = True
|
||||
logger.info(f"自动全量:从币安获取到 {len(symbol_list)} 个 USDT 永续合约交易对")
|
||||
except Exception as e:
|
||||
logger.warning(f"获取 symbol 列表失败: {e}", exc_info=True)
|
||||
await client.disconnect()
|
||||
|
|
@ -557,11 +569,14 @@ async def sync_trades_from_binance(
|
|||
|
||||
# 按时间排序,从旧到新
|
||||
all_orders.sort(key=lambda x: x.get('time', 0))
|
||||
|
||||
|
||||
# 全量或自动全量时,对无法匹配的开仓订单也创建新记录
|
||||
effective_sync_all = sync_all_symbols or auto_full_sync
|
||||
|
||||
# 先处理平仓订单(reduceOnly),再处理开仓订单
|
||||
close_orders = [o for o in all_orders if o.get('reduceOnly', False)]
|
||||
open_orders = [o for o in all_orders if not o.get('reduceOnly', False)]
|
||||
|
||||
|
||||
logger.info(f"开始同步:平仓订单 {len(close_orders)} 个,开仓订单 {len(open_orders)} 个")
|
||||
|
||||
# 1. 处理平仓订单
|
||||
|
|
@ -597,12 +612,12 @@ async def sync_trades_from_binance(
|
|||
continue
|
||||
|
||||
# 查找数据库中该交易对的 open 状态记录(仅当前账号),或已平仓但 exit_order_id 为空的记录
|
||||
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id or DEFAULT_ACCOUNT_ID)
|
||||
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=aid)
|
||||
closed_trades_no_exit_id = []
|
||||
if not existing_trade and not open_trades:
|
||||
# 如果没有 open 记录,查找已平仓但 exit_order_id 为空的记录
|
||||
try:
|
||||
closed_trades = Trade.get_by_symbol(symbol, status='closed', account_id=account_id or DEFAULT_ACCOUNT_ID)
|
||||
closed_trades = Trade.get_by_symbol(symbol, status='closed', account_id=aid)
|
||||
closed_trades_no_exit_id = [
|
||||
t for t in closed_trades
|
||||
if not t.get("exit_order_id") or str(t.get("exit_order_id")).strip() in ("", "0")
|
||||
|
|
@ -735,7 +750,7 @@ async def sync_trades_from_binance(
|
|||
)
|
||||
else:
|
||||
# 没有找到匹配的记录
|
||||
if sync_all_symbols:
|
||||
if effective_sync_all:
|
||||
# 如果启用了同步所有交易对,尝试创建完整的交易记录(开仓+平仓)
|
||||
try:
|
||||
# 查找是否有对应的开仓订单(通过时间窗口和价格匹配)
|
||||
|
|
@ -784,7 +799,7 @@ async def sync_trades_from_binance(
|
|||
time_window_end = order_time_sec + 3600
|
||||
|
||||
trades_no_entry_id = Trade.get_all(
|
||||
account_id=account_id or DEFAULT_ACCOUNT_ID,
|
||||
account_id=aid,
|
||||
symbol=symbol,
|
||||
start_timestamp=time_window_start,
|
||||
end_timestamp=time_window_end,
|
||||
|
|
@ -822,19 +837,10 @@ async def sync_trades_from_binance(
|
|||
logger.info(f"✓ 补全开仓订单号: {symbol} (ID: {matched_trade['id']}, orderId: {order_id}, qty={order_qty}, price={order_price:.4f})")
|
||||
else:
|
||||
logger.debug(f"补全开仓订单号失败(可能已有订单号): {symbol} (ID: {matched_trade['id']}, orderId: {order_id})")
|
||||
elif sync_all_symbols:
|
||||
# 如果启用了同步所有交易对,且无法匹配到现有记录,创建新记录
|
||||
elif effective_sync_all:
|
||||
# 全量/自动全量:无法匹配到现有记录时创建新记录
|
||||
try:
|
||||
# 从订单信息中提取杠杆(如果有)
|
||||
leverage = 10 # 默认杠杆
|
||||
try:
|
||||
# 尝试从订单的 positionSide 或其他字段获取杠杆信息
|
||||
# 如果没有,使用默认值
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
# 创建新的交易记录
|
||||
leverage = 10
|
||||
trade_id = Trade.create(
|
||||
symbol=symbol,
|
||||
side=order_side,
|
||||
|
|
@ -844,7 +850,7 @@ async def sync_trades_from_binance(
|
|||
entry_reason='sync_from_binance',
|
||||
entry_order_id=order_id,
|
||||
client_order_id=order.get('clientOrderId'),
|
||||
account_id=account_id or DEFAULT_ACCOUNT_ID,
|
||||
account_id=aid,
|
||||
status='open', # 先标记为 open,如果后续有平仓订单会更新
|
||||
)
|
||||
created_count += 1
|
||||
|
|
@ -859,9 +865,12 @@ async def sync_trades_from_binance(
|
|||
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
|
||||
continue
|
||||
|
||||
msg = f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,创建了 {created_count} 条新记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个"
|
||||
if auto_full_sync:
|
||||
msg += "(因 DB 无记录已自动按全量交易对拉取)"
|
||||
result = {
|
||||
"success": True,
|
||||
"message": f"同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录,创建了 {created_count} 条新记录,补全了 {entry_order_id_filled} 个开仓订单号,{exit_order_id_filled} 个平仓订单号。跳过已完整同步 {skipped_existing} 个,无法匹配 {skipped_no_match} 个",
|
||||
"message": msg,
|
||||
"total_orders": len(all_orders),
|
||||
"updated_trades": updated_count,
|
||||
"created_trades": created_count,
|
||||
|
|
|
|||
|
|
@ -935,6 +935,43 @@ class Trade:
|
|||
logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def update_pending_by_entry_order_id(symbol: str, account_id: int, entry_order_id, entry_price: float, quantity: float):
|
||||
"""
|
||||
UDS 开仓 FILLED 但无 clientOrderId 时的兜底:用 orderId 完善一条 pending 记录。
|
||||
若 DB 中已有该 entry_order_id 则跳过;否则在该 symbol+account 下找「status 为 pending 且 entry_order_id 为空」的记录,
|
||||
若恰好 1 条则用 orderId/价格/数量更新(避免误匹配多笔 pending 时只更新一条)。
|
||||
"""
|
||||
if not symbol or account_id is None or entry_order_id is None:
|
||||
return False
|
||||
try:
|
||||
existing = Trade.get_by_entry_order_id(entry_order_id)
|
||||
if existing:
|
||||
return True # 已存在该订单号,无需兜底
|
||||
if not _table_has_column("trades", "account_id"):
|
||||
return False
|
||||
# 查该 symbol+account 下 pending 且无 entry_order_id 的记录
|
||||
rows = db.execute_query(
|
||||
"""SELECT id FROM trades
|
||||
WHERE account_id = %s AND symbol = %s AND status = 'pending'
|
||||
AND (entry_order_id IS NULL OR entry_order_id = 0)
|
||||
ORDER BY id DESC""",
|
||||
(int(account_id), symbol.strip())
|
||||
)
|
||||
if not rows or len(rows) != 1:
|
||||
return False
|
||||
tid = rows[0]["id"]
|
||||
db.execute_update(
|
||||
"""UPDATE trades SET entry_order_id = %s, entry_price = %s, quantity = %s, status = 'open'
|
||||
WHERE id = %s""",
|
||||
(str(entry_order_id), float(entry_price), float(quantity), int(tid))
|
||||
)
|
||||
logger.info(f"Trade.update_pending_by_entry_order_id: 已用 orderId={entry_order_id} 兜底完善 pending 记录 id={tid} symbol={symbol!r}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"update_pending_by_entry_order_id 失败 symbol={symbol!r} orderId={entry_order_id}: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def update_entry_order_id(trade_id: int, entry_order_id):
|
||||
"""补全或更新开仓订单号(用于 REST 兜底补全)"""
|
||||
|
|
|
|||
71
docs/订单入库演变说明.md
Normal file
71
docs/订单入库演变说明.md
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
# 订单记录入库方式演变说明(为何 12 号下午之前能对上、后来对不上)
|
||||
|
||||
## 时间线(按 Git 提交)
|
||||
|
||||
- **2月12日及之前**:订单能对上一些。
|
||||
- **2月12日下午**:提交 `0df841c` 只改了止盈比例和配置/前端,**未改入库逻辑**。
|
||||
- **2月14日**:`3d9f58f`(使用自定义订单号)、`11cd55f`(client_order_id 支持)—— 下单带 `newClientOrderId`,补建逻辑收紧。
|
||||
- **2月16日**:`5154b49` 引入 **pending 预落库** + UDS 按 `client_order_id` 完善。
|
||||
- **2月17日**:`415589e` 等增强 UDS/同步;UDS 开仓 FILLED 时**无 clientOrderId 直接 return**,不完善。
|
||||
|
||||
## 12 号下午之前的入库方式(能对上的原因)
|
||||
|
||||
1. **没有 pending 预落库**
|
||||
开仓流程是:REST 下单 → 轮询等待成交 → 成交后**直接** `Trade.create(entry_order_id=order.get("orderId"), client_order_id=order.get("clientOrderId"), ...)`。
|
||||
|
||||
2. **完全依赖 REST 返回**
|
||||
- `entry_order_id` 来自 REST 的 `orderId`,一定能拿到。
|
||||
- `client_order_id` 来自 REST 的 `clientOrderId`,当时未传 `newClientOrderId`,可能是币安自动生成或空。
|
||||
|
||||
3. **User Data Stream 不参与开仓落库**
|
||||
当时 UDS 主要做平仓回写;开仓是否落库只取决于 **position_manager 里 REST 成交后那一次 `Trade.create`**,链路简单,所以容易和币安对上。
|
||||
|
||||
## 后来的改动(导致对不上的可能原因)
|
||||
|
||||
1. **2月14日 3d9f58f**
|
||||
- 下单时带 `newClientOrderId`(`SYSTEM_ORDER_ID_PREFIX` + 时间戳)。
|
||||
- 补建缺失持仓逻辑收紧:只补建「clientOrderId 带系统前缀」或「有止损/止盈单」的仓位;手动单、无前缀单不再自动建记录。
|
||||
|
||||
2. **2月16日 5154b49**
|
||||
- **先落 pending**:开仓前 `Trade.create(status="pending", client_order_id=..., entry_order_id=None)`。
|
||||
- 下单时把 `client_order_id` 传给币安(`new_client_order_id`)。
|
||||
- 成交后:优先用 **UDS 推送的 clientOrderId** 找到 pending,再 `update_pending_to_filled` 填 `entry_order_id`/价格/数量;若 UDS 未收到或推送里**没有 clientOrderId**,则跳过完善,pending 会一直缺 `entry_order_id`。
|
||||
- REST 端成交后也有「有 pending 则完善、否则新建」的兜底,但若 REST 超时/异常,就只剩「未完善的 pending」或没记录。
|
||||
|
||||
3. **2月17日 415589e**
|
||||
- UDS 开仓 FILLED:**若没有 clientOrderId,直接 return**,不尝试用 orderId 做任何回填。
|
||||
|
||||
综合效果:
|
||||
- 一旦 **UDS 断连、丢包或推送里缺少 clientOrderId**,pending 无法被 UDS 完善。
|
||||
- 若 **REST 端也失败**(超时、异常、进程重启等),就会出现「币安有成交、系统只有 pending 或无记录」的情况,对不上。
|
||||
|
||||
## 已做的改进(本次)
|
||||
|
||||
1. **同步接口按当前账号 API 拉取**
|
||||
使用 `Account.get_credentials(account_id)`,不再用全局 config,多账号/多配置时订单能对到正确账号。
|
||||
|
||||
2. **DB 无记录时自动全量**
|
||||
若该账号在 DB 中没有任何交易记录,同步时会自动按「全量交易对」从币安拉订单并创建/补全记录。
|
||||
|
||||
3. **UDS 无 clientOrderId 时用 orderId 兜底**
|
||||
- 新增 `Trade.update_pending_by_entry_order_id(symbol, account_id, order_id, entry_price, quantity)`:
|
||||
- 若 DB 中已有该 `entry_order_id` 则跳过;
|
||||
- 否则在该 symbol+account 下找「status=pending 且 entry_order_id 为空」的记录,**若恰好 1 条**则用本订单号/价格/数量更新。
|
||||
- 在 `user_data_stream` 中:开仓 FILLED 时若**没有 client_order_id**,不再直接 return,而是先尝试上述兜底;只有兜底也匹配不到唯一 pending 时才仅打 debug 日志。
|
||||
|
||||
这样即使 UDS 未带 clientOrderId(或 listenKey 断连导致漏推),只要该 symbol 下只有一条待完善的 pending,仍能用 orderId 补上,减少「币安有、系统没有」的情况。
|
||||
|
||||
## 建议自检
|
||||
|
||||
1. **配置**
|
||||
- 当前账号是否已配置 BINANCE_API_KEY / BINANCE_API_SECRET(同步用的是当前选中账号的 API)。
|
||||
- 若使用「系统单」前缀:`SYSTEM_ORDER_ID_PREFIX` 是否与下单时一致(影响补建和区分系统单)。
|
||||
|
||||
2. **对账**
|
||||
- 在前端选好账号后,用「同步订单」勾选「全量同步」、选 7 天或 30 天,先补全历史。
|
||||
- 再用「校验与币安一致性」接口查看是否还有缺失/不一致。
|
||||
|
||||
3. **日志**
|
||||
- 若仍有漏记,可查:
|
||||
- UDS:开仓 FILLED 是否带 `c`(clientOrderId)、是否有「已用 orderId 兜底完善」。
|
||||
- 同步/补建:是否有「创建新交易记录」或「补全 entry_order_id」的日志。
|
||||
|
|
@ -372,6 +372,9 @@ const TradeList = () => {
|
|||
border: '1px solid #ddd'
|
||||
}}>
|
||||
<div style={{ fontSize: '12px', fontWeight: 'bold', color: '#666', marginBottom: '4px' }}>订单同步</div>
|
||||
<p style={{ fontSize: '11px', color: '#888', margin: '0 0 6px 0' }}>
|
||||
同步的是当前选中账号的币安订单。若系统里几乎没有记录,请勾选「全量同步」并选 7 天或 30 天。
|
||||
</p>
|
||||
<div style={{ display: 'flex', alignItems: 'center', gap: '8px', flexWrap: 'wrap' }}>
|
||||
<select
|
||||
value={syncDays}
|
||||
|
|
|
|||
|
|
@ -404,11 +404,8 @@ class UserDataStream:
|
|||
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交)
|
||||
if event_type and event_type != "TRADE":
|
||||
return
|
||||
# 开仓成交:完善 pending 记录
|
||||
# 开仓成交:完善 pending 记录(优先按 client_order_id,无 c 时用 orderId 兜底)
|
||||
if not reduce_only:
|
||||
if not client_order_id:
|
||||
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,跳过完善 orderId={order_id} symbol={symbol!r}")
|
||||
return
|
||||
try:
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
|
@ -417,15 +414,23 @@ class UserDataStream:
|
|||
if backend_path.exists():
|
||||
sys.path.insert(0, str(backend_path))
|
||||
from database.models import Trade
|
||||
updated = Trade.update_pending_to_filled(
|
||||
client_order_id, self.account_id, order_id, ap_f, z_f
|
||||
)
|
||||
if updated:
|
||||
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
|
||||
if client_order_id:
|
||||
updated = Trade.update_pending_to_filled(
|
||||
client_order_id, self.account_id, order_id, ap_f, z_f
|
||||
)
|
||||
if updated:
|
||||
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
|
||||
else:
|
||||
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}")
|
||||
else:
|
||||
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善)client_order_id={client_order_id!r} orderId={order_id}")
|
||||
# 无 clientOrderId 时用 orderId 兜底:若该 symbol+account 下仅有一条 pending 且无 entry_order_id,则用本订单号完善
|
||||
updated = Trade.update_pending_by_entry_order_id(symbol, self.account_id, order_id, ap_f, z_f)
|
||||
if updated:
|
||||
logger.info(f"UserDataStream: 开仓成交已用 orderId 兜底完善 orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
|
||||
else:
|
||||
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId,且兜底未匹配到唯一 pending orderId={order_id} symbol={symbol!r}")
|
||||
except Exception as ex:
|
||||
logger.warning(f"UserDataStream: update_pending_to_filled 失败 client_order_id={client_order_id!r}: {ex}")
|
||||
logger.warning(f"UserDataStream: 开仓成交完善失败 orderId={order_id}: {ex}")
|
||||
else:
|
||||
# 平仓成交:按 symbol 回写 open 记录的 exit_order_id;若有 rp 可记入日志
|
||||
if rp is not None:
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user