From 83a09f24f83853b9c46062ad918446c28d7db9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Sat, 21 Feb 2026 11:12:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(binance=5Fclient,=20listen=5Fkey=5Fcache,?= =?UTF-8?q?=20user=5Fdata=5Fstream):=20=E5=A2=9E=E5=BC=BA=20listenKey=20?= =?UTF-8?q?=E5=88=9B=E5=BB=BA=E9=80=BB=E8=BE=91=E4=B8=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `binance_client.py` 中将 `create_futures_listen_key` 方法的最大重试次数从 2 增加到 3,并调整了超时设置以提高稳定性。更新了 `listen_key_cache.py` 和 `user_data_stream.py` 中对该方法的调用,确保在创建新的 listenKey 时使用新的重试逻辑。这些改进提升了系统在高并发情况下的可靠性与响应能力。 --- trading_system/binance_client.py | 32 +++++++++++++++--------------- trading_system/listen_key_cache.py | 4 ++-- trading_system/user_data_stream.py | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 61e6b25..6c3f837 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -397,37 +397,37 @@ class BinanceClient: """合约 REST 根地址(用于 listenKey 等)""" return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com" - async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 2) -> Optional[str]: + async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 3) -> Optional[str]: """ - 创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。60 分钟无 keepalive 会失效。 + 创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。 + 文档:每个账号 listenKey 独立,60 分钟无 keepalive 会失效。 优先使用 WebSocket API(如果 WSTradeClient 已连接),否则使用 REST API。 - 根据币安文档:如果该帐户具有有效的listenKey,则将返回该listenKey并将其有效期延长60分钟。 + 文档:如果该帐户具有有效的 listenKey,则返回现有 key 并将其有效期延长 60 分钟。 Args: - prefer_ws: 是否优先使用 WebSocket API(默认 True)。如果 WebSocket 不可用,自动回退到 REST API。 - max_retries: REST API 失败时的最大重试次数(默认 2 次) + prefer_ws: 是否优先使用 WebSocket API(默认 True) + max_retries: REST API 失败时的最大重试次数(默认 3,共 4 次尝试) """ if not self.api_key: return None - # 方法1: WebSocket API(优先,如果 WSTradeClient 已连接) + # 方法1: WebSocket API(优先,需 WSTradeClient 已连接) if prefer_ws: ws_result = await self._create_listen_key_via_ws() if ws_result: return ws_result - # WebSocket 不可用,回退到 REST API logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...") - # 方法2: REST API(备选方案,带重试机制) + # 方法2: REST API(文档:POST /fapi/v1/listenKey,带重试) last_error = None for attempt in range(max_retries + 1): try: import aiohttp url = f"{self._futures_base_url()}/fapi/v1/listenKey" headers = {"X-MBX-APIKEY": self.api_key} - # 超时:首次 25s,后续 35s(币安有时较慢,避免 20s 超时) - timeout_sec = 25 if attempt == 0 else 35 + # 超时:按文档无明确限制,启动时可能并发多连接,适当放宽 + timeout_sec = 50 if attempt == 0 else 60 async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout_sec)) as resp: text = await resp.text() @@ -437,7 +437,7 @@ class BinanceClient: resp.status, (text[:500] if text else ""), attempt + 1, max_retries + 1, ) if attempt < max_retries: - await asyncio.sleep(4) # 等待 4 秒后重试,给币安/网络恢复时间 + await asyncio.sleep(8) continue return None try: @@ -455,7 +455,7 @@ class BinanceClient: last_error, attempt + 1, max_retries + 1, ) if attempt < max_retries: - await asyncio.sleep(5) # 等待 5 秒后重试 + await asyncio.sleep(8) continue except Exception as e: last_error = getattr(e, "message", str(e)) or repr(e) @@ -464,7 +464,7 @@ class BinanceClient: type(e).__name__, last_error, attempt + 1, max_retries + 1, ) if attempt < max_retries: - await asyncio.sleep(5) # 等待 5 秒后重试 + await asyncio.sleep(8) continue logger.error(f"create_futures_listen_key (REST) 重试 {max_retries + 1} 次后仍失败: {last_error}") @@ -529,13 +529,13 @@ class BinanceClient: return False, True logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...") - # 方法2: REST API(备选方案) + # 方法2: REST API(文档:PUT /fapi/v1/listenKey?listenKey=xxx,延长 60 分钟) try: import aiohttp url = f"{self._futures_base_url()}/fapi/v1/listenKey" headers = {"X-MBX-APIKEY": self.api_key} async with aiohttp.ClientSession() as session: - async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: + async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp: text = await resp.text() ok = resp.status == 200 code_1125 = False @@ -548,7 +548,7 @@ class BinanceClient: logger.debug(f"keepalive_futures_listen_key (REST) 失败 status={resp.status} body={text}") return ok, code_1125 except asyncio.TimeoutError: - logger.warning("keepalive_futures_listen_key (REST) 失败: 请求超时(30秒)") + logger.warning("keepalive_futures_listen_key (REST) 失败: 请求超时(45秒)") return False, False except Exception as e: logger.debug(f"keepalive_futures_listen_key (REST) 失败: {e}") diff --git a/trading_system/listen_key_cache.py b/trading_system/listen_key_cache.py index c5db797..4d2365a 100644 --- a/trading_system/listen_key_cache.py +++ b/trading_system/listen_key_cache.py @@ -131,9 +131,9 @@ class ListenKeyCache: if cached and time.time() < cached.get('expires_at', 0): return cached.get('listen_key') - # 创建新的 listenKey + # 创建新的 listenKey(每个账号独立,client 的 API Key 必须与 account_id 匹配) logger.info(f"ListenKeyCache: 为账号 {account_id} 创建新的 listenKey...") - listen_key = await client.create_futures_listen_key(prefer_ws=True, max_retries=2) + listen_key = await client.create_futures_listen_key(prefer_ws=True, max_retries=3) if not listen_key: logger.warning(f"ListenKeyCache: 账号 {account_id} 创建 listenKey 失败") return None diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index 8053bca..c593933 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -245,11 +245,11 @@ class UserDataStream: except Exception as e: logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}") - # 如果缓存未命中,直接创建 + # 如果缓存未命中,直接创建(每个账号独立,使用对应 client 的 API Key) if not self._listen_key: - self._listen_key = await self.client.create_futures_listen_key() + self._listen_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=3) if not self._listen_key: - logger.warning("UserDataStream: 无法创建 listenKey,跳过启动") + logger.warning(f"UserDataStream(account_id={self.account_id}): 无法创建 listenKey,跳过启动") return False self._running = True