""" 24 小时行情 WebSocket 流:订阅 !ticker@arr,维护全市场 ticker 缓存。 供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h,减少请求与超时。 支持多进程共用:Leader 写 Redis,所有进程可通过 refresh_ticker_24h_from_redis_loop 从 Redis 更新本地缓存。 文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。 """ import asyncio import json import logging import time from typing import Dict, Optional, Any logger = logging.getLogger(__name__) try: from .market_ws_leader import KEY_TICKER_24H except ImportError: KEY_TICKER_24H = "market:ticker_24h" KEY_TICKER_24H_UPDATED_AT = "market:ticker_24h:updated_at" # 进程内不再保留全量:有 Redis 时只维护“最后更新时间”,数据全部从 Redis 按需读 _ticker_24h_cache: Dict[str, Dict[str, Any]] = {} _ticker_24h_updated_at: float = 0.0 _TICKER_24H_CACHE_MAX_KEYS = 500 # 仅无 Redis 时使用 def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]: """无 Redis 降级时返回进程内缓存;有 Redis 时应使用 get_tickers_24h_from_redis。""" return dict(_ticker_24h_cache) def get_tickers_24h_cache_updated_at() -> float: """返回缓存最后更新时间(由 refresh 从 Redis 回写 updated_at)。""" return _ticker_24h_updated_at def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool: """是否有可用数据(有 Redis 时由 refresh 更新 _ticker_24h_updated_at 为 Redis 的 time.time)。""" return _ticker_24h_updated_at > 0 and (time.time() - _ticker_24h_updated_at) <= max_age_sec async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, Any]]: """从 Redis 按需读取全量 24h ticker,进程内不保留,减轻内存。""" if not redis_cache: return get_tickers_24h_cache() try: data = await redis_cache.get(KEY_TICKER_24H) return dict(data) if isinstance(data, dict) else {} except Exception as e: logger.debug("get_tickers_24h_from_redis: %s", e) return get_tickers_24h_cache() class Ticker24hStream: """订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。""" def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False def _ws_url(self) -> str: if self.testnet: return "wss://stream.binancefuture.com/ws/!ticker@arr" return "wss://fstream.binance.com/ws/!ticker@arr" async def start(self) -> bool: if self._running: return True self._running = True self._task = asyncio.create_task(self._run_ws()) logger.info("Ticker24hStream: 已启动(!ticker@arr),扫描将优先使用 WS 缓存") return True async def stop(self): self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None logger.info("Ticker24hStream: 已停止") async def _run_ws(self): import aiohttp while self._running: url = self._ws_url() try: async with aiohttp.ClientSession() as session: async with session.ws_connect( url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15) ) as ws: self._ws = ws logger.info("Ticker24hStream: WS 已连接") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: self._handle_message(msg.data) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: break except Exception as e: err_msg = getattr(e, "message", str(e)) or repr(e) err_type = type(e).__name__ logger.warning( "Ticker24hStream: WS 异常 %s: %s,10s 后重连", err_type, err_msg, exc_info=logger.isEnabledFor(logging.DEBUG), ) await asyncio.sleep(10) self._ws = None if not self._running: break def _handle_message(self, raw: str): global _ticker_24h_cache, _ticker_24h_updated_at try: data = json.loads(raw) except Exception: return if isinstance(data, list): arr = data elif isinstance(data, dict): arr = data.get("data") if isinstance(data.get("data"), list) else [data] else: return now_ms = int(time.time() * 1000) new_items = {} for t in arr: if not isinstance(t, dict): continue s = (t.get("s") or t.get("symbol") or "").strip() if not s or not s.endswith("USDT"): continue try: price = float(t.get("c") or t.get("lastPrice") or 0) change_pct = float(t.get("P") or t.get("priceChangePercent") or 0) vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0) except (TypeError, ValueError): continue new_items[s] = { "symbol": s, "price": price, "volume": vol, "changePercent": change_pct, "ts": now_ms, } if not new_items: return # 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环从 Redis 回填,避免双重占用) if self._redis_cache: try: asyncio.get_event_loop().create_task( self._merge_and_write_ticker_24h_to_redis(new_items) ) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e) return # 无 Redis 时才写进程内存 for s, v in new_items.items(): _ticker_24h_cache[s] = v _ticker_24h_updated_at = time.monotonic() if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS: keys = list(_ticker_24h_cache.keys()) for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]: del _ticker_24h_cache[k] logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None: """从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量""" try: if not self._redis_cache: return existing = await self._redis_cache.get(KEY_TICKER_24H) merged = dict(existing) if isinstance(existing, dict) else {} merged.update(new_items) await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120) await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e) async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: """只从 Redis 拉取「更新时间」,不拉全量数据,进程内不保留 500 条以省内存。""" global _ticker_24h_updated_at if redis_cache is None: return while True: try: await asyncio.sleep(interval_sec) raw = await redis_cache.get(KEY_TICKER_24H_UPDATED_AT) if raw is not None: try: t = float(raw) _ticker_24h_updated_at = t if t > 0 else time.monotonic() except (TypeError, ValueError): _ticker_24h_updated_at = time.monotonic() logger.debug("Ticker24h: 已同步 Redis 更新时间(进程内不缓存全量)") except asyncio.CancelledError: break except Exception as e: logger.debug("Ticker24h: 从 Redis 刷新更新时间失败 %s", e)