feat(market_scanner, config): 增强K线扫描逻辑与预热机制

在 `config.py` 中新增 `SCAN_PREWARM_KLINE_ENABLED` 和 `SCAN_PREWARM_CONCURRENT` 配置,支持在扫描前预热K线数据以提高缓存命中率。更新了 `market_scanner.py` 中的扫描逻辑,添加 `_prewarm_klines_for_scan` 方法,批量预订阅WebSocket和REST预取K线,优化了数据获取效率和分析超时处理。这些改进提升了系统在高并发情况下的响应能力与稳定性。
This commit is contained in:
薇薇安 2026-02-21 12:38:05 +08:00
parent 83a09f24f8
commit e1759a7f4c
2 changed files with 85 additions and 11 deletions

View File

@ -216,8 +216,11 @@ DEFAULT_TRADING_CONFIG = {
'SCAN_INTERVAL': 900, # 扫描间隔15分钟900秒快速验证模式提高扫描频率以增加交易机会 'SCAN_INTERVAL': 900, # 扫描间隔15分钟900秒快速验证模式提高扫描频率以增加交易机会
'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线12秒通常够用网络慢可调大18~25 'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线12秒通常够用网络慢可调大18~25
'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时超时则返回降级结果仅涨跌幅/成交量),不拖满整分析超时 'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时超时则返回降级结果仅涨跌幅/成交量),不拖满整分析超时
'SCAN_KLINE_LIMIT': 30, # 扫描用 K 线根数30 根够 RSI/ATR 计算,减少 REST 数据量加快响应)
'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据最多1-1.5秒),而不是立即回退 REST API 'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据最多1-1.5秒),而不是立即回退 REST API
'SCAN_LIMIT_KLINE_SUBSCRIBE': True, # 限制 K 线订阅:只在缓存完全没有数据时才订阅,避免订阅过多导致负载上升 'SCAN_LIMIT_KLINE_SUBSCRIBE': True, # 限制 K 线订阅:只在缓存完全没有数据时才订阅,避免订阅过多导致负载上升
'SCAN_PREWARM_KLINE_ENABLED': True, # 扫描前预热 K 线:批量预订阅 WS + REST 预取,提高缓存命中率、减少分析超时
'SCAN_PREWARM_CONCURRENT': 5, # 预热阶段同时预取多少个交易对的 K 线(避免并发过高)
# 多账号/低配服务器(如 2 CPU 4G降低并发与错峰扫描避免 CPU 打满 # 多账号/低配服务器(如 2 CPU 4G降低并发与错峰扫描避免 CPU 打满
'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对2 CPU 4G 多账号建议 2单账号可 35 'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对2 CPU 4G 多账号建议 2单账号可 35
'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时错峰首次扫描,避免多进程同时扫 'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时错峰首次扫描,避免多进程同时扫

View File

