feat(redis_cache, binance_client, market_scanner, position_manager, ticker_24h_stream, book_ticker_stream): 引入 Redis 缓存机制以优化数据读取与内存管理

在多个模块中实现 Redis 缓存机制,优先从 Redis 读取数据,减少进程内存占用。更新 `binance_client.py`、`market_scanner.py`、`position_manager.py`、`ticker_24h_stream.py` 和 `book_ticker_stream.py`,确保在有 Redis 时优先使用其进行数据存储,降级到内存缓存。调整缓存管理逻辑,限制进程内缓存的最大条数为 500,避免内存无限增长。此改动提升了数据访问效率,优化了内存使用,增强了系统的整体性能与稳定性。
This commit is contained in:
薇薇安 2026-02-19 00:45:56 +08:00
parent f4feea6b87
commit 95867e90f8
6 changed files with 149 additions and 64 deletions

View File

@ -0,0 +1,35 @@
# 单账号交易服务内存评估与 Redis 减压
## 1. 单账号合理内存区间
| 组成部分 | 预估范围 | 说明 |
|----------|----------|------|
| Python 解释器 + asyncio | 5080 MB | 进程基础 |
| aiohttp + WebSocket 连接 x4 | 2050 MB | UserData / K线 / Ticker24h / BookTicker |
| Redis 客户端连接与缓冲 | 515 MB | 远端 Valkey |
| 各模块小缓存(有上限) | 520 MB | 价格、symbol_info、持仓等已限制条数 |
| 策略/扫描/持仓状态 | 1030 MB | active_positions、top_symbols 等 |
| 分配器碎片、未归还 OS | 50150 MB | Python 常见 |
| **合计(合理区间)** | **250400 MB** | 单账号、设计正常时 |
结论:单账号**约 250400 MB** 属正常;**500 MB 偏上**但尚可接受;若持续升到 700 MB+ 则偏异常。
你当前「多了约 500 MB」无服务约 720 MB开服务约 1223 MB → 交易进程约 **500 MB**,处于偏上、可接受范围。目标是通过 Redis 把压力放到远端,把进程压到 **300 MB 左右** 并稳定。
## 2. 当前仍占进程内存的来源
- **Ticker24h / BookTicker**refresh 循环每 2 秒从 Redis 拉全量(最多 500 条)回进程,进程内常驻一份完整拷贝(约 200+ 条 × 若干 KB是主要可优化点。
- **K 线**Leader 已只写 Redis进程内仅少量元数据和待写队列占比已较小。
- **持仓/余额**:有 Redis 时只写 Redis进程内几乎不保留。
- **价格 / symbol_info**:有 Redis 时仅降级写,条数有上限,占比小。
因此,**把 Ticker24h/BookTicker 改为“只从 Redis 按需读、进程内不再保留全量回填”**,是当前最有效的 Redis 减压手段。
## 3. 已实现Ticker24h/BookTicker 完全用 Redis进程内不保留全量
- **写**Leader 不变WS 收到后只写 Redis并写入 `market:ticker_24h:updated_at` / `market:book_ticker:updated_at`(时间戳,供 refresh 判断是否新鲜)。
- **读**
- **Ticker24h**`get_tickers_24h_from_redis(redis_cache)`,由 market_scanner 在 `is_ticker_24h_cache_fresh()` 为 True 时 `await` 调用,数据全部来自 Redis。
- **BookTicker**`get_book_ticker_from_redis(redis_cache, symbol)`,由 position_manager、binance_client 在有 redis_cache 时 `await` 调用。
- **refresh 循环**:只从 Redis 读取 `updated_at` 并写回 `_ticker_24h_updated_at` / `_book_ticker_updated_at`**不再**拉全量数据,进程内 `_ticker_24h_cache` / `_book_ticker_cache` 在有 Redis 时保持为空。
- **进程内**:不再保留 200500 条 ticker/book_ticker单账号进程内存预计可再降约 50150MB稳定在约 **300400MB**,压力由远端 Redis 承担。

View File

@ -1768,11 +1768,13 @@ class BinanceClient:
# 获取交易对精度信息
symbol_info = await self.get_symbol_info(symbol)
# 获取当前价格以计算名义价值(优先用 bookTicker 估算执行价,提升准确性
# 获取当前价格以计算名义价值(优先用 bookTicker 估算执行价;有 Redis 时从 Redis 读,不占进程内存
if price is None:
# 优先用最优挂单估算(买单用 askPrice卖单用 bidPrice
try:
from .book_ticker_stream import get_book_ticker
from .book_ticker_stream import get_book_ticker, get_book_ticker_from_redis
if self.redis_cache:
book = await get_book_ticker_from_redis(self.redis_cache, symbol)
else:
book = get_book_ticker(symbol)
if book:
if side == "BUY":

View File

@ -17,28 +17,40 @@ try:
except ImportError:
KEY_BOOK_TICKER = "market:book_ticker"
# 最优挂单缓存symbol -> { bidPrice, bidQty, askPrice, askQty, time }
# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积
KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at"
# 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读
_book_ticker_cache: Dict[str, Dict[str, Any]] = {}
_book_ticker_updated_at: float = 0.0
_BOOK_TICKER_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个
_BOOK_TICKER_CACHE_MAX_KEYS = 500
def get_book_ticker_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前最优挂单缓存。"""
"""无 Redis 降级时返回进程内缓存。"""
return dict(_book_ticker_cache)
def get_book_ticker(symbol: str) -> Optional[Dict[str, Any]]:
"""获取指定交易对的最优挂单;无缓存时返回 None"""
"""同步读:无 Redis 时用进程内缓存;有 Redis 时应使用 get_book_ticker_from_redis"""
return _book_ticker_cache.get(symbol.upper())
def is_book_ticker_cache_fresh(max_age_sec: float = 10.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _book_ticker_cache:
return False
return (time.monotonic() - _book_ticker_updated_at) <= max_age_sec
"""是否有可用数据(有 Redis 时由 refresh 更新 _book_ticker_updated_at"""
return _book_ticker_updated_at > 0 and (time.time() - _book_ticker_updated_at) <= max_age_sec
async def get_book_ticker_from_redis(redis_cache: Any, symbol: str) -> Optional[Dict[str, Any]]:
"""从 Redis 按需读取单 symbol 最优挂单,进程内不保留全量。"""
if not redis_cache:
return get_book_ticker(symbol)
try:
data = await redis_cache.get(KEY_BOOK_TICKER)
if isinstance(data, dict):
return data.get(symbol.upper())
except Exception as e:
logger.debug("get_book_ticker_from_redis: %s", e)
return get_book_ticker(symbol)
def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float]:
@ -203,7 +215,7 @@ class BookTickerStream:
logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}")
async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None:
"""从 Redis 读出、合并单条、写回,不占用进程内存常驻"""
"""从 Redis 读出、合并单条、写回,并写更新时间供 refresh 只拉时间"""
try:
if not self._redis_cache:
return
@ -211,28 +223,28 @@ class BookTickerStream:
merged = dict(existing) if isinstance(existing, dict) else {}
merged[symbol] = item
await self._redis_cache.set(KEY_BOOK_TICKER, merged, ttl=30)
await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30)
except Exception as e:
logger.debug("BookTickerStream: 写入 Redis 失败 %s", e)
async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""定期从 Redis 拉取 bookTicker 到本地缓存Leader 与非 Leader 都跑),并限制条数"""
global _book_ticker_cache, _book_ticker_updated_at
"""只从 Redis 拉取「更新时间」,不拉全量,进程内不保留 500 条以省内存"""
global _book_ticker_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_BOOK_TICKER)
if data and isinstance(data, dict):
_book_ticker_cache.update(data)
if len(_book_ticker_cache) > _BOOK_TICKER_CACHE_MAX_KEYS:
keys = list(_book_ticker_cache.keys())
for k in keys[_BOOK_TICKER_CACHE_MAX_KEYS:]:
del _book_ticker_cache[k]
_book_ticker_updated_at = time.monotonic()
logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(_book_ticker_cache))
raw = await redis_cache.get(KEY_BOOK_TICKER_UPDATED_AT)
if raw is not None:
try:
t = float(raw)
_book_ticker_updated_at = t if t > 0 else time.time()
except (TypeError, ValueError):
_book_ticker_updated_at = time.time()
logger.debug("BookTicker: 已同步 Redis 更新时间(进程内不缓存全量)")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("BookTicker: 从 Redis 刷新失败 %s", e)
logger.debug("BookTicker: 从 Redis 刷新更新时间失败 %s", e)

