From e5bc2547aa4c3f73c3eb5686854f9746935ed652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Mon, 16 Feb 2026 19:19:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(binance=5Fclient):=20=E5=BC=95=E5=85=A5Web?= =?UTF-8?q?Socket=E4=BA=A4=E6=98=93=E5=AE=A2=E6=88=B7=E7=AB=AF=E4=BB=A5?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=B8=8B=E5=8D=95=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `binance_client.py` 中新增 WebSocket 交易客户端的延迟初始化,优先使用 WebSocket 下单以减少 REST 超时。更新 `futures_create_algo_order` 方法,尝试通过 WebSocket 创建条件单,并在失败时回退到 REST 调用。同时,调整 `ALGO_ORDER_TIMEOUT_SEC` 的默认值为 45秒,以应对高负载情况。增强了异常处理和日志记录,确保系统的稳定性和可追溯性。 --- docs/WS交易接口实现说明.md | 66 ++++++++++ trading_system/binance_client.py | 154 +++++++++++++++------- trading_system/config.py | 4 +- trading_system/ws_trade_client.py | 203 +++++++++++++++++++++++++++++ 4 files changed, 378 insertions(+), 49 deletions(-) create mode 100644 docs/WS交易接口实现说明.md create mode 100644 trading_system/ws_trade_client.py diff --git a/docs/WS交易接口实现说明.md b/docs/WS交易接口实现说明.md new file mode 100644 index 0000000..2cb38cf --- /dev/null +++ b/docs/WS交易接口实现说明.md @@ -0,0 +1,66 @@ +# WebSocket 交易接口实现说明 + +## 概述 + +已实现通过 **WebSocket API** 创建 Algo 条件单(止损/止盈),优先于 REST,减少「请求超时」问题。 + +--- + +## 实现内容 + +### 1. 新增文件 + +- **`trading_system/ws_trade_client.py`**:WebSocket 交易客户端 + - 连接到 `wss://ws-fapi.binance.com/ws-fapi/v1`(正式)或 `wss://testnet.binancefuture.com/ws-fapi/v1`(测试) + - 实现 `algoOrder.place` 方法,发送条件单请求 + - 处理请求-响应匹配(用 `id` 字段) + - 自动重连、24h 主动重连、ping/pong 保活 + +### 2. 修改文件 + +- **`trading_system/binance_client.py`**: + - `__init__`:添加 `_ws_trade_client` 属性 + - `connect()`:连接成功后启动 WS 交易客户端 + - `futures_create_algo_order()`:**优先用 WS 发送条件单,失败回退 REST** + - `disconnect()`:停止 WS 交易客户端 + +--- + +## 工作流程 + +1. **启动时**:`BinanceClient.connect()` 成功后,若 API 密钥有效,自动启动 `WSTradeClient` 并连接到 `ws-fapi`。 +2. **下单时**: + - 若 WS 已连接 → 用 `algoOrder.place` 发送(带签名),等待响应(超时 20-25 秒) + - WS 失败或未连接 → 回退到 REST `POST /fapi/v1/algoOrder`(原有逻辑,45 秒超时 + 重试) +3. **响应格式**:WS 返回的 `result` 字段(含 `algoId` 等)直接返回,与 REST 格式兼容。 + +--- + +## 优势 + +- **减少超时**:WS 长连接,无每次 HTTP 建连开销,响应通常更快 +- **自动回退**:WS 失败时自动用 REST,不影响现有逻辑 +- **透明切换**:调用方无需改动,`futures_create_algo_order()` 自动选择最优方式 + +--- + +## 配置 + +- **超时时间**:WS 超时 = `ALGO_ORDER_TIMEOUT_SEC - 5`(最小 20 秒),REST 超时 = `ALGO_ORDER_TIMEOUT_SEC`(默认 45 秒) +- **连接管理**:WS 连接自动重连、24h 主动重连、ping/pong 保活 + +--- + +## 日志 + +- WS 连接成功:`✓ WebSocket 交易客户端已启动(条件单优先走 WS,失败回退 REST)` +- WS 下单成功:`{symbol} ✓ WS 条件单创建成功: algoId=...` +- WS 失败回退:`{symbol} WS 条件单失败(...),回退到 REST` + +--- + +## 注意事项 + +- WS 交易接口需要有效的 API 密钥(与 REST 相同) +- 签名计算方式与 REST 一致(HMAC-SHA256) +- 若 WS 连接失败,不影响 REST 下单功能 diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 8657958..b806581 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -214,6 +214,9 @@ class BinanceClient: # 这里缩短到 10s,兼顾及时性与接口频率。 self._position_mode_ttl: float = 10.0 # 秒,避免频繁调用接口 + # WebSocket 交易客户端(延迟初始化,仅在需要时创建) + self._ws_trade_client = None + # 隐藏敏感信息,只显示前4位和后4位 api_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key api_secret_display = f"{self.api_secret[:4]}...{self.api_secret[-4:]}" if self.api_secret and len(self.api_secret) > 8 else self.api_secret @@ -320,6 +323,17 @@ class BinanceClient: except Exception as e: logger.debug(f"读取持仓模式失败(可忽略,后续下单会再尝试): {e}") + # 启动 WebSocket 交易客户端(优先用 WS 下单,减少 REST 超时) + if self.api_key and self.api_secret: + try: + from .ws_trade_client import WSTradeClient + self._ws_trade_client = WSTradeClient(self.api_key, self.api_secret, self.testnet) + await self._ws_trade_client.start() + logger.info("✓ WebSocket 交易客户端已启动(条件单优先走 WS,失败回退 REST)") + except Exception as e: + logger.warning(f"启动 WebSocket 交易客户端失败(将仅用 REST): {e}") + self._ws_trade_client = None + return except asyncio.TimeoutError as e: @@ -527,6 +541,14 @@ class BinanceClient: async def disconnect(self): """断开连接""" + # 停止 WebSocket 交易客户端 + if self._ws_trade_client: + try: + await self._ws_trade_client.stop() + except Exception as e: + logger.debug(f"停止 WS 交易客户端失败: {e}") + self._ws_trade_client = None + # 关闭 Redis 连接 await self.redis_cache.close() @@ -1999,53 +2021,91 @@ class BinanceClient: async def futures_create_algo_order(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: symbol = params.get('symbol', 'UNKNOWN') - algo_timeout = getattr(config, 'ALGO_ORDER_TIMEOUT_SEC', 30) # 条件单接口易超时,单独加长 - try: - # python-binance 内部会自动补 timestamp / signature;用较长超时避免 TimeoutError 空消息 - res = await asyncio.wait_for( - self.client._request_futures_api("post", "algoOrder", True, data=params), - timeout=algo_timeout, - ) - return res if isinstance(res, dict) else None - except asyncio.TimeoutError: - logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: 请求超时({algo_timeout}秒)") - logger.error(f" 参数: {params}") - return None - except BinanceAPIException as e: - error_code = e.code if hasattr(e, 'code') else None - error_msg = str(e) - symbol = params.get('symbol', 'UNKNOWN') - trigger_type = params.get('type', 'UNKNOWN') - - # 详细错误日志 - logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}") - logger.error(f" 错误代码: {error_code}") - logger.error(f" 参数: {params}") - - # 常见错误码处理 - if error_code == -4014: - logger.error(f" 原因: 价格步长错误,triggerPrice 需要调整到 tickSize 的倍数") - elif error_code == -4164: - logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT)") - elif error_code == -2022: - logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)") - elif error_code == -4120: - logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)") - elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower(): - logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)") - # 关键修复:抛出此异常,让上层(PositionManager)捕获并执行市价平仓 - raise e - elif "position" in error_msg.lower(): - logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)") - - return None - except Exception as e: - err_msg = getattr(e, "message", str(e)) or repr(e) - logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {err_msg}") - logger.error(f" 参数: {params}") - import traceback - logger.debug(f" 堆栈跟踪: {traceback.format_exc()}") - return None + # 优先从 TRADING_CONFIG 读(DB 可配),否则用 config 或默认 45 + algo_timeout = 45 + if getattr(config, 'TRADING_CONFIG', None): + algo_timeout = int(config.TRADING_CONFIG.get('ALGO_ORDER_TIMEOUT_SEC') or algo_timeout) + else: + algo_timeout = getattr(config, 'ALGO_ORDER_TIMEOUT_SEC', algo_timeout) + algo_timeout = max(15, min(120, int(algo_timeout))) + + # 优先尝试 WebSocket 下单(减少 REST 超时) + if self._ws_trade_client and self._ws_trade_client.is_connected(): + try: + # 准备 WS 参数(需包含 apiKey, timestamp, signature) + ws_params = dict(params) + ws_params["apiKey"] = self.api_key + if "timestamp" not in ws_params: + ws_params["timestamp"] = int(time.time() * 1000) + # 计算签名(和 REST 一样的方式) + if "signature" not in ws_params: + import hmac + import hashlib + from urllib.parse import urlencode + query_string = urlencode(sorted([(k, str(v)) for k, v in ws_params.items() if k != "signature"])) + signature = hmac.new( + self.api_secret.encode('utf-8'), + query_string.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + ws_params["signature"] = signature + # WS 请求(超时时间略短于 REST,因为 WS 通常更快) + ws_timeout = min(algo_timeout - 5, 25) if algo_timeout > 25 else algo_timeout + result = await self._ws_trade_client.algo_order_place(ws_params, timeout=ws_timeout) + if result: + logger.debug(f"{symbol} ✓ WS 条件单创建成功: algoId={result.get('algoId')}") + # 转换响应格式以兼容 REST 返回格式(algoId 等字段) + return result + except (ConnectionError, TimeoutError) as e: + logger.debug(f"{symbol} WS 条件单失败({e}),回退到 REST") + except Exception as e: + logger.debug(f"{symbol} WS 条件单异常: {e},回退到 REST") + + # 回退到 REST(原有逻辑) + for attempt in range(2): # 超时重试 1 次,避免偶发网络抖动直接失败 + try: + res = await asyncio.wait_for( + self.client._request_futures_api("post", "algoOrder", True, data=params), + timeout=algo_timeout, + ) + return res if isinstance(res, dict) else None + except asyncio.TimeoutError: + if attempt == 0: + logger.warning(f"{symbol} 创建 Algo 条件单超时({algo_timeout}秒),2 秒后重试一次...") + await asyncio.sleep(2) + continue + logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: 请求超时({algo_timeout}秒,已重试)") + logger.error(f" 参数: {params}") + return None + except BinanceAPIException as e: + error_code = e.code if hasattr(e, 'code') else None + error_msg = str(e) + trigger_type = params.get('type', 'UNKNOWN') + logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}") + logger.error(f" 错误代码: {error_code}") + logger.error(f" 参数: {params}") + if error_code == -4014: + logger.error(f" 原因: 价格步长错误,triggerPrice 需要调整到 tickSize 的倍数") + elif error_code == -4164: + logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT)") + elif error_code == -2022: + logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)") + elif error_code == -4120: + logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)") + elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower(): + logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)") + raise e + elif "position" in error_msg.lower(): + logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)") + return None + except Exception as e: + err_msg = getattr(e, "message", str(e)) or repr(e) + logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {err_msg}") + logger.error(f" 参数: {params}") + import traceback + logger.debug(f" 堆栈跟踪: {traceback.format_exc()}") + return None + return None async def futures_get_open_algo_orders(self, symbol: Optional[str] = None, algo_type: str = "CONDITIONAL") -> List[Dict[str, Any]]: try: diff --git a/trading_system/config.py b/trading_system/config.py index ee53e93..1ec7208 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -421,8 +421,8 @@ CONNECTION_RETRIES = int(os.getenv('CONNECTION_RETRIES', '3')) # 连接重试 # 仅用于 get_open_positions / get_recent_trades 等只读接口的单次等待时间,不影响下单/止损止盈的快速失败 # 调大此值会延长单次请求最大等待时间,在同步/查询持仓时可能阻塞事件循环,影响实时性;保持 60 秒,通过增加重试+退避应对偶发超时 READ_ONLY_REQUEST_TIMEOUT = int(os.getenv('READ_ONLY_REQUEST_TIMEOUT', '60')) -# 创建 Algo 条件单(止损/止盈)单次请求超时(秒),网络不稳时可适当调大 -ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '30')) +# 创建 Algo 条件单(止损/止盈)单次请求超时(秒),币安 Algo 接口在高负载时易超时,默认 45 秒;网络不稳可调大至 60 +ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '45')) # 获取持仓时过滤掉名义价值低于此值的仓位(USDT),与币安仪表板不一致时可调低或设为 0 POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0')) diff --git a/trading_system/ws_trade_client.py b/trading_system/ws_trade_client.py new file mode 100644 index 0000000..fa145d0 --- /dev/null +++ b/trading_system/ws_trade_client.py @@ -0,0 +1,203 @@ +""" +WebSocket 交易客户端:通过 WS API 下单(order.place / algoOrder.place),优先于 REST,减少超时。 +端点:wss://ws-fapi.binance.com/ws-fapi/v1(正式),wss://testnet.binancefuture.com/ws-fapi/v1(测试) +文档:ws交易接口.txt +""" +import asyncio +import json +import logging +import time +import uuid +from typing import Dict, Optional, Any +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + + +class WSTradeClient: + """WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。""" + + def __init__(self, api_key: str, api_secret: str, testnet: bool = False): + self.api_key = api_key + self.api_secret = api_secret + self.testnet = testnet + self._ws = None + self._task: Optional[asyncio.Task] = None + self._running = False + self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response] + self._conn_start_time: Optional[float] = None + self._last_pong_time: Optional[float] = None + self._lock = asyncio.Lock() + + def _ws_base_url(self) -> str: + if self.testnet: + return "wss://testnet.binancefuture.com/ws-fapi/v1" + return "wss://ws-fapi.binance.com/ws-fapi/v1" + + async def start(self) -> bool: + """启动 WS 连接与消息处理循环。""" + if self._running: + return True + self._running = True + self._task = asyncio.create_task(self._run_ws()) + logger.info("WSTradeClient: 已启动(WS 交易接口)") + return True + + async def stop(self): + """停止 WS 连接。""" + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + if self._ws: + try: + await self._ws.close() + except Exception: + pass + self._ws = None + # 取消所有 pending 请求 + async with self._lock: + for fut in self._pending_requests.values(): + if not fut.done(): + fut.cancel() + self._pending_requests.clear() + logger.info("WSTradeClient: 已停止") + + def is_connected(self) -> bool: + """检查 WS 是否已连接。""" + return self._ws is not None and not self._ws.closed + + async def _run_ws(self): + """连接 WS 并处理消息;断线后自动重连。""" + import aiohttp + _24h_sec = 23 * 3600 # 24 小时主动重连 + reconnect_delay = 2 + while self._running: + url = self._ws_base_url() + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(url, heartbeat=60) as ws: + self._ws = ws + self._conn_start_time = time.time() + self._last_pong_time = time.time() + reconnect_delay = 2 + logger.info(f"WSTradeClient: 已连接到 {url}") + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self._handle_message(msg.data) + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}") + break + elif msg.type == aiohttp.WSMsgType.CLOSE: + logger.info("WSTradeClient: WS 连接关闭") + break + elif msg.type == aiohttp.WSMsgType.PING: + await ws.pong() + self._last_pong_time = time.time() + elif msg.type == aiohttp.WSMsgType.PONG: + self._last_pong_time = time.time() + # 检查 24h 重连 + if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec: + logger.info("WSTradeClient: 连接已接近 24h,主动重连") + break + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"WSTradeClient: 连接异常: {e},{reconnect_delay} 秒后重连...") + finally: + self._ws = None + self._conn_start_time = None + if not self._running: + break + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 60) + + async def _handle_message(self, data: str): + """处理 WS 消息:匹配请求-响应。""" + try: + msg = json.loads(data) + req_id = str(msg.get("id", "")) + if not req_id or req_id not in self._pending_requests: + logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}") + return + fut = self._pending_requests.pop(req_id, None) + if fut and not fut.done(): + if msg.get("status") == 200: + fut.set_result(msg.get("result")) + else: + error_msg = msg.get("error", {}).get("msg", "Unknown error") + error_code = msg.get("error", {}).get("code", -1) + # 创建类似 BinanceAPIException 的异常 + class WSTradeAPIException(Exception): + def __init__(self, message, code): + super().__init__(message) + self.message = message + self.code = code + exc = WSTradeAPIException(error_msg, error_code) + fut.set_exception(exc) + except Exception as e: + logger.error(f"WSTradeClient: 处理消息失败: {e}") + + async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]: + """发送 WS 请求并等待响应(带超时)。""" + if not self.is_connected(): + raise ConnectionError("WS 未连接") + req_id = str(uuid.uuid4()) + req = { + "id": req_id, + "method": method, + "params": params, + } + fut = asyncio.Future() + async with self._lock: + self._pending_requests[req_id] = fut + try: + await self._ws.send_str(json.dumps(req)) + result = await asyncio.wait_for(fut, timeout=timeout) + return result + except asyncio.TimeoutError: + async with self._lock: + self._pending_requests.pop(req_id, None) + raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}") + except Exception as e: + async with self._lock: + self._pending_requests.pop(req_id, None) + raise + finally: + if req_id in self._pending_requests: + async with self._lock: + self._pending_requests.pop(req_id, None) + + async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]: + """ + 通过 WS 发送 algoOrder.place 请求(条件单)。 + + Args: + params: 订单参数(需包含 apiKey, timestamp, signature;其他同 REST) + timeout: 等待响应超时(秒) + + Returns: + 订单结果(含 algoId 等),失败返回 None 或抛异常 + """ + # 确保 params 包含必要字段 + ws_params = dict(params) + if "apiKey" not in ws_params: + ws_params["apiKey"] = self.api_key + if "timestamp" not in ws_params: + ws_params["timestamp"] = int(time.time() * 1000) + # signature 应由调用方计算(使用与 REST 相同的签名逻辑) + if "signature" not in ws_params: + logger.warning("WSTradeClient: params 缺少 signature,WS 请求可能失败") + try: + result = await self._send_request("algoOrder.place", ws_params, timeout=timeout) + return result + except (ConnectionError, TimeoutError) as e: + logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}") + raise + except Exception as e: + logger.error(f"WSTradeClient: algoOrder.place 异常: {e}") + raise