auto_trade_sys/trading_system/kline_stream.py
薇薇安 d42cee2f1a feat(async_handling): 添加任务完成回调以处理异步任务异常
在多个流处理模块中引入 `_task_done_callback` 函数,确保在异步任务完成后能够捕获并记录异常,避免未处理的任务异常导致的潜在问题。此改动提升了系统的稳定性和错误处理能力,确保在执行异步操作时能够更好地管理任务状态。
2026-02-23 15:43:13 +08:00

516 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K线 WebSocket 流:订阅 <symbol>@kline_<interval>维护K线缓存。
供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。
支持多进程共用Leader 写 Redismarket:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。
文档:推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
def _task_done_callback(task: asyncio.Task) -> None:
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug("KlineStream background task error: %s", e)
try:
from .market_ws_leader import KEY_KLINE_PREFIX
except ImportError:
KEY_KLINE_PREFIX = "market:kline:"
try:
from .redis_ttl import TTL_KLINE_STREAM
except ImportError:
TTL_KLINE_STREAM = 600
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
_kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit
# ⚠️ 内存优化限制缓存总大小避免内存无限增长2 CPU 4G 服务器)
# 注意:主要数据在 Redis进程内存只做临时缓存减少大小
_MAX_CACHE_ENTRIES = 50 # 最多保留 50 个 (symbol, interval) 的缓存(减少进程内存占用)
_CACHE_CLEANUP_INTERVAL_SEC = 300 # 每 5 分钟清理一次过期缓存
_CACHE_MAX_AGE_SEC = 300 # 缓存超过 5 分钟未更新则清理(更激进,优先用 Redis
def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]:
"""
从缓存返回K线数据与 REST get_klines 格式兼容)。
⚠️ 内存优化:优先从进程内存读取(快速),但进程内存缓存已限制大小,主要数据在 Redis。
未订阅或数据不足时返回 None。
"""
key = (symbol.upper(), interval.lower())
cached = _kline_cache.get(key)
if not cached or len(cached) < limit:
return None
# 返回最后 limit 根
return cached[-limit:]
def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过。"""
key = (symbol.upper(), interval.lower())
updated = _kline_cache_updated_at.get(key, 0)
if updated == 0:
return False
return (time.monotonic() - updated) <= max_age_sec
def _cleanup_stale_kline_cache(max_age_sec: float = None):
"""
清理过期的 K 线缓存,防止内存无限增长。
优先清理最久未更新的缓存。
"""
global _kline_cache, _kline_cache_updated_at, _kline_cache_limit
if max_age_sec is None:
max_age_sec = _CACHE_MAX_AGE_SEC
now = time.monotonic()
to_remove = []
# 找出过期的缓存
for key, updated_at in _kline_cache_updated_at.items():
if (now - updated_at) > max_age_sec:
to_remove.append(key)
# 如果缓存条目过多,清理最久未更新的(即使未过期)
if len(_kline_cache) > _MAX_CACHE_ENTRIES:
# 按更新时间排序,保留最新的 _MAX_CACHE_ENTRIES 个
sorted_keys = sorted(_kline_cache_updated_at.items(), key=lambda x: x[1], reverse=True)
keep_keys = {k for k, _ in sorted_keys[:_MAX_CACHE_ENTRIES]}
for key in list(_kline_cache.keys()):
if key not in keep_keys:
to_remove.append(key)
# 清理
for key in to_remove:
_kline_cache.pop(key, None)
_kline_cache_updated_at.pop(key, None)
_kline_cache_limit.pop(key, None)
if to_remove:
logger.debug(f"KlineStream: 已清理 {len(to_remove)} 个过期缓存条目(当前缓存数: {len(_kline_cache)}")
class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
# 币安限制:每秒最多 10 条订阅消息,超限会断连
_SUBSCRIBE_RATE_LIMIT = 10 # 条/秒
_SUBSCRIBE_WINDOW_SEC = 1.0
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock()
self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速
# ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升
self._subscription_count = 0 # 当前订阅数量
self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值)
# ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积
self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息
# ⚠️ Redis 写入队列:限制大小,避免无限增长
self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)}
self._redis_write_pending_max_size = 100 # 最多保留 100 个待写入项,超出时丢弃最旧的
self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间
self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com"
return "wss://fstream.binance.com"
async def start(self) -> bool:
global _kline_stream_instance
if self._running:
return True
self._running = True
_kline_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
# ⚠️ 内存优化:启动定期清理任务,防止缓存无限增长
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("KlineStream: 已启动(支持动态订阅)")
return True
async def stop(self):
global _kline_stream_instance
self._running = False
_kline_stream_instance = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if hasattr(self, '_cleanup_task') and self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("KlineStream: 已停止")
async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool:
"""
订阅指定 symbol 和 interval 的K线流若 WS 未连接则等待连接后订阅)。
⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升。
"""
symbol = symbol.upper()
interval = interval.lower()
key = (symbol, interval)
async with self._subscription_lock:
if self._subscribed.get(key):
return True
if not self._running:
return False
# ⚠️ 优化:检查订阅数量限制,避免订阅过多导致负载上升
current_count = len(self._subscribed)
if current_count >= self._max_subscriptions:
logger.warning(
f"KlineStream: 订阅数量已达上限 ({current_count}/{self._max_subscriptions})"
f"跳过订阅 {symbol} {interval}(建议优先使用共享缓存或 REST API"
)
return False
# 等待 WS 连接(最多等待 5 秒)
for _ in range(50):
if self._ws:
break
await asyncio.sleep(0.1)
if not self._ws:
return False
# 币安限制:每秒最多 10 条订阅消息,超限会断连
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT:
wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC
wait_sec = max(0.01, wait_until - time.monotonic())
await asyncio.sleep(wait_sec)
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
stream_name = f"{symbol.lower()}@kline_{interval}"
try:
await self._ws.send_json({
"method": "SUBSCRIBE",
"params": [stream_name],
"id": int(time.time() * 1000) % 1000000,
})
self._subscribe_times.append(time.monotonic())
self._subscribed[key] = True
self._subscription_count = len(self._subscribed)
_kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}(当前订阅数: {self._subscription_count}/{self._max_subscriptions}")
return True
except (ConnectionResetError, OSError) as e:
msg = str(e).lower()
if "closing transport" in msg or "cannot write" in msg:
logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval)
else:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
except Exception as e:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
async def _run_ws(self):
import aiohttp
# 使用组合流,支持动态订阅
url = f"{self._ws_base_url()}/stream"
while self._running:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
# ⚠️ 优化:快速过滤订阅响应,避免不必要的处理
try:
data = json.loads(raw)
if isinstance(data, dict) and "result" in data:
# 订阅响应,忽略
continue
except Exception:
pass
# ⚠️ 优化:异步处理消息,避免阻塞事件循环
# 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升
t = asyncio.create_task(self._handle_message_with_limit(raw))
t.add_done_callback(_task_done_callback)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"KlineStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
# 重连时清空订阅状态,需要重新订阅
async with self._subscription_lock:
self._subscribed.clear()
self._subscription_count = 0
if not self._running:
break
async def _cleanup_loop(self):
"""定期清理过期缓存,防止内存无限增长"""
while self._running:
try:
await asyncio.sleep(_CACHE_CLEANUP_INTERVAL_SEC)
if self._running:
_cleanup_stale_kline_cache()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"KlineStream: 清理缓存时出错: {e}")
async def _handle_message_with_limit(self, raw: str):
"""
带并发限制的消息处理包装器
"""
async with self._message_semaphore:
await self._handle_message_async(raw)
async def _handle_message_async(self, raw: str):
"""
异步处理 WebSocket 消息(优化:避免阻塞事件循环)
⚠️ 优化:改为异步方法,避免同步处理大量消息时阻塞事件循环
"""
global _kline_cache, _kline_cache_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} }
if isinstance(data, dict) and "stream" in data:
stream = data.get("stream", "")
kline_data = data.get("data", {})
else:
kline_data = data
stream = ""
if not isinstance(kline_data, dict) or kline_data.get("e") != "kline":
return
k = kline_data.get("k")
if not isinstance(k, dict):
return
s = (k.get("s") or "").strip().upper()
i = (k.get("i") or "").strip().lower()
if not s or not i:
return
key = (s, i)
if key not in self._subscribed:
return
# 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...]
try:
t = int(k.get("t", 0))
o = float(k.get("o", 0))
h = float(k.get("h", 0))
l = float(k.get("l", 0))
c = float(k.get("c", 0))
v = float(k.get("v", 0))
T = int(k.get("T", 0))
q = float(k.get("q", 0))
n = int(k.get("n", 0))
x = k.get("x", False) # 是否完结
except (TypeError, ValueError):
return
kline_rest_format = [
t, # open_time
str(o), # open
str(h), # high
str(l), # low
str(c), # close
str(v), # volume
T, # close_time
str(q), # quote_volume
n, # trades
"0", # taker_buy_base_volume
"0", # taker_buy_quote_volume
"0", # ignore
]
# ⚠️ 全用 Redis有 Redis 时只写 Redis。关键不要每条消息都从 Redis GET 全量并复制,否则内存暴涨。
if self._redis_cache:
try:
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
# 复用 pending 中已有列表,避免每条消息都 Redis GET + list() 复制(原逻辑导致 400+ 次/秒分配,内存涨到 1.2G
if rkey in self._redis_write_pending:
cache_list = self._redis_write_pending[rkey][0]
else:
# 仅在新 key 或刚 flush 后做一次 Redis GET
existing = await self._redis_cache.get(rkey)
cache_list = list(existing) if existing and isinstance(existing, list) else []
if len(self._redis_write_pending) >= self._redis_write_pending_max_size:
oldest_key = min(self._redis_write_pending.keys(),
key=lambda k: self._redis_write_pending[k][1])
del self._redis_write_pending[oldest_key]
self._redis_write_pending[rkey] = (cache_list, time.monotonic())
# 原地更新 K 线
if x:
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
self._redis_write_pending[rkey] = (cache_list, time.monotonic())
now = time.monotonic()
if now - self._last_redis_batch_write >= 2.0:
async with self._batch_write_lock:
if now - self._last_redis_batch_write >= 2.0:
await self._batch_write_redis()
self._last_redis_batch_write = time.monotonic()
except Exception as e:
logger.debug(f"KlineStream: Redis 处理失败,降级到进程内存: {e}")
# Redis 失败时降级到进程内存
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
else:
# 无 Redis只写进程内存
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8:
_cleanup_stale_kline_cache()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}, Redis={bool(self._redis_cache)})")
async def _batch_write_redis(self):
"""批量写入 Redis写入成功后从进程内存移除以 Redis 为主、基本不占服务器内存"""
global _kline_cache, _kline_cache_updated_at, _kline_cache_limit
if not self._redis_write_pending:
return
try:
pending = dict(self._redis_write_pending)
self._redis_write_pending.clear()
for rkey, (data, _) in pending.items():
try:
await self._redis_cache.set(rkey, data, ttl=TTL_KLINE_STREAM)
# 写入 Redis 后从进程内存移除,避免重复占用
if rkey.startswith(KEY_KLINE_PREFIX):
suffix = rkey[len(KEY_KLINE_PREFIX):]
if ":" in suffix:
s, i = suffix.split(":", 1)
key = (s.upper(), i.lower())
_kline_cache.pop(key, None)
_kline_cache_updated_at.pop(key, None)
_kline_cache_limit.pop(key, None)
except Exception:
pass
except Exception:
pass
def _handle_message(self, raw: str):
"""
同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async
"""
# 为了兼容性保留,但实际应该使用异步版本
t = asyncio.create_task(self._handle_message_async(raw))
t.add_done_callback(_task_done_callback)
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(rkey, data, ttl=TTL_KLINE_STREAM)
except Exception as e:
logger.debug("KlineStream: 写入 Redis 失败 %s", e)
async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]:
"""共用模式:从 Redis 读取 K 线缓存;未命中返回 None。"""
if redis_cache is None or limit <= 0:
return None
try:
s, i = symbol.upper(), interval.lower()
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
data = await redis_cache.get(rkey)
if data and isinstance(data, list) and len(data) >= limit:
return data[-limit:]
return None
except Exception as e:
logger.debug("get_klines_from_redis: %s", e)
return None
# 全局 KlineStream 实例
_kline_stream_instance: Optional["KlineStream"] = None
def get_kline_stream_instance() -> Optional["KlineStream"]:
"""返回当前运行的 KlineStream 实例(未启动时为 None"""
return _kline_stream_instance
def set_kline_stream_instance(instance: Optional["KlineStream"]):
"""设置全局 KlineStream 实例(由 main 调用)。"""
global _kline_stream_instance
_kline_stream_instance = instance