""" WebSocket 交易客户端:通过 WS API 下单(order.place / algoOrder.place),优先于 REST,减少超时。 端点:wss://ws-fapi.binance.com/ws-fapi/v1(正式),wss://testnet.binancefuture.com/ws-fapi/v1(测试) 文档:ws交易接口.txt """ import asyncio import json import logging import time import uuid from typing import Dict, Optional, Any from datetime import datetime, timezone logger = logging.getLogger(__name__) class WSTradeClient: """WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。""" def __init__(self, api_key: str, api_secret: str, testnet: bool = False): self.api_key = api_key self.api_secret = api_secret self.testnet = testnet self._ws = None self._task: Optional[asyncio.Task] = None self._running = False self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response] self._conn_start_time: Optional[float] = None self._last_pong_time: Optional[float] = None self._lock = asyncio.Lock() def _ws_base_url(self) -> str: if self.testnet: return "wss://testnet.binancefuture.com/ws-fapi/v1" return "wss://ws-fapi.binance.com/ws-fapi/v1" async def start(self) -> bool: """启动 WS 连接与消息处理循环。""" if self._running: return True self._running = True self._task = asyncio.create_task(self._run_ws()) logger.info("WSTradeClient: 已启动(WS 交易接口)") return True async def stop(self): """停止 WS 连接。""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None # 取消所有 pending 请求 async with self._lock: for fut in self._pending_requests.values(): if not fut.done(): fut.cancel() self._pending_requests.clear() logger.info("WSTradeClient: 已停止") def is_connected(self) -> bool: """检查 WS 是否已连接。""" return self._ws is not None and not self._ws.closed async def _run_ws(self): """连接 WS 并处理消息;断线后自动重连。""" import aiohttp _24h_sec = 23 * 3600 # 24 小时主动重连 reconnect_delay = 2 while self._running: url = self._ws_base_url() try: async with aiohttp.ClientSession() as session: async with session.ws_connect(url, heartbeat=60) as ws: self._ws = ws self._conn_start_time = time.time() self._last_pong_time = time.time() reconnect_delay = 2 logger.info(f"WSTradeClient: 已连接到 {url}") async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: await self._handle_message(msg.data) elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSE: logger.info("WSTradeClient: WS 连接关闭") break elif msg.type == aiohttp.WSMsgType.PING: await ws.pong() self._last_pong_time = time.time() elif msg.type == aiohttp.WSMsgType.PONG: self._last_pong_time = time.time() # 检查 24h 重连 if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec: logger.info("WSTradeClient: 连接已接近 24h,主动重连") break except asyncio.CancelledError: break except Exception as e: logger.warning(f"WSTradeClient: 连接异常: {e},{reconnect_delay} 秒后重连...") finally: self._ws = None self._conn_start_time = None if not self._running: break await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 60) async def _handle_message(self, data: str): """处理 WS 消息:匹配请求-响应。""" try: msg = json.loads(data) req_id = str(msg.get("id", "")) if not req_id or req_id not in self._pending_requests: logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}") return fut = self._pending_requests.pop(req_id, None) if fut and not fut.done(): if msg.get("status") == 200: fut.set_result(msg.get("result")) else: error_msg = msg.get("error", {}).get("msg", "Unknown error") error_code = msg.get("error", {}).get("code", -1) # 创建类似 BinanceAPIException 的异常 class WSTradeAPIException(Exception): def __init__(self, message, code): super().__init__(message) self.message = message self.code = code exc = WSTradeAPIException(error_msg, error_code) fut.set_exception(exc) except Exception as e: logger.error(f"WSTradeClient: 处理消息失败: {e}") async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]: """发送 WS 请求并等待响应(带超时)。""" if not self.is_connected(): raise ConnectionError("WS 未连接") req_id = str(uuid.uuid4()) req = { "id": req_id, "method": method, "params": params, } fut = asyncio.Future() async with self._lock: self._pending_requests[req_id] = fut try: # ⚠️ 优化:检查连接状态,避免在连接关闭时发送数据 if not self.is_connected(): async with self._lock: self._pending_requests.pop(req_id, None) raise ConnectionError("WS 连接已关闭") await self._ws.send_str(json.dumps(req)) result = await asyncio.wait_for(fut, timeout=timeout) return result except (ConnectionResetError, OSError) as e: async with self._lock: self._pending_requests.pop(req_id, None) err_msg = str(e).lower() if "closing transport" in err_msg or "cannot write" in err_msg: raise ConnectionError("WS 连接正在关闭,无法发送请求") raise ConnectionError(f"WS 连接错误: {e}") except asyncio.TimeoutError: async with self._lock: self._pending_requests.pop(req_id, None) raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}") except Exception as e: async with self._lock: self._pending_requests.pop(req_id, None) raise finally: if req_id in self._pending_requests: async with self._lock: self._pending_requests.pop(req_id, None) async def ticker_price(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]: """ 通过 WebSocket API 获取最新价格(ticker.price)。 Args: symbol: 交易对(可选,不传则返回所有交易对) timeout: 等待响应超时(秒) Returns: 价格信息,失败返回 None 或抛异常 """ params = {} if symbol: params["symbol"] = symbol.upper() try: result = await self._send_request("ticker.price", params, timeout=timeout) return result except (ConnectionError, TimeoutError) as e: logger.debug(f"WSTradeClient: ticker.price 失败: {e}") raise except Exception as e: logger.error(f"WSTradeClient: ticker.price 异常: {e}") raise async def ticker_book(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]: """ 通过 WebSocket API 获取当前最优挂单(ticker.book)。 Args: symbol: 交易对(可选,不传则返回所有交易对) timeout: 等待响应超时(秒) Returns: 最优挂单信息,失败返回 None 或抛异常 """ params = {} if symbol: params["symbol"] = symbol.upper() try: result = await self._send_request("ticker.book", params, timeout=timeout) return result except (ConnectionError, TimeoutError) as e: logger.debug(f"WSTradeClient: ticker.book 失败: {e}") raise except Exception as e: logger.error(f"WSTradeClient: ticker.book 异常: {e}") raise async def depth(self, symbol: str, limit: int = 20, timeout: float = 10.0) -> Optional[Dict[str, Any]]: """ 通过 WebSocket API 获取深度信息(depth)。 Args: symbol: 交易对 limit: 深度档位(可选值: 5, 10, 20, 50, 100, 500, 1000) timeout: 等待响应超时(秒) Returns: 深度信息,失败返回 None 或抛异常 """ params = { "symbol": symbol.upper(), "limit": limit } try: result = await self._send_request("depth", params, timeout=timeout) return result except (ConnectionError, TimeoutError) as e: logger.debug(f"WSTradeClient: depth 失败: {e}") raise except Exception as e: logger.error(f"WSTradeClient: depth 异常: {e}") raise async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]: """ 通过 WS 发送 algoOrder.place 请求(条件单)。 Args: params: 订单参数(需包含 apiKey, timestamp, signature;其他同 REST) timeout: 等待响应超时(秒) Returns: 订单结果(含 algoId 等),失败返回 None 或抛异常 """ # 确保 params 包含必要字段 ws_params = dict(params) if "apiKey" not in ws_params: ws_params["apiKey"] = self.api_key if "timestamp" not in ws_params: ws_params["timestamp"] = int(time.time() * 1000) # signature 应由调用方计算(使用与 REST 相同的签名逻辑) if "signature" not in ws_params: logger.warning("WSTradeClient: params 缺少 signature,WS 请求可能失败") try: result = await self._send_request("algoOrder.place", ws_params, timeout=timeout) return result except (ConnectionError, TimeoutError) as e: logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}") raise except Exception as e: err = str(e).strip() if "GTE" in err and "open positions" in err: logger.warning(f"WSTradeClient: algoOrder.place 被拒(持仓未就绪或已平): {err[:80]}…") else: logger.error(f"WSTradeClient: algoOrder.place 异常: {e}") raise