feat(binance_client): 引入WebSocket交易客户端以优化下单逻辑

在 `binance_client.py` 中新增 WebSocket 交易客户端的延迟初始化,优先使用 WebSocket 下单以减少 REST 超时。更新 `futures_create_algo_order` 方法,尝试通过 WebSocket 创建条件单,并在失败时回退到 REST 调用。同时,调整 `ALGO_ORDER_TIMEOUT_SEC` 的默认值为 45秒,以应对高负载情况。增强了异常处理和日志记录,确保系统的稳定性和可追溯性。
This commit is contained in:
薇薇安 2026-02-16 19:19:56 +08:00
parent 857128bca9
commit e5bc2547aa
4 changed files with 378 additions and 49 deletions

View File

@ -0,0 +1,66 @@
# WebSocket 交易接口实现说明
## 概述
已实现通过 **WebSocket API** 创建 Algo 条件单(止损/止盈),优先于 REST减少「请求超时」问题。
---
## 实现内容
### 1. 新增文件
- **`trading_system/ws_trade_client.py`**WebSocket 交易客户端
- 连接到 `wss://ws-fapi.binance.com/ws-fapi/v1`(正式)或 `wss://testnet.binancefuture.com/ws-fapi/v1`(测试)
- 实现 `algoOrder.place` 方法,发送条件单请求
- 处理请求-响应匹配(用 `id` 字段)
- 自动重连、24h 主动重连、ping/pong 保活
### 2. 修改文件
- **`trading_system/binance_client.py`**
- `__init__`:添加 `_ws_trade_client` 属性
- `connect()`:连接成功后启动 WS 交易客户端
- `futures_create_algo_order()`**优先用 WS 发送条件单,失败回退 REST**
- `disconnect()`:停止 WS 交易客户端
---
## 工作流程
1. **启动时**`BinanceClient.connect()` 成功后,若 API 密钥有效,自动启动 `WSTradeClient` 并连接到 `ws-fapi`
2. **下单时**
- 若 WS 已连接 → 用 `algoOrder.place` 发送(带签名),等待响应(超时 20-25 秒)
- WS 失败或未连接 → 回退到 REST `POST /fapi/v1/algoOrder`原有逻辑45 秒超时 + 重试)
3. **响应格式**WS 返回的 `result` 字段(含 `algoId` 等)直接返回,与 REST 格式兼容。
---
## 优势
- **减少超时**WS 长连接,无每次 HTTP 建连开销,响应通常更快
- **自动回退**WS 失败时自动用 REST不影响现有逻辑
- **透明切换**:调用方无需改动,`futures_create_algo_order()` 自动选择最优方式
---
## 配置
- **超时时间**WS 超时 = `ALGO_ORDER_TIMEOUT_SEC - 5`(最小 20 秒REST 超时 = `ALGO_ORDER_TIMEOUT_SEC`(默认 45 秒)
- **连接管理**WS 连接自动重连、24h 主动重连、ping/pong 保活
---
## 日志
- WS 连接成功:`✓ WebSocket 交易客户端已启动(条件单优先走 WS失败回退 REST`
- WS 下单成功:`{symbol} ✓ WS 条件单创建成功: algoId=...`
- WS 失败回退:`{symbol} WS 条件单失败(...),回退到 REST`
---
## 注意事项
- WS 交易接口需要有效的 API 密钥(与 REST 相同)
- 签名计算方式与 REST 一致HMAC-SHA256
- 若 WS 连接失败,不影响 REST 下单功能

View File

@ -214,6 +214,9 @@ class BinanceClient:
# 这里缩短到 10s兼顾及时性与接口频率。 # 这里缩短到 10s兼顾及时性与接口频率。
self._position_mode_ttl: float = 10.0 # 秒,避免频繁调用接口 self._position_mode_ttl: float = 10.0 # 秒,避免频繁调用接口
# WebSocket 交易客户端(延迟初始化,仅在需要时创建)
self._ws_trade_client = None
# 隐藏敏感信息只显示前4位和后4位 # 隐藏敏感信息只显示前4位和后4位
api_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key api_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key
api_secret_display = f"{self.api_secret[:4]}...{self.api_secret[-4:]}" if self.api_secret and len(self.api_secret) > 8 else self.api_secret api_secret_display = f"{self.api_secret[:4]}...{self.api_secret[-4:]}" if self.api_secret and len(self.api_secret) > 8 else self.api_secret
@ -320,6 +323,17 @@ class BinanceClient:
except Exception as e: except Exception as e:
logger.debug(f"读取持仓模式失败(可忽略,后续下单会再尝试): {e}") logger.debug(f"读取持仓模式失败(可忽略,后续下单会再尝试): {e}")
# 启动 WebSocket 交易客户端(优先用 WS 下单,减少 REST 超时)
if self.api_key and self.api_secret:
try:
from .ws_trade_client import WSTradeClient
self._ws_trade_client = WSTradeClient(self.api_key, self.api_secret, self.testnet)
await self._ws_trade_client.start()
logger.info("✓ WebSocket 交易客户端已启动(条件单优先走 WS失败回退 REST")
except Exception as e:
logger.warning(f"启动 WebSocket 交易客户端失败(将仅用 REST: {e}")
self._ws_trade_client = None
return return
except asyncio.TimeoutError as e: except asyncio.TimeoutError as e:
@ -527,6 +541,14 @@ class BinanceClient:
async def disconnect(self): async def disconnect(self):
"""断开连接""" """断开连接"""
# 停止 WebSocket 交易客户端
if self._ws_trade_client:
try:
await self._ws_trade_client.stop()
except Exception as e:
logger.debug(f"停止 WS 交易客户端失败: {e}")
self._ws_trade_client = None
# 关闭 Redis 连接 # 关闭 Redis 连接
await self.redis_cache.close() await self.redis_cache.close()
@ -1999,53 +2021,91 @@ class BinanceClient:
async def futures_create_algo_order(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: async def futures_create_algo_order(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
symbol = params.get('symbol', 'UNKNOWN') symbol = params.get('symbol', 'UNKNOWN')
algo_timeout = getattr(config, 'ALGO_ORDER_TIMEOUT_SEC', 30) # 条件单接口易超时,单独加长 # 优先从 TRADING_CONFIG 读DB 可配),否则用 config 或默认 45
try: algo_timeout = 45
# python-binance 内部会自动补 timestamp / signature用较长超时避免 TimeoutError 空消息 if getattr(config, 'TRADING_CONFIG', None):
res = await asyncio.wait_for( algo_timeout = int(config.TRADING_CONFIG.get('ALGO_ORDER_TIMEOUT_SEC') or algo_timeout)
self.client._request_futures_api("post", "algoOrder", True, data=params), else:
timeout=algo_timeout, algo_timeout = getattr(config, 'ALGO_ORDER_TIMEOUT_SEC', algo_timeout)
) algo_timeout = max(15, min(120, int(algo_timeout)))
return res if isinstance(res, dict) else None
except asyncio.TimeoutError: # 优先尝试 WebSocket 下单(减少 REST 超时)
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: 请求超时({algo_timeout}秒)") if self._ws_trade_client and self._ws_trade_client.is_connected():
logger.error(f" 参数: {params}") try:
return None # 准备 WS 参数(需包含 apiKey, timestamp, signature
except BinanceAPIException as e: ws_params = dict(params)
error_code = e.code if hasattr(e, 'code') else None ws_params["apiKey"] = self.api_key
error_msg = str(e) if "timestamp" not in ws_params:
symbol = params.get('symbol', 'UNKNOWN') ws_params["timestamp"] = int(time.time() * 1000)
trigger_type = params.get('type', 'UNKNOWN') # 计算签名(和 REST 一样的方式)
if "signature" not in ws_params:
# 详细错误日志 import hmac
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}") import hashlib
logger.error(f" 错误代码: {error_code}") from urllib.parse import urlencode
logger.error(f" 参数: {params}") query_string = urlencode(sorted([(k, str(v)) for k, v in ws_params.items() if k != "signature"]))
signature = hmac.new(
# 常见错误码处理 self.api_secret.encode('utf-8'),
if error_code == -4014: query_string.encode('utf-8'),
logger.error(f" 原因: 价格步长错误triggerPrice 需要调整到 tickSize 的倍数") hashlib.sha256
elif error_code == -4164: ).hexdigest()
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT") ws_params["signature"] = signature
elif error_code == -2022: # WS 请求(超时时间略短于 REST因为 WS 通常更快)
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)") ws_timeout = min(algo_timeout - 5, 25) if algo_timeout > 25 else algo_timeout
elif error_code == -4120: result = await self._ws_trade_client.algo_order_place(ws_params, timeout=ws_timeout)
logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)") if result:
elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower(): logger.debug(f"{symbol} ✓ WS 条件单创建成功: algoId={result.get('algoId')}")
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)") # 转换响应格式以兼容 REST 返回格式algoId 等字段)
# 关键修复抛出此异常让上层PositionManager捕获并执行市价平仓 return result
raise e except (ConnectionError, TimeoutError) as e:
elif "position" in error_msg.lower(): logger.debug(f"{symbol} WS 条件单失败({e}),回退到 REST")
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)") except Exception as e:
logger.debug(f"{symbol} WS 条件单异常: {e},回退到 REST")
return None
except Exception as e: # 回退到 REST原有逻辑
err_msg = getattr(e, "message", str(e)) or repr(e) for attempt in range(2): # 超时重试 1 次,避免偶发网络抖动直接失败
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {err_msg}") try:
logger.error(f" 参数: {params}") res = await asyncio.wait_for(
import traceback self.client._request_futures_api("post", "algoOrder", True, data=params),
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}") timeout=algo_timeout,
return None )
return res if isinstance(res, dict) else None
except asyncio.TimeoutError:
if attempt == 0:
logger.warning(f"{symbol} 创建 Algo 条件单超时({algo_timeout}秒)2 秒后重试一次...")
await asyncio.sleep(2)
continue
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: 请求超时({algo_timeout}秒,已重试)")
logger.error(f" 参数: {params}")
return None
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
trigger_type = params.get('type', 'UNKNOWN')
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}")
logger.error(f" 错误代码: {error_code}")
logger.error(f" 参数: {params}")
if error_code == -4014:
logger.error(f" 原因: 价格步长错误triggerPrice 需要调整到 tickSize 的倍数")
elif error_code == -4164:
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT")
elif error_code == -2022:
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)")
elif error_code == -4120:
logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)")
elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower():
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)")
raise e
elif "position" in error_msg.lower():
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)")
return None
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {err_msg}")
logger.error(f" 参数: {params}")
import traceback
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}")
return None
return None
async def futures_get_open_algo_orders(self, symbol: Optional[str] = None, algo_type: str = "CONDITIONAL") -> List[Dict[str, Any]]: async def futures_get_open_algo_orders(self, symbol: Optional[str] = None, algo_type: str = "CONDITIONAL") -> List[Dict[str, Any]]:
try: try:

