feat(trade, position_manager, user_data_stream): 增强交易记录管理与用户数据流处理

在 `models.py` 中新增 `update_entry_order_id` 方法,用于补全或更新开仓订单号,提升交易记录的完整性。更新 `set_exit_order_id_for_open_trade` 方法以支持按 `entry_order_id` 精确匹配,优化平仓订单的回写逻辑。在 `position_manager.py` 中添加对 `entry_order_id` 的处理,确保在保存交易记录时能够及时补全。更新 `user_data_stream.py` 中的日志记录,提供更详细的状态信息,增强系统的可追溯性与调试能力。
This commit is contained in:
薇薇安 2026-02-17 22:11:36 +08:00
parent 48c3f946cc
commit 415589e625
5 changed files with 174 additions and 11 deletions

View File

@ -935,6 +935,21 @@ class Trade:
logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}")
return False
@staticmethod
def update_entry_order_id(trade_id: int, entry_order_id):
"""补全或更新开仓订单号(用于 REST 兜底补全)"""
if not trade_id or not entry_order_id:
return False
try:
db.execute_update(
"""UPDATE trades SET entry_order_id = %s WHERE id = %s AND (entry_order_id IS NULL OR entry_order_id = '')""",
(str(entry_order_id), int(trade_id))
)
return True
except Exception as e:
logger.warning(f"update_entry_order_id 失败 trade_id={trade_id}: {e}")
return False
@staticmethod
def get_all(start_timestamp=None, end_timestamp=None, symbol=None, status=None, trade_type=None, exit_reason=None, account_id: int = None, time_filter: str = "exit"):
"""
@ -1021,18 +1036,35 @@ class Trade:
)
@staticmethod
def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id) -> bool:
"""ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号(仅更新一条)。"""
def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id, entry_order_id: int = None) -> bool:
"""
ALGO_UPDATE/条件单触发后为指定 symbol 下未填 exit_order_id open 记录补全平仓订单号
优先按 entry_order_id 精确匹配若无则按 symbol 匹配最早的一条 open 记录
"""
if not symbol or account_id is None or exit_order_id is None:
return False
try:
if not _table_has_column("trades", "account_id"):
return False
# 更新一条open 且 (exit_order_id 为空)
# 优先按 entry_order_id 精确匹配(如果提供了 entry_order_id
if entry_order_id:
n = db.execute_update(
"""UPDATE trades SET exit_order_id = %s
WHERE account_id = %s AND symbol = %s AND status = 'open'
AND entry_order_id = %s
AND (exit_order_id IS NULL OR exit_order_id = '')
LIMIT 1""",
(str(exit_order_id), int(account_id), symbol.strip(), str(entry_order_id))
)
if n and n > 0:
logger.debug(f"set_exit_order_id_for_open_trade: 按 entry_order_id={entry_order_id} 精确匹配成功")
return True
# 否则按 symbol 匹配最早的一条 open 记录(按 entry_time 排序)
n = db.execute_update(
"""UPDATE trades SET exit_order_id = %s
WHERE account_id = %s AND symbol = %s AND status = 'open'
AND (exit_order_id IS NULL OR exit_order_id = '')
ORDER BY entry_time ASC
LIMIT 1""",
(str(exit_order_id), int(account_id), symbol.strip())
)

View File

@ -0,0 +1,96 @@
"""
User Data Stream 诊断工具检查 listenKey 创建和 WS 连接状态
"""
import asyncio
import sys
from pathlib import Path
# 添加路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "trading_system"))
sys.path.insert(0, str(project_root / "backend"))
async def check_user_data_stream():
"""检查 User Data Stream 状态"""
import os
from binance_client import BinanceClient
import config
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
print(f"检查账号 {account_id} 的 User Data Stream 状态...")
# 初始化客户端
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
print("\n1. 检查 listenKey 创建...")
listen_key = await client.create_futures_listen_key()
if listen_key:
print(f" ✓ listenKey 创建成功: {listen_key[:20]}...")
else:
print(" ❌ listenKey 创建失败")
print(" 可能原因:")
print(" - API Key 权限不足(需要启用 'Enable Reading''Enable Futures'")
print(" - 网络连接问题")
print(" - IP 白名单限制")
return
print("\n2. 检查 WebSocket 连接...")
try:
import aiohttp
ws_url = f"wss://fstream.binance.com/ws/{listen_key}"
print(f" 连接地址: {ws_url[:50]}...")
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
ws_url,
heartbeat=50,
timeout=aiohttp.ClientTimeout(total=10)
) as ws:
print(" ✓ WebSocket 连接成功")
print(" 等待接收推送消息10秒...")
received_messages = []
try:
async with asyncio.timeout(10):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
received_messages.append(msg.data)
print(f" ✓ 收到消息: {msg.data[:100]}...")
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f" ❌ WebSocket 错误: {msg}")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
print(" ⚠ WebSocket 已关闭")
break
except asyncio.TimeoutError:
pass
if received_messages:
print(f"\n ✓ 成功收到 {len(received_messages)} 条推送消息")
else:
print("\n ⚠ 10秒内未收到任何推送消息可能当前无订单/持仓变化)")
print(" 这是正常的,如果有订单成交或持仓变化,会收到推送")
except Exception as e:
print(f" ❌ WebSocket 连接失败: {e}")
print(f" 错误类型: {type(e).__name__}")
print("\n3. 检查 keepalive...")
ok, code_1125 = await client.keepalive_futures_listen_key(listen_key)
if ok:
print(" ✓ keepalive 成功")
else:
if code_1125:
print(" ❌ keepalive 返回 -1125listenKey 不存在或已过期)")
else:
print(" ❌ keepalive 失败")
await client.disconnect()
print("\n检查完成")
if __name__ == "__main__":
asyncio.run(check_user_data_stream())

View File

@ -358,8 +358,9 @@ async def main():
import os
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
user_data_stream = UserDataStream(client, account_id)
logger.info(f"正在启动 User Data Stream账号 {account_id}...")
if await user_data_stream.start():
logger.info("✓ User Data Stream 已启动(订单/持仓/余额 WS 推送30 分钟 keepalive")
logger.info(f"✓ User Data Stream 已启动(账号 {account_id}订单/持仓/余额 WS 推送30 分钟 keepalive")
# 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存
try:
seed_balance_cache(balance)
@ -369,7 +370,12 @@ async def main():
except Exception as e:
logger.warning(f"播种 WS 缓存失败(将仅用 REST: {e}")
else:
logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓")
logger.warning(f"⚠ User Data Stream 未启动(账号 {account_id}),将仅依赖 REST 同步订单与持仓")
logger.warning(" 可能原因:")
logger.warning(" 1. listenKey 创建失败(检查 API Key 权限:需要 'Enable Reading''Enable Futures'")
logger.warning(" 2. 网络连接问题")
logger.warning(" 3. IP 白名单限制")
logger.warning(" 提示:可运行 python -m trading_system.check_user_data_stream 进行诊断")
user_data_stream = None
# 3.0 市场 WS 多进程共用:选主 + 仅 Leader 建连接,非 Leader 从 Redis 读

View File

@ -832,6 +832,7 @@ class PositionManager:
# 无 pending 或未匹配到:走新建(兜底)
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id
# 如果 REST 已获取到 entry_order_id直接写入否则留空等待 WS 推送或后续同步补全
trade_id = Trade.create(
symbol=symbol,
side=side,
@ -839,7 +840,7 @@ class PositionManager:
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id,
entry_order_id=entry_order_id, # REST 已获取则直接写入
client_order_id=fallback_client_order_id,
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
@ -851,7 +852,27 @@ class PositionManager:
entry_context=entry_context,
account_id=self.account_id,
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
if entry_order_id:
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
else:
logger.warning(f"{symbol} 交易记录已保存但 entry_order_id 为空 (ID: {trade_id}),等待 WS 推送或后续同步补全")
# 如果有 client_order_id尝试通过 REST 查询订单号补全
if fallback_client_order_id:
try:
# 延迟查询,确保订单已入库
await asyncio.sleep(1)
orders = await self.client.client.futures_get_all_orders(
symbol=symbol, limit=10, recvWindow=10000
)
for o in orders or []:
if str(o.get("clientOrderId", "")).strip() == fallback_client_order_id:
found_order_id = o.get("orderId")
if found_order_id:
Trade.update_entry_order_id(trade_id, found_order_id)
logger.info(f"{symbol} 已通过 REST 补全 entry_order_id: {found_order_id}")
break
except Exception as e:
logger.debug(f"{symbol} REST 补全 entry_order_id 失败: {e}")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")

View File

@ -202,7 +202,7 @@ class UserDataStream:
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
self._ws = ws
self._conn_start_time = time.monotonic()
logger.info("UserDataStream: WS 已连接")
logger.info(f"UserDataStream(account_id={self.account_id}): WS 已连接,开始接收订单/持仓推送")
async for msg in ws:
if not self._running:
break
@ -294,6 +294,7 @@ class UserDataStream:
# 开仓成交:完善 pending 记录
if not reduce_only:
if not client_order_id:
logger.debug(f"UserDataStream: 开仓订单 FILLED 但无 clientOrderId跳过完善 orderId={order_id} symbol={symbol!r}")
return
try:
import sys
@ -307,9 +308,11 @@ class UserDataStream:
client_order_id, self.account_id, order_id, ap_f, z_f
)
if updated:
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}")
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id} symbol={symbol!r} 成交价={ap_f} 数量={z_f}")
else:
logger.debug(f"UserDataStream: 开仓成交完善失败(可能无 pending 记录或已完善client_order_id={client_order_id!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}")
logger.warning(f"UserDataStream: update_pending_to_filled 失败 client_order_id={client_order_id!r}: {ex}")
else:
# 平仓成交:按 symbol 回写 open 记录的 exit_order_id若有 rp 可记入日志
if rp is not None:
@ -323,8 +326,13 @@ class UserDataStream:
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id):
# 尝试从订单信息中获取关联的开仓订单号(如果有)
# 注意:币安平仓订单推送中可能不包含开仓订单号,这里先按 symbol 匹配
entry_order_id_hint = None # 未来可从订单关联信息中提取
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id, entry_order_id_hint):
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
else:
logger.debug(f"UserDataStream: 平仓订单回写失败可能已存在或无可匹配记录symbol={symbol!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}")