feat(config, market_scanner, strategy): 增强多账号支持与并发控制

在 `config.py` 中新增多账号扫描配置,支持并发数和错峰扫描设置。更新 `market_scanner.py` 以根据配置动态调整并发请求数,优化资源使用。修改 `strategy.py` 以实现多账号错峰扫描,避免低配服务器的 CPU 过载,提升系统稳定性和效率。
This commit is contained in:
薇薇安 2026-02-16 18:28:38 +08:00
parent 0fb42a5f24
commit 857128bca9
5 changed files with 136 additions and 12 deletions

View File

@ -0,0 +1,90 @@
# 多账号与低配服务器负载优化
针对 **2 CPU / 4 GB** 等低配服务器、多账号交易进程同时运行时的 CPU 打满问题,说明主要负载来源与可调参数。
---
## 一、负载从哪里来
### 1. 每个账号 = 一个独立进程
- 每个 `ATS_ACCOUNT_ID` 一个 Python 进程(如 4 个账号 = 4 个进程)。
- 每个进程内:**市场扫描**、**持仓同步**、**User Data Stream**、若为 Leader 还有 **Ticker/K线/BookTicker WS** 及写 Redis。
### 2. CPU 最重的部分:市场扫描
- **扫描阶段**:对「初步筛选」后的几十个交易对,**并发**做「拉 K 线 + 算技术指标RSI/MACD/布林/ATR/EMA」。
- 指标是纯 Python 循环,**每个 symbol 算 5+ 个指标**,单次扫描就有大量计算。
- **并发度**:由 `SCAN_CONCURRENT_SYMBOLS` 控制「同时分析几个 symbol」默认已改为 **2**(原 3
- **多进程叠加**:若 4 个进程**同时**开始扫描,相当于 4 × 2 = **8 路**同时在算指标2 核很容易打满。
因此:**扫描并发**和**多账号是否同时扫**是 CPU 的两大主要来源。
### 3. 其他相对较轻的部分
| 模块 | 说明 | 负载 |
|------|------|------|
| User Data Stream | 每进程 1 个 WS收订单/持仓/余额 | 低 |
| 市场 WSTicker/K线/Book | 仅 **Leader 进程**建连接并写 Redis其余进程只从 Redis 读 | Leader 中,其余低 |
| Redis 刷新 | 每 2 秒从 Redis 拉 Ticker/Book 到本地 | 低 |
| 持仓同步 | 每 `POSITION_SYNC_INTERVAL`(如 60s一次 REST 同步 | 低~中(与持仓数有关) |
| 配置重载 | 每轮扫描前从 Redis 重载配置 | 低 |
| exchange_info / 行情 API | 已优先走 DB/Redis 缓存,未命中才请求 API | 中(主要在未命中时) |
---
## 二、已做的可调项(直接生效)
### 1. 扫描并发:`SCAN_CONCURRENT_SYMBOLS`
- **含义**:单进程内,**同时**分析多少个交易对K 线 + 指标)。
- **默认**`2`(适合 2 CPU 4G、多账号
- **建议**
- 2 CPU 4G、多账号**2**
- 单账号或 4 CPU+:可试 **35**
- **配置**:在 `trading_config``config.py``SCAN_CONCURRENT_SYMBOLS` 中设置数字110
### 2. 多账号错峰:`SCAN_STAGGER_BY_ACCOUNT` + `SCAN_STAGGER_SEC`
- **含义**:按 `ATS_ACCOUNT_ID` 延迟**首次**扫描开始时间,避免多进程在同一时刻一起扫。
- **默认**`SCAN_STAGGER_BY_ACCOUNT=True``SCAN_STAGGER_SEC=60`。
- **效果**:例如 4 个账号,则 account 1 立即扫2 延迟 60s3 延迟 120s4 延迟 180s之后仍按各自 `SCAN_INTERVAL` 循环,自然错开。
- **配置**
- `SCAN_STAGGER_BY_ACCOUNT``true`/`false`
- `SCAN_STAGGER_SEC`:每多一个账号增加的延迟秒数(建议 60120
---
## 三、还可调节的项(按需)
| 配置项 | 说明 | 建议2 CPU 4G |
|--------|------|------------------|
| **SCAN_INTERVAL** | 扫描间隔(秒) | 90015 分钟)或 180030 分钟),减少扫描频率 |
| **MAX_SCAN_SYMBOLS** | 参与扫描的最大交易对数 | 200300减少进入「详细分析」的 symbol 数 |
| **MIN_CHANGE_PERCENT** / **MIN_VOLUME_24H** | 初步筛选更严 | 略提高可减少 pre_filtered 数量,从而减少指标计算量 |
| **POSITION_SYNC_INTERVAL** | 持仓同步间隔(秒) | 60 或 120略增可减 REST 调用 |
| **同一台机账号数** | 2 CPU 4G 上跑的进程数 | 建议 ≤ 34若仍卡可先跑 2 个账号 |
---
## 四、推荐组合2 CPU 4G、多账号
- `SCAN_CONCURRENT_SYMBOLS` = **2**
- `SCAN_STAGGER_BY_ACCOUNT` = **true**
- `SCAN_STAGGER_SEC` = **60**
- `SCAN_INTERVAL` = **900**(或 1800 若可接受更慢扫描)
- `MAX_SCAN_SYMBOLS` = **200****300**(可选)
若仍 CPU 偏高,可再:
- 将 `SCAN_CONCURRENT_SYMBOLS` 降为 **1**,或
- 同一台机器只跑 **2 个**账号,其余账号迁到另一台或扩容后再加。
---
## 五、如何确认是否生效
- 看日志:
- 多账号错峰应出现类似「多账号错峰account_id=2延迟 60 秒后开始首次扫描」。
- 扫描并发:无单独日志,可通过「扫描耗时」和 CPU 曲线判断。
- 用 `top`/`htop` 观察:多进程同时扫描时 CPU 应比「错峰 + 并发 2」时明显更低、更平稳。

View File

@ -215,6 +215,10 @@ DEFAULT_TRADING_CONFIG = {
'SCAN_INTERVAL': 900, # 扫描间隔15分钟900秒快速验证模式提高扫描频率以增加交易机会
'SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC': 12, # 单个交易对「详细分析」超时(秒);已并行拉取主周期/确认周期K线12秒通常够用网络慢可调大18~25
'SCAN_KLINE_FETCH_TIMEOUT_SEC': 8, # K线拉取单独超时超时则返回降级结果仅涨跌幅/成交量),不拖满整分析超时
# 多账号/低配服务器(如 2 CPU 4G降低并发与错峰扫描避免 CPU 打满
'SCAN_CONCURRENT_SYMBOLS': 2, # 扫描时同时分析多少个交易对2 CPU 4G 多账号建议 2单账号可 35
'SCAN_STAGGER_BY_ACCOUNT': True, # 多账号时按 account_id 错峰首次扫描,避免多进程同时扫
'SCAN_STAGGER_SEC': 60, # 每多一个账号延迟的秒数account_id-1 * 此值),如 4 账号则 0/60/120/180 秒后开扫
'KLINE_INTERVAL': '1h',
'PRIMARY_INTERVAL': '4h', # 主周期4小时过滤噪音
'CONFIRM_INTERVAL': '1d', # 确认周期日线,看大趋势

View File

@ -121,13 +121,33 @@ logger = logging.getLogger(__name__)
def _asyncio_exception_handler(loop, context):
"""自定义 asyncio 异常处理器:将 WebSocket 关闭时的 ping 写入异常降级为 DEBUG避免刷 ERROR。"""
"""
自定义 asyncio 异常处理器 WebSocket 关闭时的 ping 写入异常静默处理
避免刷Task exception was never retrieved ConnectionResetError 堆栈
"""
exc = context.get("exception")
if exc is not None and isinstance(exc, ConnectionResetError):
msg = str(exc).lower()
if "closing transport" in msg or "cannot write" in msg:
msg = str(exc).lower() if exc else ""
# 1) WebSocket 关闭时 aiohttp 的 ping 往已关闭的 transport 写会抛 ConnectionResetError / OSError
if exc is not None and msg:
if "closing transport" in msg or "cannot write to closing transport" in msg:
logger.debug("WebSocket 连接关闭时 ping 已结束(可忽略): %s", exc)
return
if isinstance(exc, (ConnectionResetError, OSError, BrokenPipeError)) and "write" in msg:
# 其他「连接已关仍写入」类错误也视为可忽略
logger.debug("连接关闭时写入被拒绝(可忽略): %s", exc)
return
# 2) 根据任务 coroutine 判断aiohttp WebSocketWriter.ping 产生的未检索异常
future = context.get("future")
if future is not None and exc is not None:
try:
coro = getattr(future, "get_coro", lambda: getattr(future, "_coro", None))()
if coro is not None:
qual = (getattr(coro, "__qualname__", "") or getattr(coro, "__name__", "")) or ""
if "ping" in qual.lower() and "websocket" in qual.lower():
logger.debug("WebSocket ping 任务异常(连接已关闭,可忽略): %s", exc)
return
except Exception:
pass
# 其他「Task exception was never retrieved」按 asyncio 默认方式记录
asyncio_logger = logging.getLogger("asyncio")
asyncio_logger.error("Task exception was never retrieved")

View File

@ -149,13 +149,13 @@ class MarketScanner:
logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e)
# 只对符合条件的交易对进行详细分析获取K线和技术指标
# ⚠️ 并发数说明:
# - 这是单个账户扫描时,同时分析多少个交易对(不是用户进程数)
# - 并发数5单用户时扫描更快15-25秒
# - 并发数3多用户时系统更稳定4个账户最多12个并发请求
# - 如果只有一个账户建议保持5如果后续增加用户可以降低到3
# - 由于中间数据K线、技术指标已经缓存实际API请求会大大减少
semaphore = asyncio.Semaphore(3) # 最多5个并发请求单用户建议5多用户建议3
# 并发数由 SCAN_CONCURRENT_SYMBOLS 控制2 CPU 4G 多账号建议 2单账号可 35
concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2)
try:
concurrent = max(1, min(10, int(concurrent)))
except (TypeError, ValueError):
concurrent = 2
semaphore = asyncio.Semaphore(concurrent)
analysis_timeout = float(cfg.get('SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC', 18) or 18)
if analysis_timeout < 5:

View File

@ -75,7 +75,17 @@ class TradingStrategy:
# 启动定期同步任务(独立于市场扫描)
self._sync_task = asyncio.create_task(self._periodic_sync_positions())
logger.info("定期持仓同步任务已启动")
# 多账号错峰:避免多进程同时扫描导致 CPU 打满2 CPU 4G 等低配服务器)
import os
stagger_enabled = config.TRADING_CONFIG.get('SCAN_STAGGER_BY_ACCOUNT', False)
stagger_sec = int(config.TRADING_CONFIG.get('SCAN_STAGGER_SEC', 60) or 60)
account_id = int(os.getenv('ATS_ACCOUNT_ID') or os.getenv('ACCOUNT_ID') or 1)
if stagger_enabled and account_id > 1 and stagger_sec > 0:
delay = (account_id - 1) * stagger_sec
logger.info(f"多账号错峰account_id={account_id},延迟 {delay} 秒后开始首次扫描")
await asyncio.sleep(delay)
try:
while self.running:
# 0. 定期从Redis重新加载配置确保配置修改能即时生效