""" 最优挂单 WebSocket 流:订阅 !bookTicker,维护全市场最优买/卖价缓存。 用于滑点估算、入场价格优化,提升交易执行效果。 支持多进程共用:Leader 写 Redis,所有进程通过 refresh_book_ticker_from_redis_loop 从 Redis 更新本地缓存。 文档:更新速度 5s,推送所有交易对的最优挂单(最高买单、最低卖单)。 """ import asyncio import json import logging import time from typing import Dict, Optional, Any logger = logging.getLogger(__name__) def _task_done_callback(task: asyncio.Task) -> None: """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" try: task.result() except asyncio.CancelledError: pass except Exception as e: logger.debug("BookTickerStream background task error: %s", e) try: from .market_ws_leader import KEY_BOOK_TICKER except ImportError: KEY_BOOK_TICKER = "market:book_ticker" KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at" # Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积 _redis_book_ticker_merge_lock: Optional[asyncio.Lock] = None _REDIS_BOOK_TICKER_MAX_KEYS = 600 def _get_book_ticker_merge_lock() -> asyncio.Lock: global _redis_book_ticker_merge_lock if _redis_book_ticker_merge_lock is None: _redis_book_ticker_merge_lock = asyncio.Lock() return _redis_book_ticker_merge_lock # 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读 _book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_updated_at: float = 0.0 _BOOK_TICKER_CACHE_MAX_KEYS = 500 def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]: """无 Redis 降级时返回进程内缓存。""" return dict(_book_ticker_cache) def get_book_ticker(symbol: str) -> Optional[Dict[str, Any]]: """同步读:无 Redis 时用进程内缓存;有 Redis 时应使用 get_book_ticker_from_redis。""" return _book_ticker_cache.get(symbol.upper()) def is_book_ticker_cache_fresh(max_age_sec: float = 10.0) -> bool: """是否有可用数据(有 Redis 时由 refresh 更新 _book_ticker_updated_at)。""" return _book_ticker_updated_at > 0 and (time.time() - _book_ticker_updated_at) <= max_age_sec async def get_book_ticker_from_redis(redis_cache: Any, symbol: str) -> Optional[Dict[str, Any]]: """从 Redis 按需读取单 symbol 最优挂单,进程内不保留全量。""" if not redis_cache: return get_book_ticker(symbol) try: data = await redis_cache.get(KEY_BOOK_TICKER) if isinstance(data, dict): return data.get(symbol.upper()) except Exception as e: logger.debug("get_book_ticker_from_redis: %s", e) return get_book_ticker(symbol) def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float]: """ 估算滑点(USDT):基于最优挂单估算执行价格与标记价格的偏差。 Args: symbol: 交易对 side: BUY/SELL quantity: 下单数量 Returns: 估算滑点(USDT),None 表示无法估算 """ ticker = get_book_ticker(symbol) if not ticker: return None try: bid_price = float(ticker.get("bidPrice", 0)) ask_price = float(ticker.get("askPrice", 0)) bid_qty = float(ticker.get("bidQty", 0)) ask_qty = float(ticker.get("askQty", 0)) if bid_price <= 0 or ask_price <= 0: return None mid_price = (bid_price + ask_price) / 2 if side == "BUY": # 买单:用 askPrice(卖一)估算,滑点 = (askPrice - midPrice) * quantity if ask_qty >= quantity: slippage = (ask_price - mid_price) * quantity else: # 数量超过卖一,需要吃多档,粗略估算 slippage = (ask_price - mid_price) * quantity * 1.2 else: # SELL # 卖单:用 bidPrice(买一)估算,滑点 = (midPrice - bidPrice) * quantity if bid_qty >= quantity: slippage = (mid_price - bid_price) * quantity else: slippage = (mid_price - bid_price) * quantity * 1.2 return slippage except Exception: return None class BookTickerStream: """订阅合约 !bookTicker,持续更新 _book_ticker_cache。Leader 时可选写 Redis 供多进程读。""" def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False def _ws_url(self) -> str: if self.testnet: return "wss://stream.binancefuture.com/ws/!bookTicker" return "wss://fstream.binance.com/ws/!bookTicker" async def start(self) -> bool: if self._running: return True self._running = True self._task = asyncio.create_task(self._run_ws()) logger.info("BookTickerStream: 已启动(!bookTicker),用于滑点估算") return True async def stop(self): self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None logger.info("BookTickerStream: 已停止") async def _run_ws(self): import aiohttp while self._running: url = self._ws_url() try: async with aiohttp.ClientSession() as session: async with session.ws_connect( url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15) ) as ws: self._ws = ws logger.info("BookTickerStream: WS 已连接") async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: self._handle_message(msg.data) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: break except Exception as e: err_msg = getattr(e, "message", str(e)) or repr(e) err_type = type(e).__name__ logger.warning( "BookTickerStream: WS 异常 %s: %s,10s 后重连", err_type, err_msg, exc_info=logger.isEnabledFor(logging.DEBUG), ) await asyncio.sleep(10) self._ws = None if not self._running: break def _handle_message(self, raw: str): global _book_ticker_cache, _book_ticker_updated_at try: data = json.loads(raw) except Exception: return if isinstance(data, dict) and "stream" in data: ticker_data = data.get("data", {}) else: ticker_data = data if not isinstance(ticker_data, dict) or ticker_data.get("e") != "bookTicker": return s = (ticker_data.get("s") or "").strip().upper() if not s or not s.endswith("USDT"): return try: item = { "symbol": s, "bidPrice": float(ticker_data.get("b", 0)), "bidQty": float(ticker_data.get("B", 0)), "askPrice": float(ticker_data.get("a", 0)), "askQty": float(ticker_data.get("A", 0)), "time": int(ticker_data.get("T", 0)), } except (TypeError, ValueError): return # 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增 if self._redis_cache: try: t = asyncio.create_task( self._merge_and_write_book_ticker_to_redis_serialized(s, item) ) t.add_done_callback(_task_done_callback) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e) return _book_ticker_cache[s] = item _book_ticker_updated_at = time.monotonic() if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS: keys = list(_book_ticker_cache.keys()) for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]: del _book_ticker_cache[k] logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}") async def _merge_and_write_book_ticker_to_redis_serialized(self, symbol: str, item: Dict[str, Any]) -> None: """串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。""" lock = _get_book_ticker_merge_lock() async with lock: await self._merge_and_write_book_ticker_to_redis(symbol, item) async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None: """从 Redis 读出、合并单条、写回,并写更新时间。限制 key 数量防 Redis 膨胀。""" try: if not self._redis_cache: return existing = await self._redis_cache.get(KEY_BOOK_TICKER) merged = dict(existing) if isinstance(existing, dict) else {} merged[symbol] = item usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")} if len(usdt_only) > _REDIS_BOOK_TICKER_MAX_KEYS: keys = list(usdt_only.keys())[-_REDIS_BOOK_TICKER_MAX_KEYS:] usdt_only = {k: usdt_only[k] for k in keys} await self._redis_cache.set(KEY_BOOK_TICKER, usdt_only, ttl=30) await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 失败 %s", e) async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: """只从 Redis 拉取「更新时间」,不拉全量,进程内不保留 500 条以省内存。""" global _book_ticker_updated_at if redis_cache is None: return while True: try: await asyncio.sleep(interval_sec) raw = await redis_cache.get(KEY_BOOK_TICKER_UPDATED_AT) if raw is not None: try: t = float(raw) _book_ticker_updated_at = t if t > 0 else time.time() except (TypeError, ValueError): _book_ticker_updated_at = time.time() logger.debug("BookTicker: 已同步 Redis 更新时间(进程内不缓存全量)") except asyncio.CancelledError: break except Exception as e: logger.debug("BookTicker: 从 Redis 刷新更新时间失败 %s", e)