diff --git a/trading_system/config.py b/trading_system/config.py index 3d910bc..b094f01 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -216,8 +216,11 @@ DEFAULT_TRADING_CONFIG = { 'SCAN_INTERVAL': 900, # 扫描间隔15分钟(900秒),快速验证模式:提高扫描频率以增加交易机会 'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线,12秒通常够用;网络慢可调大(18~25) 'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时(秒);超时则返回降级结果(仅涨跌幅/成交量),不拖满整分析超时 + 'SCAN_KLINE_LIMIT': 30, # 扫描用 K 线根数(30 根够 RSI/ATR 计算,减少 REST 数据量加快响应) 'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据(最多1-1.5秒),而不是立即回退 REST API 'SCAN_LIMIT_KLINE_SUBSCRIBE': True, # 限制 K 线订阅:只在缓存完全没有数据时才订阅,避免订阅过多导致负载上升 + 'SCAN_PREWARM_KLINE_ENABLED': True, # 扫描前预热 K 线:批量预订阅 WS + REST 预取,提高缓存命中率、减少分析超时 + 'SCAN_PREWARM_CONCURRENT': 5, # 预热阶段同时预取多少个交易对的 K 线(避免并发过高) # 多账号/低配服务器(如 2 CPU 4G):降低并发与错峰扫描,避免 CPU 打满 'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对(2 CPU 4G 多账号建议 2,单账号可 3~5) 'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时错峰首次扫描,避免多进程同时扫 diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index bbd81ab..776f5ed 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -160,6 +160,11 @@ class MarketScanner: except Exception as e: logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e) + # 方案 A:扫描前预热 K 线(批量预订阅 WS + REST 预取),提高缓存命中率、减少分析超时 + prewarm_enabled = cfg.get('SCAN_PREWARM_KLINE_ENABLED', True) + if prewarm_enabled and pre_filtered_symbols: + await self._prewarm_klines_for_scan(pre_filtered_symbols, cfg) + # 只对符合条件的交易对进行详细分析(获取K线和技术指标) # 并发数由 SCAN_CONCURRENT_SYMBOLS 控制:2 CPU 4G 多账号建议 2,单账号可 3~5 concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2) @@ -345,7 +350,71 @@ class MarketScanner: log_parts.append(f"价格: {symbol_info['price']:.4f}") logger.info(" | ".join(log_parts)) - + + async def _prewarm_klines_for_scan(self, symbols: List[str], cfg: Optional[Dict] = None) -> None: + """ + 方案 A:扫描前预热 K 线,批量预订阅 WebSocket + REST 预取,提高缓存命中率、减少分析超时。 + 文档:组合流可订阅多个 symbol@kline_interval,币安限制每秒 10 条订阅。 + """ + if not symbols: + return + cfg = cfg or dict(config.TRADING_CONFIG or {}) + primary_interval = cfg.get('PRIMARY_INTERVAL', config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')) + confirm_interval = cfg.get('CONFIRM_INTERVAL', config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')) + kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30) + kline_limit = max(20, min(50, kline_limit)) + prewarm_concurrent = int(cfg.get('SCAN_PREWARM_CONCURRENT', 5) or 5) + klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8) + + import time + start_ts = time.time() + + # 1. 批量预订阅 WebSocket K 线流(币安限制 10 条/秒) + stream = None + try: + from .kline_stream import get_kline_stream_instance + stream = get_kline_stream_instance() + except ImportError: + pass + if stream: + subs_done = 0 + for sym in symbols: + try: + await stream.subscribe(sym, primary_interval, limit=kline_limit) + await stream.subscribe(sym, confirm_interval, limit=kline_limit) + subs_done += 2 + if subs_done >= 20: # 每 20 次订阅略等待,避免超限 + await asyncio.sleep(2.1) + subs_done = 0 + except Exception as e: + logger.debug(f"预订阅 {sym} K 线失败: {e}") + logger.debug(f"K 线预订阅完成,{len(symbols)} 个交易对 x2 周期") + + # 2. 批量 REST 预取 K 线(写入 Redis,供后续分析直接命中) + async def fetch_one(symbol: str) -> None: + try: + tasks = [ + asyncio.wait_for( + self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit), + timeout=klines_timeout, + ), + asyncio.wait_for( + self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit), + timeout=klines_timeout, + ), + ] + await asyncio.gather(*tasks, return_exceptions=True) + except Exception as e: + logger.debug(f"预热 {symbol} K 线失败: {e}") + + sem = asyncio.Semaphore(prewarm_concurrent) + async def fetch_with_limit(s): + async with sem: + await fetch_one(s) + await asyncio.gather(*[fetch_with_limit(s) for s in symbols], return_exceptions=True) + elapsed = time.time() - start_ts + logger.info(f"K 线预热完成,{len(symbols)} 个交易对,耗时 {elapsed:.1f}s(WS 预订阅 + REST 预取)") + async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: """ 获取单个交易对的涨跌幅和技术指标 @@ -390,6 +459,8 @@ class MarketScanner: klines_timeout = 3.0 elif klines_timeout > 20: klines_timeout = 20.0 + kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30) + kline_limit = max(20, min(50, kline_limit)) # ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存 # 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用 @@ -402,8 +473,8 @@ class MarketScanner: from .kline_stream import get_klines_from_redis if use_shared_market_ws(self.client.redis_cache): # 尝试从共享缓存获取 - shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50) - shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50) + shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=kline_limit) + shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=kline_limit) if shared_klines and len(shared_klines) >= 2: klines = shared_klines logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}") @@ -424,12 +495,12 @@ class MarketScanner: # 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力 cache_has_data = False if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): - cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit) if cached_klines and len(cached_klines) >= 2: klines = cached_klines cache_has_data = True if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): - cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit) if cached_klines_4h and len(cached_klines_4h) >= 2: klines_4h = cached_klines_4h cache_has_data = True @@ -443,9 +514,9 @@ class MarketScanner: if need_subscribe_primary or need_subscribe_confirm: subscribe_tasks = [] if need_subscribe_primary: - subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=50)) + subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=kline_limit)) if need_subscribe_confirm: - subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=50)) + subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=kline_limit)) if subscribe_tasks: await asyncio.gather(*subscribe_tasks, return_exceptions=True) @@ -460,11 +531,11 @@ class MarketScanner: while (time.time() - wait_start) < max_wait_sec: # 检查缓存是否可用 if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): - cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit) if cached_klines and len(cached_klines) >= 2: klines = cached_klines if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): - cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit) if cached_klines_4h and len(cached_klines_4h) >= 2: klines_4h = cached_klines_4h @@ -485,11 +556,11 @@ class MarketScanner: # 只请求缺失的数据,减少等待时间 rest_tasks = [] if not klines or len(klines) < 2: - rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50))) + rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit))) else: rest_tasks.append(('primary', None)) if not klines_4h or len(klines_4h) < 2: - rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50))) + rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit))) else: rest_tasks.append(('confirm', None))