feat(trade, binance_client, position_manager, user_data_stream): 增强待处理记录对账逻辑

在 `models.py` 中新增 `get_pending_recent` 方法,用于获取最近的待处理交易记录。`binance_client.py` 中添加 `get_order_by_client_order_id` 方法,以支持按 `client_order_id` 查询订单。`position_manager.py` 中实现 `_reconcile_pending_with_binance` 方法,增强对待处理记录的对账能力。`user_data_stream.py` 中在重连前执行待处理记录对账,确保系统在断线期间的交易状态得到及时更新。这些改进提升了系统的稳定性与交易记录的准确性。
This commit is contained in:
薇薇安 2026-02-21 11:09:01 +08:00
parent a371e50a3e
commit e4e6e64608
6 changed files with 343 additions and 8 deletions

View File

@ -950,6 +950,37 @@ class Trade:
logger.debug(f"get_trades_missing_entry_order_id 失败: {e}")
return []
@staticmethod
def get_pending_recent(account_id: int, limit: int = 50, max_age_sec: int = 86400):
"""
获取最近时间范围内 status=pending 的记录用于正向流程 pending 对账
返回有 client_order_id entry_order_id 的记录 id 降序
"""
if account_id is None:
return []
try:
if not _table_has_column("trades", "account_id"):
return []
if not _table_has_column("trades", "client_order_id") and not _table_has_column("trades", "entry_order_id"):
return []
import time
cutoff_ts = int(time.time()) - max(1, int(max_age_sec))
# 用 created_at 或 entry_time 作为时间过滤created_at 更准确表示插入时间)
time_col = "created_at" if _table_has_column("trades", "created_at") else "entry_time"
query = f"""
SELECT * FROM trades
WHERE account_id = %s AND status = 'pending'
AND {time_col} >= %s
AND (client_order_id IS NOT NULL AND client_order_id != ''
OR entry_order_id IS NOT NULL AND entry_order_id != 0 AND entry_order_id != '')
ORDER BY id DESC
LIMIT %s
"""
return db.execute_query(query, (int(account_id), cutoff_ts, int(limit)))
except Exception as e:
logger.debug(f"get_pending_recent 失败: {e}")
return []
@staticmethod
def get_by_client_order_id(client_order_id, account_id: int = None):
"""根据 clientOrderId 获取交易记录(可选按 account_id 隔离)"""

View File