View File

@ -421,8 +421,8 @@ CONNECTION_RETRIES = int(os.getenv('CONNECTION_RETRIES', '3')) # 连接重试
# 仅用于 get_open_positions / get_recent_trades 等只读接口的单次等待时间,不影响下单/止损止盈的快速失败 # 仅用于 get_open_positions / get_recent_trades 等只读接口的单次等待时间,不影响下单/止损止盈的快速失败
# 调大此值会延长单次请求最大等待时间,在同步/查询持仓时可能阻塞事件循环,影响实时性;保持 60 秒,通过增加重试+退避应对偶发超时 # 调大此值会延长单次请求最大等待时间,在同步/查询持仓时可能阻塞事件循环,影响实时性;保持 60 秒,通过增加重试+退避应对偶发超时
READ_ONLY_REQUEST_TIMEOUT = int(os.getenv('READ_ONLY_REQUEST_TIMEOUT', '60')) READ_ONLY_REQUEST_TIMEOUT = int(os.getenv('READ_ONLY_REQUEST_TIMEOUT', '60'))
# 创建 Algo 条件单(止损/止盈)单次请求超时(秒),网络不稳适当调大 # 创建 Algo 条件单(止损/止盈)单次请求超时(秒),币安 Algo 接口在高负载时易超时,默认 45 秒;网络不稳可调大至 60
ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '30')) ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '45'))
# 获取持仓时过滤掉名义价值低于此值的仓位USDT与币安仪表板不一致时可调低或设为 0 # 获取持仓时过滤掉名义价值低于此值的仓位USDT与币安仪表板不一致时可调低或设为 0
POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0')) POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0'))

