From 857128bca981019b52a3ee9dcaa9c15eeb60cef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Mon, 16 Feb 2026 18:28:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(config,=20market=5Fscanner,=20strategy):?= =?UTF-8?q?=20=E5=A2=9E=E5=BC=BA=E5=A4=9A=E8=B4=A6=E5=8F=B7=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=B8=8E=E5=B9=B6=E5=8F=91=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `config.py` 中新增多账号扫描配置,支持并发数和错峰扫描设置。更新 `market_scanner.py` 以根据配置动态调整并发请求数,优化资源使用。修改 `strategy.py` 以实现多账号错峰扫描,避免低配服务器的 CPU 过载,提升系统稳定性和效率。 --- docs/多账号与低配服务器优化.md | 90 +++++++++++++++++++++++ trading_system/config.py | 4 + trading_system/main.py | 28 ++++++- trading_system/market_scanner.py | 14 ++-- trading_system/strategy.py | 12 ++- 5 files changed, 136 insertions(+), 12 deletions(-) create mode 100644 docs/多账号与低配服务器优化.md diff --git a/docs/多账号与低配服务器优化.md b/docs/多账号与低配服务器优化.md new file mode 100644 index 0000000..b19cc6d --- /dev/null +++ b/docs/多账号与低配服务器优化.md @@ -0,0 +1,90 @@ +# 多账号与低配服务器负载优化 + +针对 **2 CPU / 4 GB** 等低配服务器、多账号交易进程同时运行时的 CPU 打满问题,说明主要负载来源与可调参数。 + +--- + +## 一、负载从哪里来 + +### 1. 每个账号 = 一个独立进程 + +- 每个 `ATS_ACCOUNT_ID` 一个 Python 进程(如 4 个账号 = 4 个进程)。 +- 每个进程内:**市场扫描**、**持仓同步**、**User Data Stream**、若为 Leader 还有 **Ticker/K线/BookTicker WS** 及写 Redis。 + +### 2. CPU 最重的部分:市场扫描 + +- **扫描阶段**:对「初步筛选」后的几十个交易对,**并发**做「拉 K 线 + 算技术指标(RSI/MACD/布林/ATR/EMA)」。 +- 指标是纯 Python 循环,**每个 symbol 算 5+ 个指标**,单次扫描就有大量计算。 +- **并发度**:由 `SCAN_CONCURRENT_SYMBOLS` 控制「同时分析几个 symbol」;默认已改为 **2**(原 3)。 +- **多进程叠加**:若 4 个进程**同时**开始扫描,相当于 4 × 2 = **8 路**同时在算指标,2 核很容易打满。 + +因此:**扫描并发**和**多账号是否同时扫**是 CPU 的两大主要来源。 + +### 3. 其他相对较轻的部分 + +| 模块 | 说明 | 负载 | +|------|------|------| +| User Data Stream | 每进程 1 个 WS,收订单/持仓/余额 | 低 | +| 市场 WS(Ticker/K线/Book) | 仅 **Leader 进程**建连接并写 Redis,其余进程只从 Redis 读 | Leader 中,其余低 | +| Redis 刷新 | 每 2 秒从 Redis 拉 Ticker/Book 到本地 | 低 | +| 持仓同步 | 每 `POSITION_SYNC_INTERVAL`(如 60s)一次 REST 同步 | 低~中(与持仓数有关) | +| 配置重载 | 每轮扫描前从 Redis 重载配置 | 低 | +| exchange_info / 行情 API | 已优先走 DB/Redis 缓存,未命中才请求 API | 中(主要在未命中时) | + +--- + +## 二、已做的可调项(直接生效) + +### 1. 扫描并发:`SCAN_CONCURRENT_SYMBOLS` + +- **含义**:单进程内,**同时**分析多少个交易对(K 线 + 指标)。 +- **默认**:`2`(适合 2 CPU 4G、多账号)。 +- **建议**: + - 2 CPU 4G、多账号:**2**; + - 单账号或 4 CPU+:可试 **3~5**。 +- **配置**:在 `trading_config` 或 `config.py` 的 `SCAN_CONCURRENT_SYMBOLS` 中设置(数字,1~10)。 + +### 2. 多账号错峰:`SCAN_STAGGER_BY_ACCOUNT` + `SCAN_STAGGER_SEC` + +- **含义**:按 `ATS_ACCOUNT_ID` 延迟**首次**扫描开始时间,避免多进程在同一时刻一起扫。 +- **默认**:`SCAN_STAGGER_BY_ACCOUNT=True`,`SCAN_STAGGER_SEC=60`。 +- **效果**:例如 4 个账号,则 account 1 立即扫,2 延迟 60s,3 延迟 120s,4 延迟 180s;之后仍按各自 `SCAN_INTERVAL` 循环,自然错开。 +- **配置**: + - `SCAN_STAGGER_BY_ACCOUNT`:`true`/`false`; + - `SCAN_STAGGER_SEC`:每多一个账号增加的延迟秒数(建议 60~120)。 + +--- + +## 三、还可调节的项(按需) + +| 配置项 | 说明 | 建议(2 CPU 4G) | +|--------|------|------------------| +| **SCAN_INTERVAL** | 扫描间隔(秒) | 900(15 分钟)或 1800(30 分钟),减少扫描频率 | +| **MAX_SCAN_SYMBOLS** | 参与扫描的最大交易对数 | 200~300,减少进入「详细分析」的 symbol 数 | +| **MIN_CHANGE_PERCENT** / **MIN_VOLUME_24H** | 初步筛选更严 | 略提高可减少 pre_filtered 数量,从而减少指标计算量 | +| **POSITION_SYNC_INTERVAL** | 持仓同步间隔(秒) | 60 或 120,略增可减 REST 调用 | +| **同一台机账号数** | 2 CPU 4G 上跑的进程数 | 建议 ≤ 3~4;若仍卡可先跑 2 个账号 | + +--- + +## 四、推荐组合(2 CPU 4G、多账号) + +- `SCAN_CONCURRENT_SYMBOLS` = **2** +- `SCAN_STAGGER_BY_ACCOUNT` = **true** +- `SCAN_STAGGER_SEC` = **60** +- `SCAN_INTERVAL` = **900**(或 1800 若可接受更慢扫描) +- `MAX_SCAN_SYMBOLS` = **200**~**300**(可选) + +若仍 CPU 偏高,可再: + +- 将 `SCAN_CONCURRENT_SYMBOLS` 降为 **1**,或 +- 同一台机器只跑 **2 个**账号,其余账号迁到另一台或扩容后再加。 + +--- + +## 五、如何确认是否生效 + +- 看日志: + - 多账号错峰:应出现类似「多账号错峰:account_id=2,延迟 60 秒后开始首次扫描」。 + - 扫描并发:无单独日志,可通过「扫描耗时」和 CPU 曲线判断。 +- 用 `top`/`htop` 观察:多进程同时扫描时 CPU 应比「错峰 + 并发 2」时明显更低、更平稳。 diff --git a/trading_system/config.py b/trading_system/config.py index f65675b..ee53e93 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -215,6 +215,10 @@ DEFAULT_TRADING_CONFIG = { 'SCAN_INTERVAL': 900, # 扫描间隔15分钟(900秒),快速验证模式:提高扫描频率以增加交易机会 'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线,12秒通常够用;网络慢可调大(18~25) 'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时(秒);超时则返回降级结果(仅涨跌幅/成交量),不拖满整分析超时 + # 多账号/低配服务器(如 2 CPU 4G):降低并发与错峰扫描,避免 CPU 打满 + 'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对(2 CPU 4G 多账号建议 2,单账号可 3~5) + 'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时按 account_id 错峰首次扫描,避免多进程同时扫 + 'SCAN_STAGGER_SEC': 60, # 每多一个账号延迟的秒数(account_id-1 * 此值),如 4 账号则 0/60/120/180 秒后开扫 'KLINE_INTERVAL': '1h', 'PRIMARY_INTERVAL': '4h', # 主周期4小时,过滤噪音 'CONFIRM_INTERVAL': '1d', # 确认周期日线,看大趋势 diff --git a/trading_system/main.py b/trading_system/main.py index e09ea55..6b04914 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -121,13 +121,33 @@ logger = logging.getLogger(__name__) def _asyncio_exception_handler(loop, context): - """自定义 asyncio 异常处理器:将 WebSocket 关闭时的 ping 写入异常降级为 DEBUG,避免刷 ERROR。""" + """ + 自定义 asyncio 异常处理器:将 WebSocket 关闭时的 ping 写入异常静默处理, + 避免刷「Task exception was never retrieved」和 ConnectionResetError 堆栈。 + """ exc = context.get("exception") - if exc is not None and isinstance(exc, ConnectionResetError): - msg = str(exc).lower() - if "closing transport" in msg or "cannot write" in msg: + msg = str(exc).lower() if exc else "" + # 1) WebSocket 关闭时 aiohttp 的 ping 往已关闭的 transport 写会抛 ConnectionResetError / OSError + if exc is not None and msg: + if "closing transport" in msg or "cannot write to closing transport" in msg: logger.debug("WebSocket 连接关闭时 ping 已结束(可忽略): %s", exc) return + if isinstance(exc, (ConnectionResetError, OSError, BrokenPipeError)) and "write" in msg: + # 其他「连接已关仍写入」类错误也视为可忽略 + logger.debug("连接关闭时写入被拒绝(可忽略): %s", exc) + return + # 2) 根据任务 coroutine 判断:aiohttp WebSocketWriter.ping 产生的未检索异常 + future = context.get("future") + if future is not None and exc is not None: + try: + coro = getattr(future, "get_coro", lambda: getattr(future, "_coro", None))() + if coro is not None: + qual = (getattr(coro, "__qualname__", "") or getattr(coro, "__name__", "")) or "" + if "ping" in qual.lower() and "websocket" in qual.lower(): + logger.debug("WebSocket ping 任务异常(连接已关闭,可忽略): %s", exc) + return + except Exception: + pass # 其他「Task exception was never retrieved」按 asyncio 默认方式记录 asyncio_logger = logging.getLogger("asyncio") asyncio_logger.error("Task exception was never retrieved") diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 29b1f6a..50e6553 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -149,13 +149,13 @@ class MarketScanner: logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e) # 只对符合条件的交易对进行详细分析(获取K线和技术指标) - # ⚠️ 并发数说明: - # - 这是单个账户扫描时,同时分析多少个交易对(不是用户进程数) - # - 并发数5:单用户时扫描更快(15-25秒) - # - 并发数3:多用户时系统更稳定(4个账户最多12个并发请求) - # - 如果只有一个账户,建议保持5;如果后续增加用户,可以降低到3 - # - 由于中间数据(K线、技术指标)已经缓存,实际API请求会大大减少 - semaphore = asyncio.Semaphore(3) # 最多5个并发请求(单用户建议5,多用户建议3) + # 并发数由 SCAN_CONCURRENT_SYMBOLS 控制:2 CPU 4G 多账号建议 2,单账号可 3~5 + concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2) + try: + concurrent = max(1, min(10, int(concurrent))) + except (TypeError, ValueError): + concurrent = 2 + semaphore = asyncio.Semaphore(concurrent) analysis_timeout = float(cfg.get('SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC', 18) or 18) if analysis_timeout < 5: diff --git a/trading_system/strategy.py b/trading_system/strategy.py index ca759a5..ebcbefd 100644 --- a/trading_system/strategy.py +++ b/trading_system/strategy.py @@ -75,7 +75,17 @@ class TradingStrategy: # 启动定期同步任务(独立于市场扫描) self._sync_task = asyncio.create_task(self._periodic_sync_positions()) logger.info("定期持仓同步任务已启动") - + + # 多账号错峰:避免多进程同时扫描导致 CPU 打满(2 CPU 4G 等低配服务器) + import os + stagger_enabled = config.TRADING_CONFIG.get('SCAN_STAGGER_BY_ACCOUNT', False) + stagger_sec = int(config.TRADING_CONFIG.get('SCAN_STAGGER_SEC', 60) or 60) + account_id = int(os.getenv('ATS_ACCOUNT_ID') or os.getenv('ACCOUNT_ID') or 1) + if stagger_enabled and account_id > 1 and stagger_sec > 0: + delay = (account_id - 1) * stagger_sec + logger.info(f"多账号错峰:account_id={account_id},延迟 {delay} 秒后开始首次扫描") + await asyncio.sleep(delay) + try: while self.running: # 0. 定期从Redis重新加载配置(确保配置修改能即时生效)