@ -160,6 +160,11 @@ class MarketScanner:
except Exception as e: except Exception as e:
logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e) logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e)
# 方案 A扫描前预热 K 线(批量预订阅 WS + REST 预取),提高缓存命中率、减少分析超时
prewarm_enabled = cfg.get('SCAN_PREWARM_KLINE_ENABLED', True)
if prewarm_enabled and pre_filtered_symbols:
await self._prewarm_klines_for_scan(pre_filtered_symbols, cfg)
# 只对符合条件的交易对进行详细分析获取K线和技术指标 # 只对符合条件的交易对进行详细分析获取K线和技术指标
# 并发数由 SCAN_CONCURRENT_SYMBOLS 控制2 CPU 4G 多账号建议 2单账号可 35 # 并发数由 SCAN_CONCURRENT_SYMBOLS 控制2 CPU 4G 多账号建议 2单账号可 35
concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2) concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2)
@ -345,7 +350,71 @@ class MarketScanner:
log_parts.append(f"价格: {symbol_info['price']:.4f}") log_parts.append(f"价格: {symbol_info['price']:.4f}")
logger.info(" | ".join(log_parts)) logger.info(" | ".join(log_parts))
async def _prewarm_klines_for_scan(self, symbols: List[str], cfg: Optional[Dict] = None) -> None:
"""
方案 A扫描前预热 K 线批量预订阅 WebSocket + REST 预取提高缓存命中率减少分析超时
文档组合流可订阅多个 symbol@kline_interval币安限制每秒 10 条订阅
"""
if not symbols:
return
cfg = cfg or dict(config.TRADING_CONFIG or {})
primary_interval = cfg.get('PRIMARY_INTERVAL', config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h'))
confirm_interval = cfg.get('CONFIRM_INTERVAL', config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h'))
kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30)
kline_limit = max(20, min(50, kline_limit))
prewarm_concurrent = int(cfg.get('SCAN_PREWARM_CONCURRENT', 5) or 5)
klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8)
import time
start_ts = time.time()
# 1. 批量预订阅 WebSocket K 线流(币安限制 10 条/秒)
stream = None
try:
from .kline_stream import get_kline_stream_instance
stream = get_kline_stream_instance()
except ImportError:
pass
if stream:
subs_done = 0
for sym in symbols:
try:
await stream.subscribe(sym, primary_interval, limit=kline_limit)
await stream.subscribe(sym, confirm_interval, limit=kline_limit)
subs_done += 2
if subs_done >= 20: # 每 20 次订阅略等待,避免超限
await asyncio.sleep(2.1)
subs_done = 0
except Exception as e:
logger.debug(f"预订阅 {sym} K 线失败: {e}")
logger.debug(f"K 线预订阅完成,{len(symbols)} 个交易对 x2 周期")
# 2. 批量 REST 预取 K 线(写入 Redis供后续分析直接命中
async def fetch_one(symbol: str) -> None:
try:
tasks = [
asyncio.wait_for(
self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit),
timeout=klines_timeout,
),
asyncio.wait_for(
self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit),
timeout=klines_timeout,
),
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.debug(f"预热 {symbol} K 线失败: {e}")
sem = asyncio.Semaphore(prewarm_concurrent)
async def fetch_with_limit(s):
async with sem:
await fetch_one(s)
await asyncio.gather(*[fetch_with_limit(s) for s in symbols], return_exceptions=True)
elapsed = time.time() - start_ts
logger.info(f"K 线预热完成,{len(symbols)} 个交易对,耗时 {elapsed:.1f}sWS 预订阅 + REST 预取)")
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
""" """
获取单个交易对的涨跌幅和技术指标 获取单个交易对的涨跌幅和技术指标
@ -390,6 +459,8 @@ class MarketScanner:
klines_timeout = 3.0 klines_timeout = 3.0
elif klines_timeout > 20: elif klines_timeout > 20:
klines_timeout = 20.0 klines_timeout = 20.0
kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30)
kline_limit = max(20, min(50, kline_limit))
# ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存 # ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存
# 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用 # 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用
@ -402,8 +473,8 @@ class MarketScanner:
from .kline_stream import get_klines_from_redis from .kline_stream import get_klines_from_redis
if use_shared_market_ws(self.client.redis_cache): if use_shared_market_ws(self.client.redis_cache):
# 尝试从共享缓存获取 # 尝试从共享缓存获取
shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50) shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=kline_limit)
shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50) shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=kline_limit)
if shared_klines and len(shared_klines) >= 2: if shared_klines and len(shared_klines) >= 2:
klines = shared_klines klines = shared_klines
logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}") logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}")
@ -424,12 +495,12 @@ class MarketScanner:
# 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力 # 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力
cache_has_data = False cache_has_data = False
if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit)
if cached_klines and len(cached_klines) >= 2: if cached_klines and len(cached_klines) >= 2:
klines = cached_klines klines = cached_klines
cache_has_data = True cache_has_data = True
if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit)
if cached_klines_4h and len(cached_klines_4h) >= 2: if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h klines_4h = cached_klines_4h
cache_has_data = True cache_has_data = True
@ -443,9 +514,9 @@ class MarketScanner:
if need_subscribe_primary or need_subscribe_confirm: if need_subscribe_primary or need_subscribe_confirm:
subscribe_tasks = [] subscribe_tasks = []
if need_subscribe_primary: if need_subscribe_primary:
subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=50)) subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=kline_limit))
if need_subscribe_confirm: if need_subscribe_confirm:
subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=50)) subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=kline_limit))
if subscribe_tasks: if subscribe_tasks:
await asyncio.gather(*subscribe_tasks, return_exceptions=True) await asyncio.gather(*subscribe_tasks, return_exceptions=True)
@ -460,11 +531,11 @@ class MarketScanner:
while (time.time() - wait_start) < max_wait_sec: while (time.time() - wait_start) < max_wait_sec:
# 检查缓存是否可用 # 检查缓存是否可用
if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit)
if cached_klines and len(cached_klines) >= 2: if cached_klines and len(cached_klines) >= 2:
klines = cached_klines klines = cached_klines
if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit)
if cached_klines_4h and len(cached_klines_4h) >= 2: if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h klines_4h = cached_klines_4h
@ -485,11 +556,11 @@ class MarketScanner:
# 只请求缺失的数据,减少等待时间 # 只请求缺失的数据,减少等待时间
rest_tasks = [] rest_tasks = []
if not klines or len(klines) < 2: if not klines or len(klines) < 2:
rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50))) rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit)))
else: else:
rest_tasks.append(('primary', None)) rest_tasks.append(('primary', None))
if not klines_4h or len(klines_4h) < 2: if not klines_4h or len(klines_4h) < 2:
rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50))) rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit)))
else: else:
rest_tasks.append(('confirm', None)) rest_tasks.append(('confirm', None))