feat(async_handling): 添加任务完成回调以处理异步任务异常
在多个流处理模块中引入 `_task_done_callback` 函数,确保在异步任务完成后能够捕获并记录异常,避免未处理的任务异常导致的潜在问题。此改动提升了系统的稳定性和错误处理能力,确保在执行异步操作时能够更好地管理任务状态。
This commit is contained in:
parent
cddcf35481
commit
d42cee2f1a
|
|
@ -12,6 +12,17 @@ from typing import Dict, Optional, Any
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _task_done_callback(task: asyncio.Task) -> None:
|
||||||
|
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("BookTickerStream background task error: %s", e)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .market_ws_leader import KEY_BOOK_TICKER
|
from .market_ws_leader import KEY_BOOK_TICKER
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -212,9 +223,10 @@ class BookTickerStream:
|
||||||
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
||||||
if self._redis_cache:
|
if self._redis_cache:
|
||||||
try:
|
try:
|
||||||
asyncio.get_event_loop().create_task(
|
t = asyncio.create_task(
|
||||||
self._merge_and_write_book_ticker_to_redis_serialized(s, item)
|
self._merge_and_write_book_ticker_to_redis_serialized(s, item)
|
||||||
)
|
)
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
|
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,17 @@ from typing import Dict, List, Optional, Any, Tuple
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _task_done_callback(task: asyncio.Task) -> None:
|
||||||
|
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("KlineStream background task error: %s", e)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .market_ws_leader import KEY_KLINE_PREFIX
|
from .market_ws_leader import KEY_KLINE_PREFIX
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -250,7 +261,8 @@ class KlineStream:
|
||||||
pass
|
pass
|
||||||
# ⚠️ 优化:异步处理消息,避免阻塞事件循环
|
# ⚠️ 优化:异步处理消息,避免阻塞事件循环
|
||||||
# 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升
|
# 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升
|
||||||
asyncio.create_task(self._handle_message_with_limit(raw))
|
t = asyncio.create_task(self._handle_message_with_limit(raw))
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
||||||
break
|
break
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
|
@ -461,7 +473,8 @@ class KlineStream:
|
||||||
同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async)
|
同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async)
|
||||||
"""
|
"""
|
||||||
# 为了兼容性保留,但实际应该使用异步版本
|
# 为了兼容性保留,但实际应该使用异步版本
|
||||||
asyncio.create_task(self._handle_message_async(raw))
|
t = asyncio.create_task(self._handle_message_async(raw))
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
|
|
||||||
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
|
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,16 @@ _is_leader: bool = False
|
||||||
_leader_task: Optional[asyncio.Task] = None
|
_leader_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _task_done_callback(task: asyncio.Task) -> None:
|
||||||
|
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("market_ws_leader background task error: %s", e)
|
||||||
|
|
||||||
|
|
||||||
def _leader_identity() -> dict:
|
def _leader_identity() -> dict:
|
||||||
return {
|
return {
|
||||||
"pid": os.getpid(),
|
"pid": os.getpid(),
|
||||||
|
|
@ -85,7 +95,8 @@ def release_market_ws_leader(redis_cache: Any) -> None:
|
||||||
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
asyncio.get_event_loop().create_task(_delete_leader_key(redis_cache))
|
t = asyncio.create_task(_delete_leader_key(redis_cache))
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,17 @@ from typing import Dict, Optional, Any
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _task_done_callback(task: asyncio.Task) -> None:
|
||||||
|
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Ticker24hStream background task error: %s", e)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .market_ws_leader import KEY_TICKER_24H
|
from .market_ws_leader import KEY_TICKER_24H
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -175,9 +186,10 @@ class Ticker24hStream:
|
||||||
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
# 有 Redis 时只写 Redis;串行化合并,避免多任务同时拉全量导致 Leader 进程内存暴增
|
||||||
if self._redis_cache:
|
if self._redis_cache:
|
||||||
try:
|
try:
|
||||||
asyncio.get_event_loop().create_task(
|
t = asyncio.create_task(
|
||||||
self._merge_and_write_ticker_24h_to_redis_serialized(new_items)
|
self._merge_and_write_ticker_24h_to_redis_serialized(new_items)
|
||||||
)
|
)
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
|
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,17 @@ from typing import Dict, List, Optional, Any
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _task_done_callback(task: asyncio.Task) -> None:
|
||||||
|
"""消费 fire-and-forget 任务异常,避免 'Task exception was never retrieved'"""
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Background task finished with error: %s", e)
|
||||||
|
|
||||||
|
|
||||||
# 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
|
# 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
|
||||||
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
|
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
|
||||||
# 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
|
# 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
|
||||||
|
|
@ -632,7 +643,8 @@ class UserDataStream:
|
||||||
}
|
}
|
||||||
if redis_cache:
|
if redis_cache:
|
||||||
if asset == "USDT":
|
if asset == "USDT":
|
||||||
asyncio.create_task(self._write_balance_to_redis(asset, balance_data))
|
t = asyncio.create_task(self._write_balance_to_redis(asset, balance_data))
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
else:
|
else:
|
||||||
_balance_updates_cache[asset] = balance_data
|
_balance_updates_cache[asset] = balance_data
|
||||||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
|
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
|
||||||
|
|
@ -661,7 +673,8 @@ class UserDataStream:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
if redis_cache and positions_list:
|
if redis_cache and positions_list:
|
||||||
asyncio.create_task(self._write_positions_to_redis(positions_list))
|
t = asyncio.create_task(self._write_positions_to_redis(positions_list))
|
||||||
|
t.add_done_callback(_task_done_callback)
|
||||||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
|
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
|
||||||
|
|
||||||
async def _write_balance_to_redis(self, asset: str, balance_data: Dict):
|
async def _write_balance_to_redis(self, asset: str, balance_data: Dict):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user