@ -0,0 +1,172 @@
# 正向流程漏洞分析与加固方案
> 目标弄清楚正常流程为什么会出现「币安有持仓、DB 无 open 记录」,并完善正向流程,使正常流程掌控全局。
## 一、正常流程梳理
### 1.1 开仓链路(简化)
```
strategy.open_position()
→ 创建 pending 记录 (DB, client_order_id)
→ place_order 下单币安
→ _wait_for_order_filled() REST 轮询
→ 若 FILLEDupdate_pending_to_filled()update_open_fields()
→ 若超时/CANCELED撤单返回 None
```
### 1.2 两条更新路径
| 路径 | 触发点 | 说明 |
|------|--------|------|
| **REST 路径** | `open_position``_wait_for_order_filled` 返回 ok | 进程存活、未超时 |
| **WS 路径** | `UserDataStream._on_order_trade_update` 收到 ORDER_TRADE_UPDATE | 需 listenKey 连接正常 |
正常情况下REST 和 WS 都会尝试更新;`update_pending_to_filled` 幂等,重复更新无害。
---
## 二、遗漏场景分析
### 2.1 进程崩溃 / 重启(最主要)
**场景**
- 创建 pending下单币安进入 `_wait_for_order_filled` 轮询
- 进程在轮询过程中崩溃OOM、kill、异常退出
- 订单在币安成交
- 重启后:
- REST 路径不会再执行(`open_position` 已结束)
- WS 不会重放历史推送,`ORDER_TRADE_UPDATE` 已丢失
**结果**pending 长期残留币安有持仓DB 无 open。
### 2.2 WS 断线期间成交
**场景**
- listenKey 失效 / 网络抖动WS 断开
- 在断开期间订单成交,未收到 ORDER_TRADE_UPDATE
- REST 路径:若 `open_position` 仍在运行,轮询会拿到 FILLED可以更新若已超时返回则不会更新
**结论**只有在「WS 断线 + REST 已超时返回」时,才会出现漏记。`open_position` 中限价单超时后会撤单,一般不留下持仓;但若存在「撤单与成交」的竞态,仍可能漏记。
### 2.3 listenKey 失效
- 60 分钟无 keepalive 会失效
- 文档建议 24 小时主动重连
- 失效期间新连接不会重放历史事件
效果同 2.2。
### 2.4 重连空窗期
- 断线 → 60s 后重连 → 新 listenKey
- 空窗期内的成交事件永久丢失
- 若该期间 REST 也未完成轮询(例如进程崩溃),则必然漏记
### 2.5 update_pending_to_filled 异常
- 若 `Trade.update_pending_to_filled` 抛异常,`_on_order_trade_update` 会 catch 并打日志pending 保持
- REST 路径若在调用前崩溃,则完全依赖 WSWS 路径若异常,则完全依赖 REST
- 任一路径失败且另一路径也失败,则漏记
### 2.6 竞态:撤单 vs 成交
- 限价单超时,调用 `cancel_order`
- 若撤单请求发出时订单刚好成交,撤单可能失败
- 当前逻辑:超时则 `return None`pending 保留,不会执行 `update_pending_to_filled`
- **结果**pending 残留 + 币安已成交,属于漏记
---
## 三、当前 sync_positions_with_binance 的覆盖范围
| 情况 | 是否处理 |
|------|----------|
| DB open、币安无 | ✅ 更新 DB 为 closed |
| DB open、币安有 | ✅ 加载到 active_positions |
| 币安有、DB 无 open | ❌ 不处理(依赖 SYNC_RECOVER_MISSING_POSITIONS |
| DB pending、币安订单已 FILLED | ❌ 不处理 |
当前同步逻辑不包含「pending 对账」:不会主动查币安订单状态,也不会把已成交的 pending 转为 open。
---
## 四、正向流程加固方案
### 4.1 思路
不依赖补建SYNC_RECOVER_MISSING_POSITIONS在正向流程中补齐「pending 对账」能力,使:
- 有 pending 且有 client_order_id / entry_order_id 时,主动查币安订单状态
- 若已 FILLED则执行 `update_pending_to_filled`,将 pending 转为 open
### 4.2 加固点 1WS 重连后 pending 对账(推荐)
**位置**`user_data_stream.py``_run_ws` 重连成功后
**逻辑**
- 重连成功后,查询当前账号下 status=pending 且有 client_order_id 的记录(可限制如 24h 内)
- 对每条记录调用币安 REST`futures_get_order(symbol, orderId)` 或按 client_order_id 查
- 若 status=FILLED调用 `Trade.update_pending_to_filled`
**意义**:补齐 WS 断线期间丢失的 ORDER_TRADE_UPDATE。
### 4.3 加固点 2sync_positions_with_binance 中增加 pending 对账(推荐)
**位置**`position_manager.sync_positions_with_binance`
**逻辑**
- 在现有「DB open vs 币安持仓」同步之外,增加:
- 查询 `Trade.get_pending_recent(account_id, limit=50, max_age_sec=86400)`(需在 models 中新增)
- 对每条 pending若存在 client_order_id 或 entry_order_id查币安订单
- 若 FILLED`update_pending_to_filled``update_pending_by_entry_order_id`
**意义**周期性兜底覆盖进程重启、WS 漏推等场景。
### 4.4 加固点 3撤单后校验是否已成交可选
**位置**`position_manager.open_position`,在 `cancel_order` 之后
**逻辑**
- 撤单后(或撤单异常时),再查一次订单状态
- 若 status=FILLED则按 REST 路径正常执行 `update_pending_to_filled`,避免竞态漏记
**意义**:消除「撤单与成交」竞态导致的漏记。
### 4.5 加固点 4update_pending_to_filled robustness
- 保持幂等(当前已满足)
- 异常时记录清晰日志,便于排查
- 可选:对瞬时 DB 异常做有限次重试
---
## 五、实现优先级建议
| 优先级 | 加固点 | 影响 | 复杂度 |
|--------|--------|------|--------|
| P0 | sync_positions_with_binance 中 pending 对账 | 覆盖进程重启、WS 漏推等主要漏记 | 中 |
| P0 | WS 重连后 pending 对账 | 覆盖断线期间的漏推 | 中 |
| P1 | 撤单后校验是否已成交 | 消除竞态漏记 | 低 |
| P2 | update_pending_to_filled 重试与日志 | 提升可靠性 | 低 |
---
## 六、需要的 DB / API 支持
1. **Trade 模型**
- `get_pending_recent(account_id, limit, max_age_sec)`:返回指定时间范围内的 pending 记录,用于对账
2. **币安 API**
- 按 `clientOrderId` 查询:`futures_get_all_orders(symbol, limit)` 过滤,或使用支持 `origClientOrderId` 的接口(如有)
- 按 `orderId` 查询:`futures_get_order(symbol, orderId)`(已有)
3. **多账号**:以上逻辑需按 `account_id` 隔离,保证对账时使用对应账号的 client / API。
---
## 七、小结
- 主要漏记来自:**进程崩溃 + 订单已成交**,以及 **WS 断线期间的成交**
- 正向流程目前缺少「pending 对账」:不会主动用币安订单状态修正 pending。
- 加固方向:在 **WS 重连后****sync_positions_with_binance** 中加入 pending 对账,使正常流程在运行中即可发现并修正漏记,而不依赖单独的补建逻辑。

View File

@ -2329,6 +2329,24 @@ class BinanceClient:
logger.debug(f"取消 Algo 条件单失败 algoId={algo_id}: {e}")
return False
async def get_order_by_client_order_id(self, symbol: str, client_order_id: str) -> Optional[Dict]:
"""
origClientOrderId 查询订单用于 pending 对账
返回订单信息或 None不存在/异常
"""
if not symbol or not (client_order_id or "").strip():
return None
try:
info = await self.client.futures_get_order(
symbol=symbol,
origClientOrderId=(client_order_id or "").strip(),
recvWindow=20000,
)
return info if isinstance(info, dict) else None
except Exception as e:
logger.debug(f"{symbol} 按 origClientOrderId 查询失败: {e}")
return None
async def get_open_orders(self, symbol: str) -> List[Dict]:
"""
获取某交易对的未成交委托用于防止重复挂保护单

View File

@ -0,0 +1,85 @@
"""
正向流程加固pending 对账模块
sync_positions_with_binance UserDataStream 重连后调用补齐因 WS 断线/进程重启漏掉的 pendingopen
"""
import logging
logger = logging.getLogger(__name__)
DB_AVAILABLE = False
Trade = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
DB_AVAILABLE = True
except Exception:
pass
async def reconcile_pending_with_binance(client, account_id: int, limit: int = 50, max_age_sec: int = 86400) -> int:
"""
status=pending 记录查币安订单若已 FILLED 则更新为 open
返回成功对账数量
"""
if not DB_AVAILABLE or not Trade:
return 0
try:
pending_list = Trade.get_pending_recent(account_id, limit=limit, max_age_sec=max_age_sec)
if not pending_list:
return 0
reconciled = 0
for row in pending_list:
try:
symbol = (row.get("symbol") or "").strip()
client_order_id = (row.get("client_order_id") or "").strip() or None
entry_order_id = row.get("entry_order_id")
if not symbol:
continue
order_info = None
if client_order_id and hasattr(client, "get_order_by_client_order_id"):
order_info = await client.get_order_by_client_order_id(symbol, client_order_id)
if not order_info and entry_order_id:
try:
oid = int(entry_order_id)
order_info = await client.client.futures_get_order(
symbol=symbol, orderId=oid, recvWindow=20000
)
except Exception:
pass
if not order_info or str(order_info.get("status")).upper() != "FILLED":
continue
ap = order_info.get("avgPrice") or order_info.get("price") or 0
z = order_info.get("executedQty") or 0
try:
ap_f = float(ap)
z_f = float(z)
except (TypeError, ValueError):
continue
if ap_f <= 0 or z_f <= 0:
continue
order_id = order_info.get("orderId")
if client_order_id:
ok = Trade.update_pending_to_filled(
client_order_id, account_id, order_id, ap_f, z_f
)
else:
ok = Trade.update_pending_by_entry_order_id(
symbol, account_id, order_id, ap_f, z_f
)
if ok:
reconciled += 1
logger.info(
f"[账号{account_id}] pending 对账: {symbol} client_order_id={client_order_id!r} "
f"已完善为 open (orderId={order_id} 成交价={ap_f:.4f} 数量={z_f:.4f})"
)
except Exception as e:
logger.debug(f"[账号{account_id}] pending 对账单条失败: {e}")
return reconciled
except Exception as e:
logger.debug(f"[账号{account_id}] pending 对账失败: {e}")
return 0

View File

@ -672,15 +672,21 @@ class PositionManager:
actual_entry_price = float(res.get("avg_price") or 0)
filled_quantity = float(res.get("executed_qty") or 0)
else:
# 未成交NEW/超时/CANCELED 等)属于“策略未触发入场”或“挂单没成交”
# 这不应当当作系统错误同时需要撤单best-effort避免留下悬挂委托造成后续混乱。
logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},跳过本次开仓并撤销挂单")
# 未成交NEW/超时/CANCELED 等):撤单。撤单后校验是否已成交(竞态:撤单瞬间订单可能已成交)
logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},撤销挂单")
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
# 正向流程加固:撤单后校验,若已成交则继续流程
recheck = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=5, poll_sec=0.5)
if recheck.get("ok"):
actual_entry_price = float(recheck.get("avg_price") or 0)
filled_quantity = float(recheck.get("executed_qty") or 0)
logger.info(f"{symbol} [开仓] 撤单后发现已成交,继续完善记录 (成交价={actual_entry_price:.4f} 数量={filled_quantity:.4f})")
else:
self._pending_entry_orders.pop(symbol, None)
return None
if not actual_entry_price or actual_entry_price <= 0:
logger.error(f"{symbol} [开仓] ❌ 无法获取实际成交价格,不保存到数据库")
@ -2395,6 +2401,14 @@ class PositionManager:
logger.error(f"获取持仓摘要失败: {e}")
return {}
async def _reconcile_pending_with_binance(self) -> int:
"""正向流程加固:对 status=pending 记录查币安订单,若已 FILLED 则更新为 open。"""
try:
from .pending_reconcile import reconcile_pending_with_binance
return await reconcile_pending_with_binance(self.client, self.account_id)
except ImportError:
return 0
async def sync_positions_with_binance(self):
"""
同步币安实际持仓状态与数据库状态
@ -2407,6 +2421,11 @@ class PositionManager:
try:
logger.info("开始同步币安持仓状态与数据库...")
# 0. 正向流程加固pending 对账(补齐因 WS 断线/进程重启漏掉的 pending→open
n = await self._reconcile_pending_with_binance()
if n > 0:
logger.info(f"[账号{self.account_id}] pending 对账完成,{n} 条已更新为 open")
# 1. 获取币安实际持仓
binance_positions = await self._get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions}
@ -3099,7 +3118,7 @@ class PositionManager:
missing_in_db = binance_symbols - db_open_symbols
if missing_in_db:
logger.info(
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有 open 记录: "
f"[账号{self.account_id}] 发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有 open 记录: "
f"{', '.join(missing_in_db)}"
)
sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False)
@ -3110,7 +3129,7 @@ class PositionManager:
sync_recover = False
sync_create_manual = False
if missing_in_db:
logger.debug(f" ONLY_AUTO_TRADE_CREATES_RECORDS=True跳过补建/手动开仓创建 ({len(missing_in_db)} 个仅币安持仓)")
logger.debug(f"[账号{self.account_id}] ONLY_AUTO_TRADE_CREATES_RECORDS=True跳过补建/手动开仓创建 ({len(missing_in_db)} 个仅币安持仓)")
if sync_recover:
system_order_prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip()
@ -3301,7 +3320,8 @@ class PositionManager:
logger.warning(f"{symbol} [状态同步] 补建失败: {e}")
elif not sync_create_manual:
logger.info(
" → 已跳过自动创建交易记录SYNC_CREATE_MANUAL_ENTRY_RECORD=False, SYNC_RECOVER_MISSING_POSITIONS 未开启)。"
f"[账号{self.account_id}] → 已跳过自动创建交易记录SYNC_CREATE_MANUAL_ENTRY_RECORD=False, "
"SYNC_RECOVER_MISSING_POSITIONS 未开启)。"
" 若确认为本策略开仓可开启 SYNC_RECOVER_MISSING_POSITIONS=True仅补建有止损止盈单的"
)
elif sync_create_manual:

View File

@ -339,6 +339,15 @@ class UserDataStream:
import aiohttp
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
while self._running:
# 正向流程加固:每次重连前 pending 对账(补齐断线期间漏掉的 ORDER_TRADE_UPDATE
if self._listen_key:
try:
from .pending_reconcile import reconcile_pending_with_binance
n = await reconcile_pending_with_binance(self.client, self.account_id, limit=50, max_age_sec=86400)
if n > 0:
logger.info(f"UserDataStream(account_id={self.account_id}): 重连前 pending 对账完成,{n} 条已更新为 open")
except Exception as e:
logger.debug(f"UserDataStream(account_id={self.account_id}): pending 对账失败: {e}")
url = f"{self._ws_base_url()}/{self._listen_key}"
try:
async with aiohttp.ClientSession() as session: