auto_trade_sys/trading_system/ticker_24h_stream.py
薇薇安 d42cee2f1a feat(async_handling): 添加任务完成回调以处理异步任务异常
在多个流处理模块中引入 `_task_done_callback` 函数,确保在异步任务完成后能够捕获并记录异常,避免未处理的任务异常导致的潜在问题。此改动提升了系统的稳定性和错误处理能力,确保在执行异步操作时能够更好地管理任务状态。
2026-02-23 15:43:13 +08:00

251 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
24 小时行情 WebSocket 流:订阅 !ticker@arr维护全市场 ticker 缓存。
供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h减少请求与超时。
支持多进程共用Leader 写 Redis所有进程可通过 refresh_ticker_24h_from_redis_loop 从 Redis 更新本地缓存。
文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
def _task_done_callback(task: asyncio.Task) -> None:
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug("Ticker24hStream background task error: %s", e)
try:
from .market_ws_leader import KEY_TICKER_24H
except ImportError:
KEY_TICKER_24H = "market:ticker_24h"
KEY_TICKER_24H_UPDATED_AT = "market:ticker_24h:updated_at"
# 进程内不再保留全量:有 Redis 时只维护“最后更新时间”,数据全部从 Redis 按需读
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
_TICKER_24H_CACHE_MAX_KEYS = 500 # 仅无 Redis 时使用
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""无 Redis 降级时返回进程内缓存;有 Redis 时应使用 get_tickers_24h_from_redis。"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间(由 refresh 从 Redis 回写 updated_at"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""是否有可用数据(有 Redis 时由 refresh 更新 _ticker_24h_updated_at 为 Redis 的 time.time"""
return _ticker_24h_updated_at > 0 and (time.time() - _ticker_24h_updated_at) <= max_age_sec
async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, Any]]:
"""从 Redis 按需读取全量 24h ticker进程内不保留减轻内存。"""
if not redis_cache:
return get_tickers_24h_cache()
try:
data = await redis_cache.get(KEY_TICKER_24H)
return dict(data) if isinstance(data, dict) else {}
except Exception as e:
logger.debug("get_tickers_24h_from_redis: %s", e)
return get_tickers_24h_cache()
# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积(单进程 800MB+
_REDIS_MERGE_LOCK: Optional[asyncio.Lock] = None
_REDIS_TICKER_24H_MAX_KEYS = 600 # Redis 内最多保留 USDT 数量,防止 key 无限增长
def _get_ticker_24h_merge_lock() -> asyncio.Lock:
global _REDIS_MERGE_LOCK
if _REDIS_MERGE_LOCK is None:
_REDIS_MERGE_LOCK = asyncio.Lock()
return _REDIS_MERGE_LOCK
class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。"""
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!ticker@arr"
return "wss://fstream.binance.com/ws/!ticker@arr"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("Ticker24hStream: 已启动(!ticker@arr扫描将优先使用 WS 缓存")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("Ticker24hStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("Ticker24hStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"Ticker24hStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _ticker_24h_cache, _ticker_24h_updated_at
try:
data = json.loads(raw)
except Exception:
return
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
arr = data.get("data") if isinstance(data.get("data"), list) else [data]
else:
return
now_ms = int(time.time() * 1000)
new_items = {}
for t in arr:
if not isinstance(t, dict):
continue
s = (t.get("s") or t.get("symbol") or "").strip()
if not s or not s.endswith("USDT"):
continue
try:
price = float(t.get("c") or t.get("lastPrice") or 0)
change_pct = float(t.get("P") or t.get("priceChangePercent") or 0)
vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0)
except (TypeError, ValueError):
continue
new_items[s] = {
"symbol": s,
"price": price,
"volume": vol,
"changePercent": change_pct,
"ts": now_ms,
}
if not new_items:
return
# 有 Redis 时只写 Redis串行化合并避免多任务同时拉全量导致 Leader 进程内存暴增
if self._redis_cache:
try:
t = asyncio.create_task(
self._merge_and_write_ticker_24h_to_redis_serialized(new_items)
)
t.add_done_callback(_task_done_callback)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
return
# 无 Redis 时才写进程内存
for s, v in new_items.items():
_ticker_24h_cache[s] = v
_ticker_24h_updated_at = time.monotonic()
if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS:
keys = list(_ticker_24h_cache.keys())
for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]:
del _ticker_24h_cache[k]
logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
async def _merge_and_write_ticker_24h_to_redis_serialized(self, new_items: Dict[str, Dict[str, Any]]) -> None:
"""串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。"""
lock = _get_ticker_24h_merge_lock()
async with lock:
await self._merge_and_write_ticker_24h_to_redis(new_items)
async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None:
"""从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量。限制 key 数量防 Redis 膨胀。"""
try:
if not self._redis_cache:
return
existing = await self._redis_cache.get(KEY_TICKER_24H)
merged = dict(existing) if isinstance(existing, dict) else {}
merged.update(new_items)
# 只保留 USDT 且限制数量,防止 key 无限增长
usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")}
if len(usdt_only) > _REDIS_TICKER_24H_MAX_KEYS:
keys = list(usdt_only.keys())[-_REDIS_TICKER_24H_MAX_KEYS:]
usdt_only = {k: usdt_only[k] for k in keys}
await self._redis_cache.set(KEY_TICKER_24H, usdt_only, ttl=120)
await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""只从 Redis 拉取「更新时间」,不拉全量数据,进程内不保留 500 条以省内存。"""
global _ticker_24h_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
raw = await redis_cache.get(KEY_TICKER_24H_UPDATED_AT)
if raw is not None:
try:
t = float(raw)
_ticker_24h_updated_at = t if t > 0 else time.monotonic()
except (TypeError, ValueError):
_ticker_24h_updated_at = time.monotonic()
logger.debug("Ticker24h: 已同步 Redis 更新时间(进程内不缓存全量)")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Ticker24h: 从 Redis 刷新更新时间失败 %s", e)