在 `book_ticker_stream.py` 和 `ticker_24h_stream.py` 中新增了串行化锁,确保在写入 Redis 时避免并发合并导致内存膨胀。更新了合并逻辑,限制 Redis 中 USDT 交易对的数量,防止键值无限增长。此改进提升了内存管理与系统稳定性。
273 lines
11 KiB
Python
273 lines
11 KiB
Python
"""
|
||
最优挂单 WebSocket 流:订阅 !bookTicker,维护全市场最优买/卖价缓存。
|
||
用于滑点估算、入场价格优化,提升交易执行效果。
|
||
支持多进程共用:Leader 写 Redis,所有进程通过 refresh_book_ticker_from_redis_loop 从 Redis 更新本地缓存。
|
||
文档:更新速度 5s,推送所有交易对的最优挂单(最高买单、最低卖单)。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import time
|
||
from typing import Dict, Optional, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
from .market_ws_leader import KEY_BOOK_TICKER
|
||
except ImportError:
|
||
KEY_BOOK_TICKER = "market:book_ticker"
|
||
|
||
KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at"
|
||
|
||
# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积
|
||
_redis_book_ticker_merge_lock: Optional[asyncio.Lock] = None
|
||
_REDIS_BOOK_TICKER_MAX_KEYS = 600
|
||
|
||
|
||
def _get_book_ticker_merge_lock() -> asyncio.Lock:
|
||
global _redis_book_ticker_merge_lock
|
||
if _redis_book_ticker_merge_lock is None:
|
||
_redis_book_ticker_merge_lock = asyncio.Lock()
|
||
return _redis_book_ticker_merge_lock
|
||
|
||
|
||
# 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读
|
||
_book_ticker_cache: Dict[str, Dict[str, Any]] = {}
|
||
_book_ticker_updated_at: float = 0.0
|
||
_BOOK_TICKER_CACHE_MAX_KEYS = 500
|
||
|
||
|
||
def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]:
|
||
"""无 Redis 降级时返回进程内缓存。"""
|
||
return dict(_book_ticker_cache)
|
||
|
||
|
||
def get_book_ticker(symbol: str) -> Optional[Dict[str, Any]]:
|
||
"""同步读:无 Redis 时用进程内缓存;有 Redis 时应使用 get_book_ticker_from_redis。"""
|
||
return _book_ticker_cache.get(symbol.upper())
|
||
|
||
|
||
def is_book_ticker_cache_fresh(max_age_sec: float = 10.0) -> bool:
|
||
"""是否有可用数据(有 Redis 时由 refresh 更新 _book_ticker_updated_at)。"""
|
||
return _book_ticker_updated_at > 0 and (time.time() - _book_ticker_updated_at) <= max_age_sec
|
||
|
||
|
||
async def get_book_ticker_from_redis(redis_cache: Any, symbol: str) -> Optional[Dict[str, Any]]:
|
||
"""从 Redis 按需读取单 symbol 最优挂单,进程内不保留全量。"""
|
||
if not redis_cache:
|
||
return get_book_ticker(symbol)
|
||
try:
|
||
data = await redis_cache.get(KEY_BOOK_TICKER)
|
||
if isinstance(data, dict):
|
||
return data.get(symbol.upper())
|
||
except Exception as e:
|
||
logger.debug("get_book_ticker_from_redis: %s", e)
|
||
return get_book_ticker(symbol)
|
||
|
||
|
||
def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float]:
|
||
"""
|
||
估算滑点(USDT):基于最优挂单估算执行价格与标记价格的偏差。
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
side: BUY/SELL
|
||
quantity: 下单数量
|
||
|
||
Returns:
|
||
估算滑点(USDT),None 表示无法估算
|
||
"""
|
||
ticker = get_book_ticker(symbol)
|
||
if not ticker:
|
||
return None
|
||
try:
|
||
bid_price = float(ticker.get("bidPrice", 0))
|
||
ask_price = float(ticker.get("askPrice", 0))
|
||
bid_qty = float(ticker.get("bidQty", 0))
|
||
ask_qty = float(ticker.get("askQty", 0))
|
||
if bid_price <= 0 or ask_price <= 0:
|
||
return None
|
||
mid_price = (bid_price + ask_price) / 2
|
||
if side == "BUY":
|
||
# 买单:用 askPrice(卖一)估算,滑点 = (askPrice - midPrice) * quantity
|
||
if ask_qty >= quantity:
|
||
slippage = (ask_price - mid_price) * quantity
|
||
else:
|
||
# 数量超过卖一,需要吃多档,粗略估算
|
||
slippage = (ask_price - mid_price) * quantity * 1.2
|
||
else: # SELL
|
||
# 卖单:用 bidPrice(买一)估算,滑点 = (midPrice - bidPrice) * quantity
|
||
if bid_qty >= quantity:
|
||
slippage = (mid_price - bid_price) * quantity
|
||
else:
|
||
slippage = (mid_price - bid_price) * quantity * 1.2
|
||
return slippage
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
class BookTickerStream:
|
||
"""订阅合约 !bookTicker,持续更新 _book_ticker_cache。Leader 时可选写 Redis 供多进程读。"""
|
||
|
||
def __init__(self, testnet: bool = False, redis_cache: Any = None):
|
||
self.testnet = testnet
|
||
self._redis_cache = redis_cache
|
||
self._ws = None
|
||
self._task: Optional[asyncio.Task] = None
|
||
self._running = False
|
||
|
||
def _ws_url(self) -> str:
|
||
if self.testnet:
|
||
return "wss://stream.binancefuture.com/ws/!bookTicker"
|
||
return "wss://fstream.binance.com/ws/!bookTicker"
|
||
|
||
async def start(self) -> bool:
|
||
if self._running:
|
||
return True
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._run_ws())
|
||
logger.info("BookTickerStream: 已启动(!bookTicker),用于滑点估算")
|
||
return True
|
||
|
||
async def stop(self):
|
||
self._running = False
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
self._task = None
|
||
if self._ws:
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
self._ws = None
|
||
logger.info("BookTickerStream: 已停止")
|
||
|
||
async def _run_ws(self):
|
||
import aiohttp
|
||
while self._running:
|
||
url = self._ws_url()
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(
|
||
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
|
||
) as ws:
|
||
self._ws = ws
|
||
logger.info("BookTickerStream: WS 已连接")
|
||
async for msg in ws:
|
||
if not self._running:
|
||
break
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
self._handle_message(msg.data)
|
||
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
||
break
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
err_msg = getattr(e, "message", str(e)) or repr(e)
|
||
err_type = type(e).__name__
|
||
logger.warning(
|
||
"BookTickerStream: WS 异常 %s: %s,10s 后重连",
|
||
err_type, err_msg,
|
||
exc_info=logger.isEnabledFor(logging.DEBUG),
|
||
)
|
||
await asyncio.sleep(10)
|
||
self._ws = None
|
||
if not self._running:
|
||
break
|
||
|
||
def _handle_message(self, raw: str):
|
||
global _book_ticker_cache, _book_ticker_updated_at
|
||
try:
|
||
data = json.loads(raw)
|
||
except Exception:
|
||
return
|
||
if isinstance(data, dict) and "stream" in data:
|
||
ticker_data = data.get("data", {})
|
||
else:
|
||
ticker_data = data
|
||
|
||
if not isinstance(ticker_data, dict) or ticker_data.get("e") != "bookTicker":
|
||
return
|
||
|
||
s = (ticker_data.get("s") or "").strip().upper()
|
||
if not s or not s.endswith("USDT"):
|
||
return
|
||
|
||
try:
|
||
item = {
|
||
"symbol": s,
|
||
"bidPrice": float(ticker_data.get("b", 0)),
|
||
"bidQty": float(ticker_data.get("B", 0)),
|
||
"askPrice": float(ticker_data.get("a", 0)),
|
||
"askQty": float(ticker_data.get("A", 0)),
|
||
"time": int(ticker_data.get("T", 0)),
|
||
}
|
||
except (TypeError, ValueError):
|
||
return
|
||
|
||
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
||
if self._redis_cache:
|
||
try:
|
||
asyncio.get_event_loop().create_task(
|
||
self._merge_and_write_book_ticker_to_redis_serialized(s, item)
|
||
)
|
||
except Exception as e:
|
||
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
|
||
return
|
||
_book_ticker_cache[s] = item
|
||
_book_ticker_updated_at = time.monotonic()
|
||
if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS:
|
||
keys = list(_book_ticker_cache.keys())
|
||
for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]:
|
||
del _book_ticker_cache[k]
|
||
logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}")
|
||
|
||
async def _merge_and_write_book_ticker_to_redis_serialized(self, symbol: str, item: Dict[str, Any]) -> None:
|
||
"""串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。"""
|
||
lock = _get_book_ticker_merge_lock()
|
||
async with lock:
|
||
await self._merge_and_write_book_ticker_to_redis(symbol, item)
|
||
|
||
async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None:
|
||
"""从 Redis 读出、合并单条、写回,并写更新时间。限制 key 数量防 Redis 膨胀。"""
|
||
try:
|
||
if not self._redis_cache:
|
||
return
|
||
existing = await self._redis_cache.get(KEY_BOOK_TICKER)
|
||
merged = dict(existing) if isinstance(existing, dict) else {}
|
||
merged[symbol] = item
|
||
usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")}
|
||
if len(usdt_only) > _REDIS_BOOK_TICKER_MAX_KEYS:
|
||
keys = list(usdt_only.keys())[-_REDIS_BOOK_TICKER_MAX_KEYS:]
|
||
usdt_only = {k: usdt_only[k] for k in keys}
|
||
await self._redis_cache.set(KEY_BOOK_TICKER, usdt_only, ttl=30)
|
||
await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30)
|
||
except Exception as e:
|
||
logger.debug("BookTickerStream: 写入 Redis 失败 %s", e)
|
||
|
||
|
||
async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
|
||
"""只从 Redis 拉取「更新时间」,不拉全量,进程内不保留 500 条以省内存。"""
|
||
global _book_ticker_updated_at
|
||
if redis_cache is None:
|
||
return
|
||
while True:
|
||
try:
|
||
await asyncio.sleep(interval_sec)
|
||
raw = await redis_cache.get(KEY_BOOK_TICKER_UPDATED_AT)
|
||
if raw is not None:
|
||
try:
|
||
t = float(raw)
|
||
_book_ticker_updated_at = t if t > 0 else time.time()
|
||
except (TypeError, ValueError):
|
||
_book_ticker_updated_at = time.time()
|
||
logger.debug("BookTicker: 已同步 Redis 更新时间(进程内不缓存全量)")
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.debug("BookTicker: 从 Redis 刷新更新时间失败 %s", e)
|