diff --git a/trading_system/book_ticker_stream.py b/trading_system/book_ticker_stream.py index 3548532..480ed37 100644 --- a/trading_system/book_ticker_stream.py +++ b/trading_system/book_ticker_stream.py @@ -19,6 +19,18 @@ except ImportError: KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at" +# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积 +_redis_book_ticker_merge_lock: Optional[asyncio.Lock] = None +_REDIS_BOOK_TICKER_MAX_KEYS = 600 + + +def _get_book_ticker_merge_lock() -> asyncio.Lock: + global _redis_book_ticker_merge_lock + if _redis_book_ticker_merge_lock is None: + _redis_book_ticker_merge_lock = asyncio.Lock() + return _redis_book_ticker_merge_lock + + # 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读 _book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_updated_at: float = 0.0 @@ -197,11 +209,11 @@ class BookTickerStream: except (TypeError, ValueError): return - # 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环回填) + # 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增 if self._redis_cache: try: asyncio.get_event_loop().create_task( - self._merge_and_write_book_ticker_to_redis(s, item) + self._merge_and_write_book_ticker_to_redis_serialized(s, item) ) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e) @@ -214,15 +226,25 @@ class BookTickerStream: del _book_ticker_cache[k] logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}") + async def _merge_and_write_book_ticker_to_redis_serialized(self, symbol: str, item: Dict[str, Any]) -> None: + """串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。""" + lock = _get_book_ticker_merge_lock() + async with lock: + await self._merge_and_write_book_ticker_to_redis(symbol, item) + async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None: - """从 Redis 读出、合并单条、写回,并写更新时间供 refresh 只拉时间""" + """从 Redis 读出、合并单条、写回,并写更新时间。限制 key 数量防 Redis 膨胀。""" try: if not self._redis_cache: return existing = await self._redis_cache.get(KEY_BOOK_TICKER) merged = dict(existing) if isinstance(existing, dict) else {} merged[symbol] = item - await self._redis_cache.set(KEY_BOOK_TICKER, merged, ttl=30) + usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")} + if len(usdt_only) > _REDIS_BOOK_TICKER_MAX_KEYS: + keys = list(usdt_only.keys())[-_REDIS_BOOK_TICKER_MAX_KEYS:] + usdt_only = {k: usdt_only[k] for k in keys} + await self._redis_cache.set(KEY_BOOK_TICKER, usdt_only, ttl=30) await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 失败 %s", e) diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py index e643682..ad4b88a 100644 --- a/trading_system/ticker_24h_stream.py +++ b/trading_system/ticker_24h_stream.py @@ -52,6 +52,18 @@ async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, An return get_tickers_24h_cache() +# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积(单进程 800MB+) +_REDIS_MERGE_LOCK: Optional[asyncio.Lock] = None +_REDIS_TICKER_24H_MAX_KEYS = 600 # Redis 内最多保留 USDT 数量,防止 key 无限增长 + + +def _get_ticker_24h_merge_lock() -> asyncio.Lock: + global _REDIS_MERGE_LOCK + if _REDIS_MERGE_LOCK is None: + _REDIS_MERGE_LOCK = asyncio.Lock() + return _REDIS_MERGE_LOCK + + class Ticker24hStream: """订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。""" @@ -160,11 +172,11 @@ class Ticker24hStream: } if not new_items: return - # 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环从 Redis 回填,避免双重占用) + # 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增 if self._redis_cache: try: asyncio.get_event_loop().create_task( - self._merge_and_write_ticker_24h_to_redis(new_items) + self._merge_and_write_ticker_24h_to_redis_serialized(new_items) ) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e) @@ -179,15 +191,26 @@ class Ticker24hStream: del _ticker_24h_cache[k] logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") + async def _merge_and_write_ticker_24h_to_redis_serialized(self, new_items: Dict[str, Dict[str, Any]]) -> None: + """串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。""" + lock = _get_ticker_24h_merge_lock() + async with lock: + await self._merge_and_write_ticker_24h_to_redis(new_items) + async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None: - """从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量""" + """从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量。限制 key 数量防 Redis 膨胀。""" try: if not self._redis_cache: return existing = await self._redis_cache.get(KEY_TICKER_24H) merged = dict(existing) if isinstance(existing, dict) else {} merged.update(new_items) - await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120) + # 只保留 USDT 且限制数量,防止 key 无限增长 + usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")} + if len(usdt_only) > _REDIS_TICKER_24H_MAX_KEYS: + keys = list(usdt_only.keys())[-_REDIS_TICKER_24H_MAX_KEYS:] + usdt_only = {k: usdt_only[k] for k in keys} + await self._redis_cache.set(KEY_TICKER_24H, usdt_only, ttl=120) await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)