auto_trade_sys/trading_system/ticker_24h_stream.py
薇薇安 dfbdfee596 fix(binance_client, ticker_stream, user_data_stream): 增强异常处理和日志记录
在 `binance_client`、`ticker_24h_stream` 和 `user_data_stream` 中优化了异常处理逻辑,确保在发生错误时记录详细的错误类型和信息。更新了日志格式,以便于后续排查和监控。同时,增加了对请求超时的处理,提升了系统的稳定性和可追溯性。
2026-02-16 16:51:22 +08:00

145 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
24 小时行情 WebSocket 流:订阅 !ticker@arr维护全市场 ticker 缓存。
供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h减少请求与超时。
文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间monotonic。未更新过为 0。"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _ticker_24h_cache:
return False
return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec
class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。无需 listenKey公开行情。"""
def __init__(self, testnet: bool = False):
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!ticker@arr"
return "wss://fstream.binance.com/ws/!ticker@arr"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("Ticker24hStream: 已启动(!ticker@arr扫描将优先使用 WS 缓存")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("Ticker24hStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("Ticker24hStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"Ticker24hStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _ticker_24h_cache, _ticker_24h_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 可能是单条对象stream 名)或数组;文档说是数组
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
# 组合流格式 { "stream": "!ticker@arr", "data": [ ... ] }
arr = data.get("data") if isinstance(data.get("data"), list) else [data]
else:
return
now_ms = int(time.time() * 1000)
for t in arr:
if not isinstance(t, dict):
continue
s = (t.get("s") or t.get("symbol") or "").strip()
if not s or not s.endswith("USDT"):
continue
try:
price = float(t.get("c") or t.get("lastPrice") or 0)
change_pct = float(t.get("P") or t.get("priceChangePercent") or 0)
# 成交量:优先 quoteVolumeUSDT文档可能为 q 或 quoteVolume
vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0)
except (TypeError, ValueError):
continue
_ticker_24h_cache[s] = {
"symbol": s,
"price": price,
"volume": vol,
"changePercent": change_pct,
"ts": now_ms,
}
_ticker_24h_updated_at = time.monotonic()
logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")