View File

@ -0,0 +1,203 @@
"""
WebSocket 交易客户端通过 WS API 下单order.place / algoOrder.place优先于 REST减少超时
端点wss://ws-fapi.binance.com/ws-fapi/v1正式wss://testnet.binancefuture.com/ws-fapi/v1测试
文档ws交易接口.txt
"""
import asyncio
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class WSTradeClient:
"""WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。"""
def __init__(self, api_key: str, api_secret: str, testnet: bool = False):
self.api_key = api_key
self.api_secret = api_secret
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response]
self._conn_start_time: Optional[float] = None
self._last_pong_time: Optional[float] = None
self._lock = asyncio.Lock()
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://testnet.binancefuture.com/ws-fapi/v1"
return "wss://ws-fapi.binance.com/ws-fapi/v1"
async def start(self) -> bool:
"""启动 WS 连接与消息处理循环。"""
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("WSTradeClient: 已启动WS 交易接口)")
return True
async def stop(self):
"""停止 WS 连接。"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
# 取消所有 pending 请求
async with self._lock:
for fut in self._pending_requests.values():
if not fut.done():
fut.cancel()
self._pending_requests.clear()
logger.info("WSTradeClient: 已停止")
def is_connected(self) -> bool:
"""检查 WS 是否已连接。"""
return self._ws is not None and not self._ws.closed
async def _run_ws(self):
"""连接 WS 并处理消息;断线后自动重连。"""
import aiohttp
_24h_sec = 23 * 3600 # 24 小时主动重连
reconnect_delay = 2
while self._running:
url = self._ws_base_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=60) as ws:
self._ws = ws
self._conn_start_time = time.time()
self._last_pong_time = time.time()
reconnect_delay = 2
logger.info(f"WSTradeClient: 已连接到 {url}")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info("WSTradeClient: WS 连接关闭")
break
elif msg.type == aiohttp.WSMsgType.PING:
await ws.pong()
self._last_pong_time = time.time()
elif msg.type == aiohttp.WSMsgType.PONG:
self._last_pong_time = time.time()
# 检查 24h 重连
if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec:
logger.info("WSTradeClient: 连接已接近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"WSTradeClient: 连接异常: {e}{reconnect_delay} 秒后重连...")
finally:
self._ws = None
self._conn_start_time = None
if not self._running:
break
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 60)
async def _handle_message(self, data: str):
"""处理 WS 消息:匹配请求-响应。"""
try:
msg = json.loads(data)
req_id = str(msg.get("id", ""))
if not req_id or req_id not in self._pending_requests:
logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}")
return
fut = self._pending_requests.pop(req_id, None)
if fut and not fut.done():
if msg.get("status") == 200:
fut.set_result(msg.get("result"))
else:
error_msg = msg.get("error", {}).get("msg", "Unknown error")
error_code = msg.get("error", {}).get("code", -1)
# 创建类似 BinanceAPIException 的异常
class WSTradeAPIException(Exception):
def __init__(self, message, code):
super().__init__(message)
self.message = message
self.code = code
exc = WSTradeAPIException(error_msg, error_code)
fut.set_exception(exc)
except Exception as e:
logger.error(f"WSTradeClient: 处理消息失败: {e}")
async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""发送 WS 请求并等待响应(带超时)。"""
if not self.is_connected():
raise ConnectionError("WS 未连接")
req_id = str(uuid.uuid4())
req = {
"id": req_id,
"method": method,
"params": params,
}
fut = asyncio.Future()
async with self._lock:
self._pending_requests[req_id] = fut
try:
await self._ws.send_str(json.dumps(req))
result = await asyncio.wait_for(fut, timeout=timeout)
return result
except asyncio.TimeoutError:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}")
except Exception as e:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise
finally:
if req_id in self._pending_requests:
async with self._lock:
self._pending_requests.pop(req_id, None)
async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""
通过 WS 发送 algoOrder.place 请求条件单
Args:
params: 订单参数需包含 apiKey, timestamp, signature其他同 REST
timeout: 等待响应超时
Returns:
订单结果 algoId 失败返回 None 或抛异常
"""
# 确保 params 包含必要字段
ws_params = dict(params)
if "apiKey" not in ws_params:
ws_params["apiKey"] = self.api_key
if "timestamp" not in ws_params:
ws_params["timestamp"] = int(time.time() * 1000)
# signature 应由调用方计算(使用与 REST 相同的签名逻辑)
if "signature" not in ws_params:
logger.warning("WSTradeClient: params 缺少 signatureWS 请求可能失败")
try:
result = await self._send_request("algoOrder.place", ws_params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: algoOrder.place 异常: {e}")
raise