auto_trade_sys/trading_system/ws_trade_client.py
薇薇安 e5bc2547aa feat(binance_client): 引入WebSocket交易客户端以优化下单逻辑
在 `binance_client.py` 中新增 WebSocket 交易客户端的延迟初始化,优先使用 WebSocket 下单以减少 REST 超时。更新 `futures_create_algo_order` 方法,尝试通过 WebSocket 创建条件单,并在失败时回退到 REST 调用。同时,调整 `ALGO_ORDER_TIMEOUT_SEC` 的默认值为 45秒,以应对高负载情况。增强了异常处理和日志记录,确保系统的稳定性和可追溯性。
2026-02-16 19:19:56 +08:00

204 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSocket 交易客户端:通过 WS API 下单order.place / algoOrder.place优先于 REST减少超时。
端点wss://ws-fapi.binance.com/ws-fapi/v1正式wss://testnet.binancefuture.com/ws-fapi/v1测试
文档ws交易接口.txt
"""
import asyncio
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class WSTradeClient:
"""WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。"""
def __init__(self, api_key: str, api_secret: str, testnet: bool = False):
self.api_key = api_key
self.api_secret = api_secret
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response]
self._conn_start_time: Optional[float] = None
self._last_pong_time: Optional[float] = None
self._lock = asyncio.Lock()
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://testnet.binancefuture.com/ws-fapi/v1"
return "wss://ws-fapi.binance.com/ws-fapi/v1"
async def start(self) -> bool:
"""启动 WS 连接与消息处理循环。"""
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("WSTradeClient: 已启动WS 交易接口)")
return True
async def stop(self):
"""停止 WS 连接。"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
# 取消所有 pending 请求
async with self._lock:
for fut in self._pending_requests.values():
if not fut.done():
fut.cancel()
self._pending_requests.clear()
logger.info("WSTradeClient: 已停止")
def is_connected(self) -> bool:
"""检查 WS 是否已连接。"""
return self._ws is not None and not self._ws.closed
async def _run_ws(self):
"""连接 WS 并处理消息;断线后自动重连。"""
import aiohttp
_24h_sec = 23 * 3600 # 24 小时主动重连
reconnect_delay = 2
while self._running:
url = self._ws_base_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=60) as ws:
self._ws = ws
self._conn_start_time = time.time()
self._last_pong_time = time.time()
reconnect_delay = 2
logger.info(f"WSTradeClient: 已连接到 {url}")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info("WSTradeClient: WS 连接关闭")
break
elif msg.type == aiohttp.WSMsgType.PING:
await ws.pong()
self._last_pong_time = time.time()
elif msg.type == aiohttp.WSMsgType.PONG:
self._last_pong_time = time.time()
# 检查 24h 重连
if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec:
logger.info("WSTradeClient: 连接已接近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"WSTradeClient: 连接异常: {e}{reconnect_delay} 秒后重连...")
finally:
self._ws = None
self._conn_start_time = None
if not self._running:
break
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 60)
async def _handle_message(self, data: str):
"""处理 WS 消息:匹配请求-响应。"""
try:
msg = json.loads(data)
req_id = str(msg.get("id", ""))
if not req_id or req_id not in self._pending_requests:
logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}")
return
fut = self._pending_requests.pop(req_id, None)
if fut and not fut.done():
if msg.get("status") == 200:
fut.set_result(msg.get("result"))
else:
error_msg = msg.get("error", {}).get("msg", "Unknown error")
error_code = msg.get("error", {}).get("code", -1)
# 创建类似 BinanceAPIException 的异常
class WSTradeAPIException(Exception):
def __init__(self, message, code):
super().__init__(message)
self.message = message
self.code = code
exc = WSTradeAPIException(error_msg, error_code)
fut.set_exception(exc)
except Exception as e:
logger.error(f"WSTradeClient: 处理消息失败: {e}")
async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""发送 WS 请求并等待响应(带超时)。"""
if not self.is_connected():
raise ConnectionError("WS 未连接")
req_id = str(uuid.uuid4())
req = {
"id": req_id,
"method": method,
"params": params,
}
fut = asyncio.Future()
async with self._lock:
self._pending_requests[req_id] = fut
try:
await self._ws.send_str(json.dumps(req))
result = await asyncio.wait_for(fut, timeout=timeout)
return result
except asyncio.TimeoutError:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}")
except Exception as e:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise
finally:
if req_id in self._pending_requests:
async with self._lock:
self._pending_requests.pop(req_id, None)
async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""
通过 WS 发送 algoOrder.place 请求(条件单)。
Args:
params: 订单参数(需包含 apiKey, timestamp, signature其他同 REST
timeout: 等待响应超时(秒)
Returns:
订单结果(含 algoId 等),失败返回 None 或抛异常
"""
# 确保 params 包含必要字段
ws_params = dict(params)
if "apiKey" not in ws_params:
ws_params["apiKey"] = self.api_key
if "timestamp" not in ws_params:
ws_params["timestamp"] = int(time.time() * 1000)
# signature 应由调用方计算(使用与 REST 相同的签名逻辑)
if "signature" not in ws_params:
logger.warning("WSTradeClient: params 缺少 signatureWS 请求可能失败")
try:
result = await self._send_request("algoOrder.place", ws_params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: algoOrder.place 异常: {e}")
raise