From 5154b4933ec6fce95b4f924c7022a18022bf5bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Mon, 16 Feb 2026 15:16:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(trading=5Fsystem):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BA=A4=E6=98=93=E8=AE=B0=E5=BD=95=E7=AE=A1=E7=90=86=E4=B8=8E?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E6=95=B0=E6=8D=AE=E6=B5=81=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `position_manager` 和 `risk_manager` 中引入用户数据流缓存,优先使用 WebSocket 更新持仓和余额信息,减少对 REST API 的依赖。同时,增强了交易记录的创建和更新逻辑,支持在订单成交后完善记录,确保与币安数据一致性。新增 `update_open_fields` 和 `update_pending_to_filled` 方法,提升了交易记录的管理能力。 --- backend/api/routes/account.py | 100 +++++-- backend/database/models.py | 109 +++++++- docs/订单与统计一致性说明.md | 2 +- listenkey过期推送.txt | 18 ++ trading_system/binance_client.py | 72 ++++- trading_system/main.py | 32 ++- trading_system/position_manager.py | 280 ++++++++++++++----- trading_system/risk_manager.py | 52 +++- trading_system/user_data_stream.py | 364 +++++++++++++++++++++++++ 条件订单交易更新推送.txt | 48 ++++ 用户余额.txt | 51 ++++ 订单交易更新推送.txt | 110 ++++++++ 账户信息流连接.txt | 118 ++++++++ 13 files changed, 1239 insertions(+), 117 deletions(-) create mode 100644 listenkey过期推送.txt create mode 100644 trading_system/user_data_stream.py create mode 100644 条件订单交易更新推送.txt create mode 100644 用户余额.txt create mode 100644 订单交易更新推送.txt create mode 100644 账户信息流连接.txt diff --git a/backend/api/routes/account.py b/backend/api/routes/account.py index 9f4bef0..2c210b5 100644 --- a/backend/api/routes/account.py +++ b/backend/api/routes/account.py @@ -1811,31 +1811,87 @@ async def sync_positions( notional = quantity * entry_price if notional < 1.0: continue + # 补建时尽量拿到 entry_order_id:优先 get_all_orders+clientOrderId 前缀,兜底 get_recent_trades(100)+重试 entry_order_id = None - try: - trades = await client.get_recent_trades(symbol, limit=30) - if trades: - same_side = [t for t in trades if str(t.get('side', '')).upper() == side] - if same_side: - same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True) - entry_order_id = same_side[0].get('orderId') - except Exception as e: - logger.debug(f"获取 {symbol} 成交记录失败: {e}") client_order_id = None - is_clearly_manual = False if system_order_prefix: - if entry_order_id: - try: - order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) - cid = (order_info or {}).get("clientOrderId") or "" - client_order_id = cid or None - if cid and not cid.startswith(system_order_prefix): - is_clearly_manual = True - logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,视为手动单,跳过补建") - except Exception as e: - logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建") - if is_clearly_manual: - continue + try: + end_ms = int(time.time() * 1000) + start_ms = end_ms - (24 * 3600 * 1000) + orders = await client.client.futures_get_all_orders( + symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000 + ) + if isinstance(orders, list): + open_orders = [ + o for o in orders + if isinstance(o, dict) and o.get("reduceOnly") is False + and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED" + ] + our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)] + if our_orders: + our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True) + best = None + for o in our_orders: + ap = float(o.get("avgPrice") or 0) + eq = float(o.get("executedQty") or o.get("origQty") or 0) + if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6: + best = o + break + if best is None: + best = our_orders[0] + entry_order_id = best.get("orderId") + client_order_id = (best.get("clientOrderId") or "").strip() or None + except Exception as e: + logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}") + if entry_order_id is None: + try: + trades = await client.get_recent_trades(symbol, limit=100) + if not trades: + await asyncio.sleep(2) + trades = await client.get_recent_trades(symbol, limit=100) + if trades: + same_side = [t for t in trades if str(t.get('side', '')).upper() == side] + same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True) + if system_order_prefix and same_side: + for t in same_side[:5]: + oid = t.get("orderId") + if not oid: + continue + try: + info = await client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000) + cid = (info or {}).get("clientOrderId") or "" + if cid.startswith(system_order_prefix): + entry_order_id = oid + client_order_id = cid.strip() or None + break + except Exception: + continue + if entry_order_id is None and same_side: + entry_order_id = same_side[0].get("orderId") + except Exception as e: + logger.debug(f"获取 {symbol} 成交记录失败: {e}") + if entry_order_id and client_order_id is None: + try: + order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) + client_order_id = (order_info or {}).get("clientOrderId") or None + if client_order_id: + client_order_id = client_order_id.strip() or None + except Exception: + pass + is_clearly_manual = False + if system_order_prefix and entry_order_id and client_order_id and not client_order_id.startswith(system_order_prefix): + is_clearly_manual = True + logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id!r} 非系统前缀,视为手动单,跳过补建") + elif system_order_prefix and entry_order_id and not client_order_id: + try: + order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) + cid = (order_info or {}).get("clientOrderId") or "" + if cid and not cid.startswith(system_order_prefix): + is_clearly_manual = True + except Exception: + pass + if is_clearly_manual: + continue elif only_recover_when_has_sltp: has_sltp = False try: diff --git a/backend/database/models.py b/backend/database/models.py index c54a982..370a623 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -431,14 +431,6 @@ class GlobalStrategyConfig: class Trade: """交易记录模型""" - @staticmethod - def get_by_client_order_id(client_order_id): - """根据自定义订单号(clientOrderId)获取交易记录""" - return db.execute_one( - "SELECT * FROM trades WHERE client_order_id = %s", - (client_order_id,) - ) - @staticmethod def create( symbol, @@ -458,11 +450,13 @@ class Trade: margin_usdt=None, account_id: int = None, entry_context=None, + status: str = "open", ): """创建交易记录(使用北京时间) Args: symbol: 交易对 + status: 状态,默认 "open";先落库等 WS 成交时可传 "pending" side: 方向 quantity: 数量 entry_price: 入场价 @@ -503,7 +497,7 @@ class Trade: # 动态构建 INSERT(兼容不同schema) columns = ["symbol", "side", "quantity", "entry_price", "leverage", "entry_reason", "status", "entry_time"] - values = [symbol, side, quantity, entry_price, leverage, entry_reason, "open", entry_time] + values = [symbol, side, quantity, entry_price, leverage, entry_reason, (status or "open"), entry_time] # 在真正执行 INSERT 之前,利用 entry_order_id / client_order_id 做一次幂等去重 try: @@ -730,7 +724,39 @@ class Trade: else: # 其他错误,重新抛出 raise - + + @staticmethod + def update_open_fields(trade_id: int, **kwargs): + """开仓后完善字段:止损/止盈/名义/保证金/entry_context 等(用于先 pending 落库、成交后补全)。""" + if not kwargs: + return + allowed = { + "stop_loss_price", "take_profit_price", "take_profit_1", "take_profit_2", + "notional_usdt", "margin_usdt", "entry_context", "atr" + } + updates = [] + values = [] + for k, v in kwargs.items(): + if k not in allowed or v is None: + continue + if k == "entry_context": + try: + v = json.dumps(v, ensure_ascii=False) if isinstance(v, dict) else str(v) + except Exception: + continue + updates.append(f"{k} = %s") + values.append(v) + if not updates: + return + values.append(trade_id) + try: + db.execute_update( + f"UPDATE trades SET {', '.join(updates)} WHERE id = %s", + tuple(values) + ) + except Exception as e: + logger.warning(f"update_open_fields trade_id={trade_id} 失败: {e}") + @staticmethod def get_by_entry_order_id(entry_order_id): """根据开仓订单号获取交易记录""" @@ -746,7 +772,47 @@ class Trade: "SELECT * FROM trades WHERE exit_order_id = %s", (exit_order_id,) ) - + + @staticmethod + def get_by_client_order_id(client_order_id, account_id: int = None): + """根据 clientOrderId 获取交易记录(可选按 account_id 隔离)""" + if not client_order_id: + return None + try: + if account_id is not None and _table_has_column("trades", "account_id"): + return db.execute_one( + "SELECT * FROM trades WHERE client_order_id = %s AND account_id = %s", + (client_order_id, int(account_id)) + ) + return db.execute_one( + "SELECT * FROM trades WHERE client_order_id = %s", + (client_order_id,) + ) + except Exception: + return None + + @staticmethod + def update_pending_to_filled(client_order_id, account_id: int, entry_order_id, entry_price: float, quantity: float): + """WS 或 REST 收到成交后:按 client_order_id 将 pending 记录完善为已成交(幂等)""" + if not client_order_id or account_id is None: + return False + try: + row = Trade.get_by_client_order_id(client_order_id, account_id) + if not row: + return False + # 仅当仍为 pending 或 entry_order_id 为空时更新,避免覆盖已完善数据 + if row.get("status") not in ("pending", "open") and row.get("entry_order_id"): + return True # 已完善,视为成功 + db.execute_update( + """UPDATE trades SET entry_order_id = %s, entry_price = %s, quantity = %s, status = 'open' + WHERE client_order_id = %s AND account_id = %s""", + (entry_order_id, float(entry_price), float(quantity), client_order_id, int(account_id)) + ) + return True + except Exception as e: + logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}") + return False + @staticmethod def get_all(start_timestamp=None, end_timestamp=None, symbol=None, status=None, trade_type=None, exit_reason=None, account_id: int = None): """获取交易记录(仅包含本系统开仓的记录已通过清理脚本维护,不再在查询里筛选)""" @@ -801,6 +867,27 @@ class Trade: (symbol, status), ) + @staticmethod + def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id) -> bool: + """ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号(仅更新一条)。""" + if not symbol or account_id is None or exit_order_id is None: + return False + try: + if not _table_has_column("trades", "account_id"): + return False + # 更新一条:open 且 (exit_order_id 为空) + n = db.execute_update( + """UPDATE trades SET exit_order_id = %s + WHERE account_id = %s AND symbol = %s AND status = 'open' + AND (exit_order_id IS NULL OR exit_order_id = '') + LIMIT 1""", + (str(exit_order_id), int(account_id), symbol.strip()) + ) + return n is not None and n > 0 + except Exception as e: + logger.warning(f"set_exit_order_id_for_open_trade 失败 symbol={symbol!r}: {e}") + return False + class AccountSnapshot: """账户快照模型""" diff --git a/docs/订单与统计一致性说明.md b/docs/订单与统计一致性说明.md index b1e1383..d5442af 100644 --- a/docs/订单与统计一致性说明.md +++ b/docs/订单与统计一致性说明.md @@ -165,7 +165,7 @@ Authorization: Bearer | **数据库不可用** | `position_manager` 启动时尝试导入 `backend.database.models`,若 `backend` 目录不存在、或导入失败(缺依赖、Python 路径错误、DB 连不上等),会设 `DB_AVAILABLE = False`,成交后直接跳过保存并打日志「数据库不可用,跳过保存」。 | | **Trade.create 抛异常** | 落库时发生异常(如连接超时、唯一约束冲突、磁盘满、字段错误等),代码会打错并 `return None`,不会重试,币安已有仓但 DB 无记录。 | | **进程在落库前退出** | 订单已在币安成交,但在执行到 `Trade.create` 之前进程被 kill、崩溃或重启,也会出现币安有仓、DB 无记录。 | -| **补建时拿不到订单号** | 同步补建时用 `get_recent_trades(symbol, limit=30)` 取最近同向成交并取第一条的 `orderId` 作为 `entry_order_id`。若接口失败、返回空、或该笔开仓已不在最近 30 条成交里,则 `entry_order_id` 为空,补建记录就会「没记订单号」。 | +| **补建时拿不到订单号** | 已优化:配置了 **SYSTEM_ORDER_ID_PREFIX** 时,补建**优先**用 `futures_get_all_orders(symbol, 最近24h)` 按开仓订单(reduceOnly=false、FILLED、同向)且 **clientOrderId 以系统前缀开头** 取本系统单,再按成交价/数量匹配取 `orderId` 与 `clientOrderId`,不依赖「最近 30 条成交」。若无前缀或该接口无结果,再用 `get_recent_trades(symbol, limit=100)` 兜底,空时重试一次;若配置了前缀,会从同向成交里逐条查订单的 clientOrderId,只采用前缀匹配的订单号,避免把手动单绑到补建记录。 | **建议**:保证交易进程与 backend 同机或能访问同一 DB、且 `backend` 路径正确,避免 `DB_AVAILABLE = False`;若曾出现「开了不落库」,可查交易进程日志中的「保存交易记录到数据库失败」或「数据库不可用」;补建后仍无订单号的记录可用「仅可对账」过滤,不影响基于可对账记录的统计。 diff --git a/listenkey过期推送.txt b/listenkey过期推送.txt new file mode 100644 index 0000000..7c4a784 --- /dev/null +++ b/listenkey过期推送.txt @@ -0,0 +1,18 @@ +listenKey过期推送 +事件描述 +当前连接使用的有效listenKey过期时,user data stream 将会推送此事件。 + +注意: + +此事件与 websocket 连接中断没有必然联系 +只有正在连接中的有效listenKey过期时才会收到此消息 +收到此消息后 user data stream 将不再更新,直到用户使用新的有效的listenKey +事件类型 +listenKeyExpired + +响应示例 +{ + "e": "listenKeyExpired", // 事件类型 + "E": "1736996475556", // 事件时间 + "listenKey":"WsCMN0a4KHUPTQuX6IUnqEZfB1inxmv1qR4kbf1LuEjur5VdbzqvyxqG9TSjVVxv" +} \ No newline at end of file diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index c2fea4b..b83eb4a 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -2,6 +2,7 @@ 币安客户端封装 - 提供异步交易接口 """ import asyncio +import json import logging import random import time @@ -279,6 +280,58 @@ class BinanceClient: else: logger.warning(f"⚠ API密钥验证时出现错误: {e}") + def _futures_base_url(self) -> str: + """合约 REST 根地址(用于 listenKey 等)""" + return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com" + + async def create_futures_listen_key(self) -> Optional[str]: + """创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。60 分钟无 keepalive 会失效。""" + if not self.api_key: + return None + try: + import aiohttp + url = f"{self._futures_base_url()}/fapi/v1/listenKey" + headers = {"X-MBX-APIKEY": self.api_key} + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: + if resp.status != 200: + text = await resp.text() + logger.warning(f"create_futures_listen_key 失败 status={resp.status} body={text}") + return None + data = await resp.json() + key = data.get("listenKey") if isinstance(data, dict) else None + if key: + logger.info("✓ 合约 User Data Stream listenKey 已创建") + return key + except Exception as e: + logger.warning(f"create_futures_listen_key 失败: {e}") + return None + + async def keepalive_futures_listen_key(self, listen_key: str): + """延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。返回 (ok: bool, code_1125: bool),-1125 表示 key 不存在需换新。""" + if not self.api_key or not listen_key: + return False, False + try: + import aiohttp + url = f"{self._futures_base_url()}/fapi/v1/listenKey" + headers = {"X-MBX-APIKEY": self.api_key} + async with aiohttp.ClientSession() as session: + async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: + text = await resp.text() + ok = resp.status == 200 + code_1125 = False + if not ok and text: + try: + data = json.loads(text) if text.strip().startswith("{") else {} + code_1125 = int(data.get("code", 0)) == -1125 + except Exception: + pass + logger.debug(f"keepalive_futures_listen_key 失败 status={resp.status} body={text}") + return ok, code_1125 + except Exception as e: + logger.debug(f"keepalive_futures_listen_key 失败: {e}") + return False, False + @staticmethod def _to_bool(value: Any) -> Optional[bool]: if value is None: @@ -1258,6 +1311,7 @@ class BinanceClient: price: Optional[float] = None, reduce_only: bool = False, position_side: Optional[str] = None, + new_client_order_id: Optional[str] = None, ) -> Optional[Dict]: """ 下单 @@ -1454,12 +1508,14 @@ class BinanceClient: if position_side: logger.info(f"{symbol} 单向模式下忽略 positionSide={position_side}(避免 -4061)") - # 开仓单写入自定义订单号前缀,便于同步时根据 clientOrderId 区分系统单(仅本系统开的仓才补建记录) + # 开仓单写入自定义订单号:优先使用调用方传入的(便于先落库再下单、WS 按 c 匹配) if not reduce_only: - prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip() - if prefix: - # 币安 newClientOrderId 最长 36 字符,格式: PREFIX_timestamp_hex - order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36] + if new_client_order_id: + order_params['newClientOrderId'] = str(new_client_order_id)[:36] + else: + prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip() + if prefix: + order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36] # 如果是平仓订单,添加 reduceOnly 参数 # 根据币安API文档,reduceOnly 应该是字符串 "true" 或 "false" @@ -1502,8 +1558,10 @@ class BinanceClient: # 让 python-binance 重新生成,否则会报 -1022 Signature invalid retry_params.pop('timestamp', None) retry_params.pop('signature', None) - # 重试时生成新的 newClientOrderId,避免与首次提交冲突 - if 'newClientOrderId' in retry_params: + # 重试时保留调用方传入的 newClientOrderId,否则才重新生成(便于 WS 按 c 匹配) + if new_client_order_id and 'newClientOrderId' in retry_params: + retry_params['newClientOrderId'] = str(new_client_order_id)[:36] + elif 'newClientOrderId' in retry_params and not new_client_order_id: prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip() if prefix: retry_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36] diff --git a/trading_system/main.py b/trading_system/main.py index dceadb7..727df7e 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -16,6 +16,7 @@ try: from .risk_manager import RiskManager # type: ignore from .position_manager import PositionManager # type: ignore from .strategy import TradingStrategy # type: ignore + from .user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore from . import config # type: ignore except Exception: _here = Path(__file__).resolve().parent @@ -30,6 +31,7 @@ except Exception: from risk_manager import RiskManager # type: ignore from position_manager import PositionManager # type: ignore from strategy import TradingStrategy # type: ignore + from user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore import config # type: ignore # 配置日志(支持相对路径) @@ -233,6 +235,7 @@ async def main(): # 初始化组件 client = None + user_data_stream = None try: # 1. 初始化币安客户端 logger.info("初始化币安客户端...") @@ -302,8 +305,25 @@ async def main(): except Exception as e: logger.error(f"余额重试异常: {e}", exc_info=True) continue - - + + # 3. 启动 User Data Stream(订单/持仓/余额推送,listenKey 保活,减少 REST 请求) + import os + account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") + user_data_stream = UserDataStream(client, account_id) + if await user_data_stream.start(): + logger.info("✓ User Data Stream 已启动(订单/持仓/余额 WS 推送,30 分钟 keepalive)") + # 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存 + try: + seed_balance_cache(balance) + positions_seed = await client.get_open_positions() + seed_position_cache(positions_seed) + logger.info(f"✓ 已播种持仓/余额缓存(持仓 {len(positions_seed)} 个)") + except Exception as e: + logger.warning(f"播种 WS 缓存失败(将仅用 REST): {e}") + else: + logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓") + user_data_stream = None + # 4. 初始化各个模块 logger.info("初始化交易模块...") scanner = MarketScanner(client) @@ -384,7 +404,13 @@ async def main(): logger.error(f"程序运行出错: {e}", exc_info=True) raise finally: - # 清理资源 + # 清理资源(先停 User Data Stream,再断 client) + try: + if user_data_stream is not None: + await user_data_stream.stop() + logger.info("User Data Stream 已停止") + except Exception as e: + logger.debug(f"停止 User Data Stream 时异常: {e}") if client: await client.disconnect() logger.info("程序已退出") diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 44dae9e..461a13e 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -2,10 +2,11 @@ 仓位管理模块 - 管理持仓和订单 """ import asyncio -import logging import json -import aiohttp +import logging +import random import time +import aiohttp from pathlib import Path from typing import Dict, List, Optional from datetime import datetime @@ -96,6 +97,18 @@ def _log_trade_db_failure(symbol, entry_order_id, side, quantity, entry_price, a logger.warning(f"写入落库失败日志失败: {e}") +# User Data Stream 缓存(持仓/余额优先读 WS,减少 REST) +try: + from .user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache +except Exception: + try: + from user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache + except Exception: + get_stream_instance = lambda: None + get_positions_from_cache = lambda min_notional=1.0: [] + get_balance_from_cache = lambda: None + + class PositionManager: """仓位管理类""" @@ -131,6 +144,23 @@ class PositionManager: self._last_auto_close_attempt_ms: Dict[str, int] = {} self._last_auto_close_fail_log_ms: Dict[str, int] = {} + async def _get_open_positions(self) -> List[Dict]: + """优先使用 User Data Stream 持仓缓存,无缓存或未启动时走 REST。""" + if get_stream_instance() is not None: + min_notional = float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0) + cached = get_positions_from_cache(min_notional) + if cached is not None: + return cached + return await self.client.get_open_positions() + + async def _get_account_balance(self) -> Dict: + """优先使用 User Data Stream 余额缓存,无缓存时走 REST。""" + if get_stream_instance() is not None: + bal = get_balance_from_cache() + if bal is not None: + return bal + return await self.client.get_account_balance() + @staticmethod def _pct_like_to_ratio(v: float) -> float: """ @@ -458,6 +488,39 @@ class PositionManager: filled_quantity = 0.0 entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase") + # 生成 client_order_id:先落库 pending,WS 按 o.c 匹配完善,减少对 REST 同步依赖 + prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip() + client_order_id = None + if prefix: + client_order_id = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36] + pending_trade_id = None + if DB_AVAILABLE and Trade and client_order_id: + try: + pending_trade_id = Trade.create( + symbol=symbol, + side=side, + quantity=quantity, + entry_price=entry_price, + leverage=leverage, + entry_reason=entry_reason, + entry_order_id=None, + client_order_id=client_order_id, + stop_loss_price=None, + take_profit_price=None, + take_profit_1=None, + take_profit_2=None, + atr=atr, + notional_usdt=None, + margin_usdt=None, + account_id=self.account_id, + entry_context=entry_context, + status="pending", + ) + logger.debug(f"{symbol} 已落库 pending 记录 (client_order_id={client_order_id!r}, id={pending_trade_id})") + except Exception as e: + logger.warning(f"{symbol} 创建 pending 记录失败: {e}") + pending_trade_id = None + if not smart_entry_enabled: # 根治方案:关闭智能入场后,回归“纯限价单模式” # - 不追价 @@ -469,7 +532,8 @@ class PositionManager: f"确认超时={confirm_timeout}s(未成交将撤单跳过)" ) order = await self.client.place_order( - symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit + symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit, + new_client_order_id=client_order_id ) if not order: return None @@ -485,7 +549,10 @@ class PositionManager: f"追价上限={max_drift_ratio*100:.2f}%" ) - order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit) + order = await self.client.place_order( + symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit, + new_client_order_id=client_order_id + ) if not order: return None entry_order_id = order.get("orderId") @@ -522,7 +589,10 @@ class PositionManager: pass self._pending_entry_orders.pop(symbol, None) logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底") - order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET") + order = await self.client.place_order( + symbol=symbol, side=side, quantity=quantity, order_type="MARKET", + new_client_order_id=client_order_id + ) # 关键:转市价后必须更新 entry_order_id,否则后续会继续查询“已取消的旧限价单”,导致误判 CANCELED try: entry_order_id = order.get("orderId") if isinstance(order, dict) else None @@ -563,7 +633,10 @@ class PositionManager: f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | " f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%" ) - order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired) + order = await self.client.place_order( + symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired, + new_client_order_id=client_order_id + ) if not order: self._pending_entry_orders.pop(symbol, None) return None @@ -728,32 +801,57 @@ class PositionManager: take_profit_1 = closer take_profit_2 = further - # 记录到数据库(只有在订单真正成交后才保存) + # 记录到数据库:有 pending 则完善(WS 也会按 client_order_id 完善),否则新建 trade_id = None if DB_AVAILABLE and Trade: try: - logger.info(f"正在保存 {symbol} 交易记录到数据库...") - client_order_id = (order.get("clientOrderId") if order else None) or None - trade_id = Trade.create( - symbol=symbol, - side=side, - quantity=quantity, # 使用实际成交数量 - entry_price=entry_price, # 使用实际成交价格 - leverage=leverage, - entry_reason=entry_reason, - entry_order_id=entry_order_id, # 保存币安订单号 - client_order_id=client_order_id, # 自定义订单号,便于在订单记录中核对系统单 - stop_loss_price=stop_loss_price, - take_profit_price=take_profit_price, - take_profit_1=take_profit_1, - take_profit_2=take_profit_2, - atr=atr, - notional_usdt=notional_usdt, - margin_usdt=margin_usdt, - entry_context=entry_context, # 入场思路与过程(便于事后分析策略执行效果) - account_id=self.account_id - ) - logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") + if client_order_id: + row = Trade.get_by_client_order_id(client_order_id, self.account_id) + if row and str(row.get("status")) == "pending": + ok = Trade.update_pending_to_filled( + client_order_id, self.account_id, + entry_order_id, entry_price, quantity + ) + if ok: + row = Trade.get_by_client_order_id(client_order_id, self.account_id) + trade_id = row.get("id") if row else None + if trade_id: + Trade.update_open_fields( + trade_id, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + take_profit_1=take_profit_1, + take_profit_2=take_profit_2, + notional_usdt=notional_usdt, + margin_usdt=margin_usdt, + entry_context=entry_context, + atr=atr, + ) + logger.info(f"✓ {symbol} 已完善 pending 记录 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") + if trade_id is None: + # 无 pending 或未匹配到:走新建(兜底) + logger.info(f"正在保存 {symbol} 交易记录到数据库...") + fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id + trade_id = Trade.create( + symbol=symbol, + side=side, + quantity=quantity, + entry_price=entry_price, + leverage=leverage, + entry_reason=entry_reason, + entry_order_id=entry_order_id, + client_order_id=fallback_client_order_id, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + take_profit_1=take_profit_1, + take_profit_2=take_profit_2, + atr=atr, + notional_usdt=notional_usdt, + margin_usdt=margin_usdt, + entry_context=entry_context, + account_id=self.account_id, + ) + logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})") except Exception as e: logger.error(f"❌ 保存交易记录到数据库失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") @@ -817,7 +915,7 @@ class PositionManager: # 验证持仓是否真的在币安存在 try: await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓 - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() binance_position = next( (p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0), None @@ -910,7 +1008,7 @@ class PositionManager: pass # 获取当前持仓 - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() position = next( (p for p in positions if p['symbol'] == symbol), None @@ -1474,7 +1572,7 @@ class PositionManager: if current_price is None: try: # 优先获取标记价格(MARK_PRICE),因为止损单使用 MARK_PRICE 作为触发基准 - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() position = next((p for p in positions if p['symbol'] == symbol), None) if position: mark_price = position.get('markPrice') @@ -1735,7 +1833,7 @@ class PositionManager: try: # 获取当前持仓 - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() position_dict = {p['symbol']: p for p in positions} for symbol, position_info in list(self.active_positions.items()): @@ -2260,8 +2358,8 @@ class PositionManager: 持仓摘要信息 """ try: - positions = await self.client.get_open_positions() - balance = await self.client.get_account_balance() + positions = await self._get_open_positions() + balance = await self._get_account_balance() total_pnl = sum(p['unRealizedProfit'] for p in positions) @@ -2329,7 +2427,7 @@ class PositionManager: logger.info("开始同步币安持仓状态与数据库...") # 1. 获取币安实际持仓 - binance_positions = await self.client.get_open_positions() + binance_positions = await self._get_open_positions() binance_symbols = {p['symbol'] for p in binance_positions} logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})") @@ -3045,34 +3143,88 @@ class PositionManager: notional = quantity * entry_price if notional < 1.0: continue + # 补建时尽量拿到 entry_order_id:优先按时间范围查开仓订单并用 clientOrderId 前缀锁定本系统单,避免拿错/拿不到 entry_order_id = None - try: - recent = await self.client.get_recent_trades(symbol, limit=30) - if recent: - same_side = [t for t in recent if str(t.get("side", "")).upper() == side] - if same_side: - same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True) - entry_order_id = same_side[0].get("orderId") - except Exception: - pass client_order_id_sync = None + if system_order_prefix: + try: + end_ms = int(time.time() * 1000) + start_ms = end_ms - (24 * 3600 * 1000) + orders = await self.client.client.futures_get_all_orders( + symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000 + ) + if isinstance(orders, list): + open_orders = [ + o for o in orders + if isinstance(o, dict) and o.get("reduceOnly") is False + and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED" + ] + our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)] + if our_orders: + our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True) + best = None + for o in our_orders: + ap = float(o.get("avgPrice") or 0) + eq = float(o.get("executedQty") or o.get("origQty") or 0) + if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6: + best = o + break + if best is None: + best = our_orders[0] + entry_order_id = best.get("orderId") + client_order_id_sync = (best.get("clientOrderId") or "").strip() or None + logger.debug(f" {symbol} 补建从 get_all_orders 取得 orderId={entry_order_id}, clientOrderId={client_order_id_sync!r}") + except Exception as e: + logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}") + if entry_order_id is None: + try: + recent = await self.client.get_recent_trades(symbol, limit=100) + if not recent: + await asyncio.sleep(2) + recent = await self.client.get_recent_trades(symbol, limit=100) + if recent: + same_side = [t for t in recent if str(t.get("side", "")).upper() == side] + same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True) + if system_order_prefix and same_side: + for t in same_side[:5]: + oid = t.get("orderId") + if not oid: + continue + try: + info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000) + cid = (info or {}).get("clientOrderId") or "" + if cid.startswith(system_order_prefix): + entry_order_id = oid + client_order_id_sync = cid.strip() or None + break + except Exception: + continue + if entry_order_id is None and same_side: + entry_order_id = same_side[0].get("orderId") + except Exception: + pass + if entry_order_id and client_order_id_sync is None: + try: + order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) + cid = (order_info or {}).get("clientOrderId") or "" + client_order_id_sync = cid.strip() or None + except Exception: + pass # 标记是否「来历不明」:用于 DB entry_reason 与后续统计分析(sync_recovered_unknown_origin) sync_unknown_origin = False - # 仅当「明确查到开仓订单且 clientOrderId 非空且不以系统前缀开头」时标记为来历不明(仍会补建+挂 SL/TP+监控) is_clearly_manual = False + if system_order_prefix and entry_order_id and client_order_id_sync and not client_order_id_sync.startswith(system_order_prefix): + is_clearly_manual = True + logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id_sync!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP") + elif system_order_prefix and entry_order_id and not client_order_id_sync: + try: + order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) + cid = (order_info or {}).get("clientOrderId") or "" + if cid and not cid.startswith(system_order_prefix): + is_clearly_manual = True + except Exception: + pass if system_order_prefix: - if entry_order_id: - try: - order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000) - cid = (order_info or {}).get("clientOrderId") or "" - client_order_id_sync = cid or None - if cid and not cid.startswith(system_order_prefix): - is_clearly_manual = True - logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP") - except Exception as e: - logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建") - # 无法获取订单或 cid 为空(历史单/未带前缀)时不视为手动单,继续补建 - # 例外:若开启「仅币安有仓且存在 SL/TP 也监控」,且该 symbol 有止损/止盈单,可视为系统单(仅影响日志) if is_clearly_manual and config.TRADING_CONFIG.get("SYNC_MONITOR_BINANCE_POSITIONS_WITH_SLTP", True): if await self._symbol_has_sltp_orders(symbol): is_clearly_manual = False @@ -3081,7 +3233,6 @@ class PositionManager: sync_unknown_origin = True logger.info(f" → {symbol} 来历不明(开仓订单非系统前缀),将补建记录、自动挂止损止盈并纳入监控") else: - # 未配置前缀时,不再因「无止损/止盈单」跳过:一律补建并自动挂 SL/TP、纳入监控 if sync_recover_only_has_sltp and not (await self._symbol_has_sltp_orders(symbol)): sync_unknown_origin = True logger.info(f" → {symbol} 无止损/止盈单,将补建记录、自动挂止损止盈并纳入监控") @@ -3169,10 +3320,13 @@ class PositionManager: f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... " f"({side} {quantity:.4f} @ {entry_price:.4f})" ) - # 尽量从币安成交取 entry_order_id,便于订单记录/统计对账 + # 尽量从币安成交取 entry_order_id(limit=100、空时重试一次),便于订单记录/统计对账 entry_order_id = None try: - recent = await self.client.get_recent_trades(symbol, limit=30) + recent = await self.client.get_recent_trades(symbol, limit=100) + if not recent: + await asyncio.sleep(2) + recent = await self.client.get_recent_trades(symbol, limit=100) if recent: same_side = [t for t in recent if str(t.get('side', '')).upper() == side] if same_side: @@ -3315,7 +3469,7 @@ class PositionManager: return # 获取当前所有持仓(与 sync 一致:仅本系统关心的持仓会进 active_positions) - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() binance_symbols = {p['symbol'] for p in positions} active_symbols = set(self.active_positions.keys()) sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False) @@ -4133,7 +4287,7 @@ class PositionManager: logger.warning(f" 可能原因: 监控未启动或已停止") # 3. 获取币安实际持仓 - positions = await self.client.get_open_positions() + positions = await self._get_open_positions() binance_position = next((p for p in positions if p['symbol'] == symbol), None) if not binance_position: diff --git a/trading_system/risk_manager.py b/trading_system/risk_manager.py index 0bd6423..661acc0 100644 --- a/trading_system/risk_manager.py +++ b/trading_system/risk_manager.py @@ -17,6 +17,28 @@ except ImportError: logger = logging.getLogger(__name__) +# 可选:优先使用 User Data Stream 缓存(与 position_manager 一致) +def _get_stream_instance(): + try: + from .user_data_stream import get_stream_instance + return get_stream_instance() + except Exception: + return None + +def _get_balance_from_cache(): + try: + from .user_data_stream import get_balance_from_cache + return get_balance_from_cache() + except Exception: + return None + +def _get_positions_from_cache(): + try: + from .user_data_stream import get_positions_from_cache + return get_positions_from_cache(float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0)) + except Exception: + return None + class RiskManager: """风险管理类""" @@ -49,8 +71,10 @@ class RiskManager: try: logger.info(f"检查 {symbol} 单笔仓位大小...") - # 获取账户余额 - balance = await self.client.get_account_balance() + # 获取账户余额(优先 WS 缓存) + balance = _get_balance_from_cache() if _get_stream_instance() else None + if balance is None: + balance = await self.client.get_account_balance() available_balance = balance.get('available', 0) if available_balance <= 0: @@ -137,8 +161,10 @@ class RiskManager: 是否通过检查 """ try: - # 获取当前持仓 - positions = await self.client.get_open_positions() + # 获取当前持仓(优先 WS 缓存) + positions = _get_positions_from_cache() if _get_stream_instance() else None + if positions is None: + positions = await self.client.get_open_positions() # 计算当前总保证金占用 current_position_values = [] @@ -167,8 +193,10 @@ class RiskManager: # 加上新仓位 total_with_new = total_margin_value + new_position_margin - # 获取账户余额 - balance = await self.client.get_account_balance() + # 获取账户余额(优先 WS 缓存) + balance = _get_balance_from_cache() if _get_stream_instance() else None + if balance is None: + balance = await self.client.get_account_balance() total_balance = balance.get('total', 0) available_balance = balance.get('available', 0) @@ -401,8 +429,10 @@ class RiskManager: try: logger.info(f"开始计算 {symbol} 的仓位大小...") - # 获取账户余额 - balance = await self.client.get_account_balance() + # 获取账户余额(优先 WS 缓存) + balance = _get_balance_from_cache() if _get_stream_instance() else None + if balance is None: + balance = await self.client.get_account_balance() available_balance = balance.get('available', 0) total_balance = balance.get('total', 0) @@ -785,8 +815,10 @@ class RiskManager: logger.debug(f"{symbol} 涨跌幅 {change_percent:.2f}% 小于阈值") return False - # 检查是否已有持仓 / 总持仓数量限制 - positions = await self.client.get_open_positions() + # 检查是否已有持仓 / 总持仓数量限制(优先 WS 缓存) + positions = _get_positions_from_cache() if _get_stream_instance() else None + if positions is None: + positions = await self.client.get_open_positions() try: max_open = int(config.TRADING_CONFIG.get("MAX_OPEN_POSITIONS", 0) or 0) except Exception: diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py new file mode 100644 index 0000000..9e33a84 --- /dev/null +++ b/trading_system/user_data_stream.py @@ -0,0 +1,364 @@ +""" +合约 User Data Stream:订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。 +- 先 REST 下单并落库 pending,WS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。 +- ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。 +- listenKey 每 30 分钟 keepalive;收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。 +- 单连接有效期不超过 24 小时,届时主动重连(文档建议)。 +""" +import asyncio +import json +import logging +import time +from typing import Dict, List, Optional, Any + +logger = logging.getLogger(__name__) + +# 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions +_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...) +# 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 +_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc } +# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用) +_stream_instance: Optional["UserDataStream"] = None +# 是否已用 REST 结果播种过(未播种时业务应走 REST,不返回空缓存) +_position_cache_seeded = False +_balance_cache_seeded = False + + +def get_position_updates_cache() -> Dict[str, List[Dict]]: + """返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。""" + return dict(_position_updates_cache) + + +def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]: + """返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。""" + return dict(_balance_updates_cache) + + +def get_stream_instance() -> Optional["UserDataStream"]: + """返回当前运行的 UserDataStream 实例(未启动时为 None)。""" + return _stream_instance + + +def seed_position_cache(positions: List[Dict]) -> None: + """用 REST 全量持仓结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。""" + global _position_updates_cache, _position_cache_seeded + _position_updates_cache.clear() + _position_cache_seeded = True + for pos in positions or []: + symbol = (pos.get("symbol") or "").strip() + if not symbol: + continue + amt = float(pos.get("positionAmt") or 0) + _position_updates_cache[symbol] = [{ + "s": symbol, + "pa": amt, + "ep": str(pos.get("entryPrice") or "0"), + "up": str(pos.get("unRealizedProfit") or "0"), + "ps": pos.get("positionSide") or "BOTH", + }] + logger.debug(f"UserDataStream: 已填充持仓缓存 {len(_position_updates_cache)} 个 symbol") + + +def seed_balance_cache(balance: Dict[str, Any]) -> None: + """用 REST 余额结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。""" + global _balance_updates_cache, _balance_cache_seeded + _balance_cache_seeded = True + if balance and isinstance(balance, dict): + wb = balance.get("walletBalance") or balance.get("total") or 0 + av = balance.get("availableBalance") or balance.get("available") or wb + _balance_updates_cache["USDT"] = {"wb": str(wb), "cw": str(av), "bc": "0"} + logger.debug("UserDataStream: 已填充余额缓存 (USDT)") + + +def get_positions_from_cache(min_notional: float = 1.0) -> Optional[List[Dict]]: + """将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。""" + if not _position_cache_seeded: + return None + out = [] + for symbol, plist in _position_updates_cache.items(): + for p in plist if isinstance(plist, list) else []: + try: + pa = float(p.get("pa") or 0) + except (TypeError, ValueError): + pa = 0 + if pa == 0: + continue + ep = float(p.get("ep") or 0) + if min_notional > 0 and abs(pa) * ep < min_notional: + continue + out.append({ + "symbol": symbol, + "positionAmt": pa, + "entryPrice": ep, + "markPrice": float(p.get("markPrice") or 0), + "unRealizedProfit": float(p.get("up") or 0), + "leverage": int(p.get("leverage") or 0), + }) + return out + + +def get_balance_from_cache() -> Optional[Dict[str, Any]]: + """从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。""" + if not _balance_cache_seeded: + return None + u = _balance_updates_cache.get("USDT") + if not u: + return None + try: + wb = float(u.get("wb") or 0) + cw = float(u.get("cw") or wb) + except (TypeError, ValueError): + return None + return {"ok": True, "available": cw, "total": wb, "margin": wb} + + +class UserDataStream: + """合约用户数据流:创建/保活 listenKey,连接 WS,处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。""" + + def __init__(self, client: Any, account_id: int): + self.client = client + self.account_id = int(account_id) + self._listen_key: Optional[str] = None + self._ws = None + self._task: Optional[asyncio.Task] = None + self._keepalive_task: Optional[asyncio.Task] = None + self._running = False + self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连) + + def _ws_base_url(self) -> str: + if getattr(self.client, "testnet", False): + return "wss://stream.binancefuture.com/ws" + return "wss://fstream.binance.com/ws" + + async def start(self) -> bool: + """创建 listenKey 并启动 WS 接收循环与 keepalive 任务。""" + global _stream_instance + if self._running: + return True + self._listen_key = await self.client.create_futures_listen_key() + if not self._listen_key: + logger.warning("UserDataStream: 无法创建 listenKey,跳过启动") + return False + self._running = True + _stream_instance = self + self._task = asyncio.create_task(self._run_ws()) + self._keepalive_task = asyncio.create_task(self._run_keepalive()) + logger.info("UserDataStream: 已启动(ORDER_TRADE_UPDATE / ACCOUNT_UPDATE,含持仓与余额推送)") + return True + + async def stop(self): + """停止 WS 与 keepalive。""" + global _stream_instance + self._running = False + _stream_instance = None + if self._keepalive_task: + self._keepalive_task.cancel() + try: + await self._keepalive_task + except asyncio.CancelledError: + pass + self._keepalive_task = None + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + if self._ws: + try: + await self._ws.close() + except Exception: + pass + self._ws = None + self._listen_key = None + logger.info("UserDataStream: 已停止") + + async def _run_keepalive(self): + """每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。""" + while self._running: + await asyncio.sleep(30 * 60) + if not self._running or not self._listen_key: + break + ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key) + if not ok and code_1125 and self._ws: + logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连") + try: + await self._ws.close() + except Exception: + pass + break + if not ok: + logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey") + + async def _run_ws(self): + """连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。""" + import aiohttp + _24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连 + while self._running: + url = f"{self._ws_base_url()}/{self._listen_key}" + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws: + self._ws = ws + self._conn_start_time = time.monotonic() + logger.info("UserDataStream: WS 已连接") + async for msg in ws: + if not self._running: + break + if msg.type == aiohttp.WSMsgType.TEXT: + should_break = await self._handle_message(msg.data) + if should_break: + break + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.warning("UserDataStream: WS 错误") + break + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): + break + # 单连接 24h 主动重连(文档:每个链接有效期不超过24小时) + if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec: + logger.info("UserDataStream: 连接已近 24h,主动重连") + break + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"UserDataStream: WS 异常 {e},60s 后重连") + await asyncio.sleep(60) + self._ws = None + self._conn_start_time = None + if not self._running: + break + # 重连前重新创建 listenKey(旧 key 可能已失效或 listenKeyExpired) + self._listen_key = await self.client.create_futures_listen_key() + if not self._listen_key: + await asyncio.sleep(60) + continue + + async def _handle_message(self, raw: str) -> bool: + """处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。""" + try: + data = json.loads(raw) + except Exception: + return False + e = data.get("e") + if e == "listenKeyExpired": + logger.warning("UserDataStream: 收到 listenKeyExpired,将换新 key 重连") + return True + if e == "ORDER_TRADE_UPDATE": + self._on_order_trade_update(data.get("o") or {}) + elif e == "ACCOUNT_UPDATE": + self._on_account_update(data.get("a") or {}) + elif e == "ALGO_UPDATE": + self._on_algo_update(data.get("o") or {}) + return False + + def _on_order_trade_update(self, o: Dict): + # 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId + # ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对 + event_type = (o.get("x") or "").strip().upper() + status = (o.get("X") or "").strip().upper() + if status != "FILLED": + return + client_order_id = (o.get("c") or "").strip() or None + order_id = o.get("i") + ap = o.get("ap") + z = o.get("z") + reduce_only = o.get("R") is True + symbol = (o.get("s") or "").strip() + rp = o.get("rp") # 实现盈亏 + if order_id is None or ap is None or z is None: + return + try: + ap_f = float(ap) + z_f = float(z) + except (TypeError, ValueError): + return + # 特殊 clientOrderId:强平/ADL/结算,仅打日志不参与系统 pending 完善 + if client_order_id: + if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"): + logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}") + return + # 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交) + if event_type and event_type != "TRADE": + return + # 开仓成交:完善 pending 记录 + if not reduce_only: + if not client_order_id: + return + try: + import sys + from pathlib import Path + project_root = Path(__file__).parent.parent + backend_path = project_root / "backend" + if backend_path.exists(): + sys.path.insert(0, str(backend_path)) + from database.models import Trade + updated = Trade.update_pending_to_filled( + client_order_id, self.account_id, order_id, ap_f, z_f + ) + if updated: + logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}") + except Exception as ex: + logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}") + else: + # 平仓成交:按 symbol 回写 open 记录的 exit_order_id;若有 rp 可记入日志 + if rp is not None: + logger.debug(f"UserDataStream: 平仓订单 FILLED orderId={order_id} symbol={symbol!r} 实现盈亏 rp={rp}") + if symbol: + try: + import sys + from pathlib import Path + project_root = Path(__file__).parent.parent + backend_path = project_root / "backend" + if backend_path.exists(): + sys.path.insert(0, str(backend_path)) + from database.models import Trade + if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id): + logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}") + except Exception as ex: + logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}") + + def _on_account_update(self, a: Dict): + # 文档: a.B = 余额数组,每项 a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 + # 文档: a.P = 持仓信息数组,每项 s=symbol, pa=仓位, ep=入仓价, ps=LONG/SHORT/BOTH 等 + global _position_updates_cache, _balance_updates_cache + B = a.get("B") + if isinstance(B, list) and B: + for b in B: + asset = (b.get("a") or "").strip() + if asset: + _balance_updates_cache[asset] = { + "wb": b.get("wb"), + "cw": b.get("cw"), + "bc": b.get("bc"), + } + logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}") + P = a.get("P") + if isinstance(P, list) and P: + for p in P: + s = (p.get("s") or "").strip() + if s: + if s not in _position_updates_cache: + _position_updates_cache[s] = [] + _position_updates_cache[s] = [p] + logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}") + + def _on_algo_update(self, o: Dict): + # 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id + x = (o.get("X") or "").strip().upper() + ai = o.get("ai") + symbol = (o.get("s") or "").strip() + if x in ("TRIGGERED", "FINISHED") and ai and symbol: + try: + import sys + from pathlib import Path + project_root = Path(__file__).parent.parent + backend_path = project_root / "backend" + if backend_path.exists(): + sys.path.insert(0, str(backend_path)) + from database.models import Trade + if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai): + logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}") + except Exception as ex: + logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}") + logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}") diff --git a/条件订单交易更新推送.txt b/条件订单交易更新推送.txt new file mode 100644 index 0000000..8af866d --- /dev/null +++ b/条件订单交易更新推送.txt @@ -0,0 +1,48 @@ +条件订单交易更新推送 +事件描述 +当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ALGO_UPDATE + +本次事件的具体执行类型 + +NEW:该状态表示条件订单已提交,但尚未触发。 +CANCELED:该状态表示条件订单已被取消。 +TRIGGERING:该状态表示条件订单已满足触发条件,且已被转发至撮合引擎。 +TRIGGERED:该状态表示条件订单已成功触发并成功进入撮合引擎。 +FINISHED:该状态表示触发的条件订单已在撮合引擎中被成交或取消。 +REJECTED:该状态表示条件订单被撮合引擎拒绝,例如保证金检查失败等情况。 +EXPIRED:该状态表示条件订单被系统取消。例如,用户下了一个GTE_GTC时效条件订单,但随后关闭了该标的的所有持仓,系统因此取消了该条件订单。 +事件类型 +ALGO_UPDATE + +响应示例 +{ + "e":"ALGO_UPDATE", // 事件类型 + "T":1750515742297, // 撮合时间 + "E":1750515742303, // 事件时间 + "o":{ + "caid":"Q5xaq5EGKgXXa0fD7fs0Ip", // 客户端自定条件订单ID + "aid":2148719, // 条件单 Id + "at":"CONDITIONAL", // 条件单类型 + "o":"TAKE_PROFIT", //订单类型 + "s":"BNBUSDT", //交易对 + "S":"SELL", //订单方向 + "ps":"BOTH", //持仓方向 + "f":"GTC", //有效方式 + "q":"0.01", //订单数量 + "X":"CANCELED", //条件单状态 + "ai":"", // 触发后普通订单 id + "ap": "0.00000", // 触发后在撮合引擎中实际订单的平均成交价格,仅在订单被触发并进入撮合引擎时显示 + "aq": "0.00000", // 触发后在撮合引擎中实际订单已成交数量,仅当订单被触发并进入撮合引擎时显示 + "act": "0", // 触发后在撮合引擎中实际的订单类型,仅当订单被触发并进入撮合引擎时显示 + "tp":"750", //条件单触发价格 + "p":"750", //订单价格 + "V":"EXPIRE_MAKER", //自成交防止模式 + "wt":"CONTRACT_PRICE", //触发价类型 + "pm":"NONE", // 价格匹配模式 + "cp":false, //是否为触发平仓单; 仅在条件订单情况下会推送此字段 + "pP":false, //是否开启条件单触发保护 + "R":false, // 是否是只减仓单 + "tt":0, //触发时间 + "gtd":0, // TIF为GTD的订单自动取消时间 + "rm": "Reduce Only reject" // 条件单失败原因 +} \ No newline at end of file diff --git a/用户余额.txt b/用户余额.txt new file mode 100644 index 0000000..4da0aaa --- /dev/null +++ b/用户余额.txt @@ -0,0 +1,51 @@ +账户余额V2 (USER_DATA) +查询账户余额 + +方式 +v2/account.balance + +请求 +{ + "id": "605a6d20-6588-4cb9-afa0-b0ab087507ba", + "method": "v2/account.balance", + "params": { + "apiKey": "xTaDyrmvA9XT2oBHHjy39zyPzKCvMdtH3b9q4xadkAg2dNSJXQGCxzui26L823W2", + "timestamp": 1702561978458, + "signature": "208bb94a26f99aa122b1319490ca9cb2798fccc81d9b6449521a26268d53217a" + } +} + +请求权重 +5 + +请求参数 +名称 类型 是否必需 描述 +recvWindow LONG NO +timestamp LONG YES +响应示例 +{ + "id": "605a6d20-6588-4cb9-afa0-b0ab087507ba", + "status": 200, + "result": [ + { + "accountAlias": "SgsR", // 账户唯一识别码 + "asset": "USDT", // 资产 + "balance": "122607.35137903", // 总余额 + "crossWalletBalance": "23.72469206", // 全仓余额 + "crossUnPnl": "0.00000000" // 全仓持仓未实现盈亏 + "availableBalance": "23.72469206", // 下单可用余额 + "maxWithdrawAmount": "23.72469206", // 最大可转出余额 + "marginAvailable": true, // 是否可用作联合保证金 + "updateTime": 1617939110373 + } + ], + "rateLimits": [ + { + "rateLimitType": "REQUEST_WEIGHT", + "interval": "MINUTE", + "intervalNum": 1, + "limit": 2400, + "count": 20 + } + ] +} \ No newline at end of file diff --git a/订单交易更新推送.txt b/订单交易更新推送.txt new file mode 100644 index 0000000..8264c58 --- /dev/null +++ b/订单交易更新推送.txt @@ -0,0 +1,110 @@ +订单交易更新推送 +事件描述 +当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ORDER_TRADE_UPDATE + +订单方向 + +BUY 买入 +SELL 卖出 +订单类型 + +LIMIT 限价单 +MARKET 市价单 +STOP 止损限价单 +STOP_MARKET 止损市价单 +TAKE_PROFIT 止盈限价单 +TAKE_PROFIT_MARKET 止盈市价单 +TRAILING_STOP_MARKET 跟踪止损单 +LIQUIDATION 爆仓 +本次事件的具体执行类型 + +NEW +CANCELED 已撤 +CALCULATED 订单 ADL 或爆仓 +EXPIRED 订单失效 +TRADE 交易 +AMENDMENT 订单修改 +订单状态 + +NEW +PARTIALLY_FILLED +FILLED +CANCELED +EXPIRED +EXPIRED_IN_MATCH +有效方式: + +GTC +IOC +FOK +GTX +强平和ADL: + +若用户因保证金不足发生强平: +c为"autoclose-XXX",X为"NEW" +若用户保证金充足但被 ADL: +c为“adl_autoclose”,X为“NEW” +过期原因 + +0: 无,默认值 +1: 自成交保护,订单被取消 +2: IOC订单无法完全成交,订单被取消 +3: IOC订单因自成交保护无法完全成交,订单被取消 +4: 只减仓竞争过程中,低优先级的只减仓订单被取消 +5: 账户强平过程中,订单被取消 +6: 不满足GTE条件,订单被取消 +7: Symbol下架,订单被取消 +8: 止盈止损单触发后,初始订单被取消 +9: 市价订单无法完全成交,订单被取消 +事件类型 +ORDER_TRADE_UPDATE + +响应示例 +{ + "e":"ORDER_TRADE_UPDATE", // 事件类型 + "E":1568879465651, // 事件时间 + "T":1568879465650, // 撮合时间 + "o":{ + "s":"BTCUSDT", // 交易对 + "c":"TEST", // 客户端自定订单ID + // 特殊的自定义订单ID: + // "autoclose-"开头的字符串: 系统强平订单 + // "adl_autoclose": ADL自动减仓订单 + // "settlement_autoclose-": 下架或交割的结算订单 + "S":"SELL", // 订单方向 + "o":"TRAILING_STOP_MARKET", // 订单类型 + "f":"GTC", // 有效方式 + "q":"0.001", // 订单原始数量 + "p":"0", // 订单原始价格 + "ap":"0", // 订单平均价格 + "sp":"7103.04", // 条件订单触发价格,对追踪止损单无效 + "x":"NEW", // 本次事件的具体执行类型 + "X":"NEW", // 订单的当前状态 + "i":8886774, // 订单ID + "l":"0", // 订单末次成交量 + "z":"0", // 订单累计已成交量 + "L":"0", // 订单末次成交价格 + "N": "USDT", // 手续费资产类型 + "n": "0", // 手续费数量 + "T":1568879465650, // 成交时间 + "t":0, // 成交ID + "b":"0", // 买单净值 + "a":"9.91", // 卖单净值 + "m": false, // 该成交是作为挂单成交吗? + "R":false , // 是否是只减仓单 + "wt": "CONTRACT_PRICE", // 触发价类型 + "ot": "TRAILING_STOP_MARKET", // 原始订单类型 + "ps":"LONG" // 持仓方向 + "cp":false, // 是否为触发平仓单; 仅在条件订单情况下会推送此字段 + "AP":"7476.89", // 追踪止损激活价格, 仅在追踪止损单时会推送此字段 + "cr":"5.0", // 追踪止损回调比例, 仅在追踪止损单时会推送此字段 + "pP": false, // 是否开启条件单触发保护 + "si": 0, // 忽略 + "ss": 0, // 忽略 + "rp":"0", // 该交易实现盈亏 + "V":"EXPIRE_TAKER", // 自成交防止模式 + "pm":"OPPONENT", // 价格匹配模式 + "gtd":0, // TIF为GTD的订单自动取消时间 + "er":"0" // 过期原因 + } +} \ No newline at end of file diff --git a/账户信息流连接.txt b/账户信息流连接.txt new file mode 100644 index 0000000..bf68cf9 --- /dev/null +++ b/账户信息流连接.txt @@ -0,0 +1,118 @@ +账户信息流连接 +本篇所列出REST接口的baseurl https://fapi.binance.com + +用于订阅账户数据的 listenKey 从创建时刻起有效期为60分钟 + +可以通过PUT一个listenKey延长60分钟有效期,如收到-1125报错提示此listenKey不存在,建议重新使用POST /fapi/v1/listenKey生成listenKey + +可以通过DELETE一个 listenKey 立即关闭当前数据流,并使该listenKey 无效 + +在具有有效listenKey的帐户上执行POST将返回当前有效的listenKey并将其有效期延长60分钟 + +本篇所列出的websocket接口,连接方式如下: + +Base Url: wss://fstream.binance.com +订阅账户数据流的stream名称为 /ws/`` +连接样例: +wss://fstream.binance.com/ws/XaEAKTsQSRLZAGH9tuIu37plSRsdjmlAVBoNYPUITlTAko1WI22PgmBMpI1rS8Yh +每个链接有效期不超过24小时,请妥善处理断线重连。 + +单一账户,单一连接的推送数据流消息可以保证时间序; 强烈建议您使用 E 字段进行排序 + +考虑到剧烈行情下, RESTful接口可能存在查询延迟,我们强烈建议您优先从Websocket user data stream推送的消息来获取订单,仓位等信息。 + + + +生成listenKey (USER_STREAM) +接口描述 +创建一个新的user data stream,返回值为一个listenKey,即websocket订阅的stream名称。如果该帐户具有有效的listenKey,则将返回该listenKey并将其有效期延长60分钟。 + +HTTP请求 +POST /fapi/v1/listenKey + +请求权重 +1 + +请求参数 +None + +响应示例 +{ + "listenKey": "pqia91ma19a5s61cv6a81va65sdf19v8a65a1a5s61cv6a81va65sdf19v8a65a1" +} + + +延长listenKey有效期(USER_STREAM) +接口描述 +有效期延长至本次调用后60分钟 + +HTTP请求 +PUT /fapi/v1/listenKey + +请求权重 +1 + +请求参数 +None + +{ + "listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg" // 被延长的listenkey +} + + +Websocket API生成listenKey (USER_STREAM) +接口描述 +创建一个新的user data stream,返回值为一个listenKey,即websocket订阅的stream名称。如果该帐户具有有效的listenKey,则将返回该listenKey并将其有效期延长60分钟。 + +方式 +userDataStream.start + +请求 +{ + "id": "d3df8a61-98ea-4fe0-8f4e-0fcea5d418b0", + "method": "userDataStream.start", + "params": { + "apiKey": "vmPUZE6mv9SD5VNHk4HlWFsOr6aKE2zvsw0MuIgwCIPy6utIco14y7Ju91duEh8A" + } +} + + +Websocket API延长listenKey有效期(USER_STREAM) +接口描述 +有效期延长至本次调用后60分钟 + +方式 +userDataStream.ping + +请求 +{ + "id": "815d5fce-0880-4287-a567-80badf004c74", + "method": "userDataStream.ping", + "params": { + "apiKey": "vmPUZE6mv9SD5VNHk9HlWFsOr9aLE2zvsw0MuIgwCIPy8atIco14y7Ju91duEh8A" + } +} + +请求权重 +1 + +请求参数 +None + +响应示例 +{ + "id": "815d5fce-0880-4287-a567-80badf004c74", + "status": 200, + "result": { + "listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg" + }, + "rateLimits": [ + { + "rateLimitType": "REQUEST_WEIGHT", + "interval": "MINUTE", + "intervalNum": 1, + "limit": 2400, + "count": 2 + } + ] +} \ No newline at end of file