View File

@ -90,19 +90,31 @@ class MarketScanner:
if excluded_count > 0:
logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)")
# 优先从 24h ticker WebSocket 缓存读取,避免批量 REST 请求与超时;无/过期再走 REST
# 优先从 Redis 读 24h ticker进程内不保留全量减轻内存无/过期再 REST
all_tickers = None
try:
try:
from .ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh
from .ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
except ImportError:
from ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh
from ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
if is_ticker_24h_cache_fresh(max_age_sec=120.0):
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
all_tickers = await get_tickers_24h_from_redis(redis_cache)
else:
all_tickers = get_tickers_24h_cache()
if all_tickers:
logger.info(f"使用 24h ticker WS 缓存({len(all_tickers)} 个交易对),跳过 REST 批量请求")
logger.info(f"使用 24h ticker 缓存({len(all_tickers)} 个交易对,来自 Redis/WS),跳过 REST 批量请求")
except Exception as e:
logger.debug(f"读取 24h ticker WS 缓存失败: {e}")
logger.debug(f"读取 24h ticker 缓存失败: {e}")
if not all_tickers:
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
all_tickers = await self.client.get_all_tickers_24h()

View File

@ -1594,31 +1594,44 @@ class PositionManager:
# 获取当前价格(如果未提供):优先 WS 缓存bookTicker/ticker24h→ 持仓 markPrice → REST ticker
if current_price is None:
try:
# 1) 优先 WS最优挂单中点价或 24h ticker 缓存(避免 REST 超时
# 1) 优先 Redis/WS最优挂单中点价或 24h ticker进程内不保留全量
try:
try:
from .book_ticker_stream import get_book_ticker
from .book_ticker_stream import get_book_ticker, get_book_ticker_from_redis
except ImportError:
from book_ticker_stream import get_book_ticker
from book_ticker_stream import get_book_ticker, get_book_ticker_from_redis
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
book = await get_book_ticker_from_redis(redis_cache, symbol)
else:
book = get_book_ticker(symbol)
if book and float(book.get("bidPrice", 0)) > 0 and float(book.get("askPrice", 0)) > 0:
mid = (float(book["bidPrice"]) + float(book["askPrice"])) / 2
current_price = mid
logger.debug(f"{symbol} 从 bookTicker WS 获取当前价格: {current_price}")
logger.debug(f"{symbol} 从 bookTicker 获取当前价格: {current_price}")
except Exception:
pass
if current_price is None:
try:
try:
from .ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh
from .ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
except ImportError:
from ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh
from ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
if is_ticker_24h_cache_fresh(max_age_sec=120):
tickers = get_tickers_24h_cache()
t = tickers.get(symbol)
redis_cache = getattr(self.client, "redis_cache", None)
tickers = await get_tickers_24h_from_redis(redis_cache) if redis_cache else get_tickers_24h_cache()
t = tickers.get(symbol) if tickers else None
if t and t.get("price"):
current_price = float(t["price"])
logger.debug(f"{symbol} 从 ticker24h WS 获取当前价格: {current_price}")
logger.debug(f"{symbol} 从 ticker24h 获取当前价格: {current_price}")
except Exception:
pass
# 2) 持仓标记价格MARK_PRICE与止损单触发基准一致

