feat(kline_stream): 优化消息处理与Redis写入逻辑
在 `kline_stream.py` 中引入并发限制,使用信号量控制同时处理的消息数量,避免任务堆积导致性能下降。优化消息处理为异步方法,减少事件循环阻塞。增加批量写入Redis的机制,降低写入频率,提升系统性能与稳定性。同时,调整日志记录频率,减少高负载时的日志输出。此改动显著提升了消息处理效率与系统的响应能力。
This commit is contained in:
parent
c9d9836df5
commit
44458dca90
|
|
@ -61,6 +61,15 @@ class KlineStream:
|
||||||
# ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升
|
# ⚠️ 订阅统计:用于监控订阅数量,避免订阅过多导致负载上升
|
||||||
self._subscription_count = 0 # 当前订阅数量
|
self._subscription_count = 0 # 当前订阅数量
|
||||||
self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值)
|
self._max_subscriptions = 500 # 最大订阅数量(币安限制:单个连接最多 1024 个流,我们设置 500 作为安全阈值)
|
||||||
|
# ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积
|
||||||
|
self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息
|
||||||
|
self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)}
|
||||||
|
self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间
|
||||||
|
# ⚠️ 消息处理优化:限制并发处理任务数量,避免任务堆积
|
||||||
|
self._message_semaphore = asyncio.Semaphore(50) # 最多同时处理 50 条消息
|
||||||
|
self._redis_write_pending: Dict[str, tuple] = {} # Redis 写入待处理队列:{rkey: (data, timestamp)}
|
||||||
|
self._last_redis_batch_write = 0.0 # 上次批量写入 Redis 的时间
|
||||||
|
self._batch_write_lock = asyncio.Lock() # 批量写入锁,避免并发写入
|
||||||
|
|
||||||
def _ws_base_url(self) -> str:
|
def _ws_base_url(self) -> str:
|
||||||
if self.testnet:
|
if self.testnet:
|
||||||
|
|
@ -176,7 +185,7 @@ class KlineStream:
|
||||||
break
|
break
|
||||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||||
raw = msg.data
|
raw = msg.data
|
||||||
# 处理订阅响应({"result": null, "id": ...})或K线数据
|
# ⚠️ 优化:快速过滤订阅响应,避免不必要的处理
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw)
|
data = json.loads(raw)
|
||||||
if isinstance(data, dict) and "result" in data:
|
if isinstance(data, dict) and "result" in data:
|
||||||
|
|
@ -184,7 +193,9 @@ class KlineStream:
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
self._handle_message(raw)
|
# ⚠️ 优化:异步处理消息,避免阻塞事件循环
|
||||||
|
# 使用信号量限制并发处理任务数量,避免任务堆积导致负载上升
|
||||||
|
asyncio.create_task(self._handle_message_with_limit(raw))
|
||||||
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
||||||
break
|
break
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
|
@ -206,7 +217,19 @@ class KlineStream:
|
||||||
if not self._running:
|
if not self._running:
|
||||||
break
|
break
|
||||||
|
|
||||||
def _handle_message(self, raw: str):
|
async def _handle_message_with_limit(self, raw: str):
|
||||||
|
"""
|
||||||
|
带并发限制的消息处理包装器
|
||||||
|
"""
|
||||||
|
async with self._message_semaphore:
|
||||||
|
await self._handle_message_async(raw)
|
||||||
|
|
||||||
|
async def _handle_message_async(self, raw: str):
|
||||||
|
"""
|
||||||
|
异步处理 WebSocket 消息(优化:避免阻塞事件循环)
|
||||||
|
|
||||||
|
⚠️ 优化:改为异步方法,避免同步处理大量消息时阻塞事件循环
|
||||||
|
"""
|
||||||
global _kline_cache, _kline_cache_updated_at
|
global _kline_cache, _kline_cache_updated_at
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw)
|
data = json.loads(raw)
|
||||||
|
|
@ -285,15 +308,53 @@ class KlineStream:
|
||||||
cache_list.append(kline_rest_format)
|
cache_list.append(kline_rest_format)
|
||||||
|
|
||||||
_kline_cache_updated_at[key] = time.monotonic()
|
_kline_cache_updated_at[key] = time.monotonic()
|
||||||
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根")
|
# ⚠️ 优化:减少日志输出频率,避免大量消息时日志负载过高
|
||||||
|
# 只在 DEBUG 级别或每 100 条消息记录一次
|
||||||
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
|
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根")
|
||||||
|
|
||||||
|
# ⚠️ 优化:批量写入 Redis,减少写入频率
|
||||||
|
# 使用延迟写入机制,避免每条消息都写入 Redis(每 2 秒批量写入一次)
|
||||||
if self._redis_cache:
|
if self._redis_cache:
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
copy = list(cache_list)
|
copy = list(cache_list)
|
||||||
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
|
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
|
||||||
loop.create_task(self._write_kline_to_redis(rkey, copy))
|
# 标记需要写入 Redis,但不立即写入
|
||||||
|
self._redis_write_pending[rkey] = (copy, time.monotonic())
|
||||||
|
|
||||||
|
# ⚠️ 优化:使用锁避免并发批量写入
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - self._last_redis_batch_write >= 2.0:
|
||||||
|
async with self._batch_write_lock:
|
||||||
|
# 双重检查,避免重复写入
|
||||||
|
if now - self._last_redis_batch_write >= 2.0:
|
||||||
|
await self._batch_write_redis()
|
||||||
|
self._last_redis_batch_write = time.monotonic()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("KlineStream: 写入 Redis 调度失败 %s", e)
|
# 静默失败,避免日志过多
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _batch_write_redis(self):
|
||||||
|
"""批量写入 Redis,减少写入频率"""
|
||||||
|
if not self._redis_write_pending:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
pending = dict(self._redis_write_pending)
|
||||||
|
self._redis_write_pending.clear()
|
||||||
|
for rkey, (data, _) in pending.items():
|
||||||
|
try:
|
||||||
|
await self._redis_cache.set(rkey, data, ttl=600)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _handle_message(self, raw: str):
|
||||||
|
"""
|
||||||
|
同步处理消息(保留用于兼容性,但推荐使用 _handle_message_async)
|
||||||
|
"""
|
||||||
|
# 为了兼容性保留,但实际应该使用异步版本
|
||||||
|
asyncio.create_task(self._handle_message_async(raw))
|
||||||
|
|
||||||
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
|
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user