feat(binance_client, market_scanner): 优化 K线数据获取逻辑与缓存机制

在 `binance_client.py` 中更新 `get_klines` 方法,新增多账号共享 Redis 缓存机制,提升 K线数据获取效率,减少 REST API 调用。优化日志记录,确保清晰反馈缓存来源。更新 `config.py`,引入 `SCAN_PREFER_WEBSOCKET` 配置,优先使用 WebSocket 获取数据。修改 `market_scanner.py`,增强 K线数据获取流程,优先从共享缓存读取,确保数据完整性与实时性。此改动提升了系统的性能与稳定性。
This commit is contained in:
薇薇安 2026-02-17 23:59:31 +08:00
parent a862aec4f5
commit a404f1fdf8
4 changed files with 225 additions and 24 deletions

View File

@ -772,7 +772,12 @@ class BinanceClient:
async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]:
"""
获取K线数据合约市场
优先级WS 缓存 > Redis 缓存 > REST API
优先级共享 Redis 缓存多账号共用> WS 本地缓存 > REST API
优化多账号共享缓存机制
- Leader 进程通过 WebSocket 实时更新共享缓存market:kline:{symbol}:{interval}
- 所有账号进程优先从共享缓存读取减少 REST API 调用
- REST API 获取的数据也会写入共享缓存供其他账号使用
Args:
symbol: 交易对
@ -782,18 +787,19 @@ class BinanceClient:
Returns:
K线数据列表
"""
# 0. 多进程共用:从 Redis 读 Leader 写入的 K 线(避免非 Leader 进程无 WS 时直接打 REST
# 0. ⚠️ 优先:多进程/多账号共用 Redis 缓存(最高优先级,减少所有账号的 REST 调用
try:
from .market_ws_leader import use_shared_market_ws
from .market_ws_leader import use_shared_market_ws, KEY_KLINE_PREFIX
from .kline_stream import get_klines_from_redis
if use_shared_market_ws(self.redis_cache):
shared = await get_klines_from_redis(self.redis_cache, symbol, interval, limit)
if shared and len(shared) >= limit:
logger.debug(f"从共用 Redis 获取 {symbol} K线: {interval} x{limit}")
logger.debug(f"✓ 从共享 Redis 缓存获取 {symbol} K线: {interval} x{limit}(多账号共用)")
return shared
except Exception as e:
logger.debug(f"读取共用 K线 Redis 失败: {e}")
# 1. 优先从 WS 缓存读取(实时更新,无 REST 请求)
logger.debug(f"读取共享 K线 Redis 失败: {e}")
# 1. 本地 WS 缓存(当前进程的 WebSocket 数据,实时更新)
try:
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh
stream = get_kline_stream_instance()
@ -803,34 +809,49 @@ class BinanceClient:
if is_kline_cache_fresh(symbol, interval, max_age_sec=300.0):
ws_cached = get_klines_from_cache(symbol, interval, limit)
if ws_cached and len(ws_cached) >= limit:
logger.debug(f" WS 缓存获取 {symbol} K线数据: {interval} x{limit}")
logger.debug(f"本地 WS 缓存获取 {symbol} K线数据: {interval} x{limit}")
return ws_cached
except Exception as e:
logger.debug(f"读取 K线 WS 缓存失败: {e}")
# 2. 查 Redis 缓存
# 2. 旧格式 Redis 缓存(兼容性,逐步迁移到共享缓存)
cache_key = f"klines:{symbol}:{interval}:{limit}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从 Redis 缓存获取 {symbol} K线数据: {interval} x{limit}")
return cached
# 3. REST API兜底
# 3. REST API兜底,获取后写入共享缓存供其他账号使用
try:
klines = await self._rate_limited_request(
f'klines_{symbol}_{interval}',
self.client.futures_klines(symbol=symbol, interval=interval, limit=limit)
)
# 写入 Redis 缓存(根据 interval 动态设置 TTL
# ⚠️ 优化写入共享缓存market:kline:{symbol}:{interval}),供所有账号使用
if klines:
ttl_map = {
try:
from .market_ws_leader import KEY_KLINE_PREFIX
shared_key = f"{KEY_KLINE_PREFIX}{symbol.upper()}:{interval.lower()}"
# 使用较长的 TTL因为这是共享缓存多个账号都会使用
ttl_map = {
'1m': 60, '3m': 120, '5m': 180, '15m': 300, '30m': 600,
'1h': 900, '2h': 1800, '4h': 3600, '6h': 5400, '8h': 7200, '12h': 10800, '1d': 21600
}
ttl = ttl_map.get(interval, 1800) # 默认 30 分钟
await self.redis_cache.set(shared_key, klines, ttl=ttl)
logger.debug(f"✓ 已写入共享 Redis 缓存 {symbol} K线: {interval} x{limit} (TTL: {ttl}秒,供多账号共用)")
except Exception as e:
logger.debug(f"写入共享缓存失败: {e}")
# 同时写入旧格式缓存(兼容性)
ttl_map_old = {
'1m': 10, '3m': 20, '5m': 30, '15m': 60, '30m': 120,
'1h': 300, '2h': 600, '4h': 900, '6h': 1200, '8h': 1800, '12h': 2400, '1d': 3600
}
ttl = ttl_map.get(interval, 300)
await self.redis_cache.set(cache_key, klines, ttl=ttl)
logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl}秒)")
ttl_old = ttl_map_old.get(interval, 300)
await self.redis_cache.set(cache_key, klines, ttl=ttl_old)
logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl_old}秒)")
return klines
except BinanceAPIException as e:

