auto_trade_sys/trading_system/book_ticker_stream.py
薇薇安 22901abe39 feat(book_ticker_stream, ticker_24h_stream): 引入串行化锁以优化 Redis 写入逻辑
在 `book_ticker_stream.py` 和 `ticker_24h_stream.py` 中新增了串行化锁,确保在写入 Redis 时避免并发合并导致内存膨胀。更新了合并逻辑,限制 Redis 中 USDT 交易对的数量,防止键值无限增长。此改进提升了内存管理与系统稳定性。
2026-02-21 01:09:27 +08:00

273 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
最优挂单 WebSocket 流:订阅 !bookTicker维护全市场最优买/卖价缓存。
用于滑点估算、入场价格优化,提升交易执行效果。
支持多进程共用Leader 写 Redis所有进程通过 refresh_book_ticker_from_redis_loop 从 Redis 更新本地缓存。
文档:更新速度 5s推送所有交易对的最优挂单最高买单、最低卖单
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_BOOK_TICKER
except ImportError:
KEY_BOOK_TICKER = "market:book_ticker"
KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at"
# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积
_redis_book_ticker_merge_lock: Optional[asyncio.Lock] = None
_REDIS_BOOK_TICKER_MAX_KEYS = 600
def _get_book_ticker_merge_lock() -> asyncio.Lock:
global _redis_book_ticker_merge_lock
if _redis_book_ticker_merge_lock is None:
_redis_book_ticker_merge_lock = asyncio.Lock()
return _redis_book_ticker_merge_lock
# 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读
_book_ticker_cache: Dict[str, Dict[str, Any]] = {}
_book_ticker_updated_at: float = 0.0
_BOOK_TICKER_CACHE_MAX_KEYS = 500
def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]:
"""无 Redis 降级时返回进程内缓存。"""
return dict(_book_ticker_cache)
def get_book_ticker(symbol: str) -> Optional[Dict[str, Any]]:
"""同步读:无 Redis 时用进程内缓存;有 Redis 时应使用 get_book_ticker_from_redis。"""
return _book_ticker_cache.get(symbol.upper())
def is_book_ticker_cache_fresh(max_age_sec: float = 10.0) -> bool:
"""是否有可用数据(有 Redis 时由 refresh 更新 _book_ticker_updated_at"""
return _book_ticker_updated_at > 0 and (time.time() - _book_ticker_updated_at) <= max_age_sec
async def get_book_ticker_from_redis(redis_cache: Any, symbol: str) -> Optional[Dict[str, Any]]:
"""从 Redis 按需读取单 symbol 最优挂单,进程内不保留全量。"""
if not redis_cache:
return get_book_ticker(symbol)
try:
data = await redis_cache.get(KEY_BOOK_TICKER)
if isinstance(data, dict):
return data.get(symbol.upper())
except Exception as e:
logger.debug("get_book_ticker_from_redis: %s", e)
return get_book_ticker(symbol)
def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float]:
"""
估算滑点USDT基于最优挂单估算执行价格与标记价格的偏差。
Args:
symbol: 交易对
side: BUY/SELL
quantity: 下单数量
Returns:
估算滑点USDTNone 表示无法估算
"""
ticker = get_book_ticker(symbol)
if not ticker:
return None
try:
bid_price = float(ticker.get("bidPrice", 0))
ask_price = float(ticker.get("askPrice", 0))
bid_qty = float(ticker.get("bidQty", 0))
ask_qty = float(ticker.get("askQty", 0))
if bid_price <= 0 or ask_price <= 0:
return None
mid_price = (bid_price + ask_price) / 2
if side == "BUY":
# 买单:用 askPrice卖一估算滑点 = (askPrice - midPrice) * quantity
if ask_qty >= quantity:
slippage = (ask_price - mid_price) * quantity
else:
# 数量超过卖一,需要吃多档,粗略估算
slippage = (ask_price - mid_price) * quantity * 1.2
else: # SELL
# 卖单:用 bidPrice买一估算滑点 = (midPrice - bidPrice) * quantity
if bid_qty >= quantity:
slippage = (mid_price - bid_price) * quantity
else:
slippage = (mid_price - bid_price) * quantity * 1.2
return slippage
except Exception:
return None
class BookTickerStream:
"""订阅合约 !bookTicker持续更新 _book_ticker_cache。Leader 时可选写 Redis 供多进程读。"""
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!bookTicker"
return "wss://fstream.binance.com/ws/!bookTicker"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("BookTickerStream: 已启动(!bookTicker用于滑点估算")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("BookTickerStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("BookTickerStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"BookTickerStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _book_ticker_cache, _book_ticker_updated_at
try:
data = json.loads(raw)
except Exception:
return
if isinstance(data, dict) and "stream" in data:
ticker_data = data.get("data", {})
else:
ticker_data = data
if not isinstance(ticker_data, dict) or ticker_data.get("e") != "bookTicker":
return
s = (ticker_data.get("s") or "").strip().upper()
if not s or not s.endswith("USDT"):
return
try:
item = {
"symbol": s,
"bidPrice": float(ticker_data.get("b", 0)),
"bidQty": float(ticker_data.get("B", 0)),
"askPrice": float(ticker_data.get("a", 0)),
"askQty": float(ticker_data.get("A", 0)),
"time": int(ticker_data.get("T", 0)),
}
except (TypeError, ValueError):
return
# 有 Redis 时只写 Redis串行化合并避免多任务同时拉全量导致 Leader 进程内存暴增
if self._redis_cache:
try:
asyncio.get_event_loop().create_task(
self._merge_and_write_book_ticker_to_redis_serialized(s, item)
)
except Exception as e:
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
return
_book_ticker_cache[s] = item
_book_ticker_updated_at = time.monotonic()
if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS:
keys = list(_book_ticker_cache.keys())
for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]:
del _book_ticker_cache[k]
logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}")
async def _merge_and_write_book_ticker_to_redis_serialized(self, symbol: str, item: Dict[str, Any]) -> None:
"""串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。"""
lock = _get_book_ticker_merge_lock()
async with lock:
await self._merge_and_write_book_ticker_to_redis(symbol, item)
async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None:
"""从 Redis 读出、合并单条、写回,并写更新时间。限制 key 数量防 Redis 膨胀。"""
try:
if not self._redis_cache:
return
existing = await self._redis_cache.get(KEY_BOOK_TICKER)
merged = dict(existing) if isinstance(existing, dict) else {}
merged[symbol] = item
usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")}
if len(usdt_only) > _REDIS_BOOK_TICKER_MAX_KEYS:
keys = list(usdt_only.keys())[-_REDIS_BOOK_TICKER_MAX_KEYS:]
usdt_only = {k: usdt_only[k] for k in keys}
await self._redis_cache.set(KEY_BOOK_TICKER, usdt_only, ttl=30)
await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30)
except Exception as e:
logger.debug("BookTickerStream: 写入 Redis 失败 %s", e)
async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""只从 Redis 拉取「更新时间」,不拉全量,进程内不保留 500 条以省内存。"""
global _book_ticker_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
raw = await redis_cache.get(KEY_BOOK_TICKER_UPDATED_AT)
if raw is not None:
try:
t = float(raw)
_book_ticker_updated_at = t if t > 0 else time.time()
except (TypeError, ValueError):
_book_ticker_updated_at = time.time()
logger.debug("BookTicker: 已同步 Redis 更新时间(进程内不缓存全量)")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("BookTicker: 从 Redis 刷新更新时间失败 %s", e)