""" 市场行情 WS 多进程/多账户共用:选主 + 共享缓存。 Leader 进程负责维持 Ticker24h / BookTicker / KlineStream 三条连接并写入 Redis; 非 Leader 进程只从 Redis 读取,不建连接。需配合 Redis 使用。 """ import asyncio import json import logging import os import socket import time from typing import Optional, Any logger = logging.getLogger(__name__) # Redis 键 KEY_LEADER = "market_ws_leader" KEY_TICKER_24H = "market:ticker_24h" KEY_BOOK_TICKER = "market:book_ticker" KEY_KLINE_PREFIX = "market:kline:" LEADER_TTL_SEC = 30 LEADER_RENEW_INTERVAL_SEC = 15 _is_leader: bool = False _leader_task: Optional[asyncio.Task] = None def _task_done_callback(task: asyncio.Task) -> None: """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" try: task.result() except asyncio.CancelledError: pass except Exception as e: logger.debug("market_ws_leader background task error: %s", e) def _leader_identity() -> dict: return { "pid": os.getpid(), "host": socket.gethostname() or "unknown", "ts": time.time(), } def is_market_ws_leader() -> bool: """当前进程是否为市场 WS 的 Leader(仅本地标记,用于决定是否启动三条流)。""" return _is_leader async def try_acquire_market_ws_leader(redis_cache: Any) -> bool: """ 尝试成为市场 WS Leader。使用 Redis SET NX EX。 若成功,当前进程应启动 Ticker24h / BookTicker / KlineStream 并写入 Redis。 """ global _is_leader if redis_cache is None or getattr(redis_cache, "redis", None) is None: return False try: raw = await redis_cache.redis.set( KEY_LEADER, json.dumps(_leader_identity()), ex=LEADER_TTL_SEC, nx=True, ) _is_leader = raw is True if _is_leader: logger.info("✓ 本进程已当选市场 WS Leader(Ticker24h/BookTicker/Kline 共用连接)") return _is_leader except Exception as e: logger.debug("尝试获取市场 WS Leader 失败: %s", e) return False async def renew_market_ws_leader(redis_cache: Any) -> bool: """Leader 定期续期,避免 TTL 到期被抢。""" if not _is_leader or redis_cache is None or getattr(redis_cache, "redis", None) is None: return False try: await redis_cache.redis.set( KEY_LEADER, json.dumps(_leader_identity()), ex=LEADER_TTL_SEC, ) return True except Exception as e: logger.debug("续期市场 WS Leader 失败: %s", e) return False def release_market_ws_leader(redis_cache: Any) -> None: """主动释放 Leader(可选,便于其他进程尽快接管)。""" global _is_leader _is_leader = False if redis_cache is None or getattr(redis_cache, "redis", None) is None: return try: t = asyncio.create_task(_delete_leader_key(redis_cache)) t.add_done_callback(_task_done_callback) except Exception: pass async def _delete_leader_key(redis_cache: Any) -> None: try: await redis_cache.redis.delete(KEY_LEADER) except Exception: pass async def run_leader_renew_loop(redis_cache: Any) -> None: """在 Leader 进程中运行的续期循环;非 Leader 直接返回。""" global _leader_task if not _is_leader: return if _leader_task is not None: return async def _loop(): while _is_leader: await asyncio.sleep(LEADER_RENEW_INTERVAL_SEC) if not _is_leader: break ok = await renew_market_ws_leader(redis_cache) if not ok: logger.warning("市场 WS Leader 续期失败,可能被其他进程接管") _leader_task = asyncio.create_task(_loop()) logger.debug("市场 WS Leader 续期任务已启动") def stop_leader_renew_loop() -> None: global _leader_task if _leader_task is not None: _leader_task.cancel() _leader_task = None def use_shared_market_ws(redis_cache: Any) -> bool: """是否启用「共用市场 WS」:有 Redis 且已配置启用时返回 True。""" if redis_cache is None or getattr(redis_cache, "redis", None) is None: return False try: from . import config return bool(getattr(config, "USE_SHARED_MARKET_WS", True)) except Exception: return False