feat(redis_integration): 支持多进程共用市场数据流

在 `binance_client`、`kline_stream`、`book_ticker_stream` 和 `ticker_24h_stream` 中引入 Redis 缓存支持,允许 Leader 进程写入数据,其他进程从 Redis 读取,提升数据获取效率。更新了相关逻辑以确保在多进程环境下的稳定性和一致性,同时增强了异常处理和日志记录,确保系统的可追溯性。
This commit is contained in:
薇薇安 2026-02-16 17:44:10 +08:00
parent 249aec917a
commit 43e993034f
8 changed files with 497 additions and 38 deletions

View File

@ -0,0 +1,113 @@
# WebSocket 共用与币安限频评估
## 一、币安合约 WS 限制(摘要)
- **Base URL**`wss://fstream.binance.com`
- **单 stream**`/ws/<streamName>`
- **组合 stream**`/stream?streams=/<streamName1>/<streamName2>/...`
- **单连接最多订阅****1024 个 Streams**
- **订阅限速****每秒最多 10 条订阅消息**,超限会断连;**反复断连的 IP 可能被屏蔽**
- **连接有效期**:单连接不超过 24 小时,需断线重连
- **Ping/Pong**:服务端约 3 分钟发 ping客户端需在 10 分钟内回复 pong当前 aiohttp heartbeat 已满足)
---
## 二、当前系统 WS 使用方式(按进程)
当前为 **一进程一账户**(如 supervisor 按 `ATS_ACCOUNT_ID` 起多个 trading_system 进程),每个进程内:
| 用途 | 连接数/进程 | 订阅内容 | 是否公开 | 可否多账户共用 |
|------|-------------|----------|----------|----------------|
| **UserDataStream** | 1 | listenKey订单/持仓/余额) | 否(需 listenKey | **否**,每账户必须独立 |
| **Ticker24hStream** | 1 | `!ticker@arr`(全市场 24h ticker | 是 | **可共用**,全市场一份即可 |
| **BookTickerStream** | 1 | `!bookTicker`(全市场最优买卖) | 是 | **可共用**,全市场一份即可 |
| **KlineStream** | 1 | `/stream` + 动态 `SUBSCRIBE``<symbol>@kline_<interval>` | 是 | **可共用**,按需订阅,单连接 ≤1024 streams |
| **持仓价格监控**position_manager | **每持仓 1 条** | 每连接 `/ws/<symbol>@ticker` | 是 | **可共用**,可合并为一条组合流 |
即:**每账户 4 条固定连接 + 持仓数个单独连接**。
---
## 三、多账户时的连接与订阅量(当前架构)
设账户数为 **N**,单账户最大持仓数为 **M**(如 10
- **UserDataStream**N 条(每账户 1**不能省**
- **Ticker24hStream**N 条 → 若共用为 **1 条**
- **BookTickerStream**N 条 → 若共用为 **1 条**
- **KlineStream**N 条,每连接动态订阅若干 `symbol@kline_interval` → 若共用为 **1 条**,总 streams 数为「所有账户用到的 (symbol, interval) 并集」
- **持仓监控**N × M 条(每持仓一个 `/ws/<symbol>@ticker`)→ 若合并为一条组合流,为 **1 条连接**streams 数为「所有账户持仓的 symbol 并集」,且 **≤1024**
**当前是否超限:**
- **1024 streams/连接**:单条 KlineStream 当前按「扫描/策略用到的 symbol×interval」订阅通常几十小几百级远低于 1024持仓监控若合并为一条组合流symbol 数一般也远小于 1024。**正常使用不会超**。
- **10 条订阅/秒**KlineStream 在扫描时可能对多个 symbol×interval 连续调用 `subscribe`,若并发高会短时间多发订阅消息,有**超过 10 条/秒**的风险,存在被断连、进而 IP 被限的风险。**需要限速**(见下文实现)。
---
## 四、可共用的流与建议
### 1. 强烈建议共用(公开、全市场一份即可)
- **Ticker24hStream**`!ticker@arr`
- 全市场 24h 行情,与账户无关。
- **建议**:单机多进程时,可由一个「行情进程」或主进程单独起 1 条连接,写入 Redis/共享缓存,其他进程只读缓存;或若已用 Redis 等共享缓存,仅一个进程负责拉 WS 并更新缓存。
- **BookTickerStream**`!bookTicker`
- 全市场最优买卖,与账户无关。
- **建议**:同上,共用 1 条连接 + 共享缓存。
### 2. 建议共用(减少连接与 stream 总数)
- **KlineStream**`<symbol>@kline_<interval>`
- 按 (symbol, interval) 动态订阅,单连接最多 1024 streams。
- 多账户共用 1 条连接时,订阅集合为「各账户用到的 (symbol, interval) 并集」,通常仍远小于 1024。
- **建议**:单机多进程时,可只在一个进程内起 KlineStream其他进程通过 Redis/共享内存读 K 线缓存或部署一个共享「K 线 WS 服务」供多进程使用。
- **持仓监控**`<symbol>@ticker`
- 当前实现为「每持仓 1 条 `/ws/<symbol>@ticker`」,多账户×多持仓会变成 N×M 条连接。
- **建议**:改为**一条组合流**`/stream?streams=s1@ticker/s2@ticker/...`,把所有需要监控的 symbol 放在一起,总 symbol 数 <1024 即可多账户可共用这一条连接订阅所有账户持仓 symbol 的并集」),每个账户只消费自己关心的 symbol 即可
### 3. 不能共用
- **UserDataStream**listenKey
- 与账户绑定,每账户独立 listenKey、独立连接。**必须每账户 1 条连接**。
---
## 五、限频与实现建议
### 1. 订阅消息 ≤10 条/秒(必须遵守)
- **KlineStream**`subscribe(symbol, interval)` 时向同一连接发送 SUBSCRIBE。
- 若扫描阶段并发请求多个 symbol×interval会在短时间连续发送多条订阅容易超过 **10 条/秒**
- **实现**:在 KlineStream 内做**订阅限速**:例如维护「最近 1 秒内已发送的订阅次数」,若已达 10 次则 sleep 到「满 1 秒」后再发;或批量排队,按每秒最多 810 条发送。
- 已在代码中为 KlineStream 增加限速逻辑(见下节)。
### 2. 单连接 1024 streams
- 当前 KlineStream 与持仓监控的 stream 数量均远低于 1024只需在后续若「预订阅」大量 symbol×interval 或大量持仓时,确保单连接订阅数 ≤1024 即可(可做订阅数统计与上限检查)。
### 3. 连接 24 小时
- 现有各 WS 均有断线重连,满足「不超过 24 小时」的文档要求;保持即可。
---
## 六、结论与风险
| 项目 | 当前是否可能超限 | 说明 |
|------|------------------|------|
| 单连接 1024 streams | 否 | 当前 K 线与持仓监控订阅量远低于 1024 |
| 10 条订阅/秒 | **是(有风险)** | KlineStream 扫描时可能短时大量 subscribe已加限速 |
| 连接数 | 否 | 多账户时连接数线性增长,共用后可显著下降 |
| 24h / Ping-Pong | 否 | 已重连 + heartbeat |
**已实现:**
1. **KlineStream 订阅限速**:发送 SUBSCRIBE 时控制在 ≤10 条/秒。
2. **多进程/多账户共用 Ticker24h、BookTicker、KlineStream**
- 使用 Redis 选主(`market_ws_leader`):仅 **Leader** 进程建立上述三条 WS 连接并写入 Redis。
- 非 Leader 进程不建这三条连接,通过 **Redis 刷新任务**Ticker24h/BookTicker 每 2 秒从 Redis 拉取)和 **get_klines 时读 Redis** 获取数据。
- 配置项 `USE_SHARED_MARKET_WS`(默认 true有 Redis 时启用共用;关闭则每进程独立建连接。
3. **持仓监控**:仍为每进程按持仓建多条 `/ws/<symbol>@ticker`;后续可改为单连接组合流并共享(未实现)。

View File

@ -564,6 +564,17 @@ class BinanceClient:
Returns: Returns:
K线数据列表 K线数据列表
""" """
# 0. 多进程共用:从 Redis 读 Leader 写入的 K 线(避免非 Leader 进程无 WS 时直接打 REST
try:
from .market_ws_leader import use_shared_market_ws
from .kline_stream import get_klines_from_redis
if use_shared_market_ws(self.redis_cache):
shared = await get_klines_from_redis(self.redis_cache, symbol, interval, limit)
if shared and len(shared) >= limit:
logger.debug(f"从共用 Redis 获取 {symbol} K线: {interval} x{limit}")
return shared
except Exception as e:
logger.debug(f"读取共用 K线 Redis 失败: {e}")
# 1. 优先从 WS 缓存读取(实时更新,无 REST 请求) # 1. 优先从 WS 缓存读取(实时更新,无 REST 请求)
try: try:
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh

View File

@ -1,6 +1,7 @@
""" """
最优挂单 WebSocket 订阅 !bookTicker维护全市场最优买/卖价缓存 最优挂单 WebSocket 订阅 !bookTicker维护全市场最优买/卖价缓存
用于滑点估算入场价格优化提升交易执行效果 用于滑点估算入场价格优化提升交易执行效果
支持多进程共用Leader Redis所有进程通过 refresh_book_ticker_from_redis_loop Redis 更新本地缓存
文档更新速度 5s推送所有交易对的最优挂单最高买单最低卖单 文档更新速度 5s推送所有交易对的最优挂单最高买单最低卖单
""" """
import asyncio import asyncio
@ -11,6 +12,11 @@ from typing import Dict, Optional, Any
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_BOOK_TICKER
except ImportError:
KEY_BOOK_TICKER = "market:book_ticker"
# 最优挂单缓存symbol -> { bidPrice, bidQty, askPrice, askQty, time } # 最优挂单缓存symbol -> { bidPrice, bidQty, askPrice, askQty, time }
_book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_cache: Dict[str, Dict[str, Any]] = {}
_book_ticker_updated_at: float = 0.0 _book_ticker_updated_at: float = 0.0
@ -75,10 +81,11 @@ def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float
class BookTickerStream: class BookTickerStream:
"""订阅合约 !bookTicker持续更新 _book_ticker_cache。无需 listenKey公开行情""" """订阅合约 !bookTicker持续更新 _book_ticker_cache。Leader 时可选写 Redis 供多进程读"""
def __init__(self, testnet: bool = False): def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None self._ws = None
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._running = False self._running = False
@ -179,3 +186,36 @@ class BookTickerStream:
_book_ticker_updated_at = time.monotonic() _book_ticker_updated_at = time.monotonic()
logger.debug(f"BookTickerStream: 已更新 {s} bid={_book_ticker_cache[s]['bidPrice']:.4f} ask={_book_ticker_cache[s]['askPrice']:.4f}") logger.debug(f"BookTickerStream: 已更新 {s} bid={_book_ticker_cache[s]['bidPrice']:.4f} ask={_book_ticker_cache[s]['askPrice']:.4f}")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = dict(_book_ticker_cache)
loop.create_task(self._write_book_ticker_to_redis(copy))
except Exception as e:
logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e)
async def _write_book_ticker_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(KEY_BOOK_TICKER, data, ttl=30)
except Exception as e:
logger.debug("BookTickerStream: 写入 Redis 失败 %s", e)
async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""非 Leader 或共用模式:定期从 Redis 拉取 bookTicker 到本地缓存。所有进程可调用。"""
global _book_ticker_cache, _book_ticker_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_BOOK_TICKER)
if data and isinstance(data, dict):
_book_ticker_cache.update(data)
_book_ticker_updated_at = time.monotonic()
logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(data))
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("BookTicker: 从 Redis 刷新失败 %s", e)

View File

@ -411,6 +411,9 @@ ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '30'))
# 获取持仓时过滤掉名义价值低于此值的仓位USDT与币安仪表板不一致时可调低或设为 0 # 获取持仓时过滤掉名义价值低于此值的仓位USDT与币安仪表板不一致时可调低或设为 0
POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0')) POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0'))
# 市场 WS 多进程共用:有 Redis 时默认开启,仅 Leader 进程建连接,其余从 Redis 读
USE_SHARED_MARKET_WS = os.getenv('USE_SHARED_MARKET_WS', 'true').lower() in ('1', 'true', 'yes')
# Redis 缓存配置(优先从数据库,回退到环境变量和默认值) # Redis 缓存配置(优先从数据库,回退到环境变量和默认值)
REDIS_URL = _get_config_value('REDIS_URL', os.getenv('REDIS_URL', 'redis://localhost:6379')) REDIS_URL = _get_config_value('REDIS_URL', os.getenv('REDIS_URL', 'redis://localhost:6379'))
REDIS_USE_TLS = _get_config_value('REDIS_USE_TLS', False) if _get_config_value('REDIS_USE_TLS') is not None else os.getenv('REDIS_USE_TLS', 'False').lower() == 'true' REDIS_USE_TLS = _get_config_value('REDIS_USE_TLS', False) if _get_config_value('REDIS_USE_TLS') is not None else os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'

View File

@ -1,6 +1,7 @@
""" """
K线 WebSocket 订阅 <symbol>@kline_<interval>维护K线缓存 K线 WebSocket 订阅 <symbol>@kline_<interval>维护K线缓存
get_klines 优先使用替代 REST 拉取减少超时实时更新技术指标 get_klines 优先使用替代 REST 拉取减少超时实时更新技术指标
支持多进程共用Leader Redismarket:kline:{symbol}:{interval} Leader 可通过 get_klines_from_redis
文档推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结 文档推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结
""" """
import asyncio import asyncio
@ -11,6 +12,11 @@ from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_KLINE_PREFIX
except ImportError:
KEY_KLINE_PREFIX = "market:kline:"
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根 # K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {} _kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {} _kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
@ -39,13 +45,19 @@ def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0)
class KlineStream: class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。""" """订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
def __init__(self, testnet: bool = False): # 币安限制:每秒最多 10 条订阅消息,超限会断连
_SUBSCRIBE_RATE_LIMIT = 10 # 条/秒
_SUBSCRIBE_WINDOW_SEC = 1.0
def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None self._ws = None
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._running = False self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅 self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock() self._subscription_lock = asyncio.Lock()
self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速
def _ws_base_url(self) -> str: def _ws_base_url(self) -> str:
if self.testnet: if self.testnet:
@ -98,6 +110,15 @@ class KlineStream:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
if not self._ws: if not self._ws:
return False return False
# 币安限制:每秒最多 10 条订阅消息,超限会断连
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT:
wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC
wait_sec = max(0.01, wait_until - time.monotonic())
await asyncio.sleep(wait_sec)
now = time.monotonic()
self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC]
stream_name = f"{symbol.lower()}@kline_{interval}" stream_name = f"{symbol.lower()}@kline_{interval}"
try: try:
await self._ws.send_json({ await self._ws.send_json({
@ -105,6 +126,7 @@ class KlineStream:
"params": [stream_name], "params": [stream_name],
"id": int(time.time() * 1000) % 1000000, "id": int(time.time() * 1000) % 1000000,
}) })
self._subscribe_times.append(time.monotonic())
self._subscribed[key] = True self._subscribed[key] = True
_kline_cache_limit[key] = limit _kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}") logger.debug(f"KlineStream: 已订阅 {symbol} {interval}")
@ -246,6 +268,37 @@ class KlineStream:
_kline_cache_updated_at[key] = time.monotonic() _kline_cache_updated_at[key] = time.monotonic()
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}") logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = list(cache_list)
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
loop.create_task(self._write_kline_to_redis(rkey, copy))
except Exception as e:
logger.debug("KlineStream: 写入 Redis 调度失败 %s", e)
async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(rkey, data, ttl=600)
except Exception as e:
logger.debug("KlineStream: 写入 Redis 失败 %s", e)
async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]:
"""共用模式:从 Redis 读取 K 线缓存;未命中返回 None。"""
if redis_cache is None or limit <= 0:
return None
try:
s, i = symbol.upper(), interval.lower()
rkey = f"{KEY_KLINE_PREFIX}{s}:{i}"
data = await redis_cache.get(rkey)
if data and isinstance(data, list) and len(data) >= limit:
return data[-limit:]
return None
except Exception as e:
logger.debug("get_klines_from_redis: %s", e)
return None
# 全局 KlineStream 实例 # 全局 KlineStream 实例

View File

@ -263,6 +263,7 @@ async def main():
ticker_24h_stream = None ticker_24h_stream = None
kline_stream = None kline_stream = None
book_ticker_stream = None book_ticker_stream = None
market_ws_refresh_tasks = []
try: try:
# 1. 初始化币安客户端 # 1. 初始化币安客户端
logger.info("初始化币安客户端...") logger.info("初始化币安客户端...")
@ -351,46 +352,93 @@ async def main():
logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓") logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓")
user_data_stream = None user_data_stream = None
# 3.1 启动 24h ticker WS 流(扫描时优先用缓存,避免批量 REST 与超时) # 3.0 市场 WS 多进程共用:选主 + 仅 Leader 建连接,非 Leader 从 Redis 读
try: ticker_24h_stream = None
from .ticker_24h_stream import Ticker24hStream
use_testnet = getattr(config, "USE_TESTNET", False)
ticker_24h_stream = Ticker24hStream(testnet=use_testnet)
if await ticker_24h_stream.start():
logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)")
else:
ticker_24h_stream = None
except Exception as e:
logger.debug(f"启动 24h ticker WS 失败(将使用 REST: {e}")
ticker_24h_stream = None
# 3.2 启动 K线 WS 流(技术指标实时更新,减少 REST 超时)
kline_stream = None kline_stream = None
book_ticker_stream = None
market_ws_refresh_tasks = []
if getattr(client, "redis_cache", None):
try:
await client.redis_cache.connect()
except Exception:
pass
try: try:
from .market_ws_leader import (
use_shared_market_ws,
try_acquire_market_ws_leader,
is_market_ws_leader,
run_leader_renew_loop,
release_market_ws_leader,
stop_leader_renew_loop,
)
from .ticker_24h_stream import Ticker24hStream, refresh_ticker_24h_from_redis_loop
from .book_ticker_stream import BookTickerStream, refresh_book_ticker_from_redis_loop
from .kline_stream import KlineStream from .kline_stream import KlineStream
use_testnet = getattr(config, "USE_TESTNET", False) use_testnet = getattr(config, "USE_TESTNET", False)
kline_stream = KlineStream(testnet=use_testnet) redis_cache = getattr(client, "redis_cache", None)
if await kline_stream.start():
logger.info("✓ K线 WS 已启动(技术指标将优先使用 WS 缓存,减少超时)")
else:
kline_stream = None
except Exception as e:
logger.debug(f"启动 K线 WS 失败(将使用 REST: {e}")
kline_stream = None
# 3.3 启动最优挂单 WS 流(滑点估算,优化入场价格) if use_shared_market_ws(redis_cache):
book_ticker_stream = None is_leader = await try_acquire_market_ws_leader(redis_cache)
try: if is_leader:
from .book_ticker_stream import BookTickerStream await run_leader_renew_loop(redis_cache)
use_testnet = getattr(config, "USE_TESTNET", False) try:
book_ticker_stream = BookTickerStream(testnet=use_testnet) ticker_24h_stream = Ticker24hStream(testnet=use_testnet, redis_cache=redis_cache)
if await book_ticker_stream.start(): if await ticker_24h_stream.start():
logger.info("✓ 最优挂单 WS 已启动(用于滑点估算与价格优化)") logger.info("✓ 24h ticker WS 已启动Leader写入 Redis 供多进程共用)")
else:
ticker_24h_stream = None
except Exception as e:
logger.debug(f"启动 24h ticker WS 失败: {e}")
try:
kline_stream = KlineStream(testnet=use_testnet, redis_cache=redis_cache)
if await kline_stream.start():
logger.info("✓ K线 WS 已启动Leader写入 Redis 供多进程共用)")
else:
kline_stream = None
except Exception as e:
logger.debug(f"启动 K线 WS 失败: {e}")
try:
book_ticker_stream = BookTickerStream(testnet=use_testnet, redis_cache=redis_cache)
if await book_ticker_stream.start():
logger.info("✓ 最优挂单 WS 已启动Leader写入 Redis 供多进程共用)")
else:
book_ticker_stream = None
except Exception as e:
logger.debug(f"启动 BookTicker WS 失败: {e}")
else:
logger.info("✓ 本进程为非 Leader将仅从 Redis 读取 Ticker24h/BookTicker/K线 缓存")
t_refresh = asyncio.create_task(refresh_ticker_24h_from_redis_loop(redis_cache, 2.0))
b_refresh = asyncio.create_task(refresh_book_ticker_from_redis_loop(redis_cache, 2.0))
market_ws_refresh_tasks = [t_refresh, b_refresh]
else: else:
book_ticker_stream = None is_leader = True
try:
ticker_24h_stream = Ticker24hStream(testnet=use_testnet)
if await ticker_24h_stream.start():
logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)")
else:
ticker_24h_stream = None
except Exception as e:
logger.debug(f"启动 24h ticker WS 失败: {e}")
try:
kline_stream = KlineStream(testnet=use_testnet)
if await kline_stream.start():
logger.info("✓ K线 WS 已启动(技术指标将优先使用 WS 缓存)")
else:
kline_stream = None
except Exception as e:
logger.debug(f"启动 K线 WS 失败: {e}")
try:
book_ticker_stream = BookTickerStream(testnet=use_testnet)
if await book_ticker_stream.start():
logger.info("✓ 最优挂单 WS 已启动(用于滑点估算与价格优化)")
else:
book_ticker_stream = None
except Exception as e:
logger.debug(f"启动 BookTicker WS 失败: {e}")
except Exception as e: except Exception as e:
logger.debug(f"启动最优挂单 WS 失败: {e}") logger.debug(f"市场 WS 启动异常: {e}")
book_ticker_stream = None ticker_24h_stream = kline_stream = book_ticker_stream = None
# 4. 初始化各个模块 # 4. 初始化各个模块
logger.info("初始化交易模块...") logger.info("初始化交易模块...")
@ -497,6 +545,21 @@ async def main():
logger.info("BookTicker Stream 已停止") logger.info("BookTicker Stream 已停止")
except Exception as e: except Exception as e:
logger.debug(f"停止 BookTicker Stream 时异常: {e}") logger.debug(f"停止 BookTicker Stream 时异常: {e}")
for t in market_ws_refresh_tasks:
try:
t.cancel()
await t
except asyncio.CancelledError:
pass
except Exception:
pass
try:
from .market_ws_leader import is_market_ws_leader, release_market_ws_leader, stop_leader_renew_loop
if is_market_ws_leader() and client and getattr(client, "redis_cache", None):
release_market_ws_leader(client.redis_cache)
stop_leader_renew_loop()
except Exception:
pass
if client: if client:
await client.disconnect() await client.disconnect()
logger.info("程序已退出") logger.info("程序已退出")

View File

@ -0,0 +1,136 @@
"""
市场行情 WS 多进程/多账户共用选主 + 共享缓存
Leader 进程负责维持 Ticker24h / BookTicker / KlineStream 三条连接并写入 Redis
Leader 进程只从 Redis 读取不建连接需配合 Redis 使用
"""
import asyncio
import json
import logging
import os
import socket
import time
from typing import Optional, Any
logger = logging.getLogger(__name__)
# Redis 键
KEY_LEADER = "market_ws_leader"
KEY_TICKER_24H = "market:ticker_24h"
KEY_BOOK_TICKER = "market:book_ticker"
KEY_KLINE_PREFIX = "market:kline:"
LEADER_TTL_SEC = 30
LEADER_RENEW_INTERVAL_SEC = 15
_is_leader: bool = False
_leader_task: Optional[asyncio.Task] = None
def _leader_identity() -> dict:
return {
"pid": os.getpid(),
"host": socket.gethostname() or "unknown",
"ts": time.time(),
}
def is_market_ws_leader() -> bool:
"""当前进程是否为市场 WS 的 Leader仅本地标记用于决定是否启动三条流"""
return _is_leader
async def try_acquire_market_ws_leader(redis_cache: Any) -> bool:
"""
尝试成为市场 WS Leader使用 Redis SET NX EX
若成功当前进程应启动 Ticker24h / BookTicker / KlineStream 并写入 Redis
"""
global _is_leader
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
raw = await redis_cache.redis.set(
KEY_LEADER,
json.dumps(_leader_identity()),
ex=LEADER_TTL_SEC,
nx=True,
)
_is_leader = raw is True
if _is_leader:
logger.info("✓ 本进程已当选市场 WS LeaderTicker24h/BookTicker/Kline 共用连接)")
return _is_leader
except Exception as e:
logger.debug("尝试获取市场 WS Leader 失败: %s", e)
return False
async def renew_market_ws_leader(redis_cache: Any) -> bool:
"""Leader 定期续期,避免 TTL 到期被抢。"""
if not _is_leader or redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
await redis_cache.redis.set(
KEY_LEADER,
json.dumps(_leader_identity()),
ex=LEADER_TTL_SEC,
)
return True
except Exception as e:
logger.debug("续期市场 WS Leader 失败: %s", e)
return False
def release_market_ws_leader(redis_cache: Any) -> None:
"""主动释放 Leader可选便于其他进程尽快接管"""
global _is_leader
_is_leader = False
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return
try:
asyncio.get_event_loop().create_task(_delete_leader_key(redis_cache))
except Exception:
pass
async def _delete_leader_key(redis_cache: Any) -> None:
try:
await redis_cache.redis.delete(KEY_LEADER)
except Exception:
pass
async def run_leader_renew_loop(redis_cache: Any) -> None:
"""在 Leader 进程中运行的续期循环;非 Leader 直接返回。"""
global _leader_task
if not _is_leader:
return
if _leader_task is not None:
return
async def _loop():
while _is_leader:
await asyncio.sleep(LEADER_RENEW_INTERVAL_SEC)
if not _is_leader:
break
ok = await renew_market_ws_leader(redis_cache)
if not ok:
logger.warning("市场 WS Leader 续期失败,可能被其他进程接管")
_leader_task = asyncio.create_task(_loop())
logger.debug("市场 WS Leader 续期任务已启动")
def stop_leader_renew_loop() -> None:
global _leader_task
if _leader_task is not None:
_leader_task.cancel()
_leader_task = None
def use_shared_market_ws(redis_cache: Any) -> bool:
"""是否启用「共用市场 WS」有 Redis 且已配置启用时返回 True。"""
if redis_cache is None or getattr(redis_cache, "redis", None) is None:
return False
try:
from . import config
return bool(getattr(config, "USE_SHARED_MARKET_WS", True))
except Exception:
return False

View File

@ -1,6 +1,7 @@
""" """
24 小时行情 WebSocket 订阅 !ticker@arr维护全市场 ticker 缓存 24 小时行情 WebSocket 订阅 !ticker@arr维护全市场 ticker 缓存
market_scanner 优先使用避免批量 REST get_all_tickers_24h减少请求与超时 market_scanner 优先使用避免批量 REST get_all_tickers_24h减少请求与超时
支持多进程共用Leader Redis所有进程可通过 refresh_ticker_24h_from_redis_loop Redis 更新本地缓存
文档仅发生变化的交易对会出现在推送数组中 1 秒一次 文档仅发生变化的交易对会出现在推送数组中 1 秒一次
""" """
import asyncio import asyncio
@ -11,6 +12,11 @@ from typing import Dict, Optional, Any
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
from .market_ws_leader import KEY_TICKER_24H
except ImportError:
KEY_TICKER_24H = "market:ticker_24h"
# 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts } # 全市场 24h ticker 缓存symbol -> { symbol, price, volume, changePercent, ts }
_ticker_24h_cache: Dict[str, Dict[str, Any]] = {} _ticker_24h_cache: Dict[str, Dict[str, Any]] = {}
_ticker_24h_updated_at: float = 0.0 _ticker_24h_updated_at: float = 0.0
@ -34,10 +40,11 @@ def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool:
class Ticker24hStream: class Ticker24hStream:
"""订阅合约 !ticker@arr持续更新 _ticker_24h_cache。无需 listenKey公开行情""" """订阅合约 !ticker@arr持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读"""
def __init__(self, testnet: bool = False): def __init__(self, testnet: bool = False, redis_cache: Any = None):
self.testnet = testnet self.testnet = testnet
self._redis_cache = redis_cache
self._ws = None self._ws = None
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._running = False self._running = False
@ -142,3 +149,36 @@ class Ticker24hStream:
} }
_ticker_24h_updated_at = time.monotonic() _ticker_24h_updated_at = time.monotonic()
logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")
if self._redis_cache:
try:
loop = asyncio.get_event_loop()
copy = dict(_ticker_24h_cache)
loop.create_task(self._write_ticker_24h_to_redis(copy))
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e)
async def _write_ticker_24h_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None:
try:
if self._redis_cache:
await self._redis_cache.set(KEY_TICKER_24H, data, ttl=120)
except Exception as e:
logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e)
async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None:
"""非 Leader 或共用模式:定期从 Redis 拉取 24h ticker 到本地缓存。所有进程可调用。"""
global _ticker_24h_cache, _ticker_24h_updated_at
if redis_cache is None:
return
while True:
try:
await asyncio.sleep(interval_sec)
data = await redis_cache.get(KEY_TICKER_24H)
if data and isinstance(data, dict):
_ticker_24h_cache.update(data)
_ticker_24h_updated_at = time.monotonic()
logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(data))
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e)