diff --git a/trading_system/book_ticker_stream.py b/trading_system/book_ticker_stream.py index 480ed37..14feb4e 100644 --- a/trading_system/book_ticker_stream.py +++ b/trading_system/book_ticker_stream.py @@ -12,6 +12,17 @@ from typing import Dict, Optional, Any logger = logging.getLogger(__name__) + +def _task_done_callback(task: asyncio.Task) -> None: + """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("BookTickerStream background task error: %s", e) + + try: from .market_ws_leader import KEY_BOOK_TICKER except ImportError: @@ -212,9 +223,10 @@ class BookTickerStream: # 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增 if self._redis_cache: try: - asyncio.get_event_loop().create_task( + t = asyncio.create_task( self._merge_and_write_book_ticker_to_redis_serialized(s, item) ) + t.add_done_callback(_task_done_callback) except Exception as e: logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e) return diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 8d09c53..3d38280 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -12,6 +12,17 @@ from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) + +def _task_done_callback(task: asyncio.Task) -> None: + """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("KlineStream background task error: %s", e) + + try: from .market_ws_leader import KEY_KLINE_PREFIX except ImportError: @@ -250,7 +261,8 @@ class KlineStream: pass # ⚠️ 优化:异步处理消息,避免阻塞事件循环 # 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升 - asyncio.create_task(self._handle_message_with_limit(raw)) + t = asyncio.create_task(self._handle_message_with_limit(raw)) + t.add_done_callback(_task_done_callback) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): break except asyncio.CancelledError: @@ -461,7 +473,8 @@ class KlineStream: 同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async) """ # 为了兼容性保留,但实际应该使用异步版本 - asyncio.create_task(self._handle_message_async(raw)) + t = asyncio.create_task(self._handle_message_async(raw)) + t.add_done_callback(_task_done_callback) async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None: try: diff --git a/trading_system/market_ws_leader.py b/trading_system/market_ws_leader.py index e715e02..1d91a53 100644 --- a/trading_system/market_ws_leader.py +++ b/trading_system/market_ws_leader.py @@ -25,6 +25,16 @@ _is_leader: bool = False _leader_task: Optional[asyncio.Task] = None +def _task_done_callback(task: asyncio.Task) -> None: + """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("market_ws_leader background task error: %s", e) + + def _leader_identity() -> dict: return { "pid": os.getpid(), @@ -85,7 +95,8 @@ def release_market_ws_leader(redis_cache: Any) -> None: if redis_cache is None or getattr(redis_cache, "redis", None) is None: return try: - asyncio.get_event_loop().create_task(_delete_leader_key(redis_cache)) + t = asyncio.create_task(_delete_leader_key(redis_cache)) + t.add_done_callback(_task_done_callback) except Exception: pass diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py index ad4b88a..2c85c6c 100644 --- a/trading_system/ticker_24h_stream.py +++ b/trading_system/ticker_24h_stream.py @@ -12,6 +12,17 @@ from typing import Dict, Optional, Any logger = logging.getLogger(__name__) + +def _task_done_callback(task: asyncio.Task) -> None: + """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("Ticker24hStream background task error: %s", e) + + try: from .market_ws_leader import KEY_TICKER_24H except ImportError: @@ -175,9 +186,10 @@ class Ticker24hStream: # 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增 if self._redis_cache: try: - asyncio.get_event_loop().create_task( + t = asyncio.create_task( self._merge_and_write_ticker_24h_to_redis_serialized(new_items) ) + t.add_done_callback(_task_done_callback) except Exception as e: logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e) return diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index f16faac..d462a10 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -13,6 +13,17 @@ from typing import Dict, List, Optional, Any logger = logging.getLogger(__name__) + +def _task_done_callback(task: asyncio.Task) -> None: + """消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'""" + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("Background task finished with error: %s", e) + + # 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions _position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...) # 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量 @@ -632,7 +643,8 @@ class UserDataStream: } if redis_cache: if asset == "USDT": - asyncio.create_task(self._write_balance_to_redis(asset, balance_data)) + t = asyncio.create_task(self._write_balance_to_redis(asset, balance_data)) + t.add_done_callback(_task_done_callback) else: _balance_updates_cache[asset] = balance_data logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}") @@ -661,7 +673,8 @@ class UserDataStream: except Exception: pass if redis_cache and positions_list: - asyncio.create_task(self._write_positions_to_redis(positions_list)) + t = asyncio.create_task(self._write_positions_to_redis(positions_list)) + t.add_done_callback(_task_done_callback) logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}") async def _write_balance_to_redis(self, asset: str, balance_data: Dict):