auto_trade_sys/trading_system/ticker_24h_stream.py
薇薇安 c6126a42c9 feat(ticker_stream): 引入24小时行情WebSocket流以优化数据获取
在交易系统中新增24小时行情WebSocket流的支持,优先从缓存中读取行情数据,减少对REST API的依赖。更新市场扫描器以使用WebSocket缓存,确保在缓存过期时回退到REST请求。同时,添加了相应的异常处理逻辑以增强系统的稳定性。
2026-02-16 15:22:51 +08:00

139 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
24 小时行情 WebSocket 流:订阅 !ticker@arr维护全市场 ticker 缓存。
供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h减少请求与超时。
文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间monotonic。未更新过为 0。"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _ticker_24h_cache:
return False
return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec
class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。无需 listenKey公开行情。"""
def __init__(self, testnet: bool = False):
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!ticker@arr"
return "wss://fstream.binance.com/ws/!ticker@arr"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("Ticker24hStream: 已启动(!ticker@arr扫描将优先使用 WS 缓存")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("Ticker24hStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("Ticker24hStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Ticker24hStream: WS 异常 {e}10s 后重连")
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _ticker_24h_cache, _ticker_24h_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 可能是单条对象stream 名)或数组;文档说是数组
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
# 组合流格式 { "stream": "!ticker@arr", "data": [ ... ] }
arr = data.get("data") if isinstance(data.get("data"), list) else [data]
else:
return
now_ms = int(time.time() * 1000)
for t in arr:
if not isinstance(t, dict):
continue
s = (t.get("s") or t.get("symbol") or "").strip()
if not s or not s.endswith("USDT"):
continue
try:
price = float(t.get("c") or t.get("lastPrice") or 0)
change_pct = float(t.get("P") or t.get("priceChangePercent") or 0)
# 成交量:优先 quoteVolumeUSDT文档可能为 q 或 quoteVolume
vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0)
except (TypeError, ValueError):
continue
_ticker_24h_cache[s] = {
"symbol": s,
"price": price,
"volume": vol,
"changePercent": change_pct,
"ts": now_ms,
}
_ticker_24h_updated_at = time.monotonic()
logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")