""" K线 WebSocket 流:订阅 @kline_,维护K线缓存。 供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。 文档:推送间隔 250ms,仅推送最新一根K线的更新;x=false 表示K线未完结,x=true 表示已完结。 """ import asyncio import json import logging import time from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) # K线缓存:{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根 _kline_cache: Dict[Tuple[str, str], List[List]] = {} _kline_cache_updated_at: Dict[Tuple[str, str], float] = {} _kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]: """从缓存返回K线数据(与 REST get_klines 格式兼容)。未订阅或数据不足时返回 None。""" key = (symbol.upper(), interval.lower()) cached = _kline_cache.get(key) if not cached or len(cached) < limit: return None # 返回最后 limit 根 return cached[-limit:] def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool: """缓存是否在 max_age_sec 秒内更新过。""" key = (symbol.upper(), interval.lower()) updated = _kline_cache_updated_at.get(key, 0) if updated == 0: return False return (time.monotonic() - updated) <= max_age_sec class KlineStream: """订阅合约 K线流,持续更新 _kline_cache。支持动态订阅/取消订阅。""" def __init__(self, testnet: bool = False): self.testnet = testnet self._ws = None self._task: Optional[asyncio.Task] = None self._running = False self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅 self._subscription_lock = asyncio.Lock() def _ws_base_url(self) -> str: if self.testnet: return "wss://stream.binancefuture.com" return "wss://fstream.binance.com" async def start(self) -> bool: global _kline_stream_instance if self._running: return True self._running = True _kline_stream_instance = self self._task = asyncio.create_task(self._run_ws()) logger.info("KlineStream: 已启动(支持动态订阅)") return True async def stop(self): global _kline_stream_instance self._running = False _kline_stream_instance = None if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None logger.info("KlineStream: 已停止") async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool: """订阅指定 symbol 和 interval 的K线流(若 WS 未连接则等待连接后订阅)。""" symbol = symbol.upper() interval = interval.lower() key = (symbol, interval) async with self._subscription_lock: if self._subscribed.get(key): return True if not self._running: return False # 等待 WS 连接(最多等待 5 秒) for _ in range(50): if self._ws: break await asyncio.sleep(0.1) if not self._ws: return False stream_name = f"{symbol.lower()}@kline_{interval}" try: await self._ws.send_json({ "method": "SUBSCRIBE", "params": [stream_name], "id": int(time.time() * 1000) % 1000000, }) self._subscribed[key] = True _kline_cache_limit[key] = limit logger.debug(f"KlineStream: 已订阅 {symbol} {interval}") return True except Exception as e: logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}") return False async def _run_ws(self): import aiohttp # 使用组合流,支持动态订阅 url = f"{self._ws_base_url()}/stream" while self._running: try: async with aiohttp.ClientSession() as session: async with session.ws_connect( url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15) ) as ws: self._ws = ws logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: raw = msg.data # 处理订阅响应({"result": null, "id": ...})或K线数据 try: data = json.loads(raw) if isinstance(data, dict) and "result" in data: # 订阅响应,忽略 continue except Exception: pass self._handle_message(raw) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: break except Exception as e: err_msg = getattr(e, "message", str(e)) or repr(e) err_type = type(e).__name__ logger.warning( "KlineStream: WS 异常 %s: %s,10s 后重连", err_type, err_msg, exc_info=logger.isEnabledFor(logging.DEBUG), ) await asyncio.sleep(10) self._ws = None # 重连时清空订阅状态,需要重新订阅 async with self._subscription_lock: self._subscribed.clear() if not self._running: break def _handle_message(self, raw: str): global _kline_cache, _kline_cache_updated_at try: data = json.loads(raw) except Exception: return # 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} } if isinstance(data, dict) and "stream" in data: stream = data.get("stream", "") kline_data = data.get("data", {}) else: kline_data = data stream = "" if not isinstance(kline_data, dict) or kline_data.get("e") != "kline": return k = kline_data.get("k") if not isinstance(k, dict): return s = (k.get("s") or "").strip().upper() i = (k.get("i") or "").strip().lower() if not s or not i: return key = (s, i) if key not in self._subscribed: return # 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...] try: t = int(k.get("t", 0)) o = float(k.get("o", 0)) h = float(k.get("h", 0)) l = float(k.get("l", 0)) c = float(k.get("c", 0)) v = float(k.get("v", 0)) T = int(k.get("T", 0)) q = float(k.get("q", 0)) n = int(k.get("n", 0)) x = k.get("x", False) # 是否完结 except (TypeError, ValueError): return kline_rest_format = [ t, # open_time str(o), # open str(h), # high str(l), # low str(c), # close str(v), # volume T, # close_time str(q), # quote_volume n, # trades "0", # taker_buy_base_volume "0", # taker_buy_quote_volume "0", # ignore ] # 更新缓存:若 x=true(完结),追加新K线;若 x=false(未完结),更新最后一根 if key not in _kline_cache: _kline_cache[key] = [] cache_list = _kline_cache[key] if x: # K线完结:追加新K线 cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) if len(cache_list) > limit * 2: cache_list[:] = cache_list[-limit:] else: # K线未完结:更新最后一根(或追加第一根) if cache_list: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) _kline_cache_updated_at[key] = time.monotonic() logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根") # 全局 KlineStream 实例 _kline_stream_instance: Optional["KlineStream"] = None def get_kline_stream_instance() -> Optional["KlineStream"]: """返回当前运行的 KlineStream 实例(未启动时为 None)。""" return _kline_stream_instance def set_kline_stream_instance(instance: Optional["KlineStream"]): """设置全局 KlineStream 实例(由 main 调用)。""" global _kline_stream_instance _kline_stream_instance = instance