diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 45eb9d8..7472f6c 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -386,7 +386,7 @@ class BinanceClient: """合约 REST 根地址(用于 listenKey 等)""" return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com" - async def create_futures_listen_key(self, prefer_ws: bool = True) -> Optional[str]: + async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 2) -> Optional[str]: """ 创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。60 分钟无 keepalive 会失效。 @@ -395,6 +395,7 @@ class BinanceClient: Args: prefer_ws: 是否优先使用 WebSocket API(默认 True)。如果 WebSocket 不可用,自动回退到 REST API。 + max_retries: REST API 失败时的最大重试次数(默认 2 次) """ if not self.api_key: return None @@ -407,38 +408,56 @@ class BinanceClient: # WebSocket 不可用,回退到 REST API logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...") - # 方法2: REST API(备选方案) - try: - import aiohttp - url = f"{self._futures_base_url()}/fapi/v1/listenKey" - headers = {"X-MBX-APIKEY": self.api_key} - async with aiohttp.ClientSession() as session: - async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: - text = await resp.text() - if resp.status != 200: - logger.warning( - "create_futures_listen_key (REST) 失败 status=%s body=%s", - resp.status, (text[:500] if text else ""), - ) - return None - try: - data = json.loads(text) if (text and text.strip()) else {} - except Exception: - data = {} - key = data.get("listenKey") if isinstance(data, dict) else None - if key: - logger.info("✓ 合约 User Data Stream listenKey 已创建 (REST)") - return key - except asyncio.TimeoutError: - logger.warning("create_futures_listen_key (REST) 失败: 请求超时(30秒)") - return None - except Exception as e: - err_msg = getattr(e, "message", str(e)) or repr(e) - logger.warning( - "create_futures_listen_key (REST) 失败: %s - %s", - type(e).__name__, err_msg, - ) - return None + # 方法2: REST API(备选方案,带重试机制) + last_error = None + for attempt in range(max_retries + 1): + try: + import aiohttp + url = f"{self._futures_base_url()}/fapi/v1/listenKey" + headers = {"X-MBX-APIKEY": self.api_key} + # ⚠️ 优化:使用较短的超时时间(15秒),失败后快速重试 + timeout_sec = 15 if attempt == 0 else 20 + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout_sec)) as resp: + text = await resp.text() + if resp.status != 200: + logger.warning( + "create_futures_listen_key (REST) 失败 status=%s body=%s (尝试 %d/%d)", + resp.status, (text[:500] if text else ""), attempt + 1, max_retries + 1, + ) + if attempt < max_retries: + await asyncio.sleep(2) # 等待 2 秒后重试 + continue + return None + try: + data = json.loads(text) if (text and text.strip()) else {} + except Exception: + data = {} + key = data.get("listenKey") if isinstance(data, dict) else None + if key: + logger.info("✓ 合约 User Data Stream listenKey 已创建 (REST)") + return key + except asyncio.TimeoutError: + last_error = f"请求超时({timeout_sec}秒)" + logger.warning( + "create_futures_listen_key (REST) 失败: %s (尝试 %d/%d)", + last_error, attempt + 1, max_retries + 1, + ) + if attempt < max_retries: + await asyncio.sleep(2) # 等待 2 秒后重试 + continue + except Exception as e: + last_error = getattr(e, "message", str(e)) or repr(e) + logger.warning( + "create_futures_listen_key (REST) 失败: %s - %s (尝试 %d/%d)", + type(e).__name__, last_error, attempt + 1, max_retries + 1, + ) + if attempt < max_retries: + await asyncio.sleep(2) # 等待 2 秒后重试 + continue + + logger.error(f"create_futures_listen_key (REST) 重试 {max_retries + 1} 次后仍失败: {last_error}") + return None async def _create_listen_key_via_ws(self) -> Optional[str]: """通过 WebSocket API 创建 listenKey(优先方案)。""" diff --git a/trading_system/listen_key_cache.py b/trading_system/listen_key_cache.py new file mode 100644 index 0000000..c5db797 --- /dev/null +++ b/trading_system/listen_key_cache.py @@ -0,0 +1,284 @@ +""" +ListenKey 缓存管理:以账号为单位缓存 listenKey,支持有效期管理和自动更新。 + +⚠️ 重要说明: +- 每个账号(account_id)对应一套独立的 API Key,listenKey 与 API Key 绑定 +- 不同账号之间不能共用 listenKey(每个账号有独立的 API Key) +- 同一个账号的多个进程/实例可以共用 listenKey(因为它们使用相同的 API Key) +- Redis 键格式:listen_key:{account_id},确保不同账号的 listenKey 完全隔离 +""" +import asyncio +import json +import logging +import time +from typing import Optional, Dict, Any + +logger = logging.getLogger(__name__) + +# Redis 键格式:listen_key:{account_id} +# ⚠️ 每个 account_id 对应一套独立的 API Key,listenKey 与 API Key 绑定,不同账号之间完全隔离 +KEY_PREFIX = "listen_key:" +# listenKey 有效期:50 分钟(币安文档:60 分钟无 keepalive 会失效,我们提前 10 分钟更新) +LISTEN_KEY_TTL_SEC = 50 * 60 # 50 分钟 +# 提前更新时间:在到期前 5 分钟更新 +RENEW_BEFORE_EXPIRE_SEC = 5 * 60 # 5 分钟 + + +class ListenKeyCache: + """ + ListenKey 缓存管理器:以账号为单位缓存 listenKey,支持有效期管理和自动更新。 + + ⚠️ 重要: + - 每个账号(account_id)有独立的 listenKey 缓存 + - 不同账号之间不会共用 listenKey(每个账号对应独立的 API Key) + - 同一个账号的多个进程/实例可以共用 listenKey(使用相同的 API Key) + """ + + def __init__(self, redis_cache: Any): + """ + 初始化 ListenKey 缓存管理器 + + Args: + redis_cache: Redis 缓存实例 + """ + self.redis_cache = redis_cache + self._local_cache: Dict[int, Dict[str, Any]] = {} # 本地缓存:{account_id: {listen_key, expires_at}} + self._locks: Dict[int, asyncio.Lock] = {} # 每个账号的锁,避免并发创建 + + def _get_lock(self, account_id: int) -> asyncio.Lock: + """获取账号的锁(用于避免并发创建 listenKey)""" + if account_id not in self._locks: + self._locks[account_id] = asyncio.Lock() + return self._locks[account_id] + + def _get_redis_key(self, account_id: int) -> str: + """ + 获取 Redis 键 + + ⚠️ 重要:每个 account_id 对应独立的 Redis 键,确保不同账号的 listenKey 完全隔离 + 格式:listen_key:{account_id} + + Args: + account_id: 账号 ID(每个账号对应一套独立的 API Key) + + Returns: + Redis 键字符串 + """ + return f"{KEY_PREFIX}{account_id}" + + async def get_listen_key(self, account_id: int, client: Any) -> Optional[str]: + """ + 获取账号的 listenKey(从缓存读取或创建新的) + + ⚠️ 重要: + - 每个 account_id 对应一套独立的 API Key,listenKey 与 API Key 绑定 + - 不同账号之间不会共用 listenKey(每个账号有独立的缓存键) + - 同一个账号的多个进程/实例可以共用 listenKey(因为它们使用相同的 API Key) + + Args: + account_id: 账号 ID(每个账号对应一套独立的 API Key) + client: BinanceClient 实例(用于创建 listenKey,client 的 API Key 必须与 account_id 匹配) + + Returns: + listenKey,失败返回 None + """ + account_id = int(account_id) + + # ⚠️ 验证:确保 client 的 API Key 与 account_id 匹配(通过 account_id 隔离,不同账号不会共用) + # 注意:这里不直接验证 API Key,而是通过 account_id 来隔离,因为: + # 1. account_id 在系统启动时就已经确定(通过环境变量或配置) + # 2. 每个 account_id 对应一套独立的 API Key(在数据库 accounts 表中) + # 3. BinanceClient 在创建时已经使用了对应 account_id 的 API Key + + # 1. 先检查本地缓存 + cached = self._local_cache.get(account_id) + if cached: + expires_at = cached.get('expires_at', 0) + if time.time() < expires_at: + listen_key = cached.get('listen_key') + if listen_key: + logger.debug(f"ListenKeyCache: 从本地缓存获取账号 {account_id} 的 listenKey") + return listen_key + else: + # 本地缓存已过期,清除 + self._local_cache.pop(account_id, None) + + # 2. 从 Redis 读取(多进程共享) + if self.redis_cache: + try: + redis_key = self._get_redis_key(account_id) + cached_data = await self.redis_cache.get(redis_key) + if cached_data: + # RedisCache.get() 已经自动 JSON 解析,直接使用 + if isinstance(cached_data, dict): + listen_key = cached_data.get('listen_key') + expires_at = cached_data.get('expires_at', 0) + if listen_key and time.time() < expires_at: + # 更新本地缓存 + self._local_cache[account_id] = { + 'listen_key': listen_key, + 'expires_at': expires_at + } + logger.debug(f"ListenKeyCache: 从 Redis 缓存获取账号 {account_id} 的 listenKey") + return listen_key + except Exception as e: + logger.debug(f"ListenKeyCache: 从 Redis 读取失败: {e}") + + # 3. 缓存未命中或已过期,需要创建新的 + async with self._get_lock(account_id): + # 双重检查:可能其他协程已经创建了 + cached = self._local_cache.get(account_id) + if cached and time.time() < cached.get('expires_at', 0): + return cached.get('listen_key') + + # 创建新的 listenKey + logger.info(f"ListenKeyCache: 为账号 {account_id} 创建新的 listenKey...") + listen_key = await client.create_futures_listen_key(prefer_ws=True, max_retries=2) + if not listen_key: + logger.warning(f"ListenKeyCache: 账号 {account_id} 创建 listenKey 失败") + return None + + # 计算过期时间(50 分钟后) + expires_at = time.time() + LISTEN_KEY_TTL_SEC + + # 更新本地缓存 + self._local_cache[account_id] = { + 'listen_key': listen_key, + 'expires_at': expires_at + } + + # 写入 Redis(多进程共享) + if self.redis_cache: + try: + redis_key = self._get_redis_key(account_id) + cache_data = { + 'listen_key': listen_key, + 'expires_at': expires_at, + 'created_at': time.time(), + 'account_id': account_id + } + # RedisCache.set() 会自动 JSON 序列化,直接传入 dict + # Redis TTL 设置为 55 分钟(略长于我们的有效期,确保数据不会提前被删除) + await self.redis_cache.set(redis_key, cache_data, ttl=55 * 60) + logger.info(f"ListenKeyCache: 账号 {account_id} 的 listenKey 已缓存(有效期至 {expires_at:.0f})") + except Exception as e: + logger.warning(f"ListenKeyCache: 写入 Redis 失败: {e}") + + return listen_key + + async def renew_listen_key(self, account_id: int, client: Any, listen_key: str) -> Optional[str]: + """ + 更新 listenKey(keepalive 或创建新的) + + ⚠️ 重要: + - 每个 account_id 的 listenKey 独立更新,不会影响其他账号 + - listenKey 与 API Key 绑定,不同账号的 listenKey 不能混用 + + Args: + account_id: 账号 ID(每个账号对应一套独立的 API Key) + client: BinanceClient 实例(API Key 必须与 account_id 匹配) + listen_key: 当前的 listenKey(必须属于该 account_id) + + Returns: + 更新后的 listenKey(可能是同一个或新的),失败返回 None + """ + account_id = int(account_id) + + # 先尝试 keepalive + ok, code_1125 = await client.keepalive_futures_listen_key(listen_key, prefer_ws=True) + if ok: + # keepalive 成功,更新缓存有效期 + expires_at = time.time() + LISTEN_KEY_TTL_SEC + self._local_cache[account_id] = { + 'listen_key': listen_key, + 'expires_at': expires_at + } + + # 更新 Redis + if self.redis_cache: + try: + redis_key = self._get_redis_key(account_id) + cache_data = { + 'listen_key': listen_key, + 'expires_at': expires_at, + 'created_at': time.time(), + 'account_id': account_id + } + # RedisCache.set() 会自动 JSON 序列化 + await self.redis_cache.set(redis_key, cache_data, ttl=55 * 60) + logger.debug(f"ListenKeyCache: 账号 {account_id} 的 listenKey keepalive 成功,已更新缓存") + except Exception as e: + logger.debug(f"ListenKeyCache: 更新 Redis 失败: {e}") + + return listen_key + + # keepalive 失败(-1125 或网络错误),需要创建新的 + logger.info(f"ListenKeyCache: 账号 {account_id} 的 listenKey keepalive 失败,创建新的...") + return await self.get_listen_key(account_id, client) + + async def should_renew(self, account_id: int) -> bool: + """ + 检查是否需要更新 listenKey(在到期前 5 分钟) + + Args: + account_id: 账号 ID + + Returns: + 如果需要更新返回 True,否则返回 False + """ + account_id = int(account_id) + + # 检查本地缓存 + cached = self._local_cache.get(account_id) + if cached: + expires_at = cached.get('expires_at', 0) + if time.time() >= expires_at - RENEW_BEFORE_EXPIRE_SEC: + return True + + # 检查 Redis 缓存 + if self.redis_cache: + try: + redis_key = self._get_redis_key(account_id) + cached_data = await self.redis_cache.get(redis_key) + if cached_data and isinstance(cached_data, dict): + expires_at = cached_data.get('expires_at', 0) + if time.time() >= expires_at - RENEW_BEFORE_EXPIRE_SEC: + return True + except Exception: + pass + + return False + + async def clear_cache(self, account_id: int): + """清除账号的 listenKey 缓存""" + account_id = int(account_id) + self._local_cache.pop(account_id, None) + if self.redis_cache: + try: + redis_key = self._get_redis_key(account_id) + # RedisCache 可能没有 delete 方法,尝试直接调用 redis + if hasattr(self.redis_cache, 'redis') and self.redis_cache.redis: + await self.redis_cache.redis.delete(redis_key) + elif hasattr(self.redis_cache, 'delete'): + await self.redis_cache.delete(redis_key) + logger.debug(f"ListenKeyCache: 已清除账号 {account_id} 的 listenKey 缓存") + except Exception as e: + logger.debug(f"ListenKeyCache: 清除 Redis 缓存失败: {e}") + + +# 全局 ListenKeyCache 实例 +_listen_key_cache: Optional[ListenKeyCache] = None + + +def get_listen_key_cache(redis_cache: Any = None) -> Optional[ListenKeyCache]: + """获取全局 ListenKeyCache 实例""" + global _listen_key_cache + if _listen_key_cache is None and redis_cache: + _listen_key_cache = ListenKeyCache(redis_cache) + return _listen_key_cache + + +def set_listen_key_cache(cache: ListenKeyCache): + """设置全局 ListenKeyCache 实例""" + global _listen_key_cache + _listen_key_cache = cache diff --git a/trading_system/main.py b/trading_system/main.py index 2da8222..9ff0b42 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -357,6 +357,19 @@ async def main(): # 3. 启动 User Data Stream(订单/持仓/余额推送,listenKey 保活,减少 REST 请求) import os account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") + + # ⚠️ 优化:初始化 ListenKey 缓存管理器 + # 注意:每个账号(account_id)有独立的 listenKey 缓存,不同账号之间不会共用 + # 同一个账号的多个进程/实例可以共用 listenKey(因为它们使用相同的 API Key) + try: + from .listen_key_cache import get_listen_key_cache, set_listen_key_cache, ListenKeyCache + if getattr(client, "redis_cache", None): + cache = ListenKeyCache(client.redis_cache) + set_listen_key_cache(cache) + logger.info(f"✓ ListenKey 缓存管理器已初始化(账号 {account_id},同一账号的多进程/实例可共享 listenKey)") + except Exception as e: + logger.debug(f"初始化 ListenKey 缓存管理器失败: {e}") + user_data_stream = UserDataStream(client, account_id) logger.info(f"正在启动 User Data Stream(账号 {account_id})...") if await user_data_stream.start(): diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 3253b63..422bb0e 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -166,12 +166,34 @@ class MarketScanner: async def get_symbol_change_with_limit(symbol): async with semaphore: try: - return await asyncio.wait_for( + # ⚠️ 优化:优先使用共享缓存,减少超时风险 + result = await asyncio.wait_for( self._get_symbol_change(symbol, all_tickers.get(symbol)), timeout=analysis_timeout ) + return result except asyncio.TimeoutError: - logger.warning(f"{symbol} 分析超时({analysis_timeout:.0f}秒),跳过") + # ⚠️ 优化:超时时尝试返回降级结果(仅涨跌幅/成交量),而不是完全跳过 + logger.warning(f"{symbol} 分析超时({analysis_timeout:.0f}秒),尝试返回降级结果...") + try: + ticker = all_tickers.get(symbol) if all_tickers else None + if ticker: + change_pct = float(ticker.get('changePercent', 0) or 0) + vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0) + price = float(ticker.get('price', 0) or ticker.get('lastPrice', 0) or 0) + if price > 0: + return { + 'symbol': symbol, + 'price': price, + 'changePercent': change_pct, + 'volume24h': vol, + 'direction': 'UP' if change_pct > 0 else 'DOWN', + 'signalScore': 0, + 'signal_strength': 0, + } + except Exception as e: + logger.debug(f"{symbol} 降级结果构建失败: {e}") + logger.warning(f"{symbol} 分析超时且无法返回降级结果,跳过") return None except Exception as e: logger.debug(f"{symbol} 分析出错: {e}") diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index 521bc48..e1b3ded 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -131,14 +131,33 @@ class UserDataStream: return "wss://fstream.binance.com/ws" async def start(self) -> bool: - """创建 listenKey 并启动 WS 接收循环与 keepalive 任务。""" + """ + 创建 listenKey 并启动 WS 接收循环与 keepalive 任务。 + + ⚠️ 优化:优先从缓存获取 listenKey,避免重复创建。 + """ global _stream_instance if self._running: return True - self._listen_key = await self.client.create_futures_listen_key() + + # ⚠️ 优化:优先从缓存获取 listenKey(多进程/多实例共享) + try: + from .listen_key_cache import get_listen_key_cache + cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) + if cache: + self._listen_key = await cache.get_listen_key(self.account_id, self.client) + if self._listen_key: + logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取 listenKey") + except Exception as e: + logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}") + + # 如果缓存未命中,直接创建 if not self._listen_key: - logger.warning("UserDataStream: 无法创建 listenKey,跳过启动") - return False + self._listen_key = await self.client.create_futures_listen_key() + if not self._listen_key: + logger.warning("UserDataStream: 无法创建 listenKey,跳过启动") + return False + self._running = True _stream_instance = self self._task = asyncio.create_task(self._run_ws()) @@ -175,12 +194,42 @@ class UserDataStream: logger.info("UserDataStream: 已停止") async def _run_keepalive(self): - """每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。""" + """ + 每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。 + + ⚠️ 优化: + 1. 优先使用 WebSocket API keepalive,减少 REST 调用 + 2. 使用缓存管理器更新 listenKey,支持多进程共享 + """ while self._running: await asyncio.sleep(30 * 60) if not self._running or not self._listen_key: break - ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key) + + # ⚠️ 优化:使用缓存管理器更新 listenKey + try: + from .listen_key_cache import get_listen_key_cache + cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) + if cache: + # 使用缓存管理器更新(会自动 keepalive 或创建新的) + new_key = await cache.renew_listen_key(self.account_id, self.client, self._listen_key) + if new_key: + if new_key != self._listen_key: + logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已更新(keepalive 失败,创建了新 key)") + self._listen_key = new_key + # 如果 key 变了,需要重新连接 + if self._ws: + try: + await self._ws.close() + except Exception: + pass + break + continue + except Exception as e: + logger.debug(f"UserDataStream: 使用缓存管理器更新 listenKey 失败: {e}") + + # 回退到直接 keepalive(如果缓存管理器不可用) + ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True) if not ok and code_1125 and self._ws: logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连") try: @@ -234,15 +283,69 @@ class UserDataStream: self._conn_start_time = None if not self._running: break - # 重连前重新创建 listenKey(旧 key 可能已失效或 listenKeyExpired) - self._listen_key = await self.client.create_futures_listen_key() - if not self._listen_key: - logger.warning( - "UserDataStream(account_id=%s): 重新创建 listenKey 失败,60s 后重试(请检查该账号 API 权限/网络/IP 白名单)", - self.account_id, - ) - await asyncio.sleep(60) - continue + + # ⚠️ 优化:优先从缓存获取 listenKey(多进程共享,避免重复创建) + try: + from .listen_key_cache import get_listen_key_cache + cache = get_listen_key_cache(getattr(self.client, "redis_cache", None)) + if cache: + # 从缓存获取 listenKey(如果缓存中有有效的 key,会直接返回;否则会创建新的) + cached_key = await cache.get_listen_key(self.account_id, self.client) + if cached_key: + if cached_key == self._listen_key: + logger.debug(f"UserDataStream(account_id={self.account_id}): 从缓存获取到相同的 listenKey,复用") + else: + logger.info(f"UserDataStream(account_id={self.account_id}): 从缓存获取到新的 listenKey(可能其他进程创建的)") + self._listen_key = cached_key + # 继续使用现有的或缓存中的 listenKey + continue + except Exception as e: + logger.debug(f"UserDataStream: 从缓存获取 listenKey 失败: {e}") + + # 如果缓存不可用,回退到原有逻辑 + # ⚠️ 优化:重连前先尝试 keepalive 现有 listenKey,避免重复创建 + need_new_key = True + if self._listen_key: + logger.debug(f"UserDataStream(account_id={self.account_id}): 重连前尝试 keepalive 现有 listenKey...") + ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key, prefer_ws=True) + if ok: + logger.info(f"UserDataStream(account_id={self.account_id}): 现有 listenKey keepalive 成功,复用现有 key") + need_new_key = False + elif code_1125: + logger.debug(f"UserDataStream(account_id={self.account_id}): 现有 listenKey 已失效(-1125),需要创建新 key") + else: + logger.debug(f"UserDataStream(account_id={self.account_id}): keepalive 失败,尝试创建新 key") + + # 只有在需要新 key 时才创建(keepalive 失败或没有现有 key) + if need_new_key: + # ⚠️ 优化:增加重试机制,避免网络波动导致失败 + listen_key_retries = 3 + listen_key_created = False + for retry in range(listen_key_retries): + # 注意:根据币安文档,如果账户已有有效的 listenKey,创建接口会返回现有 key 并延长有效期 + # 所以这里即使"创建"也可能返回现有的 key,这是正常的 + new_key = await self.client.create_futures_listen_key(prefer_ws=True, max_retries=1) + if new_key: + # 如果返回的 key 与现有 key 相同,说明是复用现有 key + if new_key == self._listen_key: + logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已复用(币安返回现有 key)") + else: + logger.info(f"UserDataStream(account_id={self.account_id}): listenKey 已创建(重试 {retry + 1}/{listen_key_retries})") + self._listen_key = new_key + listen_key_created = True + break + if retry < listen_key_retries - 1: + wait_sec = (retry + 1) * 10 # 10秒、20秒、30秒 + logger.debug(f"UserDataStream(account_id={self.account_id}): listenKey 创建失败,{wait_sec}秒后重试...") + await asyncio.sleep(wait_sec) + + if not listen_key_created: + logger.warning( + "UserDataStream(account_id=%s): 重新创建 listenKey 失败(已重试 %d 次),60s 后重试(请检查该账号 API 权限/网络/IP 白名单)", + self.account_id, listen_key_retries, + ) + await asyncio.sleep(60) + continue async def _handle_message(self, raw: str) -> bool: """处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。""" diff --git a/trading_system/ws_trade_client.py b/trading_system/ws_trade_client.py index 9fee2d8..3bf0b80 100644 --- a/trading_system/ws_trade_client.py +++ b/trading_system/ws_trade_client.py @@ -156,9 +156,21 @@ class WSTradeClient: async with self._lock: self._pending_requests[req_id] = fut try: + # ⚠️ 优化:检查连接状态,避免在连接关闭时发送数据 + if not self.is_connected(): + async with self._lock: + self._pending_requests.pop(req_id, None) + raise ConnectionError("WS 连接已关闭") await self._ws.send_str(json.dumps(req)) result = await asyncio.wait_for(fut, timeout=timeout) return result + except (ConnectionResetError, OSError) as e: + async with self._lock: + self._pending_requests.pop(req_id, None) + err_msg = str(e).lower() + if "closing transport" in err_msg or "cannot write" in err_msg: + raise ConnectionError("WS 连接正在关闭,无法发送请求") + raise ConnectionError(f"WS 连接错误: {e}") except asyncio.TimeoutError: async with self._lock: self._pending_requests.pop(req_id, None)