auto_trade_sys/backend/api/routes/stats.py

495 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统计分析API
"""
from fastapi import APIRouter, Query, Header, Depends
import sys
from pathlib import Path
from datetime import datetime, timedelta
import logging
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
from database.models import AccountSnapshot, Trade, MarketScan, TradingSignal, Account, TradeStats
from fastapi import HTTPException
from api.auth_deps import get_account_id, get_admin_user
from typing import Dict, Any
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/admin/dashboard")
async def get_admin_dashboard_stats(user: Dict[str, Any] = Depends(get_admin_user)):
"""获取管理员仪表板数据总资产来自各账号快照汇总不调币安总盈亏为最近7天聚合已实现盈亏。"""
try:
accounts = Account.list_all()
stats = []
total_assets = 0.0
active_accounts = 0
for acc in accounts:
aid = acc["id"]
snapshots = AccountSnapshot.get_recent(1, account_id=aid)
acc_stat = {
"id": aid,
"name": acc["name"],
"status": acc["status"],
"total_balance": 0,
"total_pnl": 0,
"open_positions": 0,
}
if snapshots:
snap = snapshots[0]
acc_stat["total_balance"] = snap.get("total_balance", 0)
acc_stat["total_pnl"] = snap.get("total_pnl", 0)
acc_stat["open_positions"] = snap.get("open_positions", 0)
total_assets += float(acc_stat["total_balance"])
if acc["status"] == "active":
active_accounts += 1
stats.append(acc_stat)
total_pnl_7d = 0.0
try:
global_symbols = TradeStats.get_global_symbol_stats(days=7)
for row in global_symbols:
total_pnl_7d += float(row.get("net_pnl") or 0)
except Exception as e:
logger.debug(f"获取全局7天净盈亏失败: {e}")
return {
"summary": {
"total_accounts": len(accounts),
"active_accounts": active_accounts,
"total_assets_usdt": round(total_assets, 2),
"total_pnl_usdt": round(total_pnl_7d, 2),
},
"accounts": stats,
}
except Exception as e:
logger.error(f"获取管理员仪表板数据失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/overall-trade-stats")
async def get_admin_overall_trade_stats(
days: int = Query(7, ge=1, le=90),
user: Dict[str, Any] = Depends(get_admin_user),
):
"""管理员:全账号最近 N 天整体订单统计。"""
try:
by_symbol_raw = TradeStats.get_global_symbol_stats(days=days)
by_hour_raw = TradeStats.get_global_hourly_stats(days=days)
by_symbol = []
for row in by_symbol_raw:
tc = int(row.get("trade_count") or 0)
win_count = int(row.get("win_count") or 0)
loss_count = int(row.get("loss_count") or 0)
net_pnl = float(row.get("net_pnl") or 0)
win_rate = (100.0 * win_count / tc) if tc > 0 else 0.0
by_symbol.append({
"symbol": (row.get("symbol") or "").strip(),
"trade_count": tc,
"win_count": win_count,
"loss_count": loss_count,
"net_pnl": round(net_pnl, 4),
"win_rate_pct": round(win_rate, 1),
})
by_symbol = [x for x in by_symbol if x["symbol"]]
by_symbol.sort(key=lambda x: (-x["net_pnl"], -x["trade_count"]))
hourly_agg = [{"hour": h, "trade_count": 0, "net_pnl": 0.0} for h in range(24)]
for row in by_hour_raw:
h = row.get("hour")
if h is not None and 0 <= int(h) <= 23:
hi = int(h)
hourly_agg[hi]["trade_count"] = int(row.get("trade_count") or 0)
hourly_agg[hi]["net_pnl"] = round(float(row.get("net_pnl") or 0), 4)
total_trade_count = sum(x["trade_count"] for x in by_symbol)
total_win = sum(x["win_count"] for x in by_symbol)
total_loss = sum(x["loss_count"] for x in by_symbol)
total_net_pnl = sum(x["net_pnl"] for x in by_symbol)
suggestions = _build_suggestions(by_symbol)
return {
"days": days,
"summary": {
"trade_count": total_trade_count,
"win_count": total_win,
"loss_count": total_loss,
"net_pnl": round(total_net_pnl, 4),
},
"by_symbol": by_symbol,
"hourly_agg": hourly_agg,
"suggestions": suggestions,
}
except Exception as e:
logger.exception("get_admin_overall_trade_stats 失败")
raise HTTPException(status_code=500, detail=str(e))
def _aggregate_daily_by_symbol(daily: list) -> list:
"""将 daily按 date+symbol聚合成按 symbol 的汇总。"""
from collections import defaultdict
agg = defaultdict(lambda: {"trade_count": 0, "win_count": 0, "loss_count": 0, "net_pnl": 0.0})
for row in daily:
sym = (row.get("symbol") or "").strip()
if not sym:
continue
agg[sym]["trade_count"] += int(row.get("trade_count") or 0)
agg[sym]["win_count"] += int(row.get("win_count") or 0)
agg[sym]["loss_count"] += int(row.get("loss_count") or 0)
try:
agg[sym]["net_pnl"] += float(row.get("net_pnl") or 0)
except (TypeError, ValueError):
pass
out = []
for symbol, v in agg.items():
tc = v["trade_count"]
win_rate = (100.0 * v["win_count"] / tc) if tc > 0 else 0.0
out.append({
"symbol": symbol,
"trade_count": tc,
"win_count": v["win_count"],
"loss_count": v["loss_count"],
"net_pnl": round(v["net_pnl"], 4),
"win_rate_pct": round(win_rate, 1),
})
return sorted(out, key=lambda x: (-x["net_pnl"], -x["trade_count"]))
def _aggregate_hourly(by_hour: list) -> list:
"""将 by_hour按 date+hour聚合成按 hour 0-23 的汇总。"""
from collections import defaultdict
agg = defaultdict(lambda: {"trade_count": 0, "net_pnl": 0.0})
for row in by_hour:
h = row.get("hour")
if h is None:
continue
try:
h = int(h)
except (TypeError, ValueError):
continue
if 0 <= h <= 23:
agg[h]["trade_count"] += int(row.get("trade_count") or 0)
try:
agg[h]["net_pnl"] += float(row.get("net_pnl") or 0)
except (TypeError, ValueError):
pass
return [{"hour": h, "trade_count": agg[h]["trade_count"], "net_pnl": round(agg[h]["net_pnl"], 4)} for h in range(24)]
def _build_suggestions(by_symbol: list) -> dict:
"""
根据按交易对汇总生成白名单/黑名单建议(仅展示,不自动改策略)。
- 黑名单:净亏且笔数多 → 建议降权或观察
- 白名单:净盈且胜率较高、笔数足够 → 可优先考虑
"""
blacklist = []
whitelist = []
for row in by_symbol:
sym = row.get("symbol", "")
tc = int(row.get("trade_count") or 0)
net_pnl = float(row.get("net_pnl") or 0)
win_rate = float(row.get("win_rate_pct") or 0)
if tc < 2:
continue
if net_pnl < 0:
blacklist.append({
"symbol": sym,
"trade_count": tc,
"net_pnl": round(net_pnl, 2),
"win_rate_pct": round(win_rate, 1),
"suggestion": "近期净亏且笔数较多,建议降权或观察后再开仓",
})
elif net_pnl > 0 and win_rate >= 50:
whitelist.append({
"symbol": sym,
"trade_count": tc,
"net_pnl": round(net_pnl, 2),
"win_rate_pct": round(win_rate, 1),
"suggestion": "近期净盈且胜率尚可,可优先考虑",
})
return {"blacklist": blacklist, "whitelist": whitelist}
@router.get("/trade-stats")
async def get_trade_stats(
days: int = Query(7, ge=1, le=90),
account_id: int = Depends(get_account_id),
):
"""获取交易统计:最近 N 天按交易对、按小时聚合(来自 trade_stats_daily / trade_stats_time_bucket
返回原始 daily/by_hour、按交易对汇总 by_symbol、按小时汇总 hourly_agg、以及白名单/黑名单建议。"""
try:
daily = TradeStats.get_daily_stats(account_id=account_id, days=days)
by_hour = TradeStats.get_hourly_stats(account_id=account_id, days=days)
by_symbol = _aggregate_daily_by_symbol(daily)
hourly_agg = _aggregate_hourly(by_hour)
suggestions = _build_suggestions(by_symbol)
return {
"days": days,
"daily": daily,
"by_hour": by_hour,
"by_symbol": by_symbol,
"hourly_agg": hourly_agg,
"suggestions": suggestions,
}
except Exception as e:
logger.exception("get_trade_stats 失败")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/performance")
async def get_performance_stats(
days: int = Query(7, ge=1, le=365),
account_id: int = Depends(get_account_id),
):
"""获取性能统计"""
try:
# 账户快照
snapshots = AccountSnapshot.get_recent(days, account_id=account_id)
# 交易统计(时间范围 + limit 防内存暴增)
start_ts = int((datetime.now() - timedelta(days=days)).timestamp())
end_ts = int(datetime.now().timestamp())
trades = Trade.get_all(
start_timestamp=start_ts,
end_timestamp=end_ts,
account_id=account_id,
time_filter="exit",
limit=10000,
)
return {
"snapshots": snapshots,
"trades": trades,
"period": f"Last {days} days"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/dashboard")
async def get_dashboard_data(account_id: int = Depends(get_account_id)):
"""获取仪表板数据"""
logger.info("=" * 60)
logger.info(f"获取仪表板数据 - account_id={account_id}")
logger.info("=" * 60)
try:
account_data = None
account_error = None
# 优先请求币安实时余额;失败时(如 -1003 IP 封禁)再回退到数据库快照
try:
from api.routes.account import get_realtime_account_data
account_data = await get_realtime_account_data(account_id=account_id)
if account_data and account_data.get('total_balance') is not None:
logger.info("使用币安实时账户数据")
else:
account_data = None
account_error = "实时余额返回为空"
except Exception as live_err:
account_error = str(live_err)
logger.warning(f"获取实时账户数据失败 (account_id={account_id}),回退到数据库快照: {live_err}")
# 实时请求失败或无数据时,使用数据库快照
if not account_data or account_data.get('total_balance') is None:
try:
snapshots = AccountSnapshot.get_recent(1, account_id=account_id)
if snapshots:
account_data = {
"total_balance": snapshots[0].get('total_balance', 0),
"available_balance": snapshots[0].get('available_balance', 0),
"total_position_value": snapshots[0].get('total_position_value', 0),
"total_pnl": snapshots[0].get('total_pnl', 0),
"open_positions": snapshots[0].get('open_positions', 0)
}
logger.info("使用数据库快照作为账户数据")
else:
if not account_data:
account_data = {}
account_data.setdefault("total_balance", 0)
account_data.setdefault("available_balance", 0)
account_data.setdefault("total_position_value", 0)
account_data.setdefault("total_pnl", 0)
account_data.setdefault("open_positions", 0)
logger.warning("数据库中没有账户快照数据,仪表板显示 0交易进程会定期写入快照")
except Exception as db_error:
logger.error(f"从数据库获取账户快照失败: {db_error}")
if not account_data:
account_data = {
"total_balance": 0,
"available_balance": 0,
"total_position_value": 0,
"total_pnl": 0,
"open_positions": 0
}
# 获取持仓数据:优先「币安实时持仓」(含本系统下的挂单),失败时回退到数据库列表
open_trades = []
positions_error = None
try:
try:
from api.routes.account import fetch_realtime_positions
open_trades = await fetch_realtime_positions(account_id)
except Exception as fetch_err:
logger.warning(f"获取币安实时持仓失败,回退到数据库列表: {fetch_err}")
open_trades = []
if not open_trades:
db_trades = Trade.get_all(status='open', account_id=account_id, limit=500)
for trade in db_trades:
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
leverage = float(trade.get('leverage', 1))
pnl = float(trade.get('pnl', 0))
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
open_trades.append({
**trade,
'entry_value_usdt': entry_value_usdt,
'mark_price': trade.get('entry_price', 0),
'pnl': pnl,
'pnl_percent': pnl_percent
})
try:
from api.routes.account import fetch_live_positions_pnl
live_list = await fetch_live_positions_pnl(account_id)
by_symbol = {p["symbol"]: p for p in live_list}
for t in open_trades:
sym = t.get("symbol")
if sym and sym in by_symbol:
lp = by_symbol[sym]
t["mark_price"] = lp.get("mark_price", t.get("entry_price"))
t["pnl"] = lp.get("pnl", 0)
t["pnl_percent"] = lp.get("pnl_percent", 0)
except Exception as merge_err:
logger.debug(f"合并实时持仓盈亏失败: {merge_err}")
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
else:
logger.info(f"使用币安实时持仓作为列表: {len(open_trades)} 个持仓")
except Exception as db_error:
logger.error(f"从数据库获取持仓记录失败: {db_error}")
# 最近的扫描记录
recent_scans = []
try:
recent_scans = MarketScan.get_recent(10)
except Exception as e:
logger.error(f"获取扫描记录失败: {e}")
# 最近的信号
recent_signals = []
try:
recent_signals = TradingSignal.get_recent(20)
except Exception as e:
logger.error(f"获取交易信号失败: {e}")
# 计算仓位占比信息
position_stats = None
if account_data:
try:
from database.models import TradingConfig
total_balance = float(account_data.get('total_balance', 0))
max_total_position_percent = float(TradingConfig.get_value('MAX_TOTAL_POSITION_PERCENT', 0.30, account_id=account_id))
# 名义仓位notional与保证金占用margin是两个口径
# - 名义仓位可以 > 100%(高杠杆下非常正常)
# - MAX_TOTAL_POSITION_PERCENT 在当前系统语义里是“保证金占用比例”
total_notional_value = float(account_data.get('total_position_value', 0))
# 优先使用 account_data 里的 total_margin_value如果没有则从 open_trades 汇总兜底
total_margin_value = account_data.get('total_margin_value', None)
try:
total_margin_value = float(total_margin_value) if total_margin_value is not None else None
except Exception:
total_margin_value = None
if total_margin_value is None:
total_margin_value = 0.0
for t in open_trades or []:
try:
mv = t.get("margin_usdt", None)
if mv is None:
# fallback名义/杠杆
nv = float(t.get("notional_usdt", 0) or 0)
lv = float(t.get("leverage", 0) or 0)
if lv <= 0:
lv = 1.0
mv = nv / lv
total_margin_value += float(mv or 0)
except Exception:
continue
# 当前仓位占比(保证金口径,与你的 MAX_TOTAL_POSITION_PERCENT 对齐)
current_margin_percent = (total_margin_value / total_balance * 100) if total_balance > 0 else 0
# 名义占比(仅用于参考)
current_notional_percent = (total_notional_value / total_balance * 100) if total_balance > 0 else 0
# 最大允许保证金USDT
max_margin_value = total_balance * max_total_position_percent
position_stats = {
# 兼容旧字段current_position_percent 现在代表“保证金占比”
"current_position_percent": round(current_margin_percent, 2),
"current_position_percent_type": "margin",
"current_notional_percent": round(current_notional_percent, 2),
"max_position_percent": round(max_total_position_percent * 100, 2),
# 兼容旧字段max_position_value/total_position_value 现在代表“保证金(USDT)”
"max_position_value": round(max_margin_value, 2),
"total_balance": round(total_balance, 2),
"total_position_value": round(total_margin_value, 2),
# 额外信息名义价值USDT用于解释“名义占比可能>100%”
"total_notional_value": round(total_notional_value, 2),
"total_margin_value": round(total_margin_value, 2),
}
except Exception as e:
logger.warning(f"计算仓位占比信息失败: {e}")
# 获取交易配置(用于前端显示止损止盈等参数)
trading_config = {}
try:
from database.models import TradingConfig
config_keys = ['STOP_LOSS_PERCENT', 'TAKE_PROFIT_PERCENT', 'LEVERAGE', 'MAX_POSITION_PERCENT']
for key in config_keys:
config = TradingConfig.get(key, account_id=account_id)
if config:
trading_config[key] = {
'value': TradingConfig._convert_value(config['config_value'], config['config_type']),
'type': config['config_type']
}
except Exception as e:
logger.debug(f"获取交易配置失败: {e}")
# 本系统持仓数 = 数据库 status=open 条数,与下方「当前持仓」列表一致;币安持仓数 = 接口/快照中的 open_positions可能与币安页面一致
open_trades_count = len(open_trades)
result = {
"account": account_data,
"open_trades": open_trades,
"open_trades_count": open_trades_count, # 本系统持仓数,与列表条数一致
"recent_scans": recent_scans,
"recent_signals": recent_signals,
"position_stats": position_stats,
"trading_config": trading_config, # 添加交易配置
"_debug": { # 添加调试信息
"account_id": account_id,
"account_data_total_balance": account_data.get('total_balance', 'N/A') if account_data else 'N/A',
"open_trades_count": open_trades_count,
}
}
# 如果有错误,在响应中包含错误信息(但不影响返回)
if account_error or positions_error:
result["warnings"] = {}
if account_error:
result["warnings"]["account"] = account_error
if positions_error:
result["warnings"]["positions"] = positions_error
logger.info(f"返回仪表板数据:")
logger.info(f" - account_id: {account_id}")
logger.info(f" - total_balance: {account_data.get('total_balance', 'N/A') if account_data else 'N/A'}")
logger.info(f" - available_balance: {account_data.get('available_balance', 'N/A') if account_data else 'N/A'}")
logger.info(f" - open_trades count: {len(open_trades)}")
if open_trades and len(open_trades) > 0:
logger.info(f" - 第一个持仓: {open_trades[0].get('symbol', 'N/A')}")
logger.info("=" * 60)
return result
except Exception as e:
logger.error(f"获取仪表板数据失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取仪表板数据失败: {str(e)}")