import os import re import subprocess import json import time from pathlib import Path from typing import Any, Dict, Optional, Tuple from fastapi import APIRouter, HTTPException, Header, Depends, BackgroundTasks from pydantic import BaseModel import logging logger = logging.getLogger(__name__) # 路由统一挂在 /api/system 下,前端直接调用 /api/system/... router = APIRouter(prefix="/api/system") # 管理员鉴权(JWT;未启用登录时兼容 X-Admin-Token) from api.auth_deps import require_system_admin # noqa: E402 from database.models import Account # noqa: E402 LOG_GROUPS = ("error", "warning", "info") # 后端服务启动时间(用于前端展示“运行多久/是否已重启”) _BACKEND_STARTED_AT_MS: int = int(time.time() * 1000) # 系统元信息存储(优先 Redis;用于记录重启时间等) def _system_meta_prefix() -> str: return (os.getenv("SYSTEM_META_PREFIX", "ats:system").strip() or "ats:system") def _system_meta_key(name: str) -> str: name = (name or "").strip() return f"{_system_meta_prefix()}:{name}" def _system_meta_read(name: str) -> Optional[Dict[str, Any]]: client = _get_redis_client_for_logs() if client is None: return None try: raw = client.get(_system_meta_key(name)) if not raw: return None return json.loads(raw) except Exception: return None def _system_meta_write(name: str, payload: Dict[str, Any], ttl_sec: int = 30 * 24 * 3600) -> None: client = _get_redis_client_for_logs() if client is None: return try: client.setex(_system_meta_key(name), int(ttl_sec), json.dumps(payload, ensure_ascii=False)) except Exception: return # 避免 Redis 异常刷屏(前端可能自动刷新) _last_logs_redis_err_ts: float = 0.0 def _logs_prefix() -> str: return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs") def _logs_key_for_group(group: str) -> str: group = (group or "error").strip().lower() # 兼容旧配置:REDIS_LOG_LIST_KEY 仅用于 error if group == "error": legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip() if legacy: return legacy env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip() if env_key: return env_key return f"{_logs_prefix()}:{group}" def _logs_config_key() -> str: return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config") def _logs_stats_prefix() -> str: return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added") def _beijing_yyyymmdd() -> str: from datetime import datetime, timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) return datetime.now(tz=beijing_tz).strftime("%Y%m%d") def _default_logs_config() -> Dict[str, Any]: return { "max_len": {"error": 2000, "warning": 2000, "info": 2000}, "enabled": {"error": True, "warning": True, "info": True}, "dedupe_consecutive": True, "include_debug_in_info": False, "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, "config_key": _logs_config_key(), "stats_prefix": _logs_stats_prefix(), } def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]: cfg = defaults for g in LOG_GROUPS: v = redis_hash.get(f"max_len:{g}") if v is not None: try: n = int(str(v).strip()) if n > 0: cfg["max_len"][g] = n except Exception: pass ev = redis_hash.get(f"enabled:{g}") if ev is not None: s = str(ev).strip().lower() cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on") for k in ("dedupe_consecutive", "include_debug_in_info"): vv = redis_hash.get(k) if vv is not None: s = str(vv).strip().lower() cfg[k] = s in ("1", "true", "yes", "y", "on") return cfg def _read_logs_config(client) -> Dict[str, Any]: defaults = _default_logs_config() try: raw = client.hgetall(_logs_config_key()) or {} return _merge_logs_config(defaults, raw) except Exception: return defaults def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]: mapping: Dict[str, str] = {} for g in LOG_GROUPS: mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g])) mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0" mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0" mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0" # 注意:AWS Valkey/Redis 集群模式下,MULTI/EXEC 不能跨 slot # 这里会同时操作多个 key(config hash + 3 个 list),所以必须禁用 transaction pipe = client.pipeline(transaction=False) pipe.hset(_logs_config_key(), mapping=mapping) for g in LOG_GROUPS: key = _logs_key_for_group(g) max_len = int(cfg["max_len"][g]) if max_len > 0: pipe.ltrim(key, 0, max_len - 1) pipe.execute() return cfg class LogsConfigUpdate(BaseModel): max_len: Optional[Dict[str, int]] = None enabled: Optional[Dict[str, bool]] = None dedupe_consecutive: Optional[bool] = None include_debug_in_info: Optional[bool] = None def _beijing_time_str() -> str: from datetime import datetime, timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") @router.post("/logs/test-write") async def logs_test_write( _admin: Dict[str, Any] = Depends(require_system_admin), ) -> Dict[str, Any]: """ 写入 3 条测试日志到 Redis(error/warning/info),用于验证“是否写入到同一台 Redis、同一组 key”。 """ client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法写入测试日志") cfg = _read_logs_config(client) now_ms = int(__import__("time").time() * 1000) time_str = _beijing_time_str() entries = { "error": {"level": "ERROR", "message": f"[TEST] backend 写入 error 测试日志 @ {time_str}"}, "warning": {"level": "WARNING", "message": f"[TEST] backend 写入 warning 测试日志 @ {time_str}"}, "info": {"level": "INFO", "message": f"[TEST] backend 写入 info 测试日志 @ {time_str}"}, } day = _beijing_yyyymmdd() stats_prefix = _logs_stats_prefix() # 集群模式下禁用 transaction,避免 CROSSSLOT pipe = client.pipeline(transaction=False) for g in LOG_GROUPS: key = _logs_key_for_group(g) max_len = int(cfg["max_len"][g]) entry = { "ts": now_ms, "time": time_str, "service": "backend", "level": entries[g]["level"], "logger": "api.routes.system", "message": entries[g]["message"], "hostname": os.getenv("HOSTNAME", ""), "signature": f"backend|{entries[g]['level']}|test|{entries[g]['message']}", "count": 1, } pipe.lpush(key, json.dumps(entry, ensure_ascii=False)) if max_len > 0: pipe.ltrim(key, 0, max_len - 1) pipe.incr(f"{stats_prefix}:{day}:{g}", 1) pipe.expire(f"{stats_prefix}:{day}:{g}", 14 * 24 * 3600) pipe.execute() # 返回写入后的 LLEN,便于你确认 # 单 key LLEN 查询也不需要 transaction pipe2 = client.pipeline(transaction=False) for g in LOG_GROUPS: pipe2.llen(_logs_key_for_group(g)) llens = pipe2.execute() return { "message": "ok", "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, "llen": {g: int(llens[i] or 0) for i, g in enumerate(LOG_GROUPS)}, "note": "如果你在前端仍看不到,说明前端请求的后端实例/Redis key/筛选条件不一致。", } @router.post("/trading/trigger-scan") async def trigger_scan( _admin: Dict[str, Any] = Depends(require_system_admin), ) -> Dict[str, Any]: """ 触发手动扫描: 通过设置 Redis 信号(ats:trigger-scan),通知所有运行中的 strategy 进程立即执行扫描。 """ client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法触发扫描") try: # 设置触发信号(当前时间戳),让 strategy 检测到变化 import time ts = int(time.time()) # 使用 setex 设置 600秒过期,防止永久残留(虽然 strategy 只关心变化,但过期是个好习惯) # 注意:strategy 端比较的是时间戳大小,所以只要比上一次大即可 client.setex("ats:trigger-scan", 600, str(ts)) return { "message": "已发送扫描触发信号", "timestamp": ts } except Exception as e: logger.error(f"触发扫描失败: {e}") raise HTTPException(status_code=500, detail=f"触发扫描失败: {e}") def _get_redis_client_for_logs(): """ 获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。 返回:redis.Redis 或 None """ # 1) 复用 config_manager(避免重复连接) try: import config_manager # backend/config_manager.py(已负责加载 .env) cm = getattr(config_manager, "config_manager", None) if cm is not None: redis_client = getattr(cm, "_redis_client", None) redis_connected = getattr(cm, "_redis_connected", False) if redis_client is not None and redis_connected: try: redis_client.ping() return redis_client except Exception: pass except Exception: pass # 2) 自行创建 try: import redis # type: ignore redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true" redis_username = os.getenv("REDIS_USERNAME", None) redis_password = os.getenv("REDIS_PASSWORD", None) ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required") ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) kwargs: Dict[str, Any] = { "decode_responses": True, "username": redis_username, "password": redis_password, "socket_connect_timeout": 1, "socket_timeout": 1, } if redis_url.startswith("rediss://") or redis_use_tls: kwargs["ssl_cert_reqs"] = ssl_cert_reqs if ssl_ca_certs: kwargs["ssl_ca_certs"] = ssl_ca_certs if ssl_cert_reqs == "none": kwargs["ssl_check_hostname"] = False elif ssl_cert_reqs == "required": kwargs["ssl_check_hostname"] = True else: kwargs["ssl_check_hostname"] = False client = redis.from_url(redis_url, **kwargs) client.ping() return client except Exception as e: # 把错误尽量打到 api.log(你现在看的文件) global _last_logs_redis_err_ts import time as _t now = _t.time() if now - _last_logs_redis_err_ts > 30: _last_logs_redis_err_ts = now logger.warning(f"日志模块 Redis 连接失败。REDIS_URL={os.getenv('REDIS_URL', '')} err={e}") return None @router.get("/logs") async def get_logs( limit: int = 200, group: str = "error", start: int = 0, service: Optional[str] = None, level: Optional[str] = None, _admin: Dict[str, Any] = Depends(require_system_admin), ) -> Dict[str, Any]: """ 从 Redis List 读取最新日志(默认 group=error -> ats:logs:error)。 参数: - limit: 返回条数(最大 2000) - group: 日志分组(error / warning / info) - service: 过滤(backend / trading_system) - level: 过滤(ERROR / CRITICAL ...) """ if limit <= 0: limit = 200 if limit > 20000: limit = 20000 if start < 0: start = 0 # 定义管理员不需要关注的日志模式(噪声过滤) IGNORED_PATTERNS = [ "API密钥未配置", "请在配置界面设置该账号的BINANCE_API_KEY", ] group = (group or "error").strip().lower() if group not in LOG_GROUPS: raise HTTPException(status_code=400, detail=f"非法 group:{group}(可选:{', '.join(LOG_GROUPS)})") list_key = _logs_key_for_group(group) client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志") try: llen_total = int(client.llen(list_key) or 0) except Exception as e: raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}") if llen_total <= 0: return { "group": group, "key": list_key, "start": start, "limit": limit, "llen_total": 0, "next_start": start, "has_more": False, "count": 0, "items": [], } # 分页扫描:为了支持 service/level 过滤,这里会向后多取一些直到凑够 limit 或到末尾 # 保护:最多扫描 limit*10 条,避免过滤太严格导致无限扫描 max_scan = min(llen_total, start + limit * 10) pos = start scanned = 0 items: list[Dict[str, Any]] = [] try: while len(items) < limit and pos < llen_total and pos < max_scan: chunk_size = min(500, limit, max_scan - pos) end = pos + chunk_size - 1 raw_batch = client.lrange(list_key, pos, end) scanned += len(raw_batch or []) for raw in raw_batch or []: try: obj = raw if isinstance(raw, bytes): obj = raw.decode("utf-8", errors="ignore") if not isinstance(obj, str): continue parsed = json.loads(obj) if not isinstance(parsed, dict): continue if service and str(parsed.get("service")) != service: continue if level and str(parsed.get("level")) != level: continue # 噪声过滤 msg = str(parsed.get("message", "")) if any(p in msg for p in IGNORED_PATTERNS): continue items.append(parsed) if len(items) >= limit: break except Exception: continue pos = end + 1 except Exception as e: raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}") return { "group": group, "key": list_key, "start": start, "limit": limit, "llen_total": llen_total, "scanned": scanned, "next_start": pos, "has_more": pos < llen_total, "count": len(items), "items": items, } @router.get("/logs/overview") async def logs_overview(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览") try: cfg = _read_logs_config(client) day = _beijing_yyyymmdd() stats_prefix = _logs_stats_prefix() # 集群模式下禁用 transaction,避免 CROSSSLOT pipe = client.pipeline(transaction=False) for g in LOG_GROUPS: pipe.llen(_logs_key_for_group(g)) for g in LOG_GROUPS: pipe.get(f"{stats_prefix}:{day}:{g}") res = pipe.execute() llen_vals = res[: len(LOG_GROUPS)] added_vals = res[len(LOG_GROUPS) :] llen: Dict[str, int] = {} added_today: Dict[str, int] = {} for i, g in enumerate(LOG_GROUPS): try: llen[g] = int(llen_vals[i] or 0) except Exception: llen[g] = 0 try: added_today[g] = int(added_vals[i] or 0) except Exception: added_today[g] = 0 return { "config": cfg, "stats": { "day": day, "llen": llen, "added_today": added_today, }, "meta": { "redis_url": os.getenv("REDIS_URL", ""), "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, }, } except HTTPException: raise except Exception as e: logger.error(f"logs_overview 失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"logs_overview failed: {e}") @router.put("/logs/config") async def update_logs_config( payload: LogsConfigUpdate, _admin: Dict[str, Any] = Depends(require_system_admin), ) -> Dict[str, Any]: client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置") cfg = _read_logs_config(client) if payload.max_len: for g, v in payload.max_len.items(): gg = (g or "").strip().lower() if gg not in LOG_GROUPS: continue try: n = int(v) if n < 100: n = 100 if n > 20000: n = 20000 cfg["max_len"][gg] = n except Exception: continue if payload.enabled: for g, v in payload.enabled.items(): gg = (g or "").strip().lower() if gg not in LOG_GROUPS: continue cfg["enabled"][gg] = bool(v) if payload.dedupe_consecutive is not None: cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive) if payload.include_debug_in_info is not None: cfg["include_debug_in_info"] = bool(payload.include_debug_in_info) cfg = _write_logs_config_and_trim(client, cfg) return {"message": "ok", "config": cfg} def _require_admin(token: Optional[str], provided: Optional[str]) -> None: """ 可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN,则要求请求携带 X-Admin-Token。 生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。 """ if not token: return if not provided or provided != token: raise HTTPException(status_code=401, detail="Unauthorized") # # 注意:require_system_admin 已迁移到 api.auth_deps,避免导入不一致导致 uvicorn 启动失败 def _build_supervisorctl_cmd(args: list[str]) -> list[str]: supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl") supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip() use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true" # 如果没显式配置 SUPERVISOR_CONF,就尝试自动探测常见路径(宝塔/系统) if not supervisor_conf: candidates = [ "/www/server/panel/plugin/supervisor/supervisord.conf", "/www/server/panel/plugin/supervisor/supervisor.conf", "/etc/supervisor/supervisord.conf", "/etc/supervisord.conf", ] for p in candidates: try: if Path(p).exists(): supervisor_conf = p break except Exception: continue cmd: list[str] = [] if use_sudo: # 需要你在 sudoers 配置 NOPASSWD(sudo -n 才不会卡住) cmd += ["sudo", "-n"] cmd += [supervisorctl_path] if supervisor_conf: cmd += ["-c", supervisor_conf] cmd += args return cmd def _run_supervisorctl(args: list[str]) -> str: cmd = _build_supervisorctl_cmd(args) try: res = subprocess.run(cmd, capture_output=True, text=True, timeout=10) except subprocess.TimeoutExpired: raise RuntimeError("supervisorctl 超时(10s)") out = (res.stdout or "").strip() err = (res.stderr or "").strip() combined = "\n".join([s for s in [out, err] if s]).strip() # supervisorctl 约定: # - status 在存在 STOPPED/FATAL 等进程时可能返回 exit=3,但输出仍然有效 ok_rc = {0} if args and args[0] == "status": ok_rc.add(3) if res.returncode not in ok_rc: raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})") return combined or out def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]: """ 典型输出: - auto_sys RUNNING pid 1234, uptime 0:10:00 - auto_sys STOPPED Not started """ if "RUNNING" in raw: m = re.search(r"\bpid\s+(\d+)\b", raw) pid = int(m.group(1)) if m else None return True, pid, "RUNNING" for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]: if state in raw: return False, None, state return False, None, "UNKNOWN" def _list_supervisor_process_names(status_all_raw: str) -> list[str]: names: list[str] = [] if not status_all_raw: return names for ln in status_all_raw.splitlines(): s = (ln or "").strip() if not s: continue # 每行格式: ... name = s.split(None, 1)[0].strip() if name: names.append(name) return names def _get_program_name() -> str: # 你给的宝塔配置是 [program:auto_sys] return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys" def _select_best_process_name(program: str, status_all_raw: str) -> Optional[str]: """ 从 `supervisorctl status` 全量输出中,找到最匹配的真实进程名。 兼容 supervisor 的 group:process 格式,例如:auto_sys:auto_sys_00 """ if not status_all_raw: return None lines = [ln.strip() for ln in status_all_raw.splitlines() if ln.strip()] names: list[str] = [] for ln in lines: name = ln.split(None, 1)[0].strip() if name: names.append(name) # 精确优先:program / program_00 / program:program_00 preferred = [program, f"{program}_00", f"{program}:{program}_00"] for cand in preferred: if cand in names: return cand # 次优:任意以 program_ 开头 for name in names: if name.startswith(program + "_"): return name # 次优:任意以 program: 开头 for name in names: if name.startswith(program + ":"): return name return None def _status_with_fallback(program: str) -> Tuple[str, Optional[str], Optional[str]]: """ - 优先 `status ` - 若 no such process:返回全量 status,并尝试解析真实 name(例如 auto_sys:auto_sys_00) 返回:(raw, resolved_name, status_all) """ try: raw = _run_supervisorctl(["status", program]) return raw, program, None except Exception as e: msg = str(e).lower() if "no such process" not in msg: raise status_all = _run_supervisorctl(["status"]) resolved = _select_best_process_name(program, status_all) if resolved: try: raw = _run_supervisorctl(["status", resolved]) return raw, resolved, status_all except Exception: # 兜底:至少把全量输出返回,方便你确认真实进程名 return status_all, None, status_all return status_all, None, status_all def _action_with_fallback(action: str, program: str) -> Tuple[str, Optional[str], Optional[str]]: """ 对 start/stop/restart 做兜底:如果 program 不存在,尝试解析真实 name 再执行。 返回:(output, resolved_name, status_all) """ try: out = _run_supervisorctl([action, program]) return out, program, None except Exception as e: msg = str(e).lower() if "no such process" not in msg: raise status_all = _run_supervisorctl(["status"]) resolved = _select_best_process_name(program, status_all) if not resolved: # 没找到就把全量输出带上,方便定位 raise RuntimeError(f"no such process: {program}. 当前 supervisor 进程列表:\n{status_all}") out = _run_supervisorctl([action, resolved]) return out, resolved, status_all def _run_fix_script(): """Run the fix_trade_records.py script in a subprocess""" try: script_path = Path(__file__).parent.parent.parent.parent / "scripts" / "fix_trade_records.py" if not script_path.exists(): logger.error(f"Fix script not found at {script_path}") return logger.info(f"Starting trade record fix script: {script_path}") # Ensure project root is in PYTHONPATH env = os.environ.copy() project_root = Path(__file__).parent.parent.parent.parent env["PYTHONPATH"] = f"{env.get('PYTHONPATH', '')}:{project_root}" process = subprocess.Popen( ["python3", str(script_path)], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate() if process.returncode == 0: logger.info(f"Trade record fix completed successfully:\n{stdout}") else: logger.error(f"Trade record fix failed (exit code {process.returncode}):\n{stderr}") except Exception as e: logger.error(f"Error running trade record fix script: {e}") @router.post("/fix-trade-records") async def fix_trade_records( background_tasks: BackgroundTasks, _admin: Dict[str, Any] = Depends(require_system_admin) ): """ Trigger the trade record fix script (time inversion & commission backfill). Runs in background. """ background_tasks.add_task(_run_fix_script) return {"message": "Trade fix task started in background"} @router.post("/clear-cache") async def clear_cache( _admin: Dict[str, Any] = Depends(require_system_admin), x_account_id: Optional[int] = Header(default=None, alias="X-Account-Id"), ) -> Dict[str, Any]: """ 清理配置缓存(Redis Hash: trading_config),并从数据库回灌到 Redis。 """ try: import config_manager account_id = int(x_account_id or 1) cm = None if hasattr(config_manager, "ConfigManager") and hasattr(config_manager.ConfigManager, "for_account"): cm = config_manager.ConfigManager.for_account(account_id) else: cm = getattr(config_manager, "config_manager", None) if cm is None: raise HTTPException(status_code=500, detail="config_manager 未初始化") deleted_keys: list[str] = [] # 1) 清 backend 本地 cache try: cm._cache = {} except Exception: pass # 2) 清 Redis 缓存 key(Hash: trading_config) try: redis_client = getattr(cm, "_redis_client", None) redis_connected = getattr(cm, "_redis_connected", False) if redis_client is not None and redis_connected: try: redis_client.ping() except Exception: redis_connected = False if redis_client is not None and redis_connected: try: key = getattr(cm, "_redis_hash_key", "trading_config") redis_client.delete(key) deleted_keys.append(str(key)) # 兼容:老 key(仅 default 账号) legacy = getattr(cm, "_legacy_hash_key", None) if legacy and legacy != key: redis_client.delete(legacy) deleted_keys.append(str(legacy)) except Exception as e: logger.warning(f"删除 Redis key 失败: {e}") # 可选:实时推荐缓存(如果存在) try: redis_client.delete("recommendations:realtime") deleted_keys.append("recommendations:realtime") except Exception: pass except Exception as e: logger.warning(f"清 Redis 缓存失败: {e}") # 3) 立刻从 DB 回灌到 Redis(避免 trading_system 读到空) try: cm.reload() except Exception as e: logger.warning(f"回灌配置到 Redis 失败(仍可能使用DB/本地cache): {e}") return { "message": "缓存已清理并回灌", "deleted_keys": deleted_keys, "note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。", } except HTTPException: raise except Exception as e: logger.error(f"清理缓存失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/trading/services") async def list_trading_services(_admin: Dict[str, Any] = Depends(require_system_admin)): """获取所有交易服务状态(包括所有账号)""" try: # 获取所有 supervisor 进程状态 status_all = _run_supervisorctl(["status"]) services = [] summary = {"total": 0, "running": 0, "stopped": 0, "unknown": 0} # 解析每一行 # 格式通常是: name state description for line in status_all.splitlines(): line = line.strip() if not line: continue parts = line.split(None, 2) if len(parts) < 2: continue name = parts[0] state = parts[1] desc = parts[2] if len(parts) > 2 else "" # 只关注 auto_sys 开头的服务 if name.startswith("auto_sys"): is_running = state == "RUNNING" pid = None if is_running: # Parse PID from desc: "pid 1234, uptime ..." m = re.search(r"pid\s+(\d+)", desc) if m: pid = int(m.group(1)) services.append({ "program": name, "state": state, "running": is_running, "pid": pid, "description": desc }) summary["total"] += 1 if is_running: summary["running"] += 1 elif state in ["STOPPED", "EXITED", "FATAL"]: summary["stopped"] += 1 else: summary["unknown"] += 1 return { "summary": summary, "services": services, "raw": status_all } except Exception as e: # supervisor 未安装/未运行时(如 unix socket 不存在)避免刷 ERROR,改为 WARNING 并返回友好说明 err_msg = str(e).strip() if not err_msg: err_msg = repr(e) is_supervisor_unavailable = ( "no such file" in err_msg.lower() or "connection refused" in err_msg.lower() or "sock" in err_msg.lower() or "unix://" in err_msg.lower() ) if is_supervisor_unavailable: logger.warning(f"列出服务失败(supervisor 未运行或不可用): {err_msg}") return { "summary": {"total": 0, "running": 0, "stopped": 0, "unknown": 0}, "services": [], "error": "supervisor 未安装或未运行,请检查 supervisord 或配置 SUPERVISOR_CONF" } logger.error(f"列出服务失败: {e}") return { "summary": {"total": 0, "running": 0, "stopped": 0, "unknown": 0}, "services": [], "error": err_msg } @router.get("/trading/status") async def trading_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: program = _get_program_name() try: raw, resolved_name, status_all = _status_with_fallback(program) running, pid, state = _parse_supervisor_status(raw) meta = _system_meta_read("trading:last_restart") or {} return { "mode": "supervisor", "program": program, "resolved_name": resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all, "meta": meta, } except Exception as e: raise HTTPException( status_code=500, detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO", ) @router.post("/trading/start") async def trading_start(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: program = _get_program_name() try: out, resolved_name, status_all = _action_with_fallback("start", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) return { "message": "交易系统已启动(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}") @router.post("/trading/stop") async def trading_stop(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: program = _get_program_name() try: out, resolved_name, status_all = _action_with_fallback("stop", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) return { "message": "交易系统已停止(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}") @router.post("/trading/restart") async def trading_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: program = _get_program_name() try: requested_at = _beijing_time_str() requested_at_ms = int(time.time() * 1000) out, resolved_name, status_all = _action_with_fallback("restart", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) # 记录交易系统重启时间(用于前端展示) _system_meta_write( "trading:last_restart", { "requested_at": requested_at, "requested_at_ms": requested_at_ms, "pid": pid, "program": resolved_name2 or resolved_name or program, }, ) return { "message": "交易系统已重启(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, "meta": { "requested_at": requested_at, "requested_at_ms": requested_at_ms, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}") @router.post("/trading/stop-all") async def trading_stop_all( _admin: Dict[str, Any] = Depends(require_system_admin), prefix: str = "auto_sys_acc", include_default: bool = False, ) -> Dict[str, Any]: """ 一键停止所有账号交易进程(supervisor)。 """ try: prefix = (prefix or "auto_sys_acc").strip() if not prefix: prefix = "auto_sys_acc" # 先读取全量 status,拿到有哪些进程 status_all = _run_supervisorctl(["status"]) names = _list_supervisor_process_names(status_all) targets: list[str] = [] for n in names: if n.startswith(prefix): targets.append(n) if include_default: default_prog = _get_program_name() if default_prog and default_prog not in targets and default_prog in names: targets.append(default_prog) if not targets: return { "message": "未找到可停止的交易进程", "prefix": prefix, "include_default": include_default, "count": 0, "targets": [], "status_all": status_all, } results: list[Dict[str, Any]] = [] ok = 0 failed = 0 for prog in targets: try: out = _run_supervisorctl(["stop", prog]) raw = _run_supervisorctl(["status", prog]) running, pid, state = _parse_supervisor_status(raw) results.append( { "program": prog, "ok": True, "output": out, "status": {"running": running, "pid": pid, "state": state, "raw": raw}, } ) ok += 1 except Exception as e: failed += 1 results.append({"program": prog, "ok": False, "error": str(e)}) return { "message": "已发起批量停止", "prefix": prefix, "include_default": include_default, "count": len(targets), "ok": ok, "failed": failed, "targets": targets, "results": results, } except Exception as e: raise HTTPException(status_code=500, detail=f"批量停止失败: {e}") @router.post("/trading/restart-all") async def trading_restart_all( _admin: Dict[str, Any] = Depends(require_system_admin), prefix: str = "auto_sys_acc", include_default: bool = False, do_update: bool = True, ) -> Dict[str, Any]: """ 一键重启所有账号交易进程(supervisor)。 - 默认重启所有以 auto_sys_acc 开头的 program(例如 auto_sys_acc1/2/3...) - 可选 include_default=true:同时包含 SUPERVISOR_TRADING_PROGRAM(默认 auto_sys) - 可选 do_update=true:先执行 supervisorctl reread/update 再重启(确保新 ini 生效) """ try: prefix = (prefix or "auto_sys_acc").strip() if not prefix: prefix = "auto_sys_acc" # 先读取全量 status,拿到有哪些进程 status_all = _run_supervisorctl(["status"]) names = _list_supervisor_process_names(status_all) targets: list[str] = [] skipped_disabled: list[Dict[str, Any]] = [] for n in names: if n.startswith(prefix): # 若能解析出 account_id,则跳过 disabled 的账号 try: m = re.match(rf"^{re.escape(prefix)}(\d+)$", n) if m: aid = int(m.group(1)) row = Account.get(aid) st = (row.get("status") if isinstance(row, dict) else None) or "active" if str(st).strip().lower() != "active": skipped_disabled.append({"program": n, "account_id": aid, "status": st}) continue except Exception: # 解析失败/查库失败:不影响批量重启流程 pass targets.append(n) if include_default: default_prog = _get_program_name() if default_prog and default_prog not in targets and default_prog in names: targets.append(default_prog) if not targets: return { "message": "未找到可重启的交易进程", "prefix": prefix, "include_default": include_default, "count": 0, "targets": [], "status_all": status_all, "skipped_disabled": skipped_disabled, } reread_out = "" update_out = "" if do_update: try: reread_out = _run_supervisorctl(["reread"]) except Exception as e: reread_out = f"failed: {e}" try: update_out = _run_supervisorctl(["update"]) except Exception as e: update_out = f"failed: {e}" results: list[Dict[str, Any]] = [] ok = 0 failed = 0 for prog in targets: try: out = _run_supervisorctl(["restart", prog]) raw = _run_supervisorctl(["status", prog]) running, pid, state = _parse_supervisor_status(raw) results.append( { "program": prog, "ok": True, "output": out, "status": {"running": running, "pid": pid, "state": state, "raw": raw}, } ) ok += 1 except Exception as e: failed += 1 results.append({"program": prog, "ok": False, "error": str(e)}) return { "message": "已发起批量重启", "prefix": prefix, "include_default": include_default, "do_update": do_update, "count": len(targets), "ok": ok, "failed": failed, "reread": reread_out, "update": update_out, "targets": targets, "results": results, "skipped_disabled": skipped_disabled, } except Exception as e: raise HTTPException(status_code=500, detail=f"批量重启失败: {e}") @router.get("/market-overview") async def market_overview(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """ 市场行情概览:拉取 Binance 公开接口,展示与策略过滤对应的数据。 供全局配置页展示,帮助用户确认当前策略方案是否匹配市场。 """ try: from market_overview import get_market_overview except ImportError: try: from backend.market_overview import get_market_overview except ImportError: import sys backend_dir = Path(__file__).parent.parent.parent if str(backend_dir) not in sys.path: sys.path.insert(0, str(backend_dir)) from market_overview import get_market_overview data = get_market_overview() # 获取当前策略配置,用于对比 beta_enabled = False beta_threshold = -0.005 market_scheme = "normal" try: from config_manager import GlobalStrategyConfigManager mgr = GlobalStrategyConfigManager() beta_enabled = str(mgr.get("BETA_FILTER_ENABLED", "true")).lower() in ("true", "1", "yes") try: beta_threshold = float(mgr.get("BETA_FILTER_THRESHOLD", -0.005)) except (TypeError, ValueError): pass market_scheme = str(mgr.get("MARKET_SCHEME", "normal")).strip().lower() or "normal" except Exception: pass # 计算大盘共振是否触发(与 strategy._check_beta_filter 一致) threshold_pct = beta_threshold * 100 triggered = False if beta_enabled: for key in ["btc_15m_change_pct", "btc_1h_change_pct", "eth_15m_change_pct", "eth_1h_change_pct"]: val = data.get(key) if val is not None and val < threshold_pct: triggered = True break data["config"] = { "BETA_FILTER_ENABLED": beta_enabled, "BETA_FILTER_THRESHOLD": beta_threshold, "BETA_FILTER_THRESHOLD_PCT": round(threshold_pct, 2), "MARKET_SCHEME": market_scheme, } data["beta_filter_triggered"] = triggered # 策略执行概览:当前执行方案与配置项执行情况(易读文字) get_strategy_execution_overview = None try: from market_overview import get_strategy_execution_overview except ImportError: try: from backend.market_overview import get_strategy_execution_overview except ImportError: pass if get_strategy_execution_overview is None: try: import sys backend_dir = Path(__file__).resolve().parent.parent.parent if str(backend_dir) not in sys.path: sys.path.insert(0, str(backend_dir)) from market_overview import get_strategy_execution_overview except ImportError: pass if get_strategy_execution_overview is not None: try: data["strategy_execution_overview"] = get_strategy_execution_overview() except Exception as e: data["strategy_execution_overview"] = {"sections": [{"title": "加载失败", "content": str(e)}]} else: data["strategy_execution_overview"] = { "sections": [{"title": "策略执行概览暂不可用", "content": "请确认后端已重启并已部署最新代码;若已重启仍无数据,请检查 backend/market_overview.py 与 config_manager 是否可正常导入。"}] } return data @router.get("/backend/status") async def backend_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """ 查看后端服务状态(当前 uvicorn 进程)。 说明: - pid 使用 os.getpid()(当前 FastAPI 进程) - last_restart 从 Redis 读取(若可用) """ meta = _system_meta_read("backend:last_restart") or {} return { "running": True, "pid": os.getpid(), "started_at_ms": _BACKEND_STARTED_AT_MS, "started_at": _beijing_time_str(), "meta": meta, } @router.post("/backend/restart") async def backend_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """ 重启后端服务(uvicorn)。 实现方式: - 后端启动脚本为 nohup uvicorn ... & - 这里通过后台启动 backend/restart.sh 来完成: 1) grep 找到 uvicorn api.main:app 进程并 kill 2) 再执行 backend/start.sh 拉起新进程 注意: - 为了让接口能先返回,这里会延迟 1s 再执行 restart.sh """ backend_dir = Path(__file__).parent.parent.parent # backend/ restart_script = backend_dir / "restart.sh" if not restart_script.exists(): raise HTTPException(status_code=500, detail=f"找不到重启脚本: {restart_script}") requested_at = _beijing_time_str() requested_at_ms = int(time.time() * 1000) cur_pid = os.getpid() _system_meta_write( "backend:last_restart", { "requested_at": requested_at, "requested_at_ms": requested_at_ms, "pid_before": cur_pid, "script": str(restart_script), }, ) # 后台执行:sleep 1 后再重启,保证当前请求可以返回 cmd = ["bash", "-lc", f"sleep 1; '{restart_script}'"] try: subprocess.Popen( cmd, cwd=str(backend_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) except Exception as e: raise HTTPException(status_code=500, detail=f"启动重启脚本失败: {e}") return { "message": "已发起后端重启(1s 后执行)", "pid_before": cur_pid, "requested_at": requested_at, "requested_at_ms": requested_at_ms, "script": str(restart_script), "note": "重启期间接口可能短暂不可用,页面可等待 3-5 秒后刷新状态。", } @router.post("/backend/stop") async def backend_stop(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """ 停止后端服务(uvicorn)。 警告:停止后 API 将不可用,必须手动登录服务器启动! """ backend_dir = Path(__file__).parent.parent.parent # backend/ stop_script = backend_dir / "stop.sh" if not stop_script.exists(): raise HTTPException(status_code=500, detail=f"找不到停止脚本: {stop_script}") cur_pid = os.getpid() # 后台执行:sleep 1 后再停止,保证当前请求可以返回 cmd = ["bash", "-lc", f"sleep 1; '{stop_script}'"] try: subprocess.Popen( cmd, cwd=str(backend_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) except Exception as e: raise HTTPException(status_code=500, detail=f"启动停止脚本失败: {e}") return { "message": "已发起后端停止(1s 后执行)", "pid_before": cur_pid, "script": str(stop_script), "warning": "后端服务停止后,Web 界面将无法访问,请手动在服务器启动!", } def _recommendations_process_running() -> Tuple[bool, Optional[int]]: """检查推荐服务进程是否运行,返回 (running, pid)。兼容 pgrep/ps 及 supervisor 等启动方式。""" # 1. 优先 pgrep(Linux/macOS 常见) for pattern in ["trading_system.recommendations_main", "recommendations_main1", "recommendations_main"]: try: result = subprocess.run( ["pgrep", "-f", pattern], capture_output=True, text=True, timeout=5, ) if result.returncode == 0 and result.stdout.strip(): pids = [x for x in result.stdout.strip().split() if x.isdigit()] if pids: return True, int(pids[0]) except (FileNotFoundError, subprocess.TimeoutExpired, ValueError): break # 2. 回退:ps + grep(pgrep 不可用或匹配失败时) try: result = subprocess.run( ["sh", "-c", "ps aux 2>/dev/null | grep -E 'recommendations_main1|recommendations_main|trading_system.recommendations' | grep -v grep | head -1"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0 and result.stdout.strip(): parts = result.stdout.strip().split() if len(parts) >= 2 and parts[1].isdigit(): return True, int(parts[1]) except (FileNotFoundError, subprocess.TimeoutExpired, ValueError): pass return False, None @router.get("/recommendations/status") async def recommendations_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """查看推荐服务状态(recommendations_main 进程)""" running, pid = _recommendations_process_running() return { "running": running, "pid": pid, } @router.post("/recommendations/restart") async def recommendations_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """重启推荐服务(recommendations_main)""" backend_dir = Path(__file__).parent.parent.parent restart_script = backend_dir / "restart_recommendations.sh" if not restart_script.exists(): raise HTTPException(status_code=500, detail=f"找不到重启脚本: {restart_script}") running_before, pid_before = _recommendations_process_running() cmd = ["bash", "-lc", f"sleep 1; '{restart_script}'"] try: subprocess.Popen( cmd, cwd=str(backend_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) except Exception as e: raise HTTPException(status_code=500, detail=f"启动重启脚本失败: {e}") return { "message": "已发起推荐服务重启(1s 后执行)", "pid_before": pid_before, "running_before": running_before, "script": str(restart_script), } @router.post("/recommendations/stop") async def recommendations_stop(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """停止推荐服务""" backend_dir = Path(__file__).parent.parent.parent stop_script = backend_dir / "stop_recommendations.sh" if not stop_script.exists(): raise HTTPException(status_code=500, detail=f"找不到停止脚本: {stop_script}") running_before, pid_before = _recommendations_process_running() cmd = ["bash", "-lc", f"sleep 1; '{stop_script}'"] try: subprocess.Popen( cmd, cwd=str(backend_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) except Exception as e: raise HTTPException(status_code=500, detail=f"启动停止脚本失败: {e}") return { "message": "已发起推荐服务停止", "pid_before": pid_before, "running_before": running_before, } @router.post("/recommendations/start") async def recommendations_start(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]: """启动推荐服务(若已运行则跳过)""" running, pid = _recommendations_process_running() if running: return {"message": "推荐服务已在运行中", "pid": pid, "skipped": True} backend_dir = Path(__file__).parent.parent.parent start_script = backend_dir / "start_recommendations.sh" if not start_script.exists(): raise HTTPException(status_code=500, detail=f"找不到启动脚本: {start_script}") cmd = ["bash", "-lc", f"sleep 1; '{start_script}'"] try: subprocess.Popen( cmd, cwd=str(backend_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) except Exception as e: raise HTTPException(status_code=500, detail=f"启动脚本执行失败: {e}") return {"message": "已发起推荐服务启动(1s 后执行)", "script": str(start_script)}