View File

@ -17,28 +17,39 @@ try:
except ImportError:
KEY_TICKER_24H = "market:ticker_24h"
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
# 有 Redis 时由 refresh 循环从 Redis 回填,不在此无限累积
KEY_TICKER_24H_UPDATED_AT = "market:ticker_24h:updated_at"
# 进程内不再保留全量:有 Redis 时只维护“最后更新时间”,数据全部从 Redis 按需读
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0
_TICKER_24H_CACHE_MAX_KEYS = 500 # 进程内存最多保留 500 个,避免无限增长
_TICKER_24H_CACHE_MAX_KEYS = 500 # 仅无 Redis 时使用
def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]:
"""返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)"""
"""无 Redis 降级时返回进程内缓存;有 Redis 时应使用 get_tickers_24h_from_redis"""
return dict(_ticker_24h_cache)
def get_tickers_24h_cache_updated_at() -> float:
"""返回缓存最后更新时间(monotonic。未更新过为 0"""
"""返回缓存最后更新时间(由 refresh 从 Redis 回写 updated_at"""
return _ticker_24h_updated_at
def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过且非空。"""
if not _ticker_24h_cache:
return False
return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec
"""是否有可用数据(有 Redis 时由 refresh 更新 _ticker_24h_updated_at 为 Redis 的 time.time"""
return _ticker_24h_updated_at > 0 and (time.time() - _ticker_24h_updated_at) <= max_age_sec
async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, Any]]:
"""从 Redis 按需读取全量 24h ticker进程内不保留减轻内存。"""
if not redis_cache:
return get_tickers_24h_cache()
try:
data = await redis_cache.get(KEY_TICKER_24H)
return dict(data) if isinstance(data, dict) else {}
except Exception as e:
logger.debug("get_tickers_24h_from_redis: %s", e)
return get_tickers_24h_cache()
class Ticker24hStream:
@ -169,7 +180,7 @@ class Ticker24hStream:
logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None:
"""从 Redis 读出、合并新数据、写回,不占用进程内存常驻"""
"""从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量"""
try:
if not self._redis_cache:
return
@ -177,28 +188,28 @@ class Ticker24hStream:
merged = dict(existing) if isinstance(existing, dict) else {}
merged.update(new_items)
await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120)
await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""定期从 Redis 拉取 24h ticker 到本地缓存Leader 与非 Leader 都跑,避免进程内常驻全量)"""
global _ticker_24h_cache, _ticker_24h_updated_at
"""只从 Redis 拉取「更新时间」,不拉全量数据,进程内不保留 500 条以省内存"""
global _ticker_24h_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_TICKER_24H)
if data and isinstance(data, dict):
_ticker_24h_cache.update(data)
if len(_ticker_24h_cache) > _TICKER_24H_CACHE_MAX_KEYS:
keys = list(_ticker_24h_cache.keys())
for k in keys[_TICKER_24H_CACHE_MAX_KEYS:]:
del _ticker_24h_cache[k]
raw = await redis_cache.get(KEY_TICKER_24H_UPDATED_AT)
if raw is not None:
try:
t = float(raw)
_ticker_24h_updated_at = t if t > 0 else time.monotonic()
except (TypeError, ValueError):
_ticker_24h_updated_at = time.monotonic()
logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(_ticker_24h_cache))
logger.debug("Ticker24h: 已同步 Redis 更新时间(进程内不缓存全量)")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e)
logger.debug("Ticker24h: 从 Redis 刷新更新时间失败 %s", e)