From be43ec1c334a105b36307232d85befe159732fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Thu, 19 Feb 2026 09:23:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(kline=5Fstream):=20=E4=BC=98=E5=8C=96=20Re?= =?UTF-8?q?dis=20=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E4=B8=8E=E5=86=85=E5=AD=98=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `kline_stream.py` 中改进了 Redis 数据处理逻辑,避免每条消息都从 Redis 获取全量数据,减少内存占用。通过复用待处理列表,提升了性能并降低了内存使用。更新了缓存管理机制,确保在有 Redis 时优先使用其进行数据存储,进一步优化了系统的内存使用效率与稳定性。 --- trading_system/kline_stream.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 18cec7c..8d09c53 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -356,16 +356,24 @@ class KlineStream: "0", # ignore ] - # ⚠️ 全用 Redis:有 Redis 时只写 Redis,不写进程内存;无 Redis 时才写进程内存 + # ⚠️ 全用 Redis:有 Redis 时只写 Redis。关键:不要每条消息都从 Redis GET 全量并复制,否则内存暴涨。 if self._redis_cache: - # 有 Redis:从 Redis 读取当前数据,更新后写回 Redis,不占进程内存 try: rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" - # 从 Redis 读取当前 K 线列表(如果存在) - existing = await self._redis_cache.get(rkey) - cache_list = list(existing) if existing and isinstance(existing, list) else [] + # 复用 pending 中已有列表,避免每条消息都 Redis GET + list() 复制(原逻辑导致 400+ 次/秒分配,内存涨到 1.2G) + if rkey in self._redis_write_pending: + cache_list = self._redis_write_pending[rkey][0] + else: + # 仅在新 key 或刚 flush 后做一次 Redis GET + existing = await self._redis_cache.get(rkey) + cache_list = list(existing) if existing and isinstance(existing, list) else [] + if len(self._redis_write_pending) >= self._redis_write_pending_max_size: + oldest_key = min(self._redis_write_pending.keys(), + key=lambda k: self._redis_write_pending[k][1]) + del self._redis_write_pending[oldest_key] + self._redis_write_pending[rkey] = (cache_list, time.monotonic()) - # 更新 K 线数据 + # 原地更新 K 线 if x: cache_list.append(kline_rest_format) limit = _kline_cache_limit.get(key, 50) @@ -376,16 +384,8 @@ class KlineStream: cache_list[-1] = kline_rest_format else: cache_list.append(kline_rest_format) - - # 标记需要写入 Redis(批量写入),限制队列大小避免无限增长 - if len(self._redis_write_pending) >= self._redis_write_pending_max_size: - # 队列已满,删除最旧的项 - oldest_key = min(self._redis_write_pending.keys(), - key=lambda k: self._redis_write_pending[k][1]) - del self._redis_write_pending[oldest_key] self._redis_write_pending[rkey] = (cache_list, time.monotonic()) - # 批量写入 Redis(每 2 秒一次) now = time.monotonic() if now - self._last_redis_batch_write >= 2.0: async with self._batch_write_lock: