auto_trade_sys/trading_system/kline_stream.py
薇薇安 80872231a5 feat(kline_stream, diagnostics): 增强 K线缓存管理与系统负载诊断功能
在 `kline_stream.py` 中新增缓存清理机制,限制缓存总大小并定期清理过期条目,防止内存无限增长。更新 `backend/诊断负载.sh` 脚本,优化系统负载检查逻辑,提供更详细的进程与日志信息,提升用户对交易服务状态的监控能力。此改动增强了系统的稳定性与性能。
2026-02-19 00:06:23 +08:00

459 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K线 WebSocket 流:订阅 <symbol>@kline_<interval>维护K线缓存。
供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。
支持多进程共用Leader 写 Redismarket:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。
文档:推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_KLINE_PREFIX
except ImportError:
KEY_KLINE_PREFIX = "market:kline:"
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
_kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit
# ⚠️ 内存优化限制缓存总大小避免内存无限增长2 CPU 4G 服务器)
_MAX_CACHE_ENTRIES = 200 # 最多保留 200 个 (symbol, interval) 的缓存
_CACHE_CLEANUP_INTERVAL_SEC = 300 # 每 5 分钟清理一次过期缓存
_CACHE_MAX_AGE_SEC = 600 # 缓存超过 10 分钟未更新则清理
def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]:
"""从缓存返回K线数据与 REST get_klines 格式兼容)。未订阅或数据不足时返回 None。"""
key = (symbol.upper(), interval.lower())
cached = _kline_cache.get(key)
if not cached or len(cached) < limit:
return None
# 返回最后 limit 根
return cached[-limit:]
def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过。"""
key = (symbol.upper(), interval.lower())
updated = _kline_cache_updated_at.get(key, 0)
if updated == 0:
return False
return (time.monotonic() - updated) <= max_age_sec
def _cleanup_stale_kline_cache(max_age_sec: float = None):
"""
清理过期的 K 线缓存,防止内存无限增长。
优先清理最久未更新的缓存。
"""
global _kline_cache, _kline_cache_updated_at, _kline_cache_limit
if max_age_sec is None:
max_age_sec = _CACHE_MAX_AGE_SEC
now = time.monotonic()
to_remove = []
# 找出过期的缓存
for key, updated_at in _kline_cache_updated_at.items():
if (now - updated_at) > max_age_sec:
to_remove.append(key)
# 如果缓存条目过多,清理最久未更新的(即使未过期)
if len(_kline_cache) > _MAX_CACHE_ENTRIES:
# 按更新时间排序,保留最新的 _MAX_CACHE_ENTRIES 个
sorted_keys = sorted(_kline_cache_updated_at.items(), key=lambda x: x[1], reverse=True)
keep_keys = {k for k, _ in sorted_keys[:_MAX_CACHE_ENTRIES]}
for key in list(_kline_cache.keys()):
if key not in keep_keys:
to_remove.append(key)
# 清理
for key in to_remove:
_kline_cache.pop(key, None)
_kline_cache_updated_at.pop(key, None)
_kline_cache_limit.pop(key, None)
if to_remove:
logger.debug(f"KlineStream: 已清理 {len(to_remove)} 个过期缓存条目(当前缓存数: {len(_kline_cache)}")
class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
# 币安限制:每秒最多 10 条订阅消息,超限会断连
_SUBSCRIBE_RATE_LIMIT = 10 # 条/秒
_SUBSCRIBE_WINDOW_SEC = 1.0
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock()
self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速
# ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升
self._subscription_count = 0 # 当前订阅数量
self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值)
# ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积
self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息
self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)}
self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间
# ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积
self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息
self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)}
self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间
self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com"
return "wss://fstream.binance.com"
async def start(self) -> bool:
global _kline_stream_instance
if self._running:
return True
self._running = True
_kline_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
# ⚠️ 内存优化:启动定期清理任务,防止缓存无限增长
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("KlineStream: 已启动(支持动态订阅)")
return True
async def stop(self):
global _kline_stream_instance
self._running = False
_kline_stream_instance = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if hasattr(self, '_cleanup_task') and self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("KlineStream: 已停止")
async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool:
"""
订阅指定 symbol 和 interval 的K线流若 WS 未连接则等待连接后订阅)。
⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升。
"""
symbol = symbol.upper()
interval = interval.lower()
key = (symbol, interval)
async with self._subscription_lock:
if self._subscribed.get(key):
return True
if not self._running:
return False
# ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升
current_count = len(self._subscribed)
if current_count >= self._max_subscriptions:
logger.warning(
f"KlineStream: 订阅数量已达上限 ({current_count}/{self._max_subscriptions})"
f"跳过订阅 {symbol} {interval}(建议优先使用共享缓存或 REST API"
)
return False
# 等待 WS 连接(最多等待 5 秒)
for _ in range(50):
if self._ws:
break
await asyncio.sleep(0.1)
if not self._ws:
return False
# 币安限制:每秒最多 10 条订阅消息,超限会断连
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT:
wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC
wait_sec = max(0.01, wait_until - time.monotonic())
await asyncio.sleep(wait_sec)
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
stream_name = f"{symbol.lower()}@kline_{interval}"
try:
await self._ws.send_json({
"method": "SUBSCRIBE",
"params": [stream_name],
"id": int(time.time() * 1000) % 1000000,
})
self._subscribe_times.append(time.monotonic())
self._subscribed[key] = True
self._subscription_count = len(self._subscribed)
_kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}(当前订阅数: {self._subscription_count}/{self._max_subscriptions}")
return True
except (ConnectionResetError, OSError) as e:
msg = str(e).lower()
if "closing transport" in msg or "cannot write" in msg:
logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval)
else:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
except Exception as e:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
async def _run_ws(self):
import aiohttp
# 使用组合流,支持动态订阅
url = f"{self._ws_base_url()}/stream"
while self._running:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
# ⚠️ 优化:快速过滤订阅响应,避免不必要的处理
try:
data = json.loads(raw)
if isinstance(data, dict) and "result" in data:
# 订阅响应,忽略
continue
except Exception:
pass
# ⚠️ 优化:异步处理消息,避免阻塞事件循环
# 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升
asyncio.create_task(self._handle_message_with_limit(raw))
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"KlineStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
# 重连时清空订阅状态,需要重新订阅
async with self._subscription_lock:
self._subscribed.clear()
self._subscription_count = 0
if not self._running:
break
async def _cleanup_loop(self):
"""定期清理过期缓存,防止内存无限增长"""
while self._running:
try:
await asyncio.sleep(_CACHE_CLEANUP_INTERVAL_SEC)
if self._running:
_cleanup_stale_kline_cache()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"KlineStream: 清理缓存时出错: {e}")
async def _handle_message_with_limit(self, raw: str):
"""
带并发限制的消息处理包装器
"""
async with self._message_semaphore:
await self._handle_message_async(raw)
async def _handle_message_async(self, raw: str):
"""
异步处理 WebSocket 消息(优化:避免阻塞事件循环)
⚠️ 优化:改为异步方法,避免同步处理大量消息时阻塞事件循环
"""
global _kline_cache, _kline_cache_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} }
if isinstance(data, dict) and "stream" in data:
stream = data.get("stream", "")
kline_data = data.get("data", {})
else:
kline_data = data
stream = ""
if not isinstance(kline_data, dict) or kline_data.get("e") != "kline":
return
k = kline_data.get("k")
if not isinstance(k, dict):
return
s = (k.get("s") or "").strip().upper()
i = (k.get("i") or "").strip().lower()
if not s or not i:
return
key = (s, i)
if key not in self._subscribed:
return
# 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...]
try:
t = int(k.get("t", 0))
o = float(k.get("o", 0))
h = float(k.get("h", 0))
l = float(k.get("l", 0))
c = float(k.get("c", 0))
v = float(k.get("v", 0))
T = int(k.get("T", 0))
q = float(k.get("q", 0))
n = int(k.get("n", 0))
x = k.get("x", False) # 是否完结
except (TypeError, ValueError):
return
kline_rest_format = [
t, # open_time
str(o), # open
str(h), # high
str(l), # low
str(c), # close
str(v), # volume
T, # close_time
str(q), # quote_volume
n, # trades
"0", # taker_buy_base_volume
"0", # taker_buy_quote_volume
"0", # ignore
]
# 更新缓存:若 x=true完结追加新K线若 x=false未完结更新最后一根
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
# K线完结追加新K线
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
# K线未完结更新最后一根或追加第一根
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
# ⚠️ 内存优化:定期清理过期缓存,防止内存无限增长
if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8: # 达到 80% 时触发清理
_cleanup_stale_kline_cache()
# ⚠️ 优化:减少日志输出频率,避免大量消息时日志负载过高
# 只在 DEBUG 级别或每 100 条消息记录一次
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}")
# ⚠️ 优化:批量写入 Redis减少写入频率
# 使用延迟写入机制,避免每条消息都写入 Redis每 2 秒批量写入一次)
if self._redis_cache:
try:
copy = list(cache_list)
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
# 标记需要写入 Redis但不立即写入
self._redis_write_pending[rkey] = (copy, time.monotonic())
# ⚠️ 优化:使用锁避免并发批量写入
now = time.monotonic()
if now - self._last_redis_batch_write >= 2.0:
async with self._batch_write_lock:
# 双重检查,避免重复写入
if now - self._last_redis_batch_write >= 2.0:
await self._batch_write_redis()
self._last_redis_batch_write = time.monotonic()
except Exception as e:
# 静默失败,避免日志过多
pass
async def _batch_write_redis(self):
"""批量写入 Redis减少写入频率"""
if not self._redis_write_pending:
return
try:
pending = dict(self._redis_write_pending)
self._redis_write_pending.clear()
for rkey, (data, _) in pending.items():
try:
await self._redis_cache.set(rkey, data, ttl=600)
except Exception:
pass
except Exception:
pass
def _handle_message(self, raw: str):
"""
同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async
"""
# 为了兼容性保留,但实际应该使用异步版本
asyncio.create_task(self._handle_message_async(raw))
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(rkey, data, ttl=600)
except Exception as e:
logger.debug("KlineStream: 写入 Redis 失败 %s", e)
async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]:
"""共用模式:从 Redis 读取 K 线缓存;未命中返回 None。"""
if redis_cache is None or limit <= 0:
return None
try:
s, i = symbol.upper(), interval.lower()
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
data = await redis_cache.get(rkey)
if data and isinstance(data, list) and len(data) >= limit:
return data[-limit:]
return None
except Exception as e:
logger.debug("get_klines_from_redis: %s", e)
return None
# 全局 KlineStream 实例
_kline_stream_instance: Optional["KlineStream"] = None
def get_kline_stream_instance() -> Optional["KlineStream"]:
"""返回当前运行的 KlineStream 实例(未启动时为 None"""
return _kline_stream_instance
def set_kline_stream_instance(instance: Optional["KlineStream"]):
"""设置全局 KlineStream 实例(由 main 调用)。"""
global _kline_stream_instance
_kline_stream_instance = instance