View File

@ -215,6 +215,7 @@ DEFAULT_TRADING_CONFIG = {
'SCAN_INTERVAL': 900, # 扫描间隔15分钟900秒快速验证模式提高扫描频率以增加交易机会
'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线12秒通常够用网络慢可调大18~25
'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时超时则返回降级结果仅涨跌幅/成交量),不拖满整分析超时
'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据最多2秒而不是立即回退 REST API
# 多账号/低配服务器(如 2 CPU 4G降低并发与错峰扫描避免 CPU 打满
'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对2 CPU 4G 多账号建议 2单账号可 35
'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时按 account_id 错峰首次扫描,避免多进程同时扫

View File

@ -347,6 +347,7 @@ class MarketScanner:
ticker_ts = None
# 获取更多K线数据用于技术指标计算主周期 + 确认周期并行请求;单独超时避免整分析拖到 18s
# ⚠️ 优化:优先使用 WebSocket K 线缓存,避免 REST API 超时
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
cfg = dict(config.TRADING_CONFIG or {})
@ -355,18 +356,122 @@ class MarketScanner:
klines_timeout = 3.0
elif klines_timeout > 20:
klines_timeout = 20.0
# ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存
# 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用
klines, klines_4h = [], []
use_ws_prefer = cfg.get('SCAN_PREFER_WEBSOCKET', True) # 默认优先 WebSocket
# 0. 优先从共享 Redis 缓存读取(多账号共用,最高优先级)
try:
klines, klines_4h = await asyncio.wait_for(
asyncio.gather(
self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50),
self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50),
),
timeout=klines_timeout,
)
except asyncio.TimeoutError:
logger.debug(f"{symbol} K线拉取超时{klines_timeout:.0f}秒),使用降级结果(仅涨跌幅/成交量)")
klines, klines_4h = [], []
from .market_ws_leader import use_shared_market_ws
from .kline_stream import get_klines_from_redis
if use_shared_market_ws(self.client.redis_cache):
# 尝试从共享缓存获取
shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50)
shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50)
if shared_klines and len(shared_klines) >= 2:
klines = shared_klines
logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}")
if shared_klines_4h and len(shared_klines_4h) >= 2:
klines_4h = shared_klines_4h
logger.debug(f"{symbol} 从共享缓存获取确认周期 K 线: {confirm_interval}")
except Exception as e:
logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}")
# 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
try:
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh
stream = get_kline_stream_instance()
if stream:
# 确保订阅该流(首次请求时自动订阅)
subscribe_tasks = [
stream.subscribe(symbol, primary_interval, limit=50),
stream.subscribe(symbol, confirm_interval, limit=50),
]
await asyncio.gather(*subscribe_tasks, return_exceptions=True)
# 如果优先使用 WebSocket等待推送数据最多等待 2 秒)
if use_ws_prefer:
max_wait_sec = 2.0 # 最多等待 2 秒让 WebSocket 推送数据
wait_start = time.time()
check_interval = 0.2 # 每 200ms 检查一次
while (time.time() - wait_start) < max_wait_sec:
# 检查缓存是否可用
if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50)
if cached_klines and len(cached_klines) >= 2:
klines = cached_klines
if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50)
if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h
# 如果两个周期都有足够数据,提前退出
if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2:
break
await asyncio.sleep(check_interval)
else:
# 不优先 WebSocket短暂等待后立即检查
await asyncio.sleep(0.3)
if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50)
if cached_klines and len(cached_klines) >= 2:
klines = cached_klines
if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50)
if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h
except Exception as e:
logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}")
# 如果 WebSocket 缓存仍不完整,使用 REST API带超时保护
# 注意:如果 WebSocket 缓存有部分数据优先使用部分数据REST 只补充缺失的
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
# 只请求缺失的数据,减少等待时间
rest_tasks = []
if not klines or len(klines) < 2:
rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50)))
else:
rest_tasks.append(('primary', None))
if not klines_4h or len(klines_4h) < 2:
rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50)))
else:
rest_tasks.append(('confirm', None))
try:
# 只等待实际需要的数据
results = await asyncio.wait_for(
asyncio.gather(
*[task[1] if task[1] else asyncio.sleep(0) for task in rest_tasks],
return_exceptions=True
),
timeout=klines_timeout,
)
# 合并结果:优先使用 WebSocket 缓存REST 作为补充
for i, (task_type, _) in enumerate(rest_tasks):
if task_type == 'primary' and (not klines or len(klines) < 2):
if isinstance(results[i], list) and len(results[i]) >= 2:
klines = results[i]
elif task_type == 'confirm' and (not klines_4h or len(klines_4h) < 2):
if isinstance(results[i], list) and len(results[i]) >= 2:
klines_4h = results[i]
except asyncio.TimeoutError:
logger.debug(f"{symbol} K线拉取超时{klines_timeout:.0f}秒),使用已有缓存或降级结果")
# 如果 WebSocket 缓存有部分数据,继续使用;否则使用降级结果
if not klines or len(klines) < 2:
klines = []
if not klines_4h or len(klines_4h) < 2:
klines_4h = []
except Exception as e:
logger.debug(f"{symbol} K线拉取异常: {e}")
if not klines or len(klines) < 2:
klines = []
if not klines_4h or len(klines_4h) < 2:
klines_4h = []
if not klines or len(klines) < 2:
# 降级:仍有 ticker 时返回仅涨跌幅/成交量的结果,不直接跳过,避免整轮扫描被拖慢

