feat(binance_client, listen_key_cache, user_data_stream): 增强 listenKey 创建逻辑与重试机制

在 `binance_client.py` 中将 `create_futures_listen_key` 方法的最大重试次数从 2 增加到 3,并调整了超时设置以提高稳定性。更新了 `listen_key_cache.py` 和 `user_data_stream.py` 中对该方法的调用,确保在创建新的 listenKey 时使用新的重试逻辑。这些改进提升了系统在高并发情况下的可靠性与响应能力。
This commit is contained in:
薇薇安 2026-02-21 11:12:21 +08:00
parent e4e6e64608
commit 83a09f24f8
3 changed files with 21 additions and 21 deletions

View File

@ -397,37 +397,37 @@ class BinanceClient:
"""合约 REST 根地址(用于 listenKey 等)"""
return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com"
async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 2) -> Optional[str]:
async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 3) -> Optional[str]:
"""
创建 U 本位合约 User Data Stream listenKey用于 WS 订阅订单/持仓推送60 分钟无 keepalive 会失效
创建 U 本位合约 User Data Stream listenKey用于 WS 订阅订单/持仓推送
文档每个账号 listenKey 独立60 分钟无 keepalive 会失效
优先使用 WebSocket API如果 WSTradeClient 已连接否则使用 REST API
根据币安文档如果该帐户具有有效的listenKey则将返回该listenKey并将其有效期延长60分钟
文档如果该帐户具有有效的 listenKey则返回现有 key 并将其有效期延长 60 分钟
Args:
prefer_ws: 是否优先使用 WebSocket API默认 True如果 WebSocket 不可用自动回退到 REST API
max_retries: REST API 失败时的最大重试次数默认 2
prefer_ws: 是否优先使用 WebSocket API默认 True
max_retries: REST API 失败时的最大重试次数默认 3 4 次尝试
"""
if not self.api_key:
return None
# 方法1: WebSocket API优先如果 WSTradeClient 已连接)
# 方法1: WebSocket API优先 WSTradeClient 已连接)
if prefer_ws:
ws_result = await self._create_listen_key_via_ws()
if ws_result:
return ws_result
# WebSocket 不可用,回退到 REST API
logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...")
# 方法2: REST API备选方案,带重试机制
# 方法2: REST API文档POST /fapi/v1/listenKey带重试
last_error = None
for attempt in range(max_retries + 1):
try:
import aiohttp
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
headers = {"X-MBX-APIKEY": self.api_key}
# 超时:首次 25s后续 35s币安有时较慢避免 20s 超时)
timeout_sec = 25 if attempt == 0 else 35
# 超时:按文档无明确限制,启动时可能并发多连接,适当放宽
timeout_sec = 50 if attempt == 0 else 60
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout_sec)) as resp:
text = await resp.text()
@ -437,7 +437,7 @@ class BinanceClient:
resp.status, (text[:500] if text else ""), attempt + 1, max_retries + 1,
)
if attempt < max_retries:
await asyncio.sleep(4) # 等待 4 秒后重试,给币安/网络恢复时间
await asyncio.sleep(8)
continue
return None
try:
@ -455,7 +455,7 @@ class BinanceClient:
last_error, attempt + 1, max_retries + 1,
)
if attempt < max_retries:
await asyncio.sleep(5) # 等待 5 秒后重试
await asyncio.sleep(8)
continue
except Exception as e:
last_error = getattr(e, "message", str(e)) or repr(e)
@ -464,7 +464,7 @@ class BinanceClient:
type(e).__name__, last_error, attempt + 1, max_retries + 1,
)
if attempt < max_retries:
await asyncio.sleep(5) # 等待 5 秒后重试
await asyncio.sleep(8)
continue
logger.error(f"create_futures_listen_key (REST) 重试 {max_retries + 1} 次后仍失败: {last_error}")
@ -529,13 +529,13 @@ class BinanceClient:
return False, True
logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...")
# 方法2: REST API备选方案
# 方法2: REST API文档PUT /fapi/v1/listenKey?listenKey=xxx延长 60 分钟
try:
import aiohttp
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
headers = {"X-MBX-APIKEY": self.api_key}
async with aiohttp.ClientSession() as session:
async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp:
text = await resp.text()
ok = resp.status == 200
code_1125 = False
@ -548,7 +548,7 @@ class BinanceClient:
logger.debug(f"keepalive_futures_listen_key (REST) 失败 status={resp.status} body={text}")
return ok, code_1125
except asyncio.TimeoutError:
logger.warning("keepalive_futures_listen_key (REST) 失败: 请求超时(30秒)")
logger.warning("keepalive_futures_listen_key (REST) 失败: 请求超时(45秒)")
return False, False
except Exception as e:
logger.debug(f"keepalive_futures_listen_key (REST) 失败: {e}")

View File

@ -131,9 +131,9 @@ class ListenKeyCache:
if cached and time.time() < cached.get('expires_at', 0):
return cached.get('listen_key')
# 创建新的 listenKey
# 创建新的 listenKey每个账号独立client 的 API Key 必须与 account_id 匹配)
logger.info(f"ListenKeyCache: 为账号 {account_id} 创建新的 listenKey...")
listen_key = await client.create_futures_listen_key(prefer_ws=True, max_retries=2)
listen_key = await client.create_futures_listen_key(prefer_ws=True, max_retries=3)
if not listen_key:
logger.warning(f"ListenKeyCache: 账号 {account_id} 创建 listenKey 失败")
return None

View File

@ -245,11 +245,11 @@ class UserDataStream:
except Exception as e:
logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}")
# 如果缓存未命中,直接创建
# 如果缓存未命中,直接创建(每个账号独立,使用对应 client 的 API Key
if not self._listen_key:
self._listen_key = await self.client.create_futures_listen_key()
self._listen_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=3)
if not self._listen_key:
logger.warning("UserDataStream: 无法创建 listenKey跳过启动")
logger.warning(f"UserDataStream(account_id={self.account_id}): 无法创建 listenKey跳过启动")
return False
self._running = True