diff --git a/backend/检查内存问题.sh b/backend/检查内存问题.sh new file mode 100755 index 0000000..a3c9993 --- /dev/null +++ b/backend/检查内存问题.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# 检查交易服务内存问题 + +echo "=== 交易服务内存问题诊断 ===" +echo "" + +# 1. 查看交易服务进程的详细内存信息 +echo "📊 交易服务进程内存详情:" +TRADING_PID=$(ps aux | grep "trading_system.main" | grep -v grep | awk '{print $2}') +if [ -z "$TRADING_PID" ]; then + echo " ⚠️ 未找到交易服务进程" + exit 1 +fi + +echo "进程 PID: $TRADING_PID" +ps -p $TRADING_PID -o pid,vsz,rss,%mem,cmd +echo "" + +# 2. 查看进程的内存映射(找出占用大的区域) +echo "📈 进程内存映射(前 20 行,按大小排序):" +if [ -f "/proc/$TRADING_PID/smaps" ]; then + cat /proc/$TRADING_PID/smaps 2>/dev/null | awk '/^Size:/ {size=$2} /^Rss:/ {rss=$2} /^Pss:/ {pss=$2} /^Name:/ {if (rss > 1024) print size" KB (RSS: "rss" KB) - " $2}' | sort -rn | head -20 || echo " 无法读取内存映射(需要 root 权限)" +else + echo " 无法访问 /proc/$TRADING_PID/smaps" +fi +echo "" + +# 3. 查看交易服务日志中的内存相关错误 +echo "🔍 检查交易服务日志:" +LOG_DIRS=( + "../trading_system/logs" + "logs" + "/www/wwwroot/autosys_new/trading_system/logs" +) + +for LOG_DIR in "${LOG_DIRS[@]}"; do + if [ -d "$LOG_DIR" ]; then + echo "检查目录: $LOG_DIR" + # 查找内存相关错误 + find "$LOG_DIR" -name "*.log" -type f -mtime -1 2>/dev/null | while read logfile; do + echo " 文件: $logfile" + # 查找内存错误 + grep -i "memory\|oom\|out of memory\|memoryerror\|memory leak" "$logfile" 2>/dev/null | tail -5 || echo " 未找到内存相关错误" + # 查找最近的错误 + tail -50 "$logfile" 2>/dev/null | grep -i "error\|exception\|failed" | tail -5 || echo " 未找到错误" + done + break + fi +done +echo "" + +# 4. 查看系统内存压力 +echo "💾 系统内存压力:" +free -h +echo "" +echo "内存使用率:" +free | awk 'NR==2{printf "已用: %.1f%%\n", $3*100/$2}' +echo "" + +# 5. 检查是否有 swap 使用(如果有说明内存不足) +echo "🔄 Swap 使用情况:" +free | awk 'NR==3{if ($3 > 0) print "⚠️ Swap 正在使用: " $3 " KB (内存不足)"; else print "✓ Swap 未使用"}' +echo "" + +# 6. 查看最近的交易服务输出 +echo "📝 最近的交易服务输出(最后 30 行):" +for LOG_DIR in "${LOG_DIRS[@]}"; do + if [ -d "$LOG_DIR" ]; then + find "$LOG_DIR" -name "trading_*.log" -o -name "*.out.log" -type f 2>/dev/null | head -1 | while read logfile; do + if [ -f "$logfile" ]; then + tail -30 "$logfile" 2>/dev/null + break + fi + done + break + fi +done +echo "" + +echo "=== 诊断完成 ===" +echo "" +echo "💡 可能的原因:" +echo " 1. K线数据缓存过大(market_scanner 加载了太多历史K线)" +echo " 2. 持仓数据或订单数据在内存中累积" +echo " 3. WebSocket 连接或消息队列占用过多内存" +echo " 4. 数据库查询结果集太大(未使用 LIMIT)" +echo " 5. 内存泄漏(某个数据结构不断增长)" +echo "" +echo "💡 临时解决方案:" +echo " 1. 重启交易服务(释放内存)" +echo " 2. 检查配置中的缓存大小限制" +echo " 3. 减少扫描的交易对数量" +echo " 4. 检查是否有大量未关闭的数据库连接" diff --git a/backend/诊断负载.sh b/backend/诊断负载.sh index 983b9cd..ec7c129 100755 --- a/backend/诊断负载.sh +++ b/backend/诊断负载.sh @@ -21,16 +21,26 @@ echo "" # 4. 查看 Python 进程(交易服务) echo "🐍 Python 进程(交易服务):" -ps aux | grep -E "python.*trading|python.*main|uvicorn" | grep -v grep | awk '{printf "PID: %-6s CPU: %-5s MEM: %-5s CMD: %s\n", $2, $3"%", $4"%", $11" "$12" "$13" "$14}' +PYTHON_PROCS=$(ps aux | grep -E "python.*trading|python.*main|uvicorn" | grep -v grep) +if [ -z "$PYTHON_PROCS" ]; then + echo " ⚠️ 未发现交易服务进程(服务可能未运行)" +else + echo "$PYTHON_PROCS" | awk '{printf "PID: %-6s CPU: %-5s MEM: %-5s CMD: %s\n", $2, $3"%", $4"%", $11" "$12" "$13" "$14}' +fi echo "" # 5. 检查是否有同步操作在运行 echo "🔄 检查同步操作:" if [ -f "logs/api.log" ]; then - echo "最近的同步日志(最后 10 行):" - tail -100 logs/api.log | grep -i "同步\|sync.*binance\|sync_trades" | tail -10 || echo " 未找到同步日志" + SYNC_LOGS=$(tail -100 logs/api.log | grep -i "同步\|sync.*binance\|sync_trades" | tail -10) + if [ -z "$SYNC_LOGS" ]; then + echo " 未找到同步日志(可能未执行同步操作)" + else + echo "最近的同步日志(最后 10 行):" + echo "$SYNC_LOGS" + fi else - echo " 日志文件不存在" + echo " ⚠️ 日志文件不存在(backend 服务可能未运行)" fi echo "" @@ -74,17 +84,26 @@ echo "" echo "=== 诊断完成 ===" echo "" -echo "💡 建议:" -echo " 1. 如果 CPU 占用高的是 Python 进程,可能是:" +echo "💡 说明:" +echo " - 此脚本可以在交易服务未运行时使用,用于检查系统整体负载" +echo " - 如果交易服务正在运行,会显示更详细的进程和日志信息" +echo "" +echo "💡 如果负载高,可能原因:" +echo " 1. Python 进程(交易服务)占用高:" echo " - 市场扫描正在运行(计算技术指标)" echo " - 订单同步正在运行(从币安拉取大量订单)" echo " - 数据库查询慢(检查慢查询日志)" echo "" -echo " 2. 如果内存占用高,检查是否有内存泄漏" +echo " 2. 其他进程占用高:" +echo " - 检查 top/htop 查看具体是哪个进程" +echo " - 可能是系统更新、备份等后台任务" echo "" -echo " 3. 如果 I/O 等待高,可能是数据库查询慢或磁盘慢" +echo " 3. 内存占用高:检查是否有内存泄漏" echo "" -echo " 4. 临时降负载方法:" -echo " - 暂停市场扫描(在配置中设置 SCAN_ENABLED=False)" -echo " - 等待同步操作完成(不要手动取消)" -echo " - 重启交易服务(如果进程异常)" +echo " 4. I/O 等待高:可能是数据库查询慢或磁盘慢" +echo "" +echo "💡 临时降负载方法:" +echo " - 暂停市场扫描(在配置中设置 SCAN_ENABLED=False)" +echo " - 等待同步操作完成(不要手动取消)" +echo " - 重启交易服务(如果进程异常)" +echo " - 降低扫描并发(设置 SCAN_CONCURRENT_SYMBOLS=1)" diff --git a/docs/内存问题排查与优化.md b/docs/内存问题排查与优化.md new file mode 100644 index 0000000..51c12f9 --- /dev/null +++ b/docs/内存问题排查与优化.md @@ -0,0 +1,129 @@ +# 内存问题排查与优化指南 + +## 问题症状 + +- **交易服务启动后卡死** +- **内存占用过高**:交易服务进程占用 55%+ 内存(2.1GB+) +- **系统可用内存不足**:只剩 1GB 左右 + +## 快速诊断 + +运行内存诊断脚本: +```bash +cd backend +./检查内存问题.sh +``` + +## 已做的优化(本次) + +### 1. K 线缓存清理机制 + +**问题**:`_kline_cache` 全局字典会无限增长,每个 symbol 的 K 线数据一直保留在内存中。 + +**优化**: +- 添加缓存大小限制:最多保留 **200 个** (symbol, interval) 的缓存 +- 定期清理:每 **5 分钟**清理一次过期缓存(超过 10 分钟未更新的) +- 自动触发:当缓存达到 80% 容量时立即清理 +- 优先清理:按更新时间排序,保留最新的,清理最旧的 + +**效果**:防止 K 线缓存无限增长,减少内存占用。 + +## 临时解决方案 + +### 方法 1:重启交易服务(立即生效) + +```bash +# 通过 supervisor +supervisorctl restart auto_sys_* + +# 或手动 +pkill -f "python.*trading_system.main" +# 然后重新启动 +``` + +**注意**:重启会释放内存,但问题可能再次出现。 + +### 方法 2:减少扫描的交易对数量 + +在配置中设置: +```json +{ + "SCAN_MAX_SYMBOLS": 20 +} +``` + +限制每次扫描最多分析 20 个交易对,减少 K 线缓存占用。 + +### 方法 3:降低扫描频率 + +```json +{ + "SCAN_INTERVAL_SEC": 600 +} +``` + +将扫描间隔从默认值增加到 10 分钟,减少缓存更新频率。 + +### 方法 4:禁用 K 线 WebSocket(使用 REST) + +如果内存问题持续,可以禁用 K 线 WebSocket,改用 REST API(但会增加 API 调用频率): + +在配置中设置: +```json +{ + "USE_SHARED_MARKET_WS": false +} +``` + +## 长期优化建议 + +### 1. 监控内存使用 + +定期检查交易服务进程的内存占用: +```bash +ps aux | grep "trading_system.main" | awk '{print "PID:", $2, "MEM:", $4"%", "RSS:", $6/1024"MB"}' +``` + +### 2. 限制订阅数量 + +确保 `SCAN_MAX_SYMBOLS` 不要太大(建议 ≤ 30),避免订阅过多 symbol。 + +### 3. 检查其他内存占用源 + +除了 K 线缓存,还要检查: +- **持仓数据缓存**:`_position_updates_cache` +- **余额缓存**:`_balance_updates_cache` +- **数据库连接池**:确保连接及时释放 +- **WebSocket 消息队列**:避免消息堆积 + +### 4. 考虑升级服务器 + +如果内存问题持续且无法通过优化解决,考虑: +- 升级到 **4 CPU / 8GB** 服务器 +- 或使用 **Redis** 存储 K 线缓存(而不是进程内存) + +## 正常内存范围 + +对于 **2 CPU 4GB** 服务器: + +- **Backend 服务**:100-200MB(正常) +- **交易服务(空闲)**:300-500MB(正常) +- **交易服务(扫描中)**:500-800MB(可接受) +- **交易服务(内存泄漏)**:> 1.5GB(异常,需要排查) + +当前 **2.1GB** 明显异常,需要排查。 + +## 排查步骤 + +1. **运行内存诊断脚本**:`./检查内存问题.sh` +2. **查看交易服务日志**:查找内存相关错误 +3. **检查缓存大小**:确认 K 线缓存是否过多 +4. **检查订阅数量**:确认是否订阅了过多 symbol +5. **检查数据库查询**:确认是否有大量未释放的连接 + +## 预防措施 + +1. **定期重启**:每天或每周重启一次交易服务,释放内存 +2. **监控告警**:设置内存使用率告警(> 80%) +3. **限制配置**:确保 `SCAN_MAX_SYMBOLS` 和 `SCAN_CONCURRENT_SYMBOLS` 合理 +4. **定期检查**:每周运行一次内存诊断脚本 diff --git a/trading_system/kline_stream.py b/trading_system/kline_stream.py index 1eed6f4..261247e 100644 --- a/trading_system/kline_stream.py +++ b/trading_system/kline_stream.py @@ -21,6 +21,10 @@ except ImportError: _kline_cache: Dict[Tuple[str, str], List[List]] = {} _kline_cache_updated_at: Dict[Tuple[str, str], float] = {} _kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit +# ⚠️ 内存优化:限制缓存总大小,避免内存无限增长(2 CPU 4G 服务器) +_MAX_CACHE_ENTRIES = 200 # 最多保留 200 个 (symbol, interval) 的缓存 +_CACHE_CLEANUP_INTERVAL_SEC = 300 # 每 5 分钟清理一次过期缓存 +_CACHE_MAX_AGE_SEC = 600 # 缓存超过 10 分钟未更新则清理 def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]: @@ -42,6 +46,42 @@ def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) return (time.monotonic() - updated) <= max_age_sec +def _cleanup_stale_kline_cache(max_age_sec: float = None): + """ + 清理过期的 K 线缓存,防止内存无限增长。 + 优先清理最久未更新的缓存。 + """ + global _kline_cache, _kline_cache_updated_at, _kline_cache_limit + if max_age_sec is None: + max_age_sec = _CACHE_MAX_AGE_SEC + + now = time.monotonic() + to_remove = [] + + # 找出过期的缓存 + for key, updated_at in _kline_cache_updated_at.items(): + if (now - updated_at) > max_age_sec: + to_remove.append(key) + + # 如果缓存条目过多,清理最久未更新的(即使未过期) + if len(_kline_cache) > _MAX_CACHE_ENTRIES: + # 按更新时间排序,保留最新的 _MAX_CACHE_ENTRIES 个 + sorted_keys = sorted(_kline_cache_updated_at.items(), key=lambda x: x[1], reverse=True) + keep_keys = {k for k, _ in sorted_keys[:_MAX_CACHE_ENTRIES]} + for key in list(_kline_cache.keys()): + if key not in keep_keys: + to_remove.append(key) + + # 清理 + for key in to_remove: + _kline_cache.pop(key, None) + _kline_cache_updated_at.pop(key, None) + _kline_cache_limit.pop(key, None) + + if to_remove: + logger.debug(f"KlineStream: 已清理 {len(to_remove)} 个过期缓存条目(当前缓存数: {len(_kline_cache)})") + + class KlineStream: """订阅合约 K线流,持续更新 _kline_cache。支持动态订阅/取消订阅。""" @@ -83,6 +123,8 @@ class KlineStream: self._running = True _kline_stream_instance = self self._task = asyncio.create_task(self._run_ws()) + # ⚠️ 内存优化:启动定期清理任务,防止缓存无限增长 + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("KlineStream: 已启动(支持动态订阅)") return True @@ -97,6 +139,12 @@ class KlineStream: except asyncio.CancelledError: pass self._task = None + if hasattr(self, '_cleanup_task') and self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass if self._ws: try: await self._ws.close() @@ -217,6 +265,18 @@ class KlineStream: if not self._running: break + async def _cleanup_loop(self): + """定期清理过期缓存,防止内存无限增长""" + while self._running: + try: + await asyncio.sleep(_CACHE_CLEANUP_INTERVAL_SEC) + if self._running: + _cleanup_stale_kline_cache() + except asyncio.CancelledError: + break + except Exception as e: + logger.debug(f"KlineStream: 清理缓存时出错: {e}") + async def _handle_message_with_limit(self, raw: str): """ 带并发限制的消息处理包装器 @@ -308,6 +368,9 @@ class KlineStream: cache_list.append(kline_rest_format) _kline_cache_updated_at[key] = time.monotonic() + # ⚠️ 内存优化:定期清理过期缓存,防止内存无限增长 + if len(_kline_cache) > _MAX_CACHE_ENTRIES * 0.8: # 达到 80% 时触发清理 + _cleanup_stale_kline_cache() # ⚠️ 优化:减少日志输出频率,避免大量消息时日志负载过高 # 只在 DEBUG 级别或每 100 条消息记录一次 if logger.isEnabledFor(logging.DEBUG):