View File

@ -172,6 +172,80 @@ class WSTradeClient:
async with self._lock:
self._pending_requests.pop(req_id, None)
async def ticker_price(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取最新价格ticker.price
Args:
symbol: 交易对可选不传则返回所有交易对
timeout: 等待响应超时
Returns:
价格信息失败返回 None 或抛异常
"""
params = {}
if symbol:
params["symbol"] = symbol.upper()
try:
result = await self._send_request("ticker.price", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: ticker.price 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: ticker.price 异常: {e}")
raise
async def ticker_book(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取当前最优挂单ticker.book
Args:
symbol: 交易对可选不传则返回所有交易对
timeout: 等待响应超时
Returns:
最优挂单信息失败返回 None 或抛异常
"""
params = {}
if symbol:
params["symbol"] = symbol.upper()
try:
result = await self._send_request("ticker.book", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: ticker.book 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: ticker.book 异常: {e}")
raise
async def depth(self, symbol: str, limit: int = 20, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""
通过 WebSocket API 获取深度信息depth
Args:
symbol: 交易对
limit: 深度档位可选值: 5, 10, 20, 50, 100, 500, 1000
timeout: 等待响应超时
Returns:
深度信息失败返回 None 或抛异常
"""
params = {
"symbol": symbol.upper(),
"limit": limit
}
try:
result = await self._send_request("depth", params, timeout=timeout)
return result
except (ConnectionError, TimeoutError) as e:
logger.debug(f"WSTradeClient: depth 失败: {e}")
raise
except Exception as e:
logger.error(f"WSTradeClient: depth 异常: {e}")
raise
async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]:
"""
通过 WS 发送 algoOrder.place 请求条件单