From 95867e90f87eb3397aa1381607bff62c0619d676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Thu, 19 Feb 2026 00:45:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis=5Fcache,=20binance=5Fclient,=20marke?= =?UTF-8?q?t=5Fscanner,=20position=5Fmanager,=20ticker=5F24h=5Fstream,=20b?= =?UTF-8?q?ook=5Fticker=5Fstream):=20=E5=BC=95=E5=85=A5=20Redis=20?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E6=9C=BA=E5=88=B6=E4=BB=A5=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=AF=BB=E5=8F=96=E4=B8=8E=E5=86=85=E5=AD=98?= =?UTF-8?q?=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在多个模块中实现 Redis 缓存机制,优先从 Redis 读取数据,减少进程内存占用。更新 `binance_client.py`、`market_scanner.py`、`position_manager.py`、`ticker_24h_stream.py` 和 `book_ticker_stream.py`,确保在有 Redis 时优先使用其进行数据存储,降级到内存缓存。调整缓存管理逻辑,限制进程内缓存的最大条数为 500,避免内存无限增长。此改动提升了数据访问效率,优化了内存使用,增强了系统的整体性能与稳定性。 --- docs/单账号内存评估与Redis减压.md | 35 +++++++++++++ trading_system/binance_client.py | 10 ++-- trading_system/book_ticker_stream.py | 56 +++++++++++++-------- trading_system/market_scanner.py | 24 ++++++--- trading_system/position_manager.py | 33 ++++++++---- trading_system/ticker_24h_stream.py | 55 ++++++++++++-------- 6 files changed, 149 insertions(+), 64 deletions(-) create mode 100644 docs/单账号内存评估与Redis减压.md diff --git a/docs/单账号内存评估与Redis减压.md b/docs/单账号内存评估与Redis减压.md new file mode 100644 index 0000000..0f06dd4 --- /dev/null +++ b/docs/单账号内存评估与Redis减压.md @@ -0,0 +1,35 @@ +# 单账号交易服务内存评估与 Redis 减压 + +## 1. 单账号合理内存区间 + +| 组成部分 | 预估范围 | 说明 | +|----------|----------|------| +| Python 解释器 + asyncio | 50–80 MB | 进程基础 | +| aiohttp + WebSocket 连接 x4 | 20–50 MB | UserData / K线 / Ticker24h / BookTicker | +| Redis 客户端连接与缓冲 | 5–15 MB | 远端 Valkey | +| 各模块小缓存(有上限) | 5–20 MB | 价格、symbol_info、持仓等,已限制条数 | +| 策略/扫描/持仓状态 | 10–30 MB | active_positions、top_symbols 等 | +| 分配器碎片、未归还 OS | 50–150 MB | Python 常见 | +| **合计(合理区间)** | **250–400 MB** | 单账号、设计正常时 | + +结论:单账号**约 250–400 MB** 属正常;**500 MB 偏上**但尚可接受;若持续升到 700 MB+ 则偏异常。 + +你当前「多了约 500 MB」:无服务约 720 MB,开服务约 1223 MB → 交易进程约 **500 MB**,处于偏上、可接受范围。目标是通过 Redis 把压力放到远端,把进程压到 **300 MB 左右** 并稳定。 + +## 2. 当前仍占进程内存的来源 + +- **Ticker24h / BookTicker**:refresh 循环每 2 秒从 Redis 拉全量(最多 500 条)回进程,进程内常驻一份完整拷贝(约 200+ 条 × 若干 KB),是主要可优化点。 +- **K 线**:Leader 已只写 Redis,进程内仅少量元数据和待写队列,占比已较小。 +- **持仓/余额**:有 Redis 时只写 Redis,进程内几乎不保留。 +- **价格 / symbol_info**:有 Redis 时仅降级写,条数有上限,占比小。 + +因此,**把 Ticker24h/BookTicker 改为“只从 Redis 按需读、进程内不再保留全量回填”**,是当前最有效的 Redis 减压手段。 + +## 3. 已实现:Ticker24h/BookTicker 完全用 Redis,进程内不保留全量 + +- **写**:Leader 不变,WS 收到后只写 Redis,并写入 `market:ticker_24h:updated_at` / `market:book_ticker:updated_at`(时间戳,供 refresh 判断是否新鲜)。 +- **读**: + - **Ticker24h**:`get_tickers_24h_from_redis(redis_cache)`,由 market_scanner 在 `is_ticker_24h_cache_fresh()` 为 True 时 `await` 调用,数据全部来自 Redis。 + - **BookTicker**:`get_book_ticker_from_redis(redis_cache, symbol)`,由 position_manager、binance_client 在有 redis_cache 时 `await` 调用。 +- **refresh 循环**:只从 Redis 读取 `updated_at` 并写回 `_ticker_24h_updated_at` / `_book_ticker_updated_at`,**不再**拉全量数据,进程内 `_ticker_24h_cache` / `_book_ticker_cache` 在有 Redis 时保持为空。 +- **进程内**:不再保留 200~500 条 ticker/book_ticker,单账号进程内存预计可再降约 50~150MB,稳定在约 **300~400MB**,压力由远端 Redis 承担。 diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index eeba5f7..3e523b1 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -1768,12 +1768,14 @@ class BinanceClient: # 获取交易对精度信息 symbol_info = await self.get_symbol_info(symbol) - # 获取当前价格以计算名义价值(优先用 bookTicker 估算执行价,提升准确性) + # 获取当前价格以计算名义价值(优先用 bookTicker 估算执行价;有 Redis 时从 Redis 读,不占进程内存) if price is None: - # 优先用最优挂单估算(买单用 askPrice,卖单用 bidPrice) try: - from .book_ticker_stream import get_book_ticker - book = get_book_ticker(symbol) + from .book_ticker_stream import get_book_ticker, get_book_ticker_from_redis + if self.redis_cache: + book = await get_book_ticker_from_redis(self.redis_cache, symbol) + else: + book = get_book_ticker(symbol) if book: if side == "BUY": estimated_price = float(book.get("askPrice", 0)) diff --git a/trading_system/book_ticker_stream.py b/trading_system/book_ticker_stream.py index f3a8458..3548532 100644 --- a/trading_system/book_ticker_stream.py +++ b/trading_system/book_ticker_stream.py @@ -17,28 +17,40 @@ try: except ImportError: KEY_BOOK_TICKER = "market:book_ticker" -# 最优挂单缓存:symbol -> { bidPrice, bidQty, askPrice, askQty, time } -# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积 +KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at" + +# 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读 _book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_updated_at: float = 0.0 -_BOOK_TICKER_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个 +_BOOK_TICKER_CACHE_MAX_KEYS = 500 def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]: - """返回当前最优挂单缓存。""" + """无 Redis 降级时返回进程内缓存。""" return dict(_book_ticker_cache) def get_book_ticker(symbol: str) -> Optional[Dict[str, Any]]: - """获取指定交易对的最优挂单;无缓存时返回 None。""" + """同步读:无 Redis 时用进程内缓存;有 Redis 时应使用 get_book_ticker_from_redis。""" return _book_ticker_cache.get(symbol.upper()) def is_book_ticker_cache_fresh(max_age_sec: float = 10.0) -> bool: - """缓存是否在 max_age_sec 秒内更新过且非空。""" - if not _book_ticker_cache: - return False - return (time.monotonic() - _book_ticker_updated_at) <= max_age_sec + """是否有可用数据(有 Redis 时由 refresh 更新 _book_ticker_updated_at)。""" + return _book_ticker_updated_at > 0 and (time.time() - _book_ticker_updated_at) <= max_age_sec + + +async def get_book_ticker_from_redis(redis_cache: Any, symbol: str) -> Optional[Dict[str, Any]]: + """从 Redis 按需读取单 symbol 最优挂单,进程内不保留全量。""" + if not redis_cache: + return get_book_ticker(symbol) + try: + data = await redis_cache.get(KEY_BOOK_TICKER) + if isinstance(data, dict): + return data.get(symbol.upper()) + except Exception as e: + logger.debug("get_book_ticker_from_redis: %s", e) + return get_book_ticker(symbol) def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float]: @@ -203,7 +215,7 @@ class BookTickerStream: logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}") async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None: - """从 Redis 读出、合并单条、写回,不占用进程内存常驻""" + """从 Redis 读出、合并单条、写回,并写更新时间供 refresh 只拉时间""" try: if not self._redis_cache: return @@ -211,28 +223,28 @@ class BookTickerStream: merged = dict(existing) if isinstance(existing, dict) else {} merged[symbol] = item await self._redis_cache.set(KEY_BOOK_TICKER, merged, ttl=30) + await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 失败 %s", e) async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: - """定期从 Redis 拉取 bookTicker 到本地缓存(Leader 与非 Leader 都跑),并限制条数。""" - global _book_ticker_cache, _book_ticker_updated_at + """只从 Redis 拉取「更新时间」,不拉全量,进程内不保留 500 条以省内存。""" + global _book_ticker_updated_at if redis_cache is None: return while True: try: await asyncio.sleep(interval_sec) - data = await redis_cache.get(KEY_BOOK_TICKER) - if data and isinstance(data, dict): - _book_ticker_cache.update(data) - if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS: - keys = list(_book_ticker_cache.keys()) - for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]: - del _book_ticker_cache[k] - _book_ticker_updated_at = time.monotonic() - logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(_book_ticker_cache)) + raw = await redis_cache.get(KEY_BOOK_TICKER_UPDATED_AT) + if raw is not None: + try: + t = float(raw) + _book_ticker_updated_at = t if t > 0 else time.time() + except (TypeError, ValueError): + _book_ticker_updated_at = time.time() + logger.debug("BookTicker: 已同步 Redis 更新时间(进程内不缓存全量)") except asyncio.CancelledError: break except Exception as e: - logger.debug("BookTicker: 从 Redis 刷新失败 %s", e) + logger.debug("BookTicker: 从 Redis 刷新更新时间失败 %s", e) diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index b8dc1ce..bbd81ab 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -90,19 +90,31 @@ class MarketScanner: if excluded_count > 0: logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)") - # 优先从 24h ticker WebSocket 缓存读取,避免批量 REST 请求与超时;无/过期再走 REST + # 优先从 Redis 读 24h ticker(进程内不保留全量,减轻内存);无/过期再 REST all_tickers = None try: try: - from .ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + from .ticker_24h_stream import ( + get_tickers_24h_cache, + get_tickers_24h_from_redis, + is_ticker_24h_cache_fresh, + ) except ImportError: - from ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + from ticker_24h_stream import ( + get_tickers_24h_cache, + get_tickers_24h_from_redis, + is_ticker_24h_cache_fresh, + ) if is_ticker_24h_cache_fresh(max_age_sec=120.0): - all_tickers = get_tickers_24h_cache() + redis_cache = getattr(self.client, "redis_cache", None) + if redis_cache: + all_tickers = await get_tickers_24h_from_redis(redis_cache) + else: + all_tickers = get_tickers_24h_cache() if all_tickers: - logger.info(f"使用 24h ticker WS 缓存({len(all_tickers)} 个交易对),跳过 REST 批量请求") + logger.info(f"使用 24h ticker 缓存({len(all_tickers)} 个交易对,来自 Redis/WS),跳过 REST 批量请求") except Exception as e: - logger.debug(f"读取 24h ticker WS 缓存失败: {e}") + logger.debug(f"读取 24h ticker 缓存失败: {e}") if not all_tickers: logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") all_tickers = await self.client.get_all_tickers_24h() diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index e02f8f5..722ff8e 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -1594,31 +1594,44 @@ class PositionManager: # 获取当前价格(如果未提供):优先 WS 缓存(bookTicker/ticker24h)→ 持仓 markPrice → REST ticker if current_price is None: try: - # 1) 优先 WS:最优挂单中点价或 24h ticker 缓存(避免 REST 超时) + # 1) 优先 Redis/WS:最优挂单中点价或 24h ticker(进程内不保留全量) try: try: - from .book_ticker_stream import get_book_ticker + from .book_ticker_stream import get_book_ticker, get_book_ticker_from_redis except ImportError: - from book_ticker_stream import get_book_ticker - book = get_book_ticker(symbol) + from book_ticker_stream import get_book_ticker, get_book_ticker_from_redis + redis_cache = getattr(self.client, "redis_cache", None) + if redis_cache: + book = await get_book_ticker_from_redis(redis_cache, symbol) + else: + book = get_book_ticker(symbol) if book and float(book.get("bidPrice", 0)) > 0 and float(book.get("askPrice", 0)) > 0: mid = (float(book["bidPrice"]) + float(book["askPrice"])) / 2 current_price = mid - logger.debug(f"{symbol} 从 bookTicker WS 获取当前价格: {current_price}") + logger.debug(f"{symbol} 从 bookTicker 获取当前价格: {current_price}") except Exception: pass if current_price is None: try: try: - from .ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + from .ticker_24h_stream import ( + get_tickers_24h_cache, + get_tickers_24h_from_redis, + is_ticker_24h_cache_fresh, + ) except ImportError: - from ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + from ticker_24h_stream import ( + get_tickers_24h_cache, + get_tickers_24h_from_redis, + is_ticker_24h_cache_fresh, + ) if is_ticker_24h_cache_fresh(max_age_sec=120): - tickers = get_tickers_24h_cache() - t = tickers.get(symbol) + redis_cache = getattr(self.client, "redis_cache", None) + tickers = await get_tickers_24h_from_redis(redis_cache) if redis_cache else get_tickers_24h_cache() + t = tickers.get(symbol) if tickers else None if t and t.get("price"): current_price = float(t["price"]) - logger.debug(f"{symbol} 从 ticker24h WS 获取当前价格: {current_price}") + logger.debug(f"{symbol} 从 ticker24h 获取当前价格: {current_price}") except Exception: pass # 2) 持仓标记价格(MARK_PRICE,与止损单触发基准一致) diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py index 6b58788..e643682 100644 --- a/trading_system/ticker_24h_stream.py +++ b/trading_system/ticker_24h_stream.py @@ -17,28 +17,39 @@ try: except ImportError: KEY_TICKER_24H = "market:ticker_24h" -# 全市场 24h ticker 缓存:symbol -> { symbol, price, volume, changePercent, ts } -# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积 +KEY_TICKER_24H_UPDATED_AT = "market:ticker_24h:updated_at" + +# 进程内不再保留全量:有 Redis 时只维护“最后更新时间”,数据全部从 Redis 按需读 _ticker_24h_cache: Dict[str, Dict[str, Any]] = {} _ticker_24h_updated_at: float = 0.0 -_TICKER_24H_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个,避免无限增长 +_TICKER_24H_CACHE_MAX_KEYS = 500 # 仅无 Redis 时使用 def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]: - """返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。""" + """无 Redis 降级时返回进程内缓存;有 Redis 时应使用 get_tickers_24h_from_redis。""" return dict(_ticker_24h_cache) def get_tickers_24h_cache_updated_at() -> float: - """返回缓存最后更新时间(monotonic)。未更新过为 0。""" + """返回缓存最后更新时间(由 refresh 从 Redis 回写 updated_at)。""" return _ticker_24h_updated_at def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool: - """缓存是否在 max_age_sec 秒内更新过且非空。""" - if not _ticker_24h_cache: - return False - return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec + """是否有可用数据(有 Redis 时由 refresh 更新 _ticker_24h_updated_at 为 Redis 的 time.time)。""" + return _ticker_24h_updated_at > 0 and (time.time() - _ticker_24h_updated_at) <= max_age_sec + + +async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, Any]]: + """从 Redis 按需读取全量 24h ticker,进程内不保留,减轻内存。""" + if not redis_cache: + return get_tickers_24h_cache() + try: + data = await redis_cache.get(KEY_TICKER_24H) + return dict(data) if isinstance(data, dict) else {} + except Exception as e: + logger.debug("get_tickers_24h_from_redis: %s", e) + return get_tickers_24h_cache() class Ticker24hStream: @@ -169,7 +180,7 @@ class Ticker24hStream: logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None: - """从 Redis 读出、合并新数据、写回,不占用进程内存常驻""" + """从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量""" try: if not self._redis_cache: return @@ -177,28 +188,28 @@ class Ticker24hStream: merged = dict(existing) if isinstance(existing, dict) else {} merged.update(new_items) await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120) + await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e) async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: - """定期从 Redis 拉取 24h ticker 到本地缓存(Leader 与非 Leader 都跑,避免进程内常驻全量)。""" - global _ticker_24h_cache, _ticker_24h_updated_at + """只从 Redis 拉取「更新时间」,不拉全量数据,进程内不保留 500 条以省内存。""" + global _ticker_24h_updated_at if redis_cache is None: return while True: try: await asyncio.sleep(interval_sec) - data = await redis_cache.get(KEY_TICKER_24H) - if data and isinstance(data, dict): - _ticker_24h_cache.update(data) - if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS: - keys = list(_ticker_24h_cache.keys()) - for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]: - del _ticker_24h_cache[k] - _ticker_24h_updated_at = time.monotonic() - logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(_ticker_24h_cache)) + raw = await redis_cache.get(KEY_TICKER_24H_UPDATED_AT) + if raw is not None: + try: + t = float(raw) + _ticker_24h_updated_at = t if t > 0 else time.monotonic() + except (TypeError, ValueError): + _ticker_24h_updated_at = time.monotonic() + logger.debug("Ticker24h: 已同步 Redis 更新时间(进程内不缓存全量)") except asyncio.CancelledError: break except Exception as e: - logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e) + logger.debug("Ticker24h: 从 Redis 刷新更新时间失败 %s", e)