From a498520c5146fa45a3d655556ec9bb3705709864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Thu, 19 Feb 2026 00:26:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(kline=5Fstream):=20=E4=BC=98=E5=8C=96=20Re?= =?UTF-8?q?dis=20=E5=86=99=E5=85=A5=E9=80=BB=E8=BE=91=E4=B8=8E=E5=86=85?= =?UTF-8?q?=E5=AD=98=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `kline_stream.py` 中增强了 Redis 写入机制,限制待处理队列大小以防止无限增长,并在 Redis 处理失败时降级到进程内存。更新了缓存管理逻辑,确保在有 Redis 时优先使用 Redis 进行数据存储,提升了系统的内存使用效率与稳定性。同时,调整了日志记录以减少高负载时的输出频率。此改动进一步优化了消息处理与系统性能。 --- docs/内存优化_修复K线缓存.md | 103 +++++++++++++++++++++++++ trading_system/kline_stream.py | 105 ++++++++++++++++---------- 2 files changed, 168 insertions(+), 40 deletions(-) create mode 100644 docs/内存优化_修复K线缓存.md diff --git a/docs/内存优化_修复K线缓存.md b/docs/内存优化_修复K线缓存.md new file mode 100644 index 0000000..76ecf0f --- /dev/null +++ b/docs/内存优化_修复K线缓存.md @@ -0,0 +1,103 @@ +# 内存优化:修复 K 线缓存仍占用进程内存的问题 + +## 问题 + +即使配置了 Redis,K 线数据仍然在进程内存中累积,导致内存持续增长。 + +## 根本原因 + +在 `kline_stream.py` 的 `_handle_message_async` 中,即使有 Redis,代码仍然**先更新进程内存 `_kline_cache`**,然后再批量写入 Redis。这导致: + +1. **数据双重存储**:进程内存和 Redis 都有数据 +2. **批量写入延迟**:在批量写入之前(最多 2 秒),数据一直留在进程内存 +3. **写入失败时数据残留**:如果 Redis 写入失败,数据不会从进程内存删除 + +## 修复方案 + +### 1. 修改 K 线处理逻辑 + +**文件**: `trading_system/kline_stream.py` + +**修改前**: +- 先更新 `_kline_cache`(进程内存) +- 然后标记需要写入 Redis +- 批量写入 Redis 后删除进程内存中的 key + +**修改后**: +- **有 Redis 时**:直接从 Redis 读取当前数据,更新后写回 Redis,**完全不写进程内存** +- **无 Redis 时**:才写进程内存(降级) + +### 2. 限制 Redis 写入队列大小 + +`_redis_write_pending` 字典可能无限增长,添加了大小限制: +- 最多保留 **100 个**待写入项 +- 超出时删除最旧的项 + +### 3. 删除重复定义 + +修复了代码中重复定义的变量(复制粘贴错误)。 + +## 修改详情 + +### `_handle_message_async` 方法 + +```python +# 修改前:总是先写进程内存 +if key not in _kline_cache: + _kline_cache[key] = [] +cache_list = _kline_cache[key] +# ... 更新 cache_list ... +# 然后标记写入 Redis + +# 修改后:有 Redis 时只写 Redis +if self._redis_cache: + # 从 Redis 读取 + existing = await self._redis_cache.get(rkey) + cache_list = list(existing) if existing else [] + # 更新 cache_list + # 标记写入 Redis(不写进程内存) +else: + # 无 Redis 时才写进程内存 + ... +``` + +## 预期效果 + +- **进程内存占用大幅降低**:有 Redis 时,K 线数据不再存储在进程内存中 +- **内存增长停止**:不再有数据在进程内存中累积 + +## 需要重启服务 + +**重要**:修改后需要重启交易服务才能生效。 + +```bash +# 重启交易服务 +pkill -f "python.*trading_system.main" +# 然后重新启动 +``` + +## 验证 + +重启后,检查内存使用: + +```bash +# 查看进程内存 +ps aux | grep "trading_system.main" | awk '{print "MEM:", $4"%", "RSS:", $6/1024"MB"}' + +# 或使用诊断脚本 +cd backend +./检查内存问题.sh +``` + +正常情况下,有 Redis 时: +- K 线缓存进程内存应该**接近 0**(只有降级时的少量数据) +- 内存占用应该**稳定**,不再持续增长 + +## 其他可能的内存增长源 + +如果重启后内存仍在增长,检查: + +1. **WebSocket 消息队列**:检查是否有消息堆积 +2. **日志**:检查日志文件大小 +3. **数据库连接**:检查连接池是否正常释放 +4. **其他缓存**:检查 `_price_cache`、`_symbol_info_cache` 等是否正常清理 diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index ba1af65..18cec7c 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -112,11 +112,9 @@ class KlineStream: self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值) # ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积 self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息 + # ⚠️ Redis 写入队列:限制大小,避免无限增长 self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)} - self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间 - # ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积 - self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息 - self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)} + self._redis_write_pending_max_size = 100 # 最多保留 100 个待写入项,超出时丢弃最旧的 self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间 self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入 @@ -358,53 +356,80 @@ class KlineStream: "0", # ignore ] - # 更新缓存:若 x=true(完结),追加新K线;若 x=false(未完结),更新最后一根 - if key not in _kline_cache: - _kline_cache[key] = [] - cache_list = _kline_cache[key] - - if x: - # K线完结:追加新K线 - cache_list.append(kline_rest_format) - limit = _kline_cache_limit.get(key, 50) - if len(cache_list) > limit * 2: - cache_list[:] = cache_list[-limit:] - else: - # K线未完结:更新最后一根(或追加第一根) - if cache_list: - cache_list[-1] = kline_rest_format - else: - cache_list.append(kline_rest_format) - - _kline_cache_updated_at[key] = time.monotonic() - # ⚠️ 内存优化:定期清理过期缓存,防止内存无限增长 - if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8: # 达到 80% 时触发清理 - _cleanup_stale_kline_cache() - # ⚠️ 优化:减少日志输出频率,避免大量消息时日志负载过高 - # 只在 DEBUG 级别或每 100 条消息记录一次 - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根") - - # ⚠️ 优化:批量写入 Redis,减少写入频率 - # 使用延迟写入机制,避免每条消息都写入 Redis(每 2 秒批量写入一次) + # ⚠️ 全用 Redis:有 Redis 时只写 Redis,不写进程内存;无 Redis 时才写进程内存 if self._redis_cache: + # 有 Redis:从 Redis 读取当前数据,更新后写回 Redis,不占进程内存 try: - copy = list(cache_list) rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" - # 标记需要写入 Redis,但不立即写入 - self._redis_write_pending[rkey] = (copy, time.monotonic()) + # 从 Redis 读取当前 K 线列表(如果存在) + existing = await self._redis_cache.get(rkey) + cache_list = list(existing) if existing and isinstance(existing, list) else [] - # ⚠️ 优化:使用锁避免并发批量写入 + # 更新 K 线数据 + if x: + cache_list.append(kline_rest_format) + limit = _kline_cache_limit.get(key, 50) + if len(cache_list) > limit * 2: + cache_list[:] = cache_list[-limit:] + else: + if cache_list: + cache_list[-1] = kline_rest_format + else: + cache_list.append(kline_rest_format) + + # 标记需要写入 Redis(批量写入),限制队列大小避免无限增长 + if len(self._redis_write_pending) >= self._redis_write_pending_max_size: + # 队列已满,删除最旧的项 + oldest_key = min(self._redis_write_pending.keys(), + key=lambda k: self._redis_write_pending[k][1]) + del self._redis_write_pending[oldest_key] + self._redis_write_pending[rkey] = (cache_list, time.monotonic()) + + # 批量写入 Redis(每 2 秒一次) now = time.monotonic() if now - self._last_redis_batch_write >= 2.0: async with self._batch_write_lock: - # 双重检查,避免重复写入 if now - self._last_redis_batch_write >= 2.0: await self._batch_write_redis() self._last_redis_batch_write = time.monotonic() except Exception as e: - # 静默失败,避免日志过多 - pass + logger.debug(f"KlineStream: Redis 处理失败,降级到进程内存: {e}") + # Redis 失败时降级到进程内存 + if key not in _kline_cache: + _kline_cache[key] = [] + cache_list = _kline_cache[key] + if x: + cache_list.append(kline_rest_format) + limit = _kline_cache_limit.get(key, 50) + if len(cache_list) > limit * 2: + cache_list[:] = cache_list[-limit:] + else: + if cache_list: + cache_list[-1] = kline_rest_format + else: + cache_list.append(kline_rest_format) + _kline_cache_updated_at[key] = time.monotonic() + else: + # 无 Redis:只写进程内存 + if key not in _kline_cache: + _kline_cache[key] = [] + cache_list = _kline_cache[key] + if x: + cache_list.append(kline_rest_format) + limit = _kline_cache_limit.get(key, 50) + if len(cache_list) > limit * 2: + cache_list[:] = cache_list[-limit:] + else: + if cache_list: + cache_list[-1] = kline_rest_format + else: + cache_list.append(kline_rest_format) + _kline_cache_updated_at[key] = time.monotonic() + if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8: + _cleanup_stale_kline_cache() + + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}, Redis={bool(self._redis_cache)})") async def _batch_write_redis(self): """批量写入 Redis;写入成功后从进程内存移除,以 Redis 为主、基本不占服务器内存"""