auto_trade_sys/trading_system/market_ws_leader.py
薇薇安 d42cee2f1a feat(async_handling): 添加任务完成回调以处理异步任务异常
在多个流处理模块中引入 `_task_done_callback` 函数,确保在异步任务完成后能够捕获并记录异常,避免未处理的任务异常导致的潜在问题。此改动提升了系统的稳定性和错误处理能力,确保在执行异步操作时能够更好地管理任务状态。
2026-02-23 15:43:13 +08:00

148 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场行情 WS 多进程/多账户共用:选主 + 共享缓存。
Leader 进程负责维持 Ticker24h / BookTicker / KlineStream 三条连接并写入 Redis
非 Leader 进程只从 Redis 读取,不建连接。需配合 Redis 使用。
"""
import asyncio
import json
import logging
import os
import socket
import time
from typing import Optional, Any
logger = logging.getLogger(__name__)
# Redis 键
KEY_LEADER = "market_ws_leader"
KEY_TICKER_24H = "market:ticker_24h"
KEY_BOOK_TICKER = "market:book_ticker"
KEY_KLINE_PREFIX = "market:kline:"
LEADER_TTL_SEC = 30
LEADER_RENEW_INTERVAL_SEC = 15
_is_leader: bool = False
_leader_task: Optional[asyncio.Task] = None
def _task_done_callback(task: asyncio.Task) -> None:
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug("market_ws_leader background task error: %s", e)
def _leader_identity() -> dict:
return {
"pid": os.getpid(),
"host": socket.gethostname() or "unknown",
"ts": time.time(),
}
def is_market_ws_leader() -> bool:
"""当前进程是否为市场 WS 的 Leader仅本地标记用于决定是否启动三条流"""
return _is_leader
async def try_acquire_market_ws_leader(redis_cache: Any) -> bool:
"""
尝试成为市场 WS Leader。使用 Redis SET NX EX。
若成功,当前进程应启动 Ticker24h / BookTicker / KlineStream 并写入 Redis。
"""
global _is_leader
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
raw = await redis_cache.redis.set(
KEY_LEADER,
json.dumps(_leader_identity()),
ex=LEADER_TTL_SEC,
nx=True,
)
_is_leader = raw is True
if _is_leader:
logger.info("✓ 本进程已当选市场 WS LeaderTicker24h/BookTicker/Kline 共用连接)")
return _is_leader
except Exception as e:
logger.debug("尝试获取市场 WS Leader 失败: %s", e)
return False
async def renew_market_ws_leader(redis_cache: Any) -> bool:
"""Leader 定期续期,避免 TTL 到期被抢。"""
if not _is_leader or redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
await redis_cache.redis.set(
KEY_LEADER,
json.dumps(_leader_identity()),
ex=LEADER_TTL_SEC,
)
return True
except Exception as e:
logger.debug("续期市场 WS Leader 失败: %s", e)
return False
def release_market_ws_leader(redis_cache: Any) -> None:
"""主动释放 Leader可选便于其他进程尽快接管"""
global _is_leader
_is_leader = False
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return
try:
t = asyncio.create_task(_delete_leader_key(redis_cache))
t.add_done_callback(_task_done_callback)
except Exception:
pass
async def _delete_leader_key(redis_cache: Any) -> None:
try:
await redis_cache.redis.delete(KEY_LEADER)
except Exception:
pass
async def run_leader_renew_loop(redis_cache: Any) -> None:
"""在 Leader 进程中运行的续期循环;非 Leader 直接返回。"""
global _leader_task
if not _is_leader:
return
if _leader_task is not None:
return
async def _loop():
while _is_leader:
await asyncio.sleep(LEADER_RENEW_INTERVAL_SEC)
if not _is_leader:
break
ok = await renew_market_ws_leader(redis_cache)
if not ok:
logger.warning("市场 WS Leader 续期失败,可能被其他进程接管")
_leader_task = asyncio.create_task(_loop())
logger.debug("市场 WS Leader 续期任务已启动")
def stop_leader_renew_loop() -> None:
global _leader_task
if _leader_task is not None:
_leader_task.cancel()
_leader_task = None
def use_shared_market_ws(redis_cache: Any) -> bool:
"""是否启用「共用市场 WS」有 Redis 且已配置启用时返回 True。"""
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
from . import config
return bool(getattr(config, "USE_SHARED_MARKET_WS", True))
except Exception:
return False