auto_trade_sys/trading_system/ws_trade_client.py
薇薇安 7569c88a67 fix(binance_client, position_manager, config): 增强止损与盈利保护逻辑
在 `binance_client.py` 中优化了错误处理,新增对特定错误信息的警告记录,确保在条件单被拒时能够清晰提示。同时,在 `position_manager.py` 中引入了保本止损逻辑,确保在盈利达到一定比例时自动将止损移至含手续费的保本价,提升了风险控制能力。此外,更新了 `config.py` 中的相关配置项,以支持移动止损与保本功能的灵活性。
2026-02-20 23:38:14 +08:00

294 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSocket 交易客户端:通过 WS API 下单order.place / algoOrder.place优先于 REST减少超时。
端点wss://ws-fapi.binance.com/ws-fapi/v1正式wss://testnet.binancefuture.com/ws-fapi/v1测试
文档ws交易接口.txt
"""
import asyncio
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class WSTradeClient:
"""WebSocket 交易客户端:维护到 ws-fapi 的连接,发送 order.place / algoOrder.place 请求。"""
def __init__(self, api_key: str, api_secret: str, testnet: bool = False):
self.api_key = api_key
self.api_secret = api_secret
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._pending_requests: Dict[str, asyncio.Future] = {} # request_id -> Future[response]
self._conn_start_time: Optional[float] = None
self._last_pong_time: Optional[float] = None
self._lock = asyncio.Lock()
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://testnet.binancefuture.com/ws-fapi/v1"
return "wss://ws-fapi.binance.com/ws-fapi/v1"
async def start(self) -> bool:
"""启动 WS 连接与消息处理循环。"""
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("WSTradeClient: 已启动WS 交易接口)")
return True
async def stop(self):
"""停止 WS 连接。"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
# 取消所有 pending 请求
async with self._lock:
for fut in self._pending_requests.values():
if not fut.done():
fut.cancel()
self._pending_requests.clear()
logger.info("WSTradeClient: 已停止")
def is_connected(self) -> bool:
"""检查 WS 是否已连接。"""
return self._ws is not None and not self._ws.closed
async def _run_ws(self):
"""连接 WS 并处理消息;断线后自动重连。"""
import aiohttp
_24h_sec = 23 * 3600 # 24 小时主动重连
reconnect_delay = 2
while self._running:
url = self._ws_base_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=60) as ws:
self._ws = ws
self._conn_start_time = time.time()
self._last_pong_time = time.time()
reconnect_delay = 2
logger.info(f"WSTradeClient: 已连接到 {url}")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"WSTradeClient: WS 错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info("WSTradeClient: WS 连接关闭")
break
elif msg.type == aiohttp.WSMsgType.PING:
await ws.pong()
self._last_pong_time = time.time()
elif msg.type == aiohttp.WSMsgType.PONG:
self._last_pong_time = time.time()
# 检查 24h 重连
if self._conn_start_time and (time.time() - self._conn_start_time) > _24h_sec:
logger.info("WSTradeClient: 连接已接近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"WSTradeClient: 连接异常: {e}{reconnect_delay} 秒后重连...")
finally:
self._ws = None
self._conn_start_time = None
if not self._running:
break
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 60)
async def _handle_message(self, data: str):
"""处理 WS 消息:匹配请求-响应。"""
try:
msg = json.loads(data)
req_id = str(msg.get("id", ""))
if not req_id or req_id not in self._pending_requests:
logger.debug(f"WSTradeClient: 收到未匹配响应 id={req_id}")
return
fut = self._pending_requests.pop(req_id, None)
if fut and not fut.done():
if msg.get("status") == 200:
fut.set_result(msg.get("result"))
else:
error_msg = msg.get("error", {}).get("msg", "Unknown error")
error_code = msg.get("error", {}).get("code", -1)
# 创建类似 BinanceAPIException 的异常
class WSTradeAPIException(Exception):
def __init__(self, message, code):
super().__init__(message)
self.message = message
self.code = code
exc = WSTradeAPIException(error_msg, error_code)
fut.set_exception(exc)
except Exception as e:
logger.error(f"WSTradeClient: 处理消息失败: {e}")
async def _send_request(self, method: str, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""发送 WS 请求并等待响应(带超时)。"""
if not self.is_connected():
raise ConnectionError("WS 未连接")
req_id = str(uuid.uuid4())
req = {
"id": req_id,
"method": method,
"params": params,
}
fut = asyncio.Future()
async with self._lock:
self._pending_requests[req_id] = fut
try:
# ⚠️ 优化:检查连接状态,避免在连接关闭时发送数据
if not self.is_connected():
async with self._lock:
self._pending_requests.pop(req_id, None)
raise ConnectionError("WS 连接已关闭")
await self._ws.send_str(json.dumps(req))
result = await asyncio.wait_for(fut, timeout=timeout)
return result
except (ConnectionResetError, OSError) as e:
async with self._lock:
self._pending_requests.pop(req_id, None)
err_msg = str(e).lower()
if "closing transport" in err_msg or "cannot write" in err_msg:
raise ConnectionError("WS 连接正在关闭,无法发送请求")
raise ConnectionError(f"WS 连接错误: {e}")
except asyncio.TimeoutError:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise TimeoutError(f"WS 请求超时 ({timeout}秒): {method}")
except Exception as e:
async with self._lock:
self._pending_requests.pop(req_id, None)
raise
finally:
if req_id in self._pending_requests:
async with self._lock:
self._pending_requests.pop(req_id, None)
async def ticker_price(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取最新价格ticker.price
Args:
symbol: 交易对(可选,不传则返回所有交易对)
timeout: 等待响应超时(秒)
Returns:
价格信息,失败返回 None 或抛异常
"""
params = {}
if symbol:
params["symbol"] = symbol.upper()
try:
result = await self._send_request("ticker.price", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: ticker.price 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: ticker.price 异常: {e}")
raise
async def ticker_book(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取当前最优挂单ticker.book
Args:
symbol: 交易对(可选,不传则返回所有交易对)
timeout: 等待响应超时(秒)
Returns:
最优挂单信息,失败返回 None 或抛异常
"""
params = {}
if symbol:
params["symbol"] = symbol.upper()
try:
result = await self._send_request("ticker.book", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: ticker.book 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: ticker.book 异常: {e}")
raise
async def depth(self, symbol: str, limit: int = 20, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取深度信息depth
Args:
symbol: 交易对
limit: 深度档位(可选值: 5, 10, 20, 50, 100, 500, 1000
timeout: 等待响应超时(秒)
Returns:
深度信息,失败返回 None 或抛异常
"""
params = {
"symbol": symbol.upper(),
"limit": limit
}
try:
result = await self._send_request("depth", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: depth 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: depth 异常: {e}")
raise
async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""
通过 WS 发送 algoOrder.place 请求(条件单)。
Args:
params: 订单参数(需包含 apiKey, timestamp, signature其他同 REST
timeout: 等待响应超时(秒)
Returns:
订单结果(含 algoId 等),失败返回 None 或抛异常
"""
# 确保 params 包含必要字段
ws_params = dict(params)
if "apiKey" not in ws_params:
ws_params["apiKey"] = self.api_key
if "timestamp" not in ws_params:
ws_params["timestamp"] = int(time.time() * 1000)
# signature 应由调用方计算(使用与 REST 相同的签名逻辑)
if "signature" not in ws_params:
logger.warning("WSTradeClient: params 缺少 signatureWS 请求可能失败")
try:
result = await self._send_request("algoOrder.place", ws_params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: algoOrder.place 失败: {e}")
raise
except Exception as e:
err = str(e).strip()
if "GTE" in err and "open positions" in err:
logger.warning(f"WSTradeClient: algoOrder.place 被拒(持仓未就绪或已平): {err[:80]}")
else:
logger.error(f"WSTradeClient: algoOrder.place 异常: {e}")
raise