在 `binance_client.py` 中优化了错误处理,新增对特定错误信息的警告记录,确保在条件单被拒时能够清晰提示。同时,在 `position_manager.py` 中引入了保本止损逻辑,确保在盈利达到一定比例时自动将止损移至含手续费的保本价,提升了风险控制能力。此外,更新了 `config.py` 中的相关配置项,以支持移动止损与保本功能的灵活性。
294 lines
12 KiB
Python
294 lines
12 KiB
Python
"""
|
||
WebSocket 交易客户端:通过 WS API 下单(order.place / algoOrder.place),优先于 REST,减少超时。
|
||
端点:wss://ws-fapi.binance.com/ws-fapi/v1(正式),wss://testnet.binancefuture.com/ws-fapi/v1(测试)
|
||
文档:ws交易接口.txt
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import time
|
||
import uuid
|
||
from typing import Dict, Optional, Any
|
||
from datetime import datetime, timezone
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class WSTradeClient:
|
||
"""WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。"""
|
||
|
||
def __init__(self, api_key: str, api_secret: str, testnet: bool = False):
|
||
self.api_key = api_key
|
||
self.api_secret = api_secret
|
||
self.testnet = testnet
|
||
self._ws = None
|
||
self._task: Optional[asyncio.Task] = None
|
||
self._running = False
|
||
self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response]
|
||
self._conn_start_time: Optional[float] = None
|
||
self._last_pong_time: Optional[float] = None
|
||
self._lock = asyncio.Lock()
|
||
|
||
def _ws_base_url(self) -> str:
|
||
if self.testnet:
|
||
return "wss://testnet.binancefuture.com/ws-fapi/v1"
|
||
return "wss://ws-fapi.binance.com/ws-fapi/v1"
|
||
|
||
async def start(self) -> bool:
|
||
"""启动 WS 连接与消息处理循环。"""
|
||
if self._running:
|
||
return True
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._run_ws())
|
||
logger.info("WSTradeClient: 已启动(WS 交易接口)")
|
||
return True
|
||
|
||
async def stop(self):
|
||
"""停止 WS 连接。"""
|
||
self._running = False
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
self._task = None
|
||
if self._ws:
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
self._ws = None
|
||
# 取消所有 pending 请求
|
||
async with self._lock:
|
||
for fut in self._pending_requests.values():
|
||
if not fut.done():
|
||
fut.cancel()
|
||
self._pending_requests.clear()
|
||
logger.info("WSTradeClient: 已停止")
|
||
|
||
def is_connected(self) -> bool:
|
||
"""检查 WS 是否已连接。"""
|
||
return self._ws is not None and not self._ws.closed
|
||
|
||
async def _run_ws(self):
|
||
"""连接 WS 并处理消息;断线后自动重连。"""
|
||
import aiohttp
|
||
_24h_sec = 23 * 3600 # 24 小时主动重连
|
||
reconnect_delay = 2
|
||
while self._running:
|
||
url = self._ws_base_url()
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(url, heartbeat=60) as ws:
|
||
self._ws = ws
|
||
self._conn_start_time = time.time()
|
||
self._last_pong_time = time.time()
|
||
reconnect_delay = 2
|
||
logger.info(f"WSTradeClient: 已连接到 {url}")
|
||
async for msg in ws:
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
await self._handle_message(msg.data)
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||
logger.info("WSTradeClient: WS 连接关闭")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.PING:
|
||
await ws.pong()
|
||
self._last_pong_time = time.time()
|
||
elif msg.type == aiohttp.WSMsgType.PONG:
|
||
self._last_pong_time = time.time()
|
||
# 检查 24h 重连
|
||
if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec:
|
||
logger.info("WSTradeClient: 连接已接近 24h,主动重连")
|
||
break
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"WSTradeClient: 连接异常: {e},{reconnect_delay} 秒后重连...")
|
||
finally:
|
||
self._ws = None
|
||
self._conn_start_time = None
|
||
if not self._running:
|
||
break
|
||
await asyncio.sleep(reconnect_delay)
|
||
reconnect_delay = min(reconnect_delay * 2, 60)
|
||
|
||
async def _handle_message(self, data: str):
|
||
"""处理 WS 消息:匹配请求-响应。"""
|
||
try:
|
||
msg = json.loads(data)
|
||
req_id = str(msg.get("id", ""))
|
||
if not req_id or req_id not in self._pending_requests:
|
||
logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}")
|
||
return
|
||
fut = self._pending_requests.pop(req_id, None)
|
||
if fut and not fut.done():
|
||
if msg.get("status") == 200:
|
||
fut.set_result(msg.get("result"))
|
||
else:
|
||
error_msg = msg.get("error", {}).get("msg", "Unknown error")
|
||
error_code = msg.get("error", {}).get("code", -1)
|
||
# 创建类似 BinanceAPIException 的异常
|
||
class WSTradeAPIException(Exception):
|
||
def __init__(self, message, code):
|
||
super().__init__(message)
|
||
self.message = message
|
||
self.code = code
|
||
exc = WSTradeAPIException(error_msg, error_code)
|
||
fut.set_exception(exc)
|
||
except Exception as e:
|
||
logger.error(f"WSTradeClient: 处理消息失败: {e}")
|
||
|
||
async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
|
||
"""发送 WS 请求并等待响应(带超时)。"""
|
||
if not self.is_connected():
|
||
raise ConnectionError("WS 未连接")
|
||
req_id = str(uuid.uuid4())
|
||
req = {
|
||
"id": req_id,
|
||
"method": method,
|
||
"params": params,
|
||
}
|
||
fut = asyncio.Future()
|
||
async with self._lock:
|
||
self._pending_requests[req_id] = fut
|
||
try:
|
||
# ⚠️ 优化:检查连接状态,避免在连接关闭时发送数据
|
||
if not self.is_connected():
|
||
async with self._lock:
|
||
self._pending_requests.pop(req_id, None)
|
||
raise ConnectionError("WS 连接已关闭")
|
||
await self._ws.send_str(json.dumps(req))
|
||
result = await asyncio.wait_for(fut, timeout=timeout)
|
||
return result
|
||
except (ConnectionResetError, OSError) as e:
|
||
async with self._lock:
|
||
self._pending_requests.pop(req_id, None)
|
||
err_msg = str(e).lower()
|
||
if "closing transport" in err_msg or "cannot write" in err_msg:
|
||
raise ConnectionError("WS 连接正在关闭,无法发送请求")
|
||
raise ConnectionError(f"WS 连接错误: {e}")
|
||
except asyncio.TimeoutError:
|
||
async with self._lock:
|
||
self._pending_requests.pop(req_id, None)
|
||
raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}")
|
||
except Exception as e:
|
||
async with self._lock:
|
||
self._pending_requests.pop(req_id, None)
|
||
raise
|
||
finally:
|
||
if req_id in self._pending_requests:
|
||
async with self._lock:
|
||
self._pending_requests.pop(req_id, None)
|
||
|
||
async def ticker_price(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
通过 WebSocket API 获取最新价格(ticker.price)。
|
||
|
||
Args:
|
||
symbol: 交易对(可选,不传则返回所有交易对)
|
||
timeout: 等待响应超时(秒)
|
||
|
||
Returns:
|
||
价格信息,失败返回 None 或抛异常
|
||
"""
|
||
params = {}
|
||
if symbol:
|
||
params["symbol"] = symbol.upper()
|
||
try:
|
||
result = await self._send_request("ticker.price", params, timeout=timeout)
|
||
return result
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.debug(f"WSTradeClient: ticker.price 失败: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"WSTradeClient: ticker.price 异常: {e}")
|
||
raise
|
||
|
||
async def ticker_book(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
通过 WebSocket API 获取当前最优挂单(ticker.book)。
|
||
|
||
Args:
|
||
symbol: 交易对(可选,不传则返回所有交易对)
|
||
timeout: 等待响应超时(秒)
|
||
|
||
Returns:
|
||
最优挂单信息,失败返回 None 或抛异常
|
||
"""
|
||
params = {}
|
||
if symbol:
|
||
params["symbol"] = symbol.upper()
|
||
try:
|
||
result = await self._send_request("ticker.book", params, timeout=timeout)
|
||
return result
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.debug(f"WSTradeClient: ticker.book 失败: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"WSTradeClient: ticker.book 异常: {e}")
|
||
raise
|
||
|
||
async def depth(self, symbol: str, limit: int = 20, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
通过 WebSocket API 获取深度信息(depth)。
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
limit: 深度档位(可选值: 5, 10, 20, 50, 100, 500, 1000)
|
||
timeout: 等待响应超时(秒)
|
||
|
||
Returns:
|
||
深度信息,失败返回 None 或抛异常
|
||
"""
|
||
params = {
|
||
"symbol": symbol.upper(),
|
||
"limit": limit
|
||
}
|
||
try:
|
||
result = await self._send_request("depth", params, timeout=timeout)
|
||
return result
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.debug(f"WSTradeClient: depth 失败: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"WSTradeClient: depth 异常: {e}")
|
||
raise
|
||
|
||
async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
通过 WS 发送 algoOrder.place 请求(条件单)。
|
||
|
||
Args:
|
||
params: 订单参数(需包含 apiKey, timestamp, signature;其他同 REST)
|
||
timeout: 等待响应超时(秒)
|
||
|
||
Returns:
|
||
订单结果(含 algoId 等),失败返回 None 或抛异常
|
||
"""
|
||
# 确保 params 包含必要字段
|
||
ws_params = dict(params)
|
||
if "apiKey" not in ws_params:
|
||
ws_params["apiKey"] = self.api_key
|
||
if "timestamp" not in ws_params:
|
||
ws_params["timestamp"] = int(time.time() * 1000)
|
||
# signature 应由调用方计算(使用与 REST 相同的签名逻辑)
|
||
if "signature" not in ws_params:
|
||
logger.warning("WSTradeClient: params 缺少 signature,WS 请求可能失败")
|
||
try:
|
||
result = await self._send_request("algoOrder.place", ws_params, timeout=timeout)
|
||
return result
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}")
|
||
raise
|
||
except Exception as e:
|
||
err = str(e).strip()
|
||
if "GTE" in err and "open positions" in err:
|
||
logger.warning(f"WSTradeClient: algoOrder.place 被拒(持仓未就绪或已平): {err[:80]}…")
|
||
else:
|
||
logger.error(f"WSTradeClient: algoOrder.place 异常: {e}")
|
||
raise
|