diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 18cec7c..8d09c53 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -356,16 +356,24 @@ class KlineStream: "0", # ignore ] - # ⚠️ 全用 Redis:有 Redis 时只写 Redis,不写进程内存;无 Redis 时才写进程内存 + # ⚠️ 全用 Redis:有 Redis 时只写 Redis。关键:不要每条消息都从 Redis GET 全量并复制,否则内存暴涨。 if self._redis_cache: - # 有 Redis:从 Redis 读取当前数据,更新后写回 Redis,不占进程内存 try: rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" - # 从 Redis 读取当前 K 线列表(如果存在) - existing = await self._redis_cache.get(rkey) - cache_list = list(existing) if existing and isinstance(existing, list) else [] + # 复用 pending 中已有列表,避免每条消息都 Redis GET + list() 复制(原逻辑导致 400+ 次/秒分配,内存涨到 1.2G) + if rkey in self._redis_write_pending: + cache_list = self._redis_write_pending[rkey][0] + else: + # 仅在新 key 或刚 flush 后做一次 Redis GET + existing = await self._redis_cache.get(rkey) + cache_list = list(existing) if existing and isinstance(existing, list) else [] + if len(self._redis_write_pending) >= self._redis_write_pending_max_size: + oldest_key = min(self._redis_write_pending.keys(), + key=lambda k: self._redis_write_pending[k][1]) + del self._redis_write_pending[oldest_key] + self._redis_write_pending[rkey] = (cache_list, time.monotonic()) - # 更新 K 线数据 + # 原地更新 K 线 if x: cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) @@ -376,16 +384,8 @@ class KlineStream: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) - - # 标记需要写入 Redis(批量写入),限制队列大小避免无限增长 - if len(self._redis_write_pending) >= self._redis_write_pending_max_size: - # 队列已满,删除最旧的项 - oldest_key = min(self._redis_write_pending.keys(), - key=lambda k: self._redis_write_pending[k][1]) - del self._redis_write_pending[oldest_key] self._redis_write_pending[rkey] = (cache_list, time.monotonic()) - # 批量写入 Redis(每 2 秒一次) now = time.monotonic() if now - self._last_redis_batch_write >= 2.0: async with self._batch_write_lock: