在多个流处理模块中引入 `_task_done_callback` 函数,确保在异步任务完成后能够捕获并记录异常,避免未处理的任务异常导致的潜在问题。此改动提升了系统的稳定性和错误处理能力,确保在执行异步操作时能够更好地管理任务状态。
148 lines
4.4 KiB
Python
148 lines
4.4 KiB
Python
"""
|
||
市场行情 WS 多进程/多账户共用:选主 + 共享缓存。
|
||
Leader 进程负责维持 Ticker24h / BookTicker / KlineStream 三条连接并写入 Redis;
|
||
非 Leader 进程只从 Redis 读取,不建连接。需配合 Redis 使用。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import socket
|
||
import time
|
||
from typing import Optional, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Redis 键
|
||
KEY_LEADER = "market_ws_leader"
|
||
KEY_TICKER_24H = "market:ticker_24h"
|
||
KEY_BOOK_TICKER = "market:book_ticker"
|
||
KEY_KLINE_PREFIX = "market:kline:"
|
||
LEADER_TTL_SEC = 30
|
||
LEADER_RENEW_INTERVAL_SEC = 15
|
||
|
||
_is_leader: bool = False
|
||
_leader_task: Optional[asyncio.Task] = None
|
||
|
||
|
||
def _task_done_callback(task: asyncio.Task) -> None:
|
||
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||
try:
|
||
task.result()
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception as e:
|
||
logger.debug("market_ws_leader background task error: %s", e)
|
||
|
||
|
||
def _leader_identity() -> dict:
|
||
return {
|
||
"pid": os.getpid(),
|
||
"host": socket.gethostname() or "unknown",
|
||
"ts": time.time(),
|
||
}
|
||
|
||
|
||
def is_market_ws_leader() -> bool:
|
||
"""当前进程是否为市场 WS 的 Leader(仅本地标记,用于决定是否启动三条流)。"""
|
||
return _is_leader
|
||
|
||
|
||
async def try_acquire_market_ws_leader(redis_cache: Any) -> bool:
|
||
"""
|
||
尝试成为市场 WS Leader。使用 Redis SET NX EX。
|
||
若成功,当前进程应启动 Ticker24h / BookTicker / KlineStream 并写入 Redis。
|
||
"""
|
||
global _is_leader
|
||
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
||
return False
|
||
try:
|
||
raw = await redis_cache.redis.set(
|
||
KEY_LEADER,
|
||
json.dumps(_leader_identity()),
|
||
ex=LEADER_TTL_SEC,
|
||
nx=True,
|
||
)
|
||
_is_leader = raw is True
|
||
if _is_leader:
|
||
logger.info("✓ 本进程已当选市场 WS Leader(Ticker24h/BookTicker/Kline 共用连接)")
|
||
return _is_leader
|
||
except Exception as e:
|
||
logger.debug("尝试获取市场 WS Leader 失败: %s", e)
|
||
return False
|
||
|
||
|
||
async def renew_market_ws_leader(redis_cache: Any) -> bool:
|
||
"""Leader 定期续期,避免 TTL 到期被抢。"""
|
||
if not _is_leader or redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
||
return False
|
||
try:
|
||
await redis_cache.redis.set(
|
||
KEY_LEADER,
|
||
json.dumps(_leader_identity()),
|
||
ex=LEADER_TTL_SEC,
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
logger.debug("续期市场 WS Leader 失败: %s", e)
|
||
return False
|
||
|
||
|
||
def release_market_ws_leader(redis_cache: Any) -> None:
|
||
"""主动释放 Leader(可选,便于其他进程尽快接管)。"""
|
||
global _is_leader
|
||
_is_leader = False
|
||
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
||
return
|
||
try:
|
||
t = asyncio.create_task(_delete_leader_key(redis_cache))
|
||
t.add_done_callback(_task_done_callback)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
async def _delete_leader_key(redis_cache: Any) -> None:
|
||
try:
|
||
await redis_cache.redis.delete(KEY_LEADER)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
async def run_leader_renew_loop(redis_cache: Any) -> None:
|
||
"""在 Leader 进程中运行的续期循环;非 Leader 直接返回。"""
|
||
global _leader_task
|
||
if not _is_leader:
|
||
return
|
||
if _leader_task is not None:
|
||
return
|
||
|
||
async def _loop():
|
||
while _is_leader:
|
||
await asyncio.sleep(LEADER_RENEW_INTERVAL_SEC)
|
||
if not _is_leader:
|
||
break
|
||
ok = await renew_market_ws_leader(redis_cache)
|
||
if not ok:
|
||
logger.warning("市场 WS Leader 续期失败,可能被其他进程接管")
|
||
|
||
_leader_task = asyncio.create_task(_loop())
|
||
logger.debug("市场 WS Leader 续期任务已启动")
|
||
|
||
|
||
def stop_leader_renew_loop() -> None:
|
||
global _leader_task
|
||
if _leader_task is not None:
|
||
_leader_task.cancel()
|
||
_leader_task = None
|
||
|
||
|
||
def use_shared_market_ws(redis_cache: Any) -> bool:
|
||
"""是否启用「共用市场 WS」:有 Redis 且已配置启用时返回 True。"""
|
||
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
||
return False
|
||
try:
|
||
from . import config
|
||
return bool(getattr(config, "USE_SHARED_MARKET_WS", True))
|
||
except Exception:
|
||
return False
|