""" FastAPI应用主入口 """ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from api.routes import config, trades, stats, dashboard, account, recommendations, system, accounts, auth, admin, public, data_management import os import sys import logging from pathlib import Path from logging.handlers import RotatingFileHandler # 加载.env文件 try: from dotenv import load_dotenv backend_dir = Path(__file__).parent.parent project_root = backend_dir.parent env_file = backend_dir / '.env' if not env_file.exists(): env_file = project_root / '.env' if env_file.exists(): load_dotenv(env_file) else: load_dotenv(project_root / '.env', override=False) except ImportError: pass except Exception: pass # 配置日志 def setup_logging(): """配置日志系统""" # 获取日志级别 log_level = os.getenv('LOG_LEVEL', 'INFO').upper() log_level_map = { 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL } level = log_level_map.get(log_level, logging.INFO) # 创建日志目录 backend_dir = Path(__file__).parent.parent log_dir = backend_dir / 'logs' log_dir.mkdir(exist_ok=True) # 日志文件路径 log_file = log_dir / 'api.log' # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(level) # 清除现有的处理器 root_logger.handlers.clear() # 设置日志时间格式为北京时间(UTC+8) from datetime import datetime, timezone, timedelta class BeijingTimeFormatter(logging.Formatter): """使用北京时间的日志格式化器""" def formatTime(self, record, datefmt=None): # 转换为北京时间(UTC+8) beijing_tz = timezone(timedelta(hours=8)) dt = datetime.fromtimestamp(record.created, tz=beijing_tz) if datefmt: return dt.strftime(datefmt) return dt.strftime('%Y-%m-%d %H:%M:%S') # 创建格式器(使用北京时间) formatter = BeijingTimeFormatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 文件处理器(带轮转) file_handler = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setLevel(level) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(level) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # 追加:将 ERROR 日志写入 Redis(不影响现有文件/控制台日志) try: from api.redis_log_handler import RedisErrorLogHandler, RedisLogConfig redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true" redis_username = os.getenv("REDIS_USERNAME", None) redis_password = os.getenv("REDIS_PASSWORD", None) ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required") ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) redis_cfg = RedisLogConfig( redis_url=redis_url, use_tls=redis_use_tls, username=redis_username, password=redis_password, ssl_cert_reqs=ssl_cert_reqs, ssl_ca_certs=ssl_ca_certs, service="backend", ) redis_handler = RedisErrorLogHandler(redis_cfg) # 让 handler 自己按组筛选(error/warning/info),这里只需要放宽到 INFO redis_handler.setLevel(logging.INFO) root_logger.addHandler(redis_handler) # 诊断:启动时快速检测一次 Redis 可用性(失败不影响运行) try: client = redis_handler._get_redis() # noqa: SLF001(仅用于诊断) if client is None: logging.getLogger(__name__).warning(f"⚠ Redis 日志写入未启用。REDIS_URL={redis_url}") else: logging.getLogger(__name__).info(f"✓ Redis 日志写入已启用。REDIS_URL={redis_url}") except Exception: pass except Exception: pass # 设置第三方库的日志级别 logging.getLogger('uvicorn').setLevel(logging.WARNING) logging.getLogger('uvicorn.access').setLevel(logging.WARNING) return log_file # 初始化日志 log_file = setup_logging() logger = logging.getLogger(__name__) logger.info(f"日志系统已初始化,日志文件: {log_file}") logger.info(f"日志级别: {os.getenv('LOG_LEVEL', 'INFO')}") # 检查 redis-py 是否可用(redis-py 4.2+ 同时支持同步和异步,可替代aioredis) try: import redis # type: ignore # 检查是否是 redis-py 4.2+(支持异步) if hasattr(redis, 'asyncio'): logger.info(f"✓ redis-py 已安装 (版本: {redis.__version__ if hasattr(redis, '__version__') else '未知'}),支持同步和异步客户端") logger.info(" - redis.Redis: 同步客户端(用于config_manager)") logger.info(" - redis.asyncio.Redis: 异步客户端(用于trading_system,可替代aioredis)") else: logger.warning("⚠ redis-py 版本可能过低,建议升级到 4.2+ 以获得异步支持") except ImportError as e: import sys logger.warning("⚠ redis-py 未安装,Redis/Valkey 缓存将不可用") logger.warning(f" Python 路径: {sys.executable}") logger.warning(f" 导入错误: {e}") logger.warning(" 提示: 请运行 'pip install redis>=4.2.0' 安装 redis-py") logger.warning(" 注意: redis-py 4.2+ 同时支持同步和异步,无需安装 aioredis") logger.warning(" 或者运行 'pip install -r backend/requirements.txt' 安装所有依赖") app = FastAPI( title="Auto Trade System API", version="1.0.0", description="币安自动交易系统API", redirect_slashes=False # 禁用自动重定向,避免307重定向问题 ) # 现货推荐定时扫描间隔(秒),默认 15 分钟;设为 0 关闭定时扫描 SPOT_SCAN_INTERVAL_SEC = int(os.getenv("SPOT_SCAN_INTERVAL_SEC", "900")) async def _spot_scan_loop(): """后台循环:每隔 SPOT_SCAN_INTERVAL_SEC 执行一次现货扫描并写入 Redis。""" if SPOT_SCAN_INTERVAL_SEC <= 0: logger.info("现货推荐定时扫描已关闭(SPOT_SCAN_INTERVAL_SEC=0)") return import asyncio backend_dir = Path(__file__).resolve().parent.parent sys.path.insert(0, str(backend_dir)) try: from spot_scanner import run_spot_scan_and_cache except Exception as e: logger.warning("现货扫描模块加载失败,跳过定时任务: %s", e) return logger.info("现货推荐定时扫描已启动,间隔 %d 秒", SPOT_SCAN_INTERVAL_SEC) while True: try: await run_spot_scan_and_cache(ttl_sec=900) except Exception as e: logger.warning("现货扫描执行失败: %s", e) await asyncio.sleep(SPOT_SCAN_INTERVAL_SEC) # 启动时:确保存在一个初始管理员(通过环境变量配置) @app.on_event("startup") async def _ensure_initial_admin(): try: import os from database.models import User, UserAccountMembership from api.auth_utils import hash_password username = (os.getenv("ATS_ADMIN_USERNAME") or "admin").strip() password = (os.getenv("ATS_ADMIN_PASSWORD") or "").strip() if not password: # 不强制创建,避免你忘记改默认密码导致安全风险 # 你可以设置 ATS_ADMIN_PASSWORD 后重启后端自动创建 logger.warning("未设置 ATS_ADMIN_PASSWORD,跳过自动创建初始管理员") return u = User.get_by_username(username) if not u: uid = User.create(username=username, password_hash=hash_password(password), role="admin", status="active") # 默认给管理员绑定 account_id=1(default) try: UserAccountMembership.add(int(uid), 1, role="owner") except Exception: pass logger.info(f"✓ 已创建初始管理员用户: {username} (id={uid})") else: # 若已存在但不是 admin,则提升为 admin(可注释掉更保守) if (u.get("role") or "user") != "admin": try: User.set_role(int(u["id"]), "admin") logger.warning(f"已将用户 {username} 提升为 admin") except Exception: pass except Exception as e: logger.warning(f"初始化管理员失败(可忽略): {e}") # 启动现货推荐定时扫描(后台任务) try: import asyncio asyncio.create_task(_spot_scan_loop()) except Exception as e: logger.warning("启动现货扫描定时任务失败(可忽略): %s", e) # CORS配置(允许React前端访问) # 默认包含:本地开发端口、主前端域名、推荐查看器域名 cors_origins_str = os.getenv('CORS_ORIGINS', 'http://localhost:3000,http://localhost:3001,http://localhost:5173,http://as.deepx1.com,http://asapi.deepx1.com,http://r.deepx1.com,https://r.deepx1.com,http://asapi-new.deepx1.com') cors_origins = [origin.strip() for origin in cors_origins_str.split(',') if origin.strip()] logger.info(f"CORS允许的源: {cors_origins}") app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], allow_headers=["*"], expose_headers=["*"], ) # 注册路由 app.include_router(config.router, prefix="/api/config", tags=["配置管理"]) app.include_router(auth.router, tags=["auth"]) app.include_router(admin.router) app.include_router(accounts.router, prefix="/api/accounts", tags=["账号管理"]) app.include_router(trades.router, prefix="/api/trades", tags=["交易记录"]) app.include_router(stats.router, prefix="/api/stats", tags=["统计分析"]) app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表板"]) app.include_router(account.router, prefix="/api/account", tags=["账户数据"]) app.include_router(recommendations.router, tags=["交易推荐"]) app.include_router(system.router, tags=["系统控制"]) app.include_router(data_management.router) app.include_router(public.router) @app.get("/") async def root(): return { "message": "Auto Trade System API", "version": "1.0.0", "docs": "/docs" } @app.get("/api/health") async def health(): return {"status": "ok"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)