auto_trade_sys/trading_system/ticker_24h_stream.py
薇薇安 f4feea6b87 feat(ticker_stream, book_ticker_stream): 优化内存管理与Redis写入逻辑
在 `ticker_24h_stream.py` 和 `book_ticker_stream.py` 中引入新的内存管理机制,限制进程内缓存的最大条数为 500,避免内存无限增长。更新 Redis 写入逻辑,确保在有 Redis 时优先写入 Redis,而不在进程内存中常驻数据。通过定期从 Redis 拉取数据并合并,提升了系统的内存使用效率与稳定性,同时优化了日志记录以减少高负载时的输出频率。此改动进一步增强了系统性能与资源管理能力。
2026-02-19 00:34:35 +08:00

205 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
24 小时行情 WebSocket 流:订阅 !ticker@arr维护全市场 ticker 缓存。
供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h减少请求与超时。
支持多进程共用Leader 写 Redis所有进程可通过 refresh_ticker_24h_from_redis_loop 从 Redis 更新本地缓存。
文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。
"""
import asyncio
import json
import logging
import time
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_TICKER_24H
except ImportError:
KEY_TICKER_24H = "market:ticker_24h"
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
_TICKER_24H_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个,避免无限增长
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间monotonic。未更新过为 0。"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _ticker_24h_cache:
return False
return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec
class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。"""
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
def _ws_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com/ws/!ticker@arr"
return "wss://fstream.binance.com/ws/!ticker@arr"
async def start(self) -> bool:
if self._running:
return True
self._running = True
self._task = asyncio.create_task(self._run_ws())
logger.info("Ticker24hStream: 已启动(!ticker@arr扫描将优先使用 WS 缓存")
return True
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("Ticker24hStream: 已停止")
async def _run_ws(self):
import aiohttp
while self._running:
url = self._ws_url()
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("Ticker24hStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"Ticker24hStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
if not self._running:
break
def _handle_message(self, raw: str):
global _ticker_24h_cache, _ticker_24h_updated_at
try:
data = json.loads(raw)
except Exception:
return
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
arr = data.get("data") if isinstance(data.get("data"), list) else [data]
else:
return
now_ms = int(time.time() * 1000)
new_items = {}
for t in arr:
if not isinstance(t, dict):
continue
s = (t.get("s") or t.get("symbol") or "").strip()
if not s or not s.endswith("USDT"):
continue
try:
price = float(t.get("c") or t.get("lastPrice") or 0)
change_pct = float(t.get("P") or t.get("priceChangePercent") or 0)
vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0)
except (TypeError, ValueError):
continue
new_items[s] = {
"symbol": s,
"price": price,
"volume": vol,
"changePercent": change_pct,
"ts": now_ms,
}
if not new_items:
return
# 有 Redis 时只写 Redis不写进程内存由 refresh 循环从 Redis 回填,避免双重占用)
if self._redis_cache:
try:
asyncio.get_event_loop().create_task(
self._merge_and_write_ticker_24h_to_redis(new_items)
)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
return
# 无 Redis 时才写进程内存
for s, v in new_items.items():
_ticker_24h_cache[s] = v
_ticker_24h_updated_at = time.monotonic()
if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS:
keys = list(_ticker_24h_cache.keys())
for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]:
del _ticker_24h_cache[k]
logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None:
"""从 Redis 读出、合并新数据、写回,不占用进程内存常驻"""
try:
if not self._redis_cache:
return
existing = await self._redis_cache.get(KEY_TICKER_24H)
merged = dict(existing) if isinstance(existing, dict) else {}
merged.update(new_items)
await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""定期从 Redis 拉取 24h ticker 到本地缓存Leader 与非 Leader 都跑,避免进程内常驻全量)。"""
global _ticker_24h_cache, _ticker_24h_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_TICKER_24H)
if data and isinstance(data, dict):
_ticker_24h_cache.update(data)
if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS:
keys = list(_ticker_24h_cache.keys())
for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]:
del _ticker_24h_cache[k]
_ticker_24h_updated_at = time.monotonic()
logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(_ticker_24h_cache))
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e)