diff --git a/trading_system/config.py b/trading_system/config.py index ba8a0e4..405edd5 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -215,7 +215,8 @@ DEFAULT_TRADING_CONFIG = { 'SCAN_INTERVAL': 900, # 扫描间隔15分钟(900秒),快速验证模式:提高扫描频率以增加交易机会 'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线,12秒通常够用;网络慢可调大(18~25) 'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时(秒);超时则返回降级结果(仅涨跌幅/成交量),不拖满整分析超时 - 'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据(最多2秒),而不是立即回退 REST API + 'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据(最多1-1.5秒),而不是立即回退 REST API + 'SCAN_LIMIT_KLINE_SUBSCRIBE': True, # 限制 K 线订阅:只在缓存完全没有数据时才订阅,避免订阅过多导致负载上升 # 多账号/低配服务器(如 2 CPU 4G):降低并发与错峰扫描,避免 CPU 打满 'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对(2 CPU 4G 多账号建议 2,单账号可 3~5) 'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时按 account_id 错峰首次扫描,避免多进程同时扫 diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 7b941dc..f628b49 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -58,6 +58,9 @@ class KlineStream: self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅 self._subscription_lock = asyncio.Lock() self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速 + # ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升 + self._subscription_count = 0 # 当前订阅数量 + self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值) def _ws_base_url(self) -> str: if self.testnet: @@ -94,7 +97,11 @@ class KlineStream: logger.info("KlineStream: 已停止") async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool: - """订阅指定 symbol 和 interval 的K线流(若 WS 未连接则等待连接后订阅)。""" + """ + 订阅指定 symbol 和 interval 的K线流(若 WS 未连接则等待连接后订阅)。 + + ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升。 + """ symbol = symbol.upper() interval = interval.lower() key = (symbol, interval) @@ -103,6 +110,15 @@ class KlineStream: return True if not self._running: return False + + # ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升 + current_count = len(self._subscribed) + if current_count >= self._max_subscriptions: + logger.warning( + f"KlineStream: 订阅数量已达上限 ({current_count}/{self._max_subscriptions})," + f"跳过订阅 {symbol} {interval}(建议优先使用共享缓存或 REST API)" + ) + return False # 等待 WS 连接(最多等待 5 秒) for _ in range(50): if self._ws: @@ -128,8 +144,9 @@ class KlineStream: }) self._subscribe_times.append(time.monotonic()) self._subscribed[key] = True + self._subscription_count = len(self._subscribed) _kline_cache_limit[key] = limit - logger.debug(f"KlineStream: 已订阅 {symbol} {interval}") + logger.debug(f"KlineStream: 已订阅 {symbol} {interval}(当前订阅数: {self._subscription_count}/{self._max_subscriptions})") return True except (ConnectionResetError, OSError) as e: msg = str(e).lower() @@ -185,6 +202,7 @@ class KlineStream: # 重连时清空订阅状态,需要重新订阅 async with self._subscription_lock: self._subscribed.clear() + self._subscription_count = 0 if not self._running: break diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 422bb0e..b8dc1ce 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -402,51 +402,68 @@ class MarketScanner: logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}") # 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存 + # ⚠️ 优化:减少不必要的订阅,优先使用已有缓存,避免订阅过多导致负载上升 if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2: try: from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh stream = get_kline_stream_instance() if stream: - # 确保订阅该流(首次请求时自动订阅) - subscribe_tasks = [ - stream.subscribe(symbol, primary_interval, limit=50), - stream.subscribe(symbol, confirm_interval, limit=50), - ] - await asyncio.gather(*subscribe_tasks, return_exceptions=True) + # ⚠️ 优化:先检查缓存是否已有数据,避免不必要的订阅 + # 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力 + cache_has_data = False + if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + if cached_klines and len(cached_klines) >= 2: + klines = cached_klines + cache_has_data = True + if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + if cached_klines_4h and len(cached_klines_4h) >= 2: + klines_4h = cached_klines_4h + cache_has_data = True - # 如果优先使用 WebSocket,等待推送数据(最多等待 2 秒) - if use_ws_prefer: - max_wait_sec = 2.0 # 最多等待 2 秒让 WebSocket 推送数据 - wait_start = time.time() - check_interval = 0.2 # 每 200ms 检查一次 + # ⚠️ 优化:只有在缓存完全没有数据时才订阅,避免订阅过多导致负载上升 + # 如果缓存已有部分数据,优先使用部分数据,减少订阅需求 + need_subscribe_primary = (not klines or len(klines) < 2) + need_subscribe_confirm = (not klines_4h or len(klines_4h) < 2) + + # 只在真正需要时才订阅(避免扫描时订阅过多流) + if need_subscribe_primary or need_subscribe_confirm: + subscribe_tasks = [] + if need_subscribe_primary: + subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=50)) + if need_subscribe_confirm: + subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=50)) - while (time.time() - wait_start) < max_wait_sec: - # 检查缓存是否可用 - if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): - cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) - if cached_klines and len(cached_klines) >= 2: - klines = cached_klines - if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): - cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) - if cached_klines_4h and len(cached_klines_4h) >= 2: - klines_4h = cached_klines_4h + if subscribe_tasks: + await asyncio.gather(*subscribe_tasks, return_exceptions=True) + + # 如果优先使用 WebSocket,等待推送数据(但缩短等待时间,避免阻塞) + if use_ws_prefer: + # ⚠️ 优化:缩短等待时间(1秒),如果缓存已有部分数据,直接使用,不等待 + max_wait_sec = 1.0 if cache_has_data else 1.5 # 如果缓存有数据,只等1秒;否则等1.5秒 + wait_start = time.time() + check_interval = 0.2 # 每 200ms 检查一次 - # 如果两个周期都有足够数据,提前退出 - if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2: - break - - await asyncio.sleep(check_interval) + while (time.time() - wait_start) < max_wait_sec: + # 检查缓存是否可用 + if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + if cached_klines and len(cached_klines) >= 2: + klines = cached_klines + if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + if cached_klines_4h and len(cached_klines_4h) >= 2: + klines_4h = cached_klines_4h + + # 如果两个周期都有足够数据,提前退出 + if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2: + break + + await asyncio.sleep(check_interval) else: - # 不优先 WebSocket,短暂等待后立即检查 - await asyncio.sleep(0.3) - if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): - cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) - if cached_klines and len(cached_klines) >= 2: - klines = cached_klines - if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): - cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) - if cached_klines_4h and len(cached_klines_4h) >= 2: - klines_4h = cached_klines_4h + # 缓存已有完整数据,无需订阅 + logger.debug(f"{symbol} 缓存已有完整 K 线数据,跳过订阅") except Exception as e: logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}")