feat(book_ticker_stream, ticker_24h_stream): 引入串行化锁以优化 Redis 写入逻辑
在 `book_ticker_stream.py` 和 `ticker_24h_stream.py` 中新增了串行化锁,确保在写入 Redis 时避免并发合并导致内存膨胀。更新了合并逻辑,限制 Redis 中 USDT 交易对的数量,防止键值无限增长。此改进提升了内存管理与系统稳定性。
This commit is contained in:
parent
f1e2cabc01
commit
22901abe39
|
|
@ -19,6 +19,18 @@ except ImportError:
|
|||
|
||||
KEY_BOOK_TICKER_UPDATED_AT = "market:book_ticker:updated_at"
|
||||
|
||||
# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积
|
||||
_redis_book_ticker_merge_lock: Optional[asyncio.Lock] = None
|
||||
_REDIS_BOOK_TICKER_MAX_KEYS = 600
|
||||
|
||||
|
||||
def _get_book_ticker_merge_lock() -> asyncio.Lock:
|
||||
global _redis_book_ticker_merge_lock
|
||||
if _redis_book_ticker_merge_lock is None:
|
||||
_redis_book_ticker_merge_lock = asyncio.Lock()
|
||||
return _redis_book_ticker_merge_lock
|
||||
|
||||
|
||||
# 进程内不再保留全量:有 Redis 时只维护「最后更新时间」,数据从 Redis 按需读
|
||||
_book_ticker_cache: Dict[str, Dict[str, Any]] = {}
|
||||
_book_ticker_updated_at: float = 0.0
|
||||
|
|
@ -197,11 +209,11 @@ class BookTickerStream:
|
|||
except (TypeError, ValueError):
|
||||
return
|
||||
|
||||
# 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环回填)
|
||||
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
||||
if self._redis_cache:
|
||||
try:
|
||||
asyncio.get_event_loop().create_task(
|
||||
self._merge_and_write_book_ticker_to_redis(s, item)
|
||||
self._merge_and_write_book_ticker_to_redis_serialized(s, item)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
|
||||
|
|
@ -214,15 +226,25 @@ class BookTickerStream:
|
|||
del _book_ticker_cache[k]
|
||||
logger.debug(f"BookTickerStream: 已更新 {s} bid={item['bidPrice']:.4f} ask={item['askPrice']:.4f}")
|
||||
|
||||
async def _merge_and_write_book_ticker_to_redis_serialized(self, symbol: str, item: Dict[str, Any]) -> None:
|
||||
"""串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。"""
|
||||
lock = _get_book_ticker_merge_lock()
|
||||
async with lock:
|
||||
await self._merge_and_write_book_ticker_to_redis(symbol, item)
|
||||
|
||||
async def _merge_and_write_book_ticker_to_redis(self, symbol: str, item: Dict[str, Any]) -> None:
|
||||
"""从 Redis 读出、合并单条、写回,并写更新时间供 refresh 只拉时间"""
|
||||
"""从 Redis 读出、合并单条、写回,并写更新时间。限制 key 数量防 Redis 膨胀。"""
|
||||
try:
|
||||
if not self._redis_cache:
|
||||
return
|
||||
existing = await self._redis_cache.get(KEY_BOOK_TICKER)
|
||||
merged = dict(existing) if isinstance(existing, dict) else {}
|
||||
merged[symbol] = item
|
||||
await self._redis_cache.set(KEY_BOOK_TICKER, merged, ttl=30)
|
||||
usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")}
|
||||
if len(usdt_only) > _REDIS_BOOK_TICKER_MAX_KEYS:
|
||||
keys = list(usdt_only.keys())[-_REDIS_BOOK_TICKER_MAX_KEYS:]
|
||||
usdt_only = {k: usdt_only[k] for k in keys}
|
||||
await self._redis_cache.set(KEY_BOOK_TICKER, usdt_only, ttl=30)
|
||||
await self._redis_cache.set(KEY_BOOK_TICKER_UPDATED_AT, time.time(), ttl=30)
|
||||
except Exception as e:
|
||||
logger.debug("BookTickerStream: 写入 Redis 失败 %s", e)
|
||||
|
|
|
|||
|
|
@ -52,6 +52,18 @@ async def get_tickers_24h_from_redis(redis_cache: Any) -> Dict[str, Dict[str, An
|
|||
return get_tickers_24h_cache()
|
||||
|
||||
|
||||
# Leader 写 Redis 时串行化,避免并发 merge 导致多份全量缓存在内存堆积(单进程 800MB+)
|
||||
_REDIS_MERGE_LOCK: Optional[asyncio.Lock] = None
|
||||
_REDIS_TICKER_24H_MAX_KEYS = 600 # Redis 内最多保留 USDT 数量,防止 key 无限增长
|
||||
|
||||
|
||||
def _get_ticker_24h_merge_lock() -> asyncio.Lock:
|
||||
global _REDIS_MERGE_LOCK
|
||||
if _REDIS_MERGE_LOCK is None:
|
||||
_REDIS_MERGE_LOCK = asyncio.Lock()
|
||||
return _REDIS_MERGE_LOCK
|
||||
|
||||
|
||||
class Ticker24hStream:
|
||||
"""订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。"""
|
||||
|
||||
|
|
@ -160,11 +172,11 @@ class Ticker24hStream:
|
|||
}
|
||||
if not new_items:
|
||||
return
|
||||
# 有 Redis 时只写 Redis,不写进程内存(由 refresh 循环从 Redis 回填,避免双重占用)
|
||||
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
||||
if self._redis_cache:
|
||||
try:
|
||||
asyncio.get_event_loop().create_task(
|
||||
self._merge_and_write_ticker_24h_to_redis(new_items)
|
||||
self._merge_and_write_ticker_24h_to_redis_serialized(new_items)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
|
||||
|
|
@ -179,15 +191,26 @@ class Ticker24hStream:
|
|||
del _ticker_24h_cache[k]
|
||||
logger.debug(f"Ticker24hStream: 已更新 {len(new_items)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
|
||||
|
||||
async def _merge_and_write_ticker_24h_to_redis_serialized(self, new_items: Dict[str, Dict[str, Any]]) -> None:
|
||||
"""串行化:同一时刻仅一个 merge 执行,避免多份全量缓存在内存堆积。"""
|
||||
lock = _get_ticker_24h_merge_lock()
|
||||
async with lock:
|
||||
await self._merge_and_write_ticker_24h_to_redis(new_items)
|
||||
|
||||
async def _merge_and_write_ticker_24h_to_redis(self, new_items: Dict[str, Dict[str, Any]]) -> None:
|
||||
"""从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量"""
|
||||
"""从 Redis 读出、合并新数据、写回,并写更新时间供 refresh 只拉时间不拉全量。限制 key 数量防 Redis 膨胀。"""
|
||||
try:
|
||||
if not self._redis_cache:
|
||||
return
|
||||
existing = await self._redis_cache.get(KEY_TICKER_24H)
|
||||
merged = dict(existing) if isinstance(existing, dict) else {}
|
||||
merged.update(new_items)
|
||||
await self._redis_cache.set(KEY_TICKER_24H, merged, ttl=120)
|
||||
# 只保留 USDT 且限制数量,防止 key 无限增长
|
||||
usdt_only = {k: v for k, v in merged.items() if isinstance(k, str) and k.endswith("USDT")}
|
||||
if len(usdt_only) > _REDIS_TICKER_24H_MAX_KEYS:
|
||||
keys = list(usdt_only.keys())[-_REDIS_TICKER_24H_MAX_KEYS:]
|
||||
usdt_only = {k: usdt_only[k] for k in keys}
|
||||
await self._redis_cache.set(KEY_TICKER_24H, usdt_only, ttl=120)
|
||||
await self._redis_cache.set(KEY_TICKER_24H_UPDATED_AT, time.time(), ttl=120)
|
||||
except Exception as e:
|
||||
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user