auto_trade_sys/trading_system/ticker_24h_stream.py
薇薇安 43e993034f feat(redis_integration): 支持多进程共用市场数据流
在 `binance_client`、`kline_stream`、`book_ticker_stream` 和 `ticker_24h_stream` 中引入 Redis 缓存支持,允许 Leader 进程写入数据,其他进程从 Redis 读取,提升数据获取效率。更新了相关逻辑以确保在多进程环境下的稳定性和一致性,同时增强了异常处理和日志记录,确保系统的可追溯性。
2026-02-16 17:44:10 +08:00

185 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
24 小时行情 WebSocket 流:订阅 !ticker@arr维护全市场 ticker 缓存。
供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h减少请求与超时。
支持多进程共用Leader 写 Redis所有进程可通过 refresh_ticker_24h_from_redis_loop 从 Redis 更新本地缓存。
文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_TICKER_24H
except ImportError:
KEY_TICKER_24H = "market:ticker_24h"
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间monotonic。未更新过为 0。"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _ticker_24h_cache:
return False
return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec
class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。"""
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!ticker@arr"
return "wss://fstream.binance.com/ws/!ticker@arr"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("Ticker24hStream: 已启动(!ticker@arr扫描将优先使用 WS 缓存")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("Ticker24hStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("Ticker24hStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"Ticker24hStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _ticker_24h_cache, _ticker_24h_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 可能是单条对象stream 名)或数组;文档说是数组
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
# 组合流格式 { "stream": "!ticker@arr", "data": [ ... ] }
arr = data.get("data") if isinstance(data.get("data"), list) else [data]
else:
return
now_ms = int(time.time() * 1000)
for t in arr:
if not isinstance(t, dict):
continue
s = (t.get("s") or t.get("symbol") or "").strip()
if not s or not s.endswith("USDT"):
continue
try:
price = float(t.get("c") or t.get("lastPrice") or 0)
change_pct = float(t.get("P") or t.get("priceChangePercent") or 0)
# 成交量:优先 quoteVolumeUSDT文档可能为 q 或 quoteVolume
vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0)
except (TypeError, ValueError):
continue
_ticker_24h_cache[s] = {
"symbol": s,
"price": price,
"volume": vol,
"changePercent": change_pct,
"ts": now_ms,
}
_ticker_24h_updated_at = time.monotonic()
logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = dict(_ticker_24h_cache)
loop.create_task(self._write_ticker_24h_to_redis(copy))
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
async def _write_ticker_24h_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(KEY_TICKER_24H, data, ttl=120)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""非 Leader 或共用模式:定期从 Redis 拉取 24h ticker 到本地缓存。所有进程可调用。"""
global _ticker_24h_cache, _ticker_24h_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_TICKER_24H)
if data and isinstance(data, dict):
_ticker_24h_cache.update(data)
_ticker_24h_updated_at = time.monotonic()
logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(data))
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e)