""" K线 WebSocket 流:订阅 @kline_,维护K线缓存。 供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。 支持多进程共用:Leader 写 Redis(market:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。 文档:推送间隔 250ms,仅推送最新一根K线的更新;x=false 表示K线未完结,x=true 表示已完结。 """ import asyncio import json import logging import time from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) def _task_done_callback(task: asyncio.Task) -> None: """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" try: task.result() except asyncio.CancelledError: pass except Exception as e: logger.debug("KlineStream background task error: %s", e) try: from .market_ws_leader import KEY_KLINE_PREFIX except ImportError: KEY_KLINE_PREFIX = "market:kline:" try: from .redis_ttl import TTL_KLINE_STREAM except ImportError: TTL_KLINE_STREAM = 600 # K线缓存:{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根 _kline_cache: Dict[Tuple[str, str], List[List]] = {} _kline_cache_updated_at: Dict[Tuple[str, str], float] = {} _kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit # ⚠️ 内存优化:限制缓存总大小,避免内存无限增长(2 CPU 4G 服务器) # 注意:主要数据在 Redis,进程内存只做临时缓存(减少大小) _MAX_CACHE_ENTRIES = 50 # 最多保留 50 个 (symbol, interval) 的缓存(减少进程内存占用) _CACHE_CLEANUP_INTERVAL_SEC = 300 # 每 5 分钟清理一次过期缓存 _CACHE_MAX_AGE_SEC = 300 # 缓存超过 5 分钟未更新则清理(更激进,优先用 Redis) def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]: """ 从缓存返回K线数据(与 REST get_klines 格式兼容)。 ⚠️ 内存优化:优先从进程内存读取(快速),但进程内存缓存已限制大小,主要数据在 Redis。 未订阅或数据不足时返回 None。 """ key = (symbol.upper(), interval.lower()) cached = _kline_cache.get(key) if not cached or len(cached) < limit: return None # 返回最后 limit 根 return cached[-limit:] def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool: """缓存是否在 max_age_sec 秒内更新过。""" key = (symbol.upper(), interval.lower()) updated = _kline_cache_updated_at.get(key, 0) if updated == 0: return False return (time.monotonic() - updated) <= max_age_sec def _cleanup_stale_kline_cache(max_age_sec: float = None): """ 清理过期的 K 线缓存,防止内存无限增长。 优先清理最久未更新的缓存。 """ global _kline_cache, _kline_cache_updated_at, _kline_cache_limit if max_age_sec is None: max_age_sec = _CACHE_MAX_AGE_SEC now = time.monotonic() to_remove = [] # 找出过期的缓存 for key, updated_at in _kline_cache_updated_at.items(): if (now - updated_at) > max_age_sec: to_remove.append(key) # 如果缓存条目过多,清理最久未更新的(即使未过期) if len(_kline_cache) > _MAX_CACHE_ENTRIES: # 按更新时间排序,保留最新的 _MAX_CACHE_ENTRIES 个 sorted_keys = sorted(_kline_cache_updated_at.items(), key=lambda x: x[1], reverse=True) keep_keys = {k for k, _ in sorted_keys[:_MAX_CACHE_ENTRIES]} for key in list(_kline_cache.keys()): if key not in keep_keys: to_remove.append(key) # 清理 for key in to_remove: _kline_cache.pop(key, None) _kline_cache_updated_at.pop(key, None) _kline_cache_limit.pop(key, None) if to_remove: logger.debug(f"KlineStream: 已清理 {len(to_remove)} 个过期缓存条目(当前缓存数: {len(_kline_cache)})") class KlineStream: """订阅合约 K线流,持续更新 _kline_cache。支持动态订阅/取消订阅。""" # 币安限制:每秒最多 10 条订阅消息,超限会断连 _SUBSCRIBE_RATE_LIMIT = 10 # 条/秒 _SUBSCRIBE_WINDOW_SEC = 1.0 def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅 self._subscription_lock = asyncio.Lock() self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速 # ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升 self._subscription_count = 0 # 当前订阅数量 self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值) # ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积 self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息 # ⚠️ Redis 写入队列:限制大小,避免无限增长 self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)} self._redis_write_pending_max_size = 100 # 最多保留 100 个待写入项,超出时丢弃最旧的 self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间 self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入 def _ws_base_url(self) -> str: if self.testnet: return "wss://stream.binancefuture.com" return "wss://fstream.binance.com" async def start(self) -> bool: global _kline_stream_instance if self._running: return True self._running = True _kline_stream_instance = self self._task = asyncio.create_task(self._run_ws()) # ⚠️ 内存优化:启动定期清理任务,防止缓存无限增长 self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("KlineStream: 已启动(支持动态订阅)") return True async def stop(self): global _kline_stream_instance self._running = False _kline_stream_instance = None if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if hasattr(self, '_cleanup_task') and self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass if self._ws: try: await self._ws.close() except Exception: pass self._ws = None logger.info("KlineStream: 已停止") async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool: """ 订阅指定 symbol 和 interval 的K线流(若 WS 未连接则等待连接后订阅)。 ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升。 """ symbol = symbol.upper() interval = interval.lower() key = (symbol, interval) async with self._subscription_lock: if self._subscribed.get(key): return True if not self._running: return False # ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升 current_count = len(self._subscribed) if current_count >= self._max_subscriptions: logger.warning( f"KlineStream: 订阅数量已达上限 ({current_count}/{self._max_subscriptions})," f"跳过订阅 {symbol} {interval}(建议优先使用共享缓存或 REST API)" ) return False # 等待 WS 连接(最多等待 5 秒) for _ in range(50): if self._ws: break await asyncio.sleep(0.1) if not self._ws: return False # 币安限制:每秒最多 10 条订阅消息,超限会断连 now = time.monotonic() self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC] while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT: wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC wait_sec = max(0.01, wait_until - time.monotonic()) await asyncio.sleep(wait_sec) now = time.monotonic() self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC] stream_name = f"{symbol.lower()}@kline_{interval}" try: await self._ws.send_json({ "method": "SUBSCRIBE", "params": [stream_name], "id": int(time.time() * 1000) % 1000000, }) self._subscribe_times.append(time.monotonic()) self._subscribed[key] = True self._subscription_count = len(self._subscribed) _kline_cache_limit[key] = limit logger.debug(f"KlineStream: 已订阅 {symbol} {interval}(当前订阅数: {self._subscription_count}/{self._max_subscriptions})") return True except (ConnectionResetError, OSError) as e: msg = str(e).lower() if "closing transport" in msg or "cannot write" in msg: logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval) else: logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}") return False except Exception as e: logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}") return False async def _run_ws(self): import aiohttp # 使用组合流,支持动态订阅 url = f"{self._ws_base_url()}/stream" while self._running: try: async with aiohttp.ClientSession() as session: async with session.ws_connect( url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15) ) as ws: self._ws = ws logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: raw = msg.data # ⚠️ 优化:快速过滤订阅响应,避免不必要的处理 try: data = json.loads(raw) if isinstance(data, dict) and "result" in data: # 订阅响应,忽略 continue except Exception: pass # ⚠️ 优化:异步处理消息,避免阻塞事件循环 # 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升 t = asyncio.create_task(self._handle_message_with_limit(raw)) t.add_done_callback(_task_done_callback) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: break except Exception as e: err_msg = getattr(e, "message", str(e)) or repr(e) err_type = type(e).__name__ logger.warning( "KlineStream: WS 异常 %s: %s,10s 后重连", err_type, err_msg, exc_info=logger.isEnabledFor(logging.DEBUG), ) await asyncio.sleep(10) self._ws = None # 重连时清空订阅状态,需要重新订阅 async with self._subscription_lock: self._subscribed.clear() self._subscription_count = 0 if not self._running: break async def _cleanup_loop(self): """定期清理过期缓存,防止内存无限增长""" while self._running: try: await asyncio.sleep(_CACHE_CLEANUP_INTERVAL_SEC) if self._running: _cleanup_stale_kline_cache() except asyncio.CancelledError: break except Exception as e: logger.debug(f"KlineStream: 清理缓存时出错: {e}") async def _handle_message_with_limit(self, raw: str): """ 带并发限制的消息处理包装器 """ async with self._message_semaphore: await self._handle_message_async(raw) async def _handle_message_async(self, raw: str): """ 异步处理 WebSocket 消息(优化:避免阻塞事件循环) ⚠️ 优化:改为异步方法,避免同步处理大量消息时阻塞事件循环 """ global _kline_cache, _kline_cache_updated_at try: data = json.loads(raw) except Exception: return # 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} } if isinstance(data, dict) and "stream" in data: stream = data.get("stream", "") kline_data = data.get("data", {}) else: kline_data = data stream = "" if not isinstance(kline_data, dict) or kline_data.get("e") != "kline": return k = kline_data.get("k") if not isinstance(k, dict): return s = (k.get("s") or "").strip().upper() i = (k.get("i") or "").strip().lower() if not s or not i: return key = (s, i) if key not in self._subscribed: return # 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...] try: t = int(k.get("t", 0)) o = float(k.get("o", 0)) h = float(k.get("h", 0)) l = float(k.get("l", 0)) c = float(k.get("c", 0)) v = float(k.get("v", 0)) T = int(k.get("T", 0)) q = float(k.get("q", 0)) n = int(k.get("n", 0)) x = k.get("x", False) # 是否完结 except (TypeError, ValueError): return kline_rest_format = [ t, # open_time str(o), # open str(h), # high str(l), # low str(c), # close str(v), # volume T, # close_time str(q), # quote_volume n, # trades "0", # taker_buy_base_volume "0", # taker_buy_quote_volume "0", # ignore ] # ⚠️ 全用 Redis:有 Redis 时只写 Redis。关键:不要每条消息都从 Redis GET 全量并复制,否则内存暴涨。 if self._redis_cache: try: rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" # 复用 pending 中已有列表,避免每条消息都 Redis GET + list() 复制(原逻辑导致 400+ 次/秒分配,内存涨到 1.2G) if rkey in self._redis_write_pending: cache_list = self._redis_write_pending[rkey][0] else: # 仅在新 key 或刚 flush 后做一次 Redis GET existing = await self._redis_cache.get(rkey) cache_list = list(existing) if existing and isinstance(existing, list) else [] if len(self._redis_write_pending) >= self._redis_write_pending_max_size: oldest_key = min(self._redis_write_pending.keys(), key=lambda k: self._redis_write_pending[k][1]) del self._redis_write_pending[oldest_key] self._redis_write_pending[rkey] = (cache_list, time.monotonic()) # 原地更新 K 线 if x: cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) if len(cache_list) > limit * 2: cache_list[:] = cache_list[-limit:] else: if cache_list: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) self._redis_write_pending[rkey] = (cache_list, time.monotonic()) now = time.monotonic() if now - self._last_redis_batch_write >= 2.0: async with self._batch_write_lock: if now - self._last_redis_batch_write >= 2.0: await self._batch_write_redis() self._last_redis_batch_write = time.monotonic() except Exception as e: logger.debug(f"KlineStream: Redis 处理失败,降级到进程内存: {e}") # Redis 失败时降级到进程内存 if key not in _kline_cache: _kline_cache[key] = [] cache_list = _kline_cache[key] if x: cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) if len(cache_list) > limit * 2: cache_list[:] = cache_list[-limit:] else: if cache_list: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) _kline_cache_updated_at[key] = time.monotonic() else: # 无 Redis:只写进程内存 if key not in _kline_cache: _kline_cache[key] = [] cache_list = _kline_cache[key] if x: cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) if len(cache_list) > limit * 2: cache_list[:] = cache_list[-limit:] else: if cache_list: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) _kline_cache_updated_at[key] = time.monotonic() if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8: _cleanup_stale_kline_cache() if logger.isEnabledFor(logging.DEBUG): logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}, Redis={bool(self._redis_cache)})") async def _batch_write_redis(self): """批量写入 Redis;写入成功后从进程内存移除,以 Redis 为主、基本不占服务器内存""" global _kline_cache, _kline_cache_updated_at, _kline_cache_limit if not self._redis_write_pending: return try: pending = dict(self._redis_write_pending) self._redis_write_pending.clear() for rkey, (data, _) in pending.items(): try: await self._redis_cache.set(rkey, data, ttl=TTL_KLINE_STREAM) # 写入 Redis 后从进程内存移除,避免重复占用 if rkey.startswith(KEY_KLINE_PREFIX): suffix = rkey[len(KEY_KLINE_PREFIX):] if ":" in suffix: s, i = suffix.split(":", 1) key = (s.upper(), i.lower()) _kline_cache.pop(key, None) _kline_cache_updated_at.pop(key, None) _kline_cache_limit.pop(key, None) except Exception: pass except Exception: pass def _handle_message(self, raw: str): """ 同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async) """ # 为了兼容性保留,但实际应该使用异步版本 t = asyncio.create_task(self._handle_message_async(raw)) t.add_done_callback(_task_done_callback) async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None: try: if self._redis_cache: await self._redis_cache.set(rkey, data, ttl=TTL_KLINE_STREAM) except Exception as e: logger.debug("KlineStream: 写入 Redis 失败 %s", e) async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]: """共用模式:从 Redis 读取 K 线缓存;未命中返回 None。""" if redis_cache is None or limit <= 0: return None try: s, i = symbol.upper(), interval.lower() rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" data = await redis_cache.get(rkey) if data and isinstance(data, list) and len(data) >= limit: return data[-limit:] return None except Exception as e: logger.debug("get_klines_from_redis: %s", e) return None # 全局 KlineStream 实例 _kline_stream_instance: Optional["KlineStream"] = None def get_kline_stream_instance() -> Optional["KlineStream"]: """返回当前运行的 KlineStream 实例(未启动时为 None)。""" return _kline_stream_instance def set_kline_stream_instance(instance: Optional["KlineStream"]): """设置全局 KlineStream 实例(由 main 调用)。""" global _kline_stream_instance _kline_stream_instance = instance