diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index f628b49..1eed6f4 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -61,6 +61,15 @@ class KlineStream: # ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升 self._subscription_count = 0 # 当前订阅数量 self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值) + # ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积 + self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息 + self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)} + self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间 + # ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积 + self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息 + self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)} + self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间 + self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入 def _ws_base_url(self) -> str: if self.testnet: @@ -176,7 +185,7 @@ class KlineStream: break if msg.type == aiohttp.WSMsgType.TEXT: raw = msg.data - # 处理订阅响应({"result": null, "id": ...})或K线数据 + # ⚠️ 优化:快速过滤订阅响应,避免不必要的处理 try: data = json.loads(raw) if isinstance(data, dict) and "result" in data: @@ -184,7 +193,9 @@ class KlineStream: continue except Exception: pass - self._handle_message(raw) + # ⚠️ 优化:异步处理消息,避免阻塞事件循环 + # 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升 + asyncio.create_task(self._handle_message_with_limit(raw)) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: @@ -206,7 +217,19 @@ class KlineStream: if not self._running: break - def _handle_message(self, raw: str): + async def _handle_message_with_limit(self, raw: str): + """ + 带并发限制的消息处理包装器 + """ + async with self._message_semaphore: + await self._handle_message_async(raw) + + async def _handle_message_async(self, raw: str): + """ + 异步处理 WebSocket 消息(优化:避免阻塞事件循环) + + ⚠️ 优化:改为异步方法,避免同步处理大量消息时阻塞事件循环 + """ global _kline_cache, _kline_cache_updated_at try: data = json.loads(raw) @@ -285,15 +308,53 @@ class KlineStream: cache_list.append(kline_rest_format) _kline_cache_updated_at[key] = time.monotonic() - logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根") + # ⚠️ 优化:减少日志输出频率,避免大量消息时日志负载过高 + # 只在 DEBUG 级别或每 100 条消息记录一次 + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根") + + # ⚠️ 优化:批量写入 Redis,减少写入频率 + # 使用延迟写入机制,避免每条消息都写入 Redis(每 2 秒批量写入一次) if self._redis_cache: try: - loop = asyncio.get_event_loop() copy = list(cache_list) rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" - loop.create_task(self._write_kline_to_redis(rkey, copy)) + # 标记需要写入 Redis,但不立即写入 + self._redis_write_pending[rkey] = (copy, time.monotonic()) + + # ⚠️ 优化:使用锁避免并发批量写入 + now = time.monotonic() + if now - self._last_redis_batch_write >= 2.0: + async with self._batch_write_lock: + # 双重检查,避免重复写入 + if now - self._last_redis_batch_write >= 2.0: + await self._batch_write_redis() + self._last_redis_batch_write = time.monotonic() except Exception as e: - logger.debug("KlineStream: 写入 Redis 调度失败 %s", e) + # 静默失败,避免日志过多 + pass + + async def _batch_write_redis(self): + """批量写入 Redis,减少写入频率""" + if not self._redis_write_pending: + return + try: + pending = dict(self._redis_write_pending) + self._redis_write_pending.clear() + for rkey, (data, _) in pending.items(): + try: + await self._redis_cache.set(rkey, data, ttl=600) + except Exception: + pass + except Exception: + pass + + def _handle_message(self, raw: str): + """ + 同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async) + """ + # 为了兼容性保留,但实际应该使用异步版本 + asyncio.create_task(self._handle_message_async(raw)) async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None: try: