feat(kline_stream): 优化 Redis 数据处理逻辑与内存管理

在 `kline_stream.py` 中改进了 Redis 数据处理逻辑,避免每条消息都从 Redis 获取全量数据,减少内存占用。通过复用待处理列表,提升了性能并降低了内存使用。更新了缓存管理机制,确保在有 Redis 时优先使用其进行数据存储,进一步优化了系统的内存使用效率与稳定性。
This commit is contained in:
薇薇安 2026-02-19 09:23:53 +08:00
parent 95867e90f8
commit be43ec1c33

View File

@ -356,16 +356,24 @@ class KlineStream:
"0", # ignore
]
# ⚠️ 全用 Redis有 Redis 时只写 Redis,不写进程内存;无 Redis 时才写进程内存
# ⚠️ 全用 Redis有 Redis 时只写 Redis。关键:不要每条消息都从 Redis GET 全量并复制,否则内存暴涨。
if self._redis_cache:
# 有 Redis从 Redis 读取当前数据,更新后写回 Redis不占进程内存
try:
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
# 从 Redis 读取当前 K 线列表(如果存在)
existing = await self._redis_cache.get(rkey)
cache_list = list(existing) if existing and isinstance(existing, list) else []
# 复用 pending 中已有列表,避免每条消息都 Redis GET + list() 复制(原逻辑导致 400+ 次/秒分配,内存涨到 1.2G
if rkey in self._redis_write_pending:
cache_list = self._redis_write_pending[rkey][0]
else:
# 仅在新 key 或刚 flush 后做一次 Redis GET
existing = await self._redis_cache.get(rkey)
cache_list = list(existing) if existing and isinstance(existing, list) else []
if len(self._redis_write_pending) >= self._redis_write_pending_max_size:
oldest_key = min(self._redis_write_pending.keys(),
key=lambda k: self._redis_write_pending[k][1])
del self._redis_write_pending[oldest_key]
self._redis_write_pending[rkey] = (cache_list, time.monotonic())
# 更新 K 线数据
# 原地更新 K 线
if x:
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
@ -376,16 +384,8 @@ class KlineStream:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
# 标记需要写入 Redis批量写入限制队列大小避免无限增长
if len(self._redis_write_pending) >= self._redis_write_pending_max_size:
# 队列已满,删除最旧的项
oldest_key = min(self._redis_write_pending.keys(),
key=lambda k: self._redis_write_pending[k][1])
del self._redis_write_pending[oldest_key]
self._redis_write_pending[rkey] = (cache_list, time.monotonic())
# 批量写入 Redis每 2 秒一次)
now = time.monotonic()
if now - self._last_redis_batch_write >= 2.0:
async with self._batch_write_lock: