auto_trade_sys/backend/api/main.py
薇薇安 3389e0aafc feat(recommendations): 添加现货推荐扫描与API支持
在后端API中新增现货推荐扫描功能,定时将数据写入Redis缓存,并提供相应的API接口以获取现货推荐。前端组件更新以支持现货推荐的展示与切换,提升用户体验与决策支持。此改动为用户提供了实时的现货推荐信息,增强了系统的功能性与灵活性。
2026-02-25 08:40:52 +08:00

287 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI应用主入口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.routes import config, trades, stats, dashboard, account, recommendations, system, accounts, auth, admin, public, data_management
import os
import sys
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler
# 加载.env文件
try:
from dotenv import load_dotenv
backend_dir = Path(__file__).parent.parent
project_root = backend_dir.parent
env_file = backend_dir / '.env'
if not env_file.exists():
env_file = project_root / '.env'
if env_file.exists():
load_dotenv(env_file)
else:
load_dotenv(project_root / '.env', override=False)
except ImportError:
pass
except Exception:
pass
# 配置日志
def setup_logging():
"""配置日志系统"""
# 获取日志级别
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
log_level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
level = log_level_map.get(log_level, logging.INFO)
# 创建日志目录
backend_dir = Path(__file__).parent.parent
log_dir = backend_dir / 'logs'
log_dir.mkdir(exist_ok=True)
# 日志文件路径
log_file = log_dir / 'api.log'
# 配置根日志记录器
root_logger = logging.getLogger()
root_logger.setLevel(level)
# 清除现有的处理器
root_logger.handlers.clear()
# 设置日志时间格式为北京时间UTC+8
from datetime import datetime, timezone, timedelta
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
# 转换为北京时间UTC+8
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
# 创建格式器(使用北京时间)
formatter = BeijingTimeFormatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器(带轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 追加:将 ERROR 日志写入 Redis不影响现有文件/控制台日志)
try:
from api.redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true"
redis_username = os.getenv("REDIS_USERNAME", None)
redis_password = os.getenv("REDIS_PASSWORD", None)
ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required")
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
redis_cfg = RedisLogConfig(
redis_url=redis_url,
use_tls=redis_use_tls,
username=redis_username,
password=redis_password,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
service="backend",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
# 让 handler 自己按组筛选error/warning/info这里只需要放宽到 INFO
redis_handler.setLevel(logging.INFO)
root_logger.addHandler(redis_handler)
# 诊断:启动时快速检测一次 Redis 可用性(失败不影响运行)
try:
client = redis_handler._get_redis() # noqa: SLF001仅用于诊断
if client is None:
logging.getLogger(__name__).warning(f"⚠ Redis 日志写入未启用。REDIS_URL={redis_url}")
else:
logging.getLogger(__name__).info(f"✓ Redis 日志写入已启用。REDIS_URL={redis_url}")
except Exception:
pass
except Exception:
pass
# 设置第三方库的日志级别
logging.getLogger('uvicorn').setLevel(logging.WARNING)
logging.getLogger('uvicorn.access').setLevel(logging.WARNING)
return log_file
# 初始化日志
log_file = setup_logging()
logger = logging.getLogger(__name__)
logger.info(f"日志系统已初始化,日志文件: {log_file}")
logger.info(f"日志级别: {os.getenv('LOG_LEVEL', 'INFO')}")
# 检查 redis-py 是否可用redis-py 4.2+ 同时支持同步和异步可替代aioredis
try:
import redis # type: ignore
# 检查是否是 redis-py 4.2+(支持异步)
if hasattr(redis, 'asyncio'):
logger.info(f"✓ redis-py 已安装 (版本: {redis.__version__ if hasattr(redis, '__version__') else '未知'}),支持同步和异步客户端")
logger.info(" - redis.Redis: 同步客户端用于config_manager")
logger.info(" - redis.asyncio.Redis: 异步客户端用于trading_system可替代aioredis")
else:
logger.warning("⚠ redis-py 版本可能过低,建议升级到 4.2+ 以获得异步支持")
except ImportError as e:
import sys
logger.warning("⚠ redis-py 未安装Redis/Valkey 缓存将不可用")
logger.warning(f" Python 路径: {sys.executable}")
logger.warning(f" 导入错误: {e}")
logger.warning(" 提示: 请运行 'pip install redis>=4.2.0' 安装 redis-py")
logger.warning(" 注意: redis-py 4.2+ 同时支持同步和异步,无需安装 aioredis")
logger.warning(" 或者运行 'pip install -r backend/requirements.txt' 安装所有依赖")
app = FastAPI(
title="Auto Trade System API",
version="1.0.0",
description="币安自动交易系统API",
redirect_slashes=False # 禁用自动重定向避免307重定向问题
)
# 现货推荐定时扫描间隔(秒),默认 15 分钟;设为 0 关闭定时扫描
SPOT_SCAN_INTERVAL_SEC = int(os.getenv("SPOT_SCAN_INTERVAL_SEC", "900"))
async def _spot_scan_loop():
"""后台循环:每隔 SPOT_SCAN_INTERVAL_SEC 执行一次现货扫描并写入 Redis。"""
if SPOT_SCAN_INTERVAL_SEC <= 0:
logger.info("现货推荐定时扫描已关闭SPOT_SCAN_INTERVAL_SEC=0")
return
import asyncio
backend_dir = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(backend_dir))
try:
from spot_scanner import run_spot_scan_and_cache
except Exception as e:
logger.warning("现货扫描模块加载失败,跳过定时任务: %s", e)
return
logger.info("现货推荐定时扫描已启动,间隔 %d", SPOT_SCAN_INTERVAL_SEC)
while True:
try:
await run_spot_scan_and_cache(ttl_sec=900)
except Exception as e:
logger.warning("现货扫描执行失败: %s", e)
await asyncio.sleep(SPOT_SCAN_INTERVAL_SEC)
# 启动时:确保存在一个初始管理员(通过环境变量配置)
@app.on_event("startup")
async def _ensure_initial_admin():
try:
import os
from database.models import User, UserAccountMembership
from api.auth_utils import hash_password
username = (os.getenv("ATS_ADMIN_USERNAME") or "admin").strip()
password = (os.getenv("ATS_ADMIN_PASSWORD") or "").strip()
if not password:
# 不强制创建,避免你忘记改默认密码导致安全风险
# 你可以设置 ATS_ADMIN_PASSWORD 后重启后端自动创建
logger.warning("未设置 ATS_ADMIN_PASSWORD跳过自动创建初始管理员")
return
u = User.get_by_username(username)
if not u:
uid = User.create(username=username, password_hash=hash_password(password), role="admin", status="active")
# 默认给管理员绑定 account_id=1default
try:
UserAccountMembership.add(int(uid), 1, role="owner")
except Exception:
pass
logger.info(f"✓ 已创建初始管理员用户: {username} (id={uid})")
else:
# 若已存在但不是 admin则提升为 admin可注释掉更保守
if (u.get("role") or "user") != "admin":
try:
User.set_role(int(u["id"]), "admin")
logger.warning(f"已将用户 {username} 提升为 admin")
except Exception:
pass
except Exception as e:
logger.warning(f"初始化管理员失败(可忽略): {e}")
# 启动现货推荐定时扫描(后台任务)
try:
import asyncio
asyncio.create_task(_spot_scan_loop())
except Exception as e:
logger.warning("启动现货扫描定时任务失败(可忽略): %s", e)
# CORS配置允许React前端访问
# 默认包含:本地开发端口、主前端域名、推荐查看器域名
cors_origins_str = os.getenv('CORS_ORIGINS', 'http://localhost:3000,http://localhost:3001,http://localhost:5173,http://as.deepx1.com,http://asapi.deepx1.com,http://r.deepx1.com,https://r.deepx1.com,http://asapi-new.deepx1.com')
cors_origins = [origin.strip() for origin in cors_origins_str.split(',') if origin.strip()]
logger.info(f"CORS允许的源: {cors_origins}")
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["*"],
expose_headers=["*"],
)
# 注册路由
app.include_router(config.router, prefix="/api/config", tags=["配置管理"])
app.include_router(auth.router, tags=["auth"])
app.include_router(admin.router)
app.include_router(accounts.router, prefix="/api/accounts", tags=["账号管理"])
app.include_router(trades.router, prefix="/api/trades", tags=["交易记录"])
app.include_router(stats.router, prefix="/api/stats", tags=["统计分析"])
app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表板"])
app.include_router(account.router, prefix="/api/account", tags=["账户数据"])
app.include_router(recommendations.router, tags=["交易推荐"])
app.include_router(system.router, tags=["系统控制"])
app.include_router(data_management.router)
app.include_router(public.router)
@app.get("/")
async def root():
return {
"message": "Auto Trade System API",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/api/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)