diff --git a/docs/内存优化_修复K线缓存.md b/docs/内存优化_修复K线缓存.md index 76ecf0f..ea2edf1 100644 --- a/docs/内存优化_修复K线缓存.md +++ b/docs/内存优化_修复K线缓存.md @@ -93,11 +93,24 @@ cd backend - K 线缓存进程内存应该**接近 0**(只有降级时的少量数据) - 内存占用应该**稳定**,不再持续增长 +## 补充:Ticker24h / BookTicker 进程内存(本次一并修复) + +**原因**:Leader 进程里 `_ticker_24h_cache`、`_book_ticker_cache` 一直随 WS 消息更新,全市场 200+ 交易对常驻进程内存,且每次写 Redis 前还 `dict(_ticker_24h_cache)` 做完整拷贝,加重内存和分配。 + +**修改**(`ticker_24h_stream.py` / `book_ticker_stream.py`): +- **有 Redis 时**:WS 只做「从 Redis 读出 → 合并本批/本条 → 写回 Redis」,**不再更新** `_ticker_24h_cache` / `_book_ticker_cache`。 +- 进程内缓存只由 **refresh 循环**(每 2 秒从 Redis 拉一次)回填,并限制最多 **500 条**,避免无限增长。 +- **无 Redis 时**:仍只写进程内存,并同样限制 500 条。 + +这样 Leader 不再在进程里常驻全量 ticker/bookTicker,也不为写 Redis 做整份拷贝。 + +--- + ## 其他可能的内存增长源 -如果重启后内存仍在增长,检查: +若重启后内存仍涨,可再查: -1. **WebSocket 消息队列**:检查是否有消息堆积 -2. **日志**:检查日志文件大小 -3. **数据库连接**:检查连接池是否正常释放 -4. **其他缓存**:检查 `_price_cache`、`_symbol_info_cache` 等是否正常清理 +1. **WebSocket 消息队列**:是否有任务/消息堆积 +2. **日志**:日志文件与 Redis 日志 handler 是否过大 +3. **数据库连接**:连接池是否及时释放 +4. **其他缓存**:`_price_cache`、`_symbol_info_cache` 等是否有上限并定期淘汰 diff --git a/trading_system/book_ticker_stream.py b/trading_system/book_ticker_stream.py index 591caf8..f3a8458 100644 --- a/trading_system/book_ticker_stream.py +++ b/trading_system/book_ticker_stream.py @@ -18,8 +18,10 @@ except ImportError: KEY_BOOK_TICKER = "market:book_ticker" # 最优挂单缓存:symbol -> { bidPrice, bidQty, askPrice, askQty, time } +# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积 _book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_updated_at: float = 0.0 +_BOOK_TICKER_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个 def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]: @@ -159,7 +161,6 @@ class BookTickerStream: data = json.loads(raw) except Exception: return - # 可能是单条对象或组合流格式 if isinstance(data, dict) and "stream" in data: ticker_data = data.get("data", {}) else: @@ -173,7 +174,7 @@ class BookTickerStream: return try: - _book_ticker_cache[s] = { + item = { "symbol": s, "bidPrice": float(ticker_data.get("b", 0)), "bidQty": float(ticker_data.get("B", 0)), @@ -184,26 +185,38 @@ class BookTickerStream: except (TypeError, ValueError): return - _book_ticker_updated_at = time.monotonic() - logger.debug(f"BookTickerStream: 已更新 {s} bid={_book_ticker_cache[s]['bidPrice']:.4f} ask={_book_ticker_cache[s]['askPrice']:.4f}") + # 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环回填) if self._redis_cache: try: - loop = asyncio.get_event_loop() - copy = dict(_book_ticker_cache) - loop.create_task(self._write_book_ticker_to_redis(copy)) + asyncio.get_event_loop().create_task( + self._merge_and_write_book_ticker_to_redis(s, item) + ) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e) + return + _book_ticker_cache[s] = item + _book_ticker_updated_at = time.monotonic() + if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS: + keys = list(_book_ticker_cache.keys()) + for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]: + del _book_ticker_cache[k] + logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}") - async def _write_book_ticker_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None: + async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None: + """从 Redis 读出、合并单条、写回,不占用进程内存常驻""" try: - if self._redis_cache: - await self._redis_cache.set(KEY_BOOK_TICKER, data, ttl=30) + if not self._redis_cache: + return + existing = await self._redis_cache.get(KEY_BOOK_TICKER) + merged = dict(existing) if isinstance(existing, dict) else {} + merged[symbol] = item + await self._redis_cache.set(KEY_BOOK_TICKER, merged, ttl=30) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 失败 %s", e) async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: - """非 Leader 或共用模式:定期从 Redis 拉取 bookTicker 到本地缓存。所有进程可调用。""" + """定期从 Redis 拉取 bookTicker 到本地缓存(Leader 与非 Leader 都跑),并限制条数。""" global _book_ticker_cache, _book_ticker_updated_at if redis_cache is None: return @@ -213,8 +226,12 @@ async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: fl data = await redis_cache.get(KEY_BOOK_TICKER) if data and isinstance(data, dict): _book_ticker_cache.update(data) + if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS: + keys = list(_book_ticker_cache.keys()) + for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]: + del _book_ticker_cache[k] _book_ticker_updated_at = time.monotonic() - logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(data)) + logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(_book_ticker_cache)) except asyncio.CancelledError: break except Exception as e: diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py index d2ce962..6b58788 100644 --- a/trading_system/ticker_24h_stream.py +++ b/trading_system/ticker_24h_stream.py @@ -18,8 +18,10 @@ except ImportError: KEY_TICKER_24H = "market:ticker_24h" # 全市场 24h ticker 缓存:symbol -> { symbol, price, volume, changePercent, ts } +# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积 _ticker_24h_cache: Dict[str, Dict[str, Any]] = {} _ticker_24h_updated_at: float = 0.0 +_TICKER_24H_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个,避免无限增长 def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]: @@ -118,15 +120,14 @@ class Ticker24hStream: data = json.loads(raw) except Exception: return - # 可能是单条对象(stream 名)或数组;文档说是数组 if isinstance(data, list): arr = data elif isinstance(data, dict): - # 组合流格式 { "stream": "!ticker@arr", "data": [ ... ] } arr = data.get("data") if isinstance(data.get("data"), list) else [data] else: return now_ms = int(time.time() * 1000) + new_items = {} for t in arr: if not isinstance(t, dict): continue @@ -136,37 +137,52 @@ class Ticker24hStream: try: price = float(t.get("c") or t.get("lastPrice") or 0) change_pct = float(t.get("P") or t.get("priceChangePercent") or 0) - # 成交量:优先 quoteVolume(USDT),文档可能为 q 或 quoteVolume vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0) except (TypeError, ValueError): continue - _ticker_24h_cache[s] = { + new_items[s] = { "symbol": s, "price": price, "volume": vol, "changePercent": change_pct, "ts": now_ms, } - _ticker_24h_updated_at = time.monotonic() - logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") + if not new_items: + return + # 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环从 Redis 回填,避免双重占用) if self._redis_cache: try: - loop = asyncio.get_event_loop() - copy = dict(_ticker_24h_cache) - loop.create_task(self._write_ticker_24h_to_redis(copy)) + asyncio.get_event_loop().create_task( + self._merge_and_write_ticker_24h_to_redis(new_items) + ) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e) + return + # 无 Redis 时才写进程内存 + for s, v in new_items.items(): + _ticker_24h_cache[s] = v + _ticker_24h_updated_at = time.monotonic() + if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS: + keys = list(_ticker_24h_cache.keys()) + for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]: + del _ticker_24h_cache[k] + logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") - async def _write_ticker_24h_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None: + async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None: + """从 Redis 读出、合并新数据、写回,不占用进程内存常驻""" try: - if self._redis_cache: - await self._redis_cache.set(KEY_TICKER_24H, data, ttl=120) + if not self._redis_cache: + return + existing = await self._redis_cache.get(KEY_TICKER_24H) + merged = dict(existing) if isinstance(existing, dict) else {} + merged.update(new_items) + await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e) async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: - """非 Leader 或共用模式:定期从 Redis 拉取 24h ticker 到本地缓存。所有进程可调用。""" + """定期从 Redis 拉取 24h ticker 到本地缓存(Leader 与非 Leader 都跑,避免进程内常驻全量)。""" global _ticker_24h_cache, _ticker_24h_updated_at if redis_cache is None: return @@ -176,8 +192,12 @@ async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: flo data = await redis_cache.get(KEY_TICKER_24H) if data and isinstance(data, dict): _ticker_24h_cache.update(data) + if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS: + keys = list(_ticker_24h_cache.keys()) + for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]: + del _ticker_24h_cache[k] _ticker_24h_updated_at = time.monotonic() - logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(data)) + logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(_ticker_24h_cache)) except asyncio.CancelledError: break except Exception as e: