From 43e993034f8c94e492f66456e79191ee3ec0e2a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Mon, 16 Feb 2026 17:44:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis=5Fintegration):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=A4=9A=E8=BF=9B=E7=A8=8B=E5=85=B1=E7=94=A8=E5=B8=82=E5=9C=BA?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `binance_client`、`kline_stream`、`book_ticker_stream` 和 `ticker_24h_stream` 中引入 Redis 缓存支持,允许 Leader 进程写入数据,其他进程从 Redis 读取,提升数据获取效率。更新了相关逻辑以确保在多进程环境下的稳定性和一致性,同时增强了异常处理和日志记录,确保系统的可追溯性。 --- docs/WebSocket共用与限频评估.md | 113 ++++++++++++++++++++ trading_system/binance_client.py | 11 ++ trading_system/book_ticker_stream.py | 44 +++++++- trading_system/config.py | 3 + trading_system/kline_stream.py | 55 +++++++++- trading_system/main.py | 129 +++++++++++++++++------ trading_system/market_ws_leader.py | 136 +++++++++++++++++++++++++ trading_system/ticker_24h_stream.py | 44 +++++++- 8 files changed, 497 insertions(+), 38 deletions(-) create mode 100644 docs/WebSocket共用与限频评估.md create mode 100644 trading_system/market_ws_leader.py diff --git a/docs/WebSocket共用与限频评估.md b/docs/WebSocket共用与限频评估.md new file mode 100644 index 0000000..1dd8038 --- /dev/null +++ b/docs/WebSocket共用与限频评估.md @@ -0,0 +1,113 @@ +# WebSocket 共用与币安限频评估 + +## 一、币安合约 WS 限制(摘要) + +- **Base URL**:`wss://fstream.binance.com` +- **单 stream**:`/ws/` +- **组合 stream**:`/stream?streams=///...` +- **单连接最多订阅**:**1024 个 Streams** +- **订阅限速**:**每秒最多 10 条订阅消息**,超限会断连;**反复断连的 IP 可能被屏蔽** +- **连接有效期**:单连接不超过 24 小时,需断线重连 +- **Ping/Pong**:服务端约 3 分钟发 ping,客户端需在 10 分钟内回复 pong(当前 aiohttp heartbeat 已满足) + +--- + +## 二、当前系统 WS 使用方式(按进程) + +当前为 **一进程一账户**(如 supervisor 按 `ATS_ACCOUNT_ID` 起多个 trading_system 进程),每个进程内: + +| 用途 | 连接数/进程 | 订阅内容 | 是否公开 | 可否多账户共用 | +|------|-------------|----------|----------|----------------| +| **UserDataStream** | 1 | listenKey(订单/持仓/余额) | 否(需 listenKey) | **否**,每账户必须独立 | +| **Ticker24hStream** | 1 | `!ticker@arr`(全市场 24h ticker) | 是 | **可共用**,全市场一份即可 | +| **BookTickerStream** | 1 | `!bookTicker`(全市场最优买卖) | 是 | **可共用**,全市场一份即可 | +| **KlineStream** | 1 | `/stream` + 动态 `SUBSCRIBE`:`@kline_` | 是 | **可共用**,按需订阅,单连接 ≤1024 streams | +| **持仓价格监控**(position_manager) | **每持仓 1 条** | 每连接 `/ws/@ticker` | 是 | **可共用**,可合并为一条组合流 | + +即:**每账户 4 条固定连接 + 持仓数个单独连接**。 + +--- + +## 三、多账户时的连接与订阅量(当前架构) + +设账户数为 **N**,单账户最大持仓数为 **M**(如 10): + +- **UserDataStream**:N 条(每账户 1,**不能省**) +- **Ticker24hStream**:N 条 → 若共用为 **1 条** +- **BookTickerStream**:N 条 → 若共用为 **1 条** +- **KlineStream**:N 条,每连接动态订阅若干 `symbol@kline_interval` → 若共用为 **1 条**,总 streams 数为「所有账户用到的 (symbol, interval) 并集」 +- **持仓监控**:N × M 条(每持仓一个 `/ws/@ticker`)→ 若合并为一条组合流,为 **1 条连接**,streams 数为「所有账户持仓的 symbol 并集」,且 **≤1024** + +**当前是否超限:** + +- **1024 streams/连接**:单条 KlineStream 当前按「扫描/策略用到的 symbol×interval」订阅,通常几十~小几百级,远低于 1024;持仓监控若合并为一条组合流,symbol 数一般也远小于 1024。**正常使用不会超**。 +- **10 条订阅/秒**:KlineStream 在扫描时可能对多个 symbol×interval 连续调用 `subscribe`,若并发高会短时间多发订阅消息,有**超过 10 条/秒**的风险,存在被断连、进而 IP 被限的风险。**需要限速**(见下文实现)。 + +--- + +## 四、可共用的流与建议 + +### 1. 强烈建议共用(公开、全市场一份即可) + +- **Ticker24hStream**(`!ticker@arr`) + - 全市场 24h 行情,与账户无关。 + - **建议**:单机多进程时,可由一个「行情进程」或主进程单独起 1 条连接,写入 Redis/共享缓存,其他进程只读缓存;或若已用 Redis 等共享缓存,仅一个进程负责拉 WS 并更新缓存。 + +- **BookTickerStream**(`!bookTicker`) + - 全市场最优买卖,与账户无关。 + - **建议**:同上,共用 1 条连接 + 共享缓存。 + +### 2. 建议共用(减少连接与 stream 总数) + +- **KlineStream**(`@kline_`) + - 按 (symbol, interval) 动态订阅,单连接最多 1024 streams。 + - 多账户共用 1 条连接时,订阅集合为「各账户用到的 (symbol, interval) 并集」,通常仍远小于 1024。 + - **建议**:单机多进程时,可只在一个进程内起 KlineStream,其他进程通过 Redis/共享内存读 K 线缓存;或部署一个共享「K 线 WS 服务」供多进程使用。 + +- **持仓监控**(`@ticker`) + - 当前实现为「每持仓 1 条 `/ws/@ticker`」,多账户×多持仓会变成 N×M 条连接。 + - **建议**:改为**一条组合流**:`/stream?streams=s1@ticker/s2@ticker/...`,把所有需要监控的 symbol 放在一起,总 symbol 数 <1024 即可。多账户可共用这一条连接(订阅「所有账户持仓 symbol 的并集」),每个账户只消费自己关心的 symbol 即可。 + +### 3. 不能共用 + +- **UserDataStream**(listenKey) + - 与账户绑定,每账户独立 listenKey、独立连接。**必须每账户 1 条连接**。 + +--- + +## 五、限频与实现建议 + +### 1. 订阅消息 ≤10 条/秒(必须遵守) + +- **KlineStream** 在 `subscribe(symbol, interval)` 时向同一连接发送 SUBSCRIBE。 +- 若扫描阶段并发请求多个 symbol×interval,会在短时间连续发送多条订阅,容易超过 **10 条/秒**。 +- **实现**:在 KlineStream 内做**订阅限速**:例如维护「最近 1 秒内已发送的订阅次数」,若已达 10 次则 sleep 到「满 1 秒」后再发;或批量排队,按每秒最多 8~10 条发送。 +- 已在代码中为 KlineStream 增加限速逻辑(见下节)。 + +### 2. 单连接 1024 streams + +- 当前 KlineStream 与持仓监控的 stream 数量均远低于 1024,只需在后续若「预订阅」大量 symbol×interval 或大量持仓时,确保单连接订阅数 ≤1024 即可(可做订阅数统计与上限检查)。 + +### 3. 连接 24 小时 + +- 现有各 WS 均有断线重连,满足「不超过 24 小时」的文档要求;保持即可。 + +--- + +## 六、结论与风险 + +| 项目 | 当前是否可能超限 | 说明 | +|------|------------------|------| +| 单连接 1024 streams | 否 | 当前 K 线与持仓监控订阅量远低于 1024 | +| 10 条订阅/秒 | **是(有风险)** | KlineStream 扫描时可能短时大量 subscribe,已加限速 | +| 连接数 | 否 | 多账户时连接数线性增长,共用后可显著下降 | +| 24h / Ping-Pong | 否 | 已重连 + heartbeat | + +**已实现:** + +1. **KlineStream 订阅限速**:发送 SUBSCRIBE 时控制在 ≤10 条/秒。 +2. **多进程/多账户共用 Ticker24h、BookTicker、KlineStream**: + - 使用 Redis 选主(`market_ws_leader`):仅 **Leader** 进程建立上述三条 WS 连接并写入 Redis。 + - 非 Leader 进程不建这三条连接,通过 **Redis 刷新任务**(Ticker24h/BookTicker 每 2 秒从 Redis 拉取)和 **get_klines 时读 Redis** 获取数据。 + - 配置项 `USE_SHARED_MARKET_WS`(默认 true):有 Redis 时启用共用;关闭则每进程独立建连接。 +3. **持仓监控**:仍为每进程按持仓建多条 `/ws/@ticker`;后续可改为单连接组合流并共享(未实现)。 diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 1b53d4a..17b697e 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -564,6 +564,17 @@ class BinanceClient: Returns: K线数据列表 """ + # 0. 多进程共用:从 Redis 读 Leader 写入的 K 线(避免非 Leader 进程无 WS 时直接打 REST) + try: + from .market_ws_leader import use_shared_market_ws + from .kline_stream import get_klines_from_redis + if use_shared_market_ws(self.redis_cache): + shared = await get_klines_from_redis(self.redis_cache, symbol, interval, limit) + if shared and len(shared) >= limit: + logger.debug(f"从共用 Redis 获取 {symbol} K线: {interval} x{limit}") + return shared + except Exception as e: + logger.debug(f"读取共用 K线 Redis 失败: {e}") # 1. 优先从 WS 缓存读取(实时更新,无 REST 请求) try: from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh diff --git a/trading_system/book_ticker_stream.py b/trading_system/book_ticker_stream.py index 3831180..591caf8 100644 --- a/trading_system/book_ticker_stream.py +++ b/trading_system/book_ticker_stream.py @@ -1,6 +1,7 @@ """ 最优挂单 WebSocket 流:订阅 !bookTicker,维护全市场最优买/卖价缓存。 用于滑点估算、入场价格优化,提升交易执行效果。 +支持多进程共用:Leader 写 Redis,所有进程通过 refresh_book_ticker_from_redis_loop 从 Redis 更新本地缓存。 文档:更新速度 5s,推送所有交易对的最优挂单(最高买单、最低卖单)。 """ import asyncio @@ -11,6 +12,11 @@ from typing import Dict, Optional, Any logger = logging.getLogger(__name__) +try: + from .market_ws_leader import KEY_BOOK_TICKER +except ImportError: + KEY_BOOK_TICKER = "market:book_ticker" + # 最优挂单缓存:symbol -> { bidPrice, bidQty, askPrice, askQty, time } _book_ticker_cache: Dict[str, Dict[str, Any]] = {} _book_ticker_updated_at: float = 0.0 @@ -75,10 +81,11 @@ def estimate_slippage(symbol: str, side: str, quantity: float) -> Optional[float class BookTickerStream: - """订阅合约 !bookTicker,持续更新 _book_ticker_cache。无需 listenKey,公开行情。""" + """订阅合约 !bookTicker,持续更新 _book_ticker_cache。Leader 时可选写 Redis 供多进程读。""" - def __init__(self, testnet: bool = False): + def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet + self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False @@ -179,3 +186,36 @@ class BookTickerStream: _book_ticker_updated_at = time.monotonic() logger.debug(f"BookTickerStream: 已更新 {s} bid={_book_ticker_cache[s]['bidPrice']:.4f} ask={_book_ticker_cache[s]['askPrice']:.4f}") + if self._redis_cache: + try: + loop = asyncio.get_event_loop() + copy = dict(_book_ticker_cache) + loop.create_task(self._write_book_ticker_to_redis(copy)) + except Exception as e: + logger.debug("BookTickerStream: 写入 Redis 调度失败 %s", e) + + async def _write_book_ticker_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None: + try: + if self._redis_cache: + await self._redis_cache.set(KEY_BOOK_TICKER, data, ttl=30) + except Exception as e: + logger.debug("BookTickerStream: 写入 Redis 失败 %s", e) + + +async def refresh_book_ticker_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: + """非 Leader 或共用模式:定期从 Redis 拉取 bookTicker 到本地缓存。所有进程可调用。""" + global _book_ticker_cache, _book_ticker_updated_at + if redis_cache is None: + return + while True: + try: + await asyncio.sleep(interval_sec) + data = await redis_cache.get(KEY_BOOK_TICKER) + if data and isinstance(data, dict): + _book_ticker_cache.update(data) + _book_ticker_updated_at = time.monotonic() + logger.debug("BookTicker: 从 Redis 刷新 %s 个交易对", len(data)) + except asyncio.CancelledError: + break + except Exception as e: + logger.debug("BookTicker: 从 Redis 刷新失败 %s", e) diff --git a/trading_system/config.py b/trading_system/config.py index 9a71d91..711a0e9 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -411,6 +411,9 @@ ALGO_ORDER_TIMEOUT_SEC = int(os.getenv('ALGO_ORDER_TIMEOUT_SEC', '30')) # 获取持仓时过滤掉名义价值低于此值的仓位(USDT),与币安仪表板不一致时可调低或设为 0 POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0')) +# 市场 WS 多进程共用:有 Redis 时默认开启,仅 Leader 进程建连接,其余从 Redis 读 +USE_SHARED_MARKET_WS = os.getenv('USE_SHARED_MARKET_WS', 'true').lower() in ('1', 'true', 'yes') + # Redis 缓存配置(优先从数据库,回退到环境变量和默认值) REDIS_URL = _get_config_value('REDIS_URL', os.getenv('REDIS_URL', 'redis://localhost:6379')) REDIS_USE_TLS = _get_config_value('REDIS_USE_TLS', False) if _get_config_value('REDIS_USE_TLS') is not None else os.getenv('REDIS_USE_TLS', 'False').lower() == 'true' diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 9ecbdb2..7b941dc 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -1,6 +1,7 @@ """ K线 WebSocket 流:订阅 @kline_,维护K线缓存。 供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。 +支持多进程共用:Leader 写 Redis(market:kline:{symbol}:{interval}),非 Leader 可通过 get_klines_from_redis 读。 文档:推送间隔 250ms,仅推送最新一根K线的更新;x=false 表示K线未完结,x=true 表示已完结。 """ import asyncio @@ -11,6 +12,11 @@ from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) +try: + from .market_ws_leader import KEY_KLINE_PREFIX +except ImportError: + KEY_KLINE_PREFIX = "market:kline:" + # K线缓存:{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根 _kline_cache: Dict[Tuple[str, str], List[List]] = {} _kline_cache_updated_at: Dict[Tuple[str, str], float] = {} @@ -39,13 +45,19 @@ def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) class KlineStream: """订阅合约 K线流,持续更新 _kline_cache。支持动态订阅/取消订阅。""" - def __init__(self, testnet: bool = False): + # 币安限制:每秒最多 10 条订阅消息,超限会断连 + _SUBSCRIBE_RATE_LIMIT = 10 # 条/秒 + _SUBSCRIBE_WINDOW_SEC = 1.0 + + def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet + self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅 self._subscription_lock = asyncio.Lock() + self._subscribe_times: List[float] = [] # 最近一次窗口内的发送时间,用于限速 def _ws_base_url(self) -> str: if self.testnet: @@ -98,6 +110,15 @@ class KlineStream: await asyncio.sleep(0.1) if not self._ws: return False + # 币安限制:每秒最多 10 条订阅消息,超限会断连 + now = time.monotonic() + self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC] + while len(self._subscribe_times) >= self._SUBSCRIBE_RATE_LIMIT: + wait_until = self._subscribe_times[0] + self._SUBSCRIBE_WINDOW_SEC + wait_sec = max(0.01, wait_until - time.monotonic()) + await asyncio.sleep(wait_sec) + now = time.monotonic() + self._subscribe_times = [t for t in self._subscribe_times if now - t < self._SUBSCRIBE_WINDOW_SEC] stream_name = f"{symbol.lower()}@kline_{interval}" try: await self._ws.send_json({ @@ -105,6 +126,7 @@ class KlineStream: "params": [stream_name], "id": int(time.time() * 1000) % 1000000, }) + self._subscribe_times.append(time.monotonic()) self._subscribed[key] = True _kline_cache_limit[key] = limit logger.debug(f"KlineStream: 已订阅 {symbol} {interval}") @@ -246,6 +268,37 @@ class KlineStream: _kline_cache_updated_at[key] = time.monotonic() logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)} 根") + if self._redis_cache: + try: + loop = asyncio.get_event_loop() + copy = list(cache_list) + rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" + loop.create_task(self._write_kline_to_redis(rkey, copy)) + except Exception as e: + logger.debug("KlineStream: 写入 Redis 调度失败 %s", e) + + async def _write_kline_to_redis(self, rkey: str, data: List[List]) -> None: + try: + if self._redis_cache: + await self._redis_cache.set(rkey, data, ttl=600) + except Exception as e: + logger.debug("KlineStream: 写入 Redis 失败 %s", e) + + +async def get_klines_from_redis(redis_cache: Any, symbol: str, interval: str, limit: int) -> Optional[List[List]]: + """共用模式:从 Redis 读取 K 线缓存;未命中返回 None。""" + if redis_cache is None or limit <= 0: + return None + try: + s, i = symbol.upper(), interval.lower() + rkey = f"{KEY_KLINE_PREFIX}{s}:{i}" + data = await redis_cache.get(rkey) + if data and isinstance(data, list) and len(data) >= limit: + return data[-limit:] + return None + except Exception as e: + logger.debug("get_klines_from_redis: %s", e) + return None # 全局 KlineStream 实例 diff --git a/trading_system/main.py b/trading_system/main.py index 8e6a991..e09ea55 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -263,6 +263,7 @@ async def main(): ticker_24h_stream = None kline_stream = None book_ticker_stream = None + market_ws_refresh_tasks = [] try: # 1. 初始化币安客户端 logger.info("初始化币安客户端...") @@ -351,46 +352,93 @@ async def main(): logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓") user_data_stream = None - # 3.1 启动 24h ticker WS 流(扫描时优先用缓存,避免批量 REST 与超时) - try: - from .ticker_24h_stream import Ticker24hStream - use_testnet = getattr(config, "USE_TESTNET", False) - ticker_24h_stream = Ticker24hStream(testnet=use_testnet) - if await ticker_24h_stream.start(): - logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)") - else: - ticker_24h_stream = None - except Exception as e: - logger.debug(f"启动 24h ticker WS 失败(将使用 REST): {e}") - ticker_24h_stream = None - - # 3.2 启动 K线 WS 流(技术指标实时更新,减少 REST 超时) + # 3.0 市场 WS 多进程共用:选主 + 仅 Leader 建连接,非 Leader 从 Redis 读 + ticker_24h_stream = None kline_stream = None + book_ticker_stream = None + market_ws_refresh_tasks = [] + if getattr(client, "redis_cache", None): + try: + await client.redis_cache.connect() + except Exception: + pass try: + from .market_ws_leader import ( + use_shared_market_ws, + try_acquire_market_ws_leader, + is_market_ws_leader, + run_leader_renew_loop, + release_market_ws_leader, + stop_leader_renew_loop, + ) + from .ticker_24h_stream import Ticker24hStream, refresh_ticker_24h_from_redis_loop + from .book_ticker_stream import BookTickerStream, refresh_book_ticker_from_redis_loop from .kline_stream import KlineStream use_testnet = getattr(config, "USE_TESTNET", False) - kline_stream = KlineStream(testnet=use_testnet) - if await kline_stream.start(): - logger.info("✓ K线 WS 已启动(技术指标将优先使用 WS 缓存,减少超时)") - else: - kline_stream = None - except Exception as e: - logger.debug(f"启动 K线 WS 失败(将使用 REST): {e}") - kline_stream = None + redis_cache = getattr(client, "redis_cache", None) - # 3.3 启动最优挂单 WS 流(滑点估算,优化入场价格) - book_ticker_stream = None - try: - from .book_ticker_stream import BookTickerStream - use_testnet = getattr(config, "USE_TESTNET", False) - book_ticker_stream = BookTickerStream(testnet=use_testnet) - if await book_ticker_stream.start(): - logger.info("✓ 最优挂单 WS 已启动(用于滑点估算与价格优化)") + if use_shared_market_ws(redis_cache): + is_leader = await try_acquire_market_ws_leader(redis_cache) + if is_leader: + await run_leader_renew_loop(redis_cache) + try: + ticker_24h_stream = Ticker24hStream(testnet=use_testnet, redis_cache=redis_cache) + if await ticker_24h_stream.start(): + logger.info("✓ 24h ticker WS 已启动(Leader,写入 Redis 供多进程共用)") + else: + ticker_24h_stream = None + except Exception as e: + logger.debug(f"启动 24h ticker WS 失败: {e}") + try: + kline_stream = KlineStream(testnet=use_testnet, redis_cache=redis_cache) + if await kline_stream.start(): + logger.info("✓ K线 WS 已启动(Leader,写入 Redis 供多进程共用)") + else: + kline_stream = None + except Exception as e: + logger.debug(f"启动 K线 WS 失败: {e}") + try: + book_ticker_stream = BookTickerStream(testnet=use_testnet, redis_cache=redis_cache) + if await book_ticker_stream.start(): + logger.info("✓ 最优挂单 WS 已启动(Leader,写入 Redis 供多进程共用)") + else: + book_ticker_stream = None + except Exception as e: + logger.debug(f"启动 BookTicker WS 失败: {e}") + else: + logger.info("✓ 本进程为非 Leader,将仅从 Redis 读取 Ticker24h/BookTicker/K线 缓存") + t_refresh = asyncio.create_task(refresh_ticker_24h_from_redis_loop(redis_cache, 2.0)) + b_refresh = asyncio.create_task(refresh_book_ticker_from_redis_loop(redis_cache, 2.0)) + market_ws_refresh_tasks = [t_refresh, b_refresh] else: - book_ticker_stream = None + is_leader = True + try: + ticker_24h_stream = Ticker24hStream(testnet=use_testnet) + if await ticker_24h_stream.start(): + logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)") + else: + ticker_24h_stream = None + except Exception as e: + logger.debug(f"启动 24h ticker WS 失败: {e}") + try: + kline_stream = KlineStream(testnet=use_testnet) + if await kline_stream.start(): + logger.info("✓ K线 WS 已启动(技术指标将优先使用 WS 缓存)") + else: + kline_stream = None + except Exception as e: + logger.debug(f"启动 K线 WS 失败: {e}") + try: + book_ticker_stream = BookTickerStream(testnet=use_testnet) + if await book_ticker_stream.start(): + logger.info("✓ 最优挂单 WS 已启动(用于滑点估算与价格优化)") + else: + book_ticker_stream = None + except Exception as e: + logger.debug(f"启动 BookTicker WS 失败: {e}") except Exception as e: - logger.debug(f"启动最优挂单 WS 失败: {e}") - book_ticker_stream = None + logger.debug(f"市场 WS 启动异常: {e}") + ticker_24h_stream = kline_stream = book_ticker_stream = None # 4. 初始化各个模块 logger.info("初始化交易模块...") @@ -497,6 +545,21 @@ async def main(): logger.info("BookTicker Stream 已停止") except Exception as e: logger.debug(f"停止 BookTicker Stream 时异常: {e}") + for t in market_ws_refresh_tasks: + try: + t.cancel() + await t + except asyncio.CancelledError: + pass + except Exception: + pass + try: + from .market_ws_leader import is_market_ws_leader, release_market_ws_leader, stop_leader_renew_loop + if is_market_ws_leader() and client and getattr(client, "redis_cache", None): + release_market_ws_leader(client.redis_cache) + stop_leader_renew_loop() + except Exception: + pass if client: await client.disconnect() logger.info("程序已退出") diff --git a/trading_system/market_ws_leader.py b/trading_system/market_ws_leader.py new file mode 100644 index 0000000..e715e02 --- /dev/null +++ b/trading_system/market_ws_leader.py @@ -0,0 +1,136 @@ +""" +市场行情 WS 多进程/多账户共用:选主 + 共享缓存。 +Leader 进程负责维持 Ticker24h / BookTicker / KlineStream 三条连接并写入 Redis; +非 Leader 进程只从 Redis 读取,不建连接。需配合 Redis 使用。 +""" +import asyncio +import json +import logging +import os +import socket +import time +from typing import Optional, Any + +logger = logging.getLogger(__name__) + +# Redis 键 +KEY_LEADER = "market_ws_leader" +KEY_TICKER_24H = "market:ticker_24h" +KEY_BOOK_TICKER = "market:book_ticker" +KEY_KLINE_PREFIX = "market:kline:" +LEADER_TTL_SEC = 30 +LEADER_RENEW_INTERVAL_SEC = 15 + +_is_leader: bool = False +_leader_task: Optional[asyncio.Task] = None + + +def _leader_identity() -> dict: + return { + "pid": os.getpid(), + "host": socket.gethostname() or "unknown", + "ts": time.time(), + } + + +def is_market_ws_leader() -> bool: + """当前进程是否为市场 WS 的 Leader(仅本地标记,用于决定是否启动三条流)。""" + return _is_leader + + +async def try_acquire_market_ws_leader(redis_cache: Any) -> bool: + """ + 尝试成为市场 WS Leader。使用 Redis SET NX EX。 + 若成功,当前进程应启动 Ticker24h / BookTicker / KlineStream 并写入 Redis。 + """ + global _is_leader + if redis_cache is None or getattr(redis_cache, "redis", None) is None: + return False + try: + raw = await redis_cache.redis.set( + KEY_LEADER, + json.dumps(_leader_identity()), + ex=LEADER_TTL_SEC, + nx=True, + ) + _is_leader = raw is True + if _is_leader: + logger.info("✓ 本进程已当选市场 WS Leader(Ticker24h/BookTicker/Kline 共用连接)") + return _is_leader + except Exception as e: + logger.debug("尝试获取市场 WS Leader 失败: %s", e) + return False + + +async def renew_market_ws_leader(redis_cache: Any) -> bool: + """Leader 定期续期,避免 TTL 到期被抢。""" + if not _is_leader or redis_cache is None or getattr(redis_cache, "redis", None) is None: + return False + try: + await redis_cache.redis.set( + KEY_LEADER, + json.dumps(_leader_identity()), + ex=LEADER_TTL_SEC, + ) + return True + except Exception as e: + logger.debug("续期市场 WS Leader 失败: %s", e) + return False + + +def release_market_ws_leader(redis_cache: Any) -> None: + """主动释放 Leader(可选,便于其他进程尽快接管)。""" + global _is_leader + _is_leader = False + if redis_cache is None or getattr(redis_cache, "redis", None) is None: + return + try: + asyncio.get_event_loop().create_task(_delete_leader_key(redis_cache)) + except Exception: + pass + + +async def _delete_leader_key(redis_cache: Any) -> None: + try: + await redis_cache.redis.delete(KEY_LEADER) + except Exception: + pass + + +async def run_leader_renew_loop(redis_cache: Any) -> None: + """在 Leader 进程中运行的续期循环;非 Leader 直接返回。""" + global _leader_task + if not _is_leader: + return + if _leader_task is not None: + return + + async def _loop(): + while _is_leader: + await asyncio.sleep(LEADER_RENEW_INTERVAL_SEC) + if not _is_leader: + break + ok = await renew_market_ws_leader(redis_cache) + if not ok: + logger.warning("市场 WS Leader 续期失败,可能被其他进程接管") + + _leader_task = asyncio.create_task(_loop()) + logger.debug("市场 WS Leader 续期任务已启动") + + +def stop_leader_renew_loop() -> None: + global _leader_task + if _leader_task is not None: + _leader_task.cancel() + _leader_task = None + + +def use_shared_market_ws(redis_cache: Any) -> bool: + """是否启用「共用市场 WS」:有 Redis 且已配置启用时返回 True。""" + if redis_cache is None or getattr(redis_cache, "redis", None) is None: + return False + try: + from . import config + return bool(getattr(config, "USE_SHARED_MARKET_WS", True)) + except Exception: + return False diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py index 01a2299..d2ce962 100644 --- a/trading_system/ticker_24h_stream.py +++ b/trading_system/ticker_24h_stream.py @@ -1,6 +1,7 @@ """ 24 小时行情 WebSocket 流:订阅 !ticker@arr,维护全市场 ticker 缓存。 供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h,减少请求与超时。 +支持多进程共用:Leader 写 Redis,所有进程可通过 refresh_ticker_24h_from_redis_loop 从 Redis 更新本地缓存。 文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。 """ import asyncio @@ -11,6 +12,11 @@ from typing import Dict, Optional, Any logger = logging.getLogger(__name__) +try: + from .market_ws_leader import KEY_TICKER_24H +except ImportError: + KEY_TICKER_24H = "market:ticker_24h" + # 全市场 24h ticker 缓存:symbol -> { symbol, price, volume, changePercent, ts } _ticker_24h_cache: Dict[str, Dict[str, Any]] = {} _ticker_24h_updated_at: float = 0.0 @@ -34,10 +40,11 @@ def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool: class Ticker24hStream: - """订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。无需 listenKey,公开行情。""" + """订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。Leader 时可选写 Redis 供多进程读。""" - def __init__(self, testnet: bool = False): + def __init__(self, testnet: bool = False, redis_cache: Any = None): self.testnet = testnet + self._redis_cache = redis_cache self._ws = None self._task: Optional[asyncio.Task] = None self._running = False @@ -142,3 +149,36 @@ class Ticker24hStream: } _ticker_24h_updated_at = time.monotonic() logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对") + if self._redis_cache: + try: + loop = asyncio.get_event_loop() + copy = dict(_ticker_24h_cache) + loop.create_task(self._write_ticker_24h_to_redis(copy)) + except Exception as e: + logger.debug("Ticker24hStream: 写入 Redis 调度失败 %s", e) + + async def _write_ticker_24h_to_redis(self, data: Dict[str, Dict[str, Any]]) -> None: + try: + if self._redis_cache: + await self._redis_cache.set(KEY_TICKER_24H, data, ttl=120) + except Exception as e: + logger.debug("Ticker24hStream: 写入 Redis 失败 %s", e) + + +async def refresh_ticker_24h_from_redis_loop(redis_cache: Any, interval_sec: float = 2.0) -> None: + """非 Leader 或共用模式:定期从 Redis 拉取 24h ticker 到本地缓存。所有进程可调用。""" + global _ticker_24h_cache, _ticker_24h_updated_at + if redis_cache is None: + return + while True: + try: + await asyncio.sleep(interval_sec) + data = await redis_cache.get(KEY_TICKER_24H) + if data and isinstance(data, dict): + _ticker_24h_cache.update(data) + _ticker_24h_updated_at = time.monotonic() + logger.debug("Ticker24h: 从 Redis 刷新 %s 个交易对", len(data)) + except asyncio.CancelledError: + break + except Exception as e: + logger.debug("Ticker24h: 从 Redis 刷新失败 %s", e)