From a404f1fdf87001706a9946fc9e53df21340c9b34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Tue, 17 Feb 2026 23:59:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(binance=5Fclient,=20market=5Fscanner):=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=20K=E7=BA=BF=E6=95=B0=E6=8D=AE=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E9=80=BB=E8=BE=91=E4=B8=8E=E7=BC=93=E5=AD=98=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `binance_client.py` 中更新 `get_klines` 方法,新增多账号共享 Redis 缓存机制,提升 K线数据获取效率,减少 REST API 调用。优化日志记录,确保清晰反馈缓存来源。更新 `config.py`,引入 `SCAN_PREFER_WEBSOCKET` 配置,优先使用 WebSocket 获取数据。修改 `market_scanner.py`,增强 K线数据获取流程,优先从共享缓存读取,确保数据完整性与实时性。此改动提升了系统的性能与稳定性。 --- trading_system/binance_client.py | 49 ++++++++---- trading_system/config.py | 1 + trading_system/market_scanner.py | 125 +++++++++++++++++++++++++++--- trading_system/ws_trade_client.py | 74 ++++++++++++++++++ 4 files changed, 225 insertions(+), 24 deletions(-) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index e1cf289..45eb9d8 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -772,7 +772,12 @@ class BinanceClient: async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]: """ 获取K线数据(合约市场) - 优先级:WS 缓存 > Redis 缓存 > REST API + 优先级:共享 Redis 缓存(多账号共用)> WS 本地缓存 > REST API + + ⚠️ 优化:多账号共享缓存机制 + - Leader 进程通过 WebSocket 实时更新共享缓存(market:kline:{symbol}:{interval}) + - 所有账号进程优先从共享缓存读取,减少 REST API 调用 + - REST API 获取的数据也会写入共享缓存,供其他账号使用 Args: symbol: 交易对 @@ -782,18 +787,19 @@ class BinanceClient: Returns: K线数据列表 """ - # 0. 多进程共用:从 Redis 读 Leader 写入的 K 线(避免非 Leader 进程无 WS 时直接打 REST) + # 0. ⚠️ 优先:多进程/多账号共用 Redis 缓存(最高优先级,减少所有账号的 REST 调用) try: - from .market_ws_leader import use_shared_market_ws + from .market_ws_leader import use_shared_market_ws, KEY_KLINE_PREFIX from .kline_stream import get_klines_from_redis if use_shared_market_ws(self.redis_cache): shared = await get_klines_from_redis(self.redis_cache, symbol, interval, limit) if shared and len(shared) >= limit: - logger.debug(f"从共用 Redis 获取 {symbol} K线: {interval} x{limit}") + logger.debug(f"✓ 从共享 Redis 缓存获取 {symbol} K线: {interval} x{limit}(多账号共用)") return shared except Exception as e: - logger.debug(f"读取共用 K线 Redis 失败: {e}") - # 1. 优先从 WS 缓存读取(实时更新,无 REST 请求) + logger.debug(f"读取共享 K线 Redis 失败: {e}") + + # 1. 本地 WS 缓存(当前进程的 WebSocket 数据,实时更新) try: from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh stream = get_kline_stream_instance() @@ -803,34 +809,49 @@ class BinanceClient: if is_kline_cache_fresh(symbol, interval, max_age_sec=300.0): ws_cached = get_klines_from_cache(symbol, interval, limit) if ws_cached and len(ws_cached) >= limit: - logger.debug(f"从 WS 缓存获取 {symbol} K线数据: {interval} x{limit}") + logger.debug(f"从本地 WS 缓存获取 {symbol} K线数据: {interval} x{limit}") return ws_cached except Exception as e: logger.debug(f"读取 K线 WS 缓存失败: {e}") - # 2. 查 Redis 缓存 + # 2. 旧格式 Redis 缓存(兼容性,逐步迁移到共享缓存) cache_key = f"klines:{symbol}:{interval}:{limit}" cached = await self.redis_cache.get(cache_key) if cached: logger.debug(f"从 Redis 缓存获取 {symbol} K线数据: {interval} x{limit}") return cached - # 3. REST API(兜底) + # 3. REST API(兜底,获取后写入共享缓存供其他账号使用) try: klines = await self._rate_limited_request( f'klines_{symbol}_{interval}', self.client.futures_klines(symbol=symbol, interval=interval, limit=limit) ) - # 写入 Redis 缓存(根据 interval 动态设置 TTL) + # ⚠️ 优化:写入共享缓存(market:kline:{symbol}:{interval}),供所有账号使用 if klines: - ttl_map = { + try: + from .market_ws_leader import KEY_KLINE_PREFIX + shared_key = f"{KEY_KLINE_PREFIX}{symbol.upper()}:{interval.lower()}" + # 使用较长的 TTL,因为这是共享缓存,多个账号都会使用 + ttl_map = { + '1m': 60, '3m': 120, '5m': 180, '15m': 300, '30m': 600, + '1h': 900, '2h': 1800, '4h': 3600, '6h': 5400, '8h': 7200, '12h': 10800, '1d': 21600 + } + ttl = ttl_map.get(interval, 1800) # 默认 30 分钟 + await self.redis_cache.set(shared_key, klines, ttl=ttl) + logger.debug(f"✓ 已写入共享 Redis 缓存 {symbol} K线: {interval} x{limit} (TTL: {ttl}秒,供多账号共用)") + except Exception as e: + logger.debug(f"写入共享缓存失败: {e}") + + # 同时写入旧格式缓存(兼容性) + ttl_map_old = { '1m': 10, '3m': 20, '5m': 30, '15m': 60, '30m': 120, '1h': 300, '2h': 600, '4h': 900, '6h': 1200, '8h': 1800, '12h': 2400, '1d': 3600 } - ttl = ttl_map.get(interval, 300) - await self.redis_cache.set(cache_key, klines, ttl=ttl) - logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl}秒)") + ttl_old = ttl_map_old.get(interval, 300) + await self.redis_cache.set(cache_key, klines, ttl=ttl_old) + logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl_old}秒)") return klines except BinanceAPIException as e: diff --git a/trading_system/config.py b/trading_system/config.py index 8e051fb..ba8a0e4 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -215,6 +215,7 @@ DEFAULT_TRADING_CONFIG = { 'SCAN_INTERVAL': 900, # 扫描间隔15分钟(900秒),快速验证模式:提高扫描频率以增加交易机会 'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线,12秒通常够用;网络慢可调大(18~25) 'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时(秒);超时则返回降级结果(仅涨跌幅/成交量),不拖满整分析超时 + 'SCAN_PREFER_WEBSOCKET': True, # 优先使用 WebSocket 方式:如果缓存不完整,等待 WebSocket 推送数据(最多2秒),而不是立即回退 REST API # 多账号/低配服务器(如 2 CPU 4G):降低并发与错峰扫描,避免 CPU 打满 'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对(2 CPU 4G 多账号建议 2,单账号可 3~5) 'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时按 account_id 错峰首次扫描,避免多进程同时扫 diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 6eb5367..3253b63 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -347,6 +347,7 @@ class MarketScanner: ticker_ts = None # 获取更多K线数据用于技术指标计算(主周期 + 确认周期并行请求;单独超时避免整分析拖到 18s) + # ⚠️ 优化:优先使用 WebSocket K 线缓存,避免 REST API 超时 primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h') cfg = dict(config.TRADING_CONFIG or {}) @@ -355,18 +356,122 @@ class MarketScanner: klines_timeout = 3.0 elif klines_timeout > 20: klines_timeout = 20.0 + + # ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存 + # 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用 klines, klines_4h = [], [] + use_ws_prefer = cfg.get('SCAN_PREFER_WEBSOCKET', True) # 默认优先 WebSocket + + # 0. 优先从共享 Redis 缓存读取(多账号共用,最高优先级) try: - klines, klines_4h = await asyncio.wait_for( - asyncio.gather( - self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50), - self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50), - ), - timeout=klines_timeout, - ) - except asyncio.TimeoutError: - logger.debug(f"{symbol} K线拉取超时({klines_timeout:.0f}秒),使用降级结果(仅涨跌幅/成交量)") - klines, klines_4h = [], [] + from .market_ws_leader import use_shared_market_ws + from .kline_stream import get_klines_from_redis + if use_shared_market_ws(self.client.redis_cache): + # 尝试从共享缓存获取 + shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50) + shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50) + if shared_klines and len(shared_klines) >= 2: + klines = shared_klines + logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}") + if shared_klines_4h and len(shared_klines_4h) >= 2: + klines_4h = shared_klines_4h + logger.debug(f"{symbol} 从共享缓存获取确认周期 K 线: {confirm_interval}") + except Exception as e: + logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}") + + # 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存 + if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2: + try: + from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh + stream = get_kline_stream_instance() + if stream: + # 确保订阅该流(首次请求时自动订阅) + subscribe_tasks = [ + stream.subscribe(symbol, primary_interval, limit=50), + stream.subscribe(symbol, confirm_interval, limit=50), + ] + await asyncio.gather(*subscribe_tasks, return_exceptions=True) + + # 如果优先使用 WebSocket,等待推送数据(最多等待 2 秒) + if use_ws_prefer: + max_wait_sec = 2.0 # 最多等待 2 秒让 WebSocket 推送数据 + wait_start = time.time() + check_interval = 0.2 # 每 200ms 检查一次 + + while (time.time() - wait_start) < max_wait_sec: + # 检查缓存是否可用 + if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + if cached_klines and len(cached_klines) >= 2: + klines = cached_klines + if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + if cached_klines_4h and len(cached_klines_4h) >= 2: + klines_4h = cached_klines_4h + + # 如果两个周期都有足够数据,提前退出 + if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2: + break + + await asyncio.sleep(check_interval) + else: + # 不优先 WebSocket,短暂等待后立即检查 + await asyncio.sleep(0.3) + if (not klines or len(klines) < 2) and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): + cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) + if cached_klines and len(cached_klines) >= 2: + klines = cached_klines + if (not klines_4h or len(klines_4h) < 2) and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): + cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) + if cached_klines_4h and len(cached_klines_4h) >= 2: + klines_4h = cached_klines_4h + except Exception as e: + logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}") + + # 如果 WebSocket 缓存仍不完整,使用 REST API(带超时保护) + # 注意:如果 WebSocket 缓存有部分数据,优先使用部分数据,REST 只补充缺失的 + if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2: + # 只请求缺失的数据,减少等待时间 + rest_tasks = [] + if not klines or len(klines) < 2: + rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50))) + else: + rest_tasks.append(('primary', None)) + if not klines_4h or len(klines_4h) < 2: + rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50))) + else: + rest_tasks.append(('confirm', None)) + + try: + # 只等待实际需要的数据 + results = await asyncio.wait_for( + asyncio.gather( + *[task[1] if task[1] else asyncio.sleep(0) for task in rest_tasks], + return_exceptions=True + ), + timeout=klines_timeout, + ) + # 合并结果:优先使用 WebSocket 缓存,REST 作为补充 + for i, (task_type, _) in enumerate(rest_tasks): + if task_type == 'primary' and (not klines or len(klines) < 2): + if isinstance(results[i], list) and len(results[i]) >= 2: + klines = results[i] + elif task_type == 'confirm' and (not klines_4h or len(klines_4h) < 2): + if isinstance(results[i], list) and len(results[i]) >= 2: + klines_4h = results[i] + except asyncio.TimeoutError: + logger.debug(f"{symbol} K线拉取超时({klines_timeout:.0f}秒),使用已有缓存或降级结果") + # 如果 WebSocket 缓存有部分数据,继续使用;否则使用降级结果 + if not klines or len(klines) < 2: + klines = [] + if not klines_4h or len(klines_4h) < 2: + klines_4h = [] + except Exception as e: + logger.debug(f"{symbol} K线拉取异常: {e}") + if not klines or len(klines) < 2: + klines = [] + if not klines_4h or len(klines_4h) < 2: + klines_4h = [] if not klines or len(klines) < 2: # 降级:仍有 ticker 时返回仅涨跌幅/成交量的结果,不直接跳过,避免整轮扫描被拖慢 diff --git a/trading_system/ws_trade_client.py b/trading_system/ws_trade_client.py index fa145d0..9fee2d8 100644 --- a/trading_system/ws_trade_client.py +++ b/trading_system/ws_trade_client.py @@ -172,6 +172,80 @@ class WSTradeClient: async with self._lock: self._pending_requests.pop(req_id, None) + async def ticker_price(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]: + """ + 通过 WebSocket API 获取最新价格(ticker.price)。 + + Args: + symbol: 交易对(可选,不传则返回所有交易对) + timeout: 等待响应超时(秒) + + Returns: + 价格信息,失败返回 None 或抛异常 + """ + params = {} + if symbol: + params["symbol"] = symbol.upper() + try: + result = await self._send_request("ticker.price", params, timeout=timeout) + return result + except (ConnectionError, TimeoutError) as e: + logger.debug(f"WSTradeClient: ticker.price 失败: {e}") + raise + except Exception as e: + logger.error(f"WSTradeClient: ticker.price 异常: {e}") + raise + + async def ticker_book(self, symbol: Optional[str] = None, timeout: float = 10.0) -> Optional[Dict[str, Any]]: + """ + 通过 WebSocket API 获取当前最优挂单(ticker.book)。 + + Args: + symbol: 交易对(可选,不传则返回所有交易对) + timeout: 等待响应超时(秒) + + Returns: + 最优挂单信息,失败返回 None 或抛异常 + """ + params = {} + if symbol: + params["symbol"] = symbol.upper() + try: + result = await self._send_request("ticker.book", params, timeout=timeout) + return result + except (ConnectionError, TimeoutError) as e: + logger.debug(f"WSTradeClient: ticker.book 失败: {e}") + raise + except Exception as e: + logger.error(f"WSTradeClient: ticker.book 异常: {e}") + raise + + async def depth(self, symbol: str, limit: int = 20, timeout: float = 10.0) -> Optional[Dict[str, Any]]: + """ + 通过 WebSocket API 获取深度信息(depth)。 + + Args: + symbol: 交易对 + limit: 深度档位(可选值: 5, 10, 20, 50, 100, 500, 1000) + timeout: 等待响应超时(秒) + + Returns: + 深度信息,失败返回 None 或抛异常 + """ + params = { + "symbol": symbol.upper(), + "limit": limit + } + try: + result = await self._send_request("depth", params, timeout=timeout) + return result + except (ConnectionError, TimeoutError) as e: + logger.debug(f"WSTradeClient: depth 失败: {e}") + raise + except Exception as e: + logger.error(f"WSTradeClient: depth 异常: {e}") + raise + async def algo_order_place(self, params: Dict[str, Any], timeout: float = 30.0) -> Optional[Dict[str, Any]]: """ 通过 WS 发送 algoOrder.place 请求(条件单)。