auto_trade_sys/trading_system/kline_stream.py
薇薇安 43e993034f feat(redis_integration): 支持多进程共用市场数据流
在 `binance_client`、`kline_stream`、`book_ticker_stream` 和 `ticker_24h_stream` 中引入 Redis 缓存支持,允许 Leader 进程写入数据,其他进程从 Redis 读取,提升数据获取效率。更新了相关逻辑以确保在多进程环境下的稳定性和一致性,同时增强了异常处理和日志记录,确保系统的可追溯性。
2026-02-16 17:44:10 +08:00

317 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K线 WebSocket 流:订阅 <symbol>@kline_<interval>维护K线缓存。
供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。
支持多进程共用Leader 写 Redismarket:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。
文档:推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_KLINE_PREFIX
except ImportError:
KEY_KLINE_PREFIX = "market:kline:"
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
_kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit
def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]:
"""从缓存返回K线数据与 REST get_klines 格式兼容)。未订阅或数据不足时返回 None。"""
key = (symbol.upper(), interval.lower())
cached = _kline_cache.get(key)
if not cached or len(cached) < limit:
return None
# 返回最后 limit 根
return cached[-limit:]
def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过。"""
key = (symbol.upper(), interval.lower())
updated = _kline_cache_updated_at.get(key, 0)
if updated == 0:
return False
return (time.monotonic() - updated) <= max_age_sec
class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
# 币安限制:每秒最多 10 条订阅消息,超限会断连
_SUBSCRIBE_RATE_LIMIT = 10 # 条/秒
_SUBSCRIBE_WINDOW_SEC = 1.0
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock()
self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com"
return "wss://fstream.binance.com"
async def start(self) -> bool:
global _kline_stream_instance
if self._running:
return True
self._running = True
_kline_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
logger.info("KlineStream: 已启动(支持动态订阅)")
return True
async def stop(self):
global _kline_stream_instance
self._running = False
_kline_stream_instance = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("KlineStream: 已停止")
async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool:
"""订阅指定 symbol 和 interval 的K线流若 WS 未连接则等待连接后订阅)。"""
symbol = symbol.upper()
interval = interval.lower()
key = (symbol, interval)
async with self._subscription_lock:
if self._subscribed.get(key):
return True
if not self._running:
return False
# 等待 WS 连接(最多等待 5 秒)
for _ in range(50):
if self._ws:
break
await asyncio.sleep(0.1)
if not self._ws:
return False
# 币安限制:每秒最多 10 条订阅消息,超限会断连
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT:
wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC
wait_sec = max(0.01, wait_until - time.monotonic())
await asyncio.sleep(wait_sec)
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
stream_name = f"{symbol.lower()}@kline_{interval}"
try:
await self._ws.send_json({
"method": "SUBSCRIBE",
"params": [stream_name],
"id": int(time.time() * 1000) % 1000000,
})
self._subscribe_times.append(time.monotonic())
self._subscribed[key] = True
_kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}")
return True
except (ConnectionResetError, OSError) as e:
msg = str(e).lower()
if "closing transport" in msg or "cannot write" in msg:
logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval)
else:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
except Exception as e:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
async def _run_ws(self):
import aiohttp
# 使用组合流,支持动态订阅
url = f"{self._ws_base_url()}/stream"
while self._running:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
# 处理订阅响应({"result": null, "id": ...}或K线数据
try:
data = json.loads(raw)
if isinstance(data, dict) and "result" in data:
# 订阅响应,忽略
continue
except Exception:
pass
self._handle_message(raw)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"KlineStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
# 重连时清空订阅状态,需要重新订阅
async with self._subscription_lock:
self._subscribed.clear()
if not self._running:
break
def _handle_message(self, raw: str):
global _kline_cache, _kline_cache_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} }
if isinstance(data, dict) and "stream" in data:
stream = data.get("stream", "")
kline_data = data.get("data", {})
else:
kline_data = data
stream = ""
if not isinstance(kline_data, dict) or kline_data.get("e") != "kline":
return
k = kline_data.get("k")
if not isinstance(k, dict):
return
s = (k.get("s") or "").strip().upper()
i = (k.get("i") or "").strip().lower()
if not s or not i:
return
key = (s, i)
if key not in self._subscribed:
return
# 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...]
try:
t = int(k.get("t", 0))
o = float(k.get("o", 0))
h = float(k.get("h", 0))
l = float(k.get("l", 0))
c = float(k.get("c", 0))
v = float(k.get("v", 0))
T = int(k.get("T", 0))
q = float(k.get("q", 0))
n = int(k.get("n", 0))
x = k.get("x", False) # 是否完结
except (TypeError, ValueError):
return
kline_rest_format = [
t, # open_time
str(o), # open
str(h), # high
str(l), # low
str(c), # close
str(v), # volume
T, # close_time
str(q), # quote_volume
n, # trades
"0", # taker_buy_base_volume
"0", # taker_buy_quote_volume
"0", # ignore
]
# 更新缓存:若 x=true完结追加新K线若 x=false未完结更新最后一根
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
# K线完结追加新K线
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
# K线未完结更新最后一根或追加第一根
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = list(cache_list)
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
loop.create_task(self._write_kline_to_redis(rkey, copy))
except Exception as e:
logger.debug("KlineStream: 写入 Redis 调度失败 %s", e)
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(rkey, data, ttl=600)
except Exception as e:
logger.debug("KlineStream: 写入 Redis 失败 %s", e)
async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]:
"""共用模式:从 Redis 读取 K 线缓存;未命中返回 None。"""
if redis_cache is None or limit <= 0:
return None
try:
s, i = symbol.upper(), interval.lower()
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
data = await redis_cache.get(rkey)
if data and isinstance(data, list) and len(data) >= limit:
return data[-limit:]
return None
except Exception as e:
logger.debug("get_klines_from_redis: %s", e)
return None
# 全局 KlineStream 实例
_kline_stream_instance: Optional["KlineStream"] = None
def get_kline_stream_instance() -> Optional["KlineStream"]:
"""返回当前运行的 KlineStream 实例(未启动时为 None"""
return _kline_stream_instance
def set_kline_stream_instance(instance: Optional["KlineStream"]):
"""设置全局 KlineStream 实例(由 main 调用)。"""
global _kline_stream_instance
_kline_stream_instance = instance