auto_trade_sys/trading_system/kline_stream.py
薇薇安 c9d9836df5 feat(kline_stream, market_scanner, config): 优化 K线订阅逻辑与缓存机制
在 `config.py` 中新增 `SCAN_LIMIT_KLINE_SUBSCRIBE` 配置,限制 K线订阅数量以降低负载。更新 `kline_stream.py`,引入订阅统计与数量限制,避免过多订阅导致性能问题。修改 `market_scanner.py`,优化 K线数据获取流程,优先使用已有缓存,减少不必要的订阅。此改动提升了系统的稳定性与性能。
2026-02-18 00:36:44 +08:00

335 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K线 WebSocket 流:订阅 <symbol>@kline_<interval>维护K线缓存。
供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。
支持多进程共用Leader 写 Redismarket:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。
文档:推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_KLINE_PREFIX
except ImportError:
KEY_KLINE_PREFIX = "market:kline:"
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
_kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit
def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]:
"""从缓存返回K线数据与 REST get_klines 格式兼容)。未订阅或数据不足时返回 None。"""
key = (symbol.upper(), interval.lower())
cached = _kline_cache.get(key)
if not cached or len(cached) < limit:
return None
# 返回最后 limit 根
return cached[-limit:]
def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过。"""
key = (symbol.upper(), interval.lower())
updated = _kline_cache_updated_at.get(key, 0)
if updated == 0:
return False
return (time.monotonic() - updated) <= max_age_sec
class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
# 币安限制:每秒最多 10 条订阅消息,超限会断连
_SUBSCRIBE_RATE_LIMIT = 10 # 条/秒
_SUBSCRIBE_WINDOW_SEC = 1.0
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock()
self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速
# ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升
self._subscription_count = 0 # 当前订阅数量
self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值)
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com"
return "wss://fstream.binance.com"
async def start(self) -> bool:
global _kline_stream_instance
if self._running:
return True
self._running = True
_kline_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
logger.info("KlineStream: 已启动(支持动态订阅)")
return True
async def stop(self):
global _kline_stream_instance
self._running = False
_kline_stream_instance = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("KlineStream: 已停止")
async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool:
"""
订阅指定 symbol 和 interval 的K线流若 WS 未连接则等待连接后订阅)。
⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升。
"""
symbol = symbol.upper()
interval = interval.lower()
key = (symbol, interval)
async with self._subscription_lock:
if self._subscribed.get(key):
return True
if not self._running:
return False
# ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升
current_count = len(self._subscribed)
if current_count >= self._max_subscriptions:
logger.warning(
f"KlineStream: 订阅数量已达上限 ({current_count}/{self._max_subscriptions})"
f"跳过订阅 {symbol} {interval}(建议优先使用共享缓存或 REST API"
)
return False
# 等待 WS 连接(最多等待 5 秒)
for _ in range(50):
if self._ws:
break
await asyncio.sleep(0.1)
if not self._ws:
return False
# 币安限制:每秒最多 10 条订阅消息,超限会断连
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT:
wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC
wait_sec = max(0.01, wait_until - time.monotonic())
await asyncio.sleep(wait_sec)
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
stream_name = f"{symbol.lower()}@kline_{interval}"
try:
await self._ws.send_json({
"method": "SUBSCRIBE",
"params": [stream_name],
"id": int(time.time() * 1000) % 1000000,
})
self._subscribe_times.append(time.monotonic())
self._subscribed[key] = True
self._subscription_count = len(self._subscribed)
_kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}(当前订阅数: {self._subscription_count}/{self._max_subscriptions}")
return True
except (ConnectionResetError, OSError) as e:
msg = str(e).lower()
if "closing transport" in msg or "cannot write" in msg:
logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval)
else:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
except Exception as e:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
async def _run_ws(self):
import aiohttp
# 使用组合流,支持动态订阅
url = f"{self._ws_base_url()}/stream"
while self._running:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
# 处理订阅响应({"result": null, "id": ...}或K线数据
try:
data = json.loads(raw)
if isinstance(data, dict) and "result" in data:
# 订阅响应,忽略
continue
except Exception:
pass
self._handle_message(raw)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"KlineStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
# 重连时清空订阅状态,需要重新订阅
async with self._subscription_lock:
self._subscribed.clear()
self._subscription_count = 0
if not self._running:
break
def _handle_message(self, raw: str):
global _kline_cache, _kline_cache_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} }
if isinstance(data, dict) and "stream" in data:
stream = data.get("stream", "")
kline_data = data.get("data", {})
else:
kline_data = data
stream = ""
if not isinstance(kline_data, dict) or kline_data.get("e") != "kline":
return
k = kline_data.get("k")
if not isinstance(k, dict):
return
s = (k.get("s") or "").strip().upper()
i = (k.get("i") or "").strip().lower()
if not s or not i:
return
key = (s, i)
if key not in self._subscribed:
return
# 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...]
try:
t = int(k.get("t", 0))
o = float(k.get("o", 0))
h = float(k.get("h", 0))
l = float(k.get("l", 0))
c = float(k.get("c", 0))
v = float(k.get("v", 0))
T = int(k.get("T", 0))
q = float(k.get("q", 0))
n = int(k.get("n", 0))
x = k.get("x", False) # 是否完结
except (TypeError, ValueError):
return
kline_rest_format = [
t, # open_time
str(o), # open
str(h), # high
str(l), # low
str(c), # close
str(v), # volume
T, # close_time
str(q), # quote_volume
n, # trades
"0", # taker_buy_base_volume
"0", # taker_buy_quote_volume
"0", # ignore
]
# 更新缓存:若 x=true完结追加新K线若 x=false未完结更新最后一根
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
# K线完结追加新K线
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
# K线未完结更新最后一根或追加第一根
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = list(cache_list)
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
loop.create_task(self._write_kline_to_redis(rkey, copy))
except Exception as e:
logger.debug("KlineStream: 写入 Redis 调度失败 %s", e)
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(rkey, data, ttl=600)
except Exception as e:
logger.debug("KlineStream: 写入 Redis 失败 %s", e)
async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]:
"""共用模式:从 Redis 读取 K 线缓存;未命中返回 None。"""
if redis_cache is None or limit <= 0:
return None
try:
s, i = symbol.upper(), interval.lower()
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
data = await redis_cache.get(rkey)
if data and isinstance(data, list) and len(data) >= limit:
return data[-limit:]
return None
except Exception as e:
logger.debug("get_klines_from_redis: %s", e)
return None
# 全局 KlineStream 实例
_kline_stream_instance: Optional["KlineStream"] = None
def get_kline_stream_instance() -> Optional["KlineStream"]:
"""返回当前运行的 KlineStream 实例(未启动时为 None"""
return _kline_stream_instance
def set_kline_stream_instance(instance: Optional["KlineStream"]):
"""设置全局 KlineStream 实例(由 main 调用)。"""
global _kline_stream_instance
_kline_stream_instance = instance