Modified the admin dashboard statistics to retrieve account snapshots from the last 30 days instead of just 1 day, ensuring more comprehensive data. Additionally, introduced a new data source option for trades, allowing users to select between 'binance' and 'local' records, with appropriate handling for each source. Updated the frontend components to reflect these changes and improve user experience in managing trade data.
496 lines
22 KiB
Python
496 lines
22 KiB
Python
"""
|
||
统计分析API
|
||
"""
|
||
from fastapi import APIRouter, Query, Header, Depends
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
|
||
project_root = Path(__file__).parent.parent.parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
sys.path.insert(0, str(project_root / 'backend'))
|
||
|
||
from database.models import AccountSnapshot, Trade, MarketScan, TradingSignal, Account, TradeStats
|
||
from fastapi import HTTPException
|
||
from api.auth_deps import get_account_id, get_admin_user
|
||
from typing import Dict, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("/admin/dashboard")
|
||
async def get_admin_dashboard_stats(user: Dict[str, Any] = Depends(get_admin_user)):
|
||
"""获取管理员仪表板数据:总资产来自各账号快照汇总(不调币安),总盈亏为最近7天聚合已实现盈亏。"""
|
||
try:
|
||
accounts = Account.list_all()
|
||
stats = []
|
||
total_assets = 0.0
|
||
active_accounts = 0
|
||
for acc in accounts:
|
||
aid = acc["id"]
|
||
# 取最近 30 天内的快照,再取最新一条,避免“仅 1 天”导致无数据
|
||
snapshots = AccountSnapshot.get_recent(30, account_id=aid)
|
||
acc_stat = {
|
||
"id": aid,
|
||
"name": acc["name"],
|
||
"status": acc["status"],
|
||
"total_balance": 0,
|
||
"total_pnl": 0,
|
||
"open_positions": 0,
|
||
}
|
||
if snapshots:
|
||
snap = snapshots[0]
|
||
acc_stat["total_balance"] = snap.get("total_balance", 0)
|
||
acc_stat["total_pnl"] = snap.get("total_pnl", 0)
|
||
acc_stat["open_positions"] = snap.get("open_positions", 0)
|
||
total_assets += float(acc_stat["total_balance"])
|
||
if acc["status"] == "active":
|
||
active_accounts += 1
|
||
stats.append(acc_stat)
|
||
total_pnl_7d = 0.0
|
||
try:
|
||
global_symbols = TradeStats.get_global_symbol_stats(days=7)
|
||
for row in global_symbols:
|
||
total_pnl_7d += float(row.get("net_pnl") or 0)
|
||
except Exception as e:
|
||
logger.debug(f"获取全局7天净盈亏失败: {e}")
|
||
return {
|
||
"summary": {
|
||
"total_accounts": len(accounts),
|
||
"active_accounts": active_accounts,
|
||
"total_assets_usdt": round(total_assets, 2),
|
||
"total_pnl_usdt": round(total_pnl_7d, 2),
|
||
},
|
||
"accounts": stats,
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取管理员仪表板数据失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/admin/overall-trade-stats")
|
||
async def get_admin_overall_trade_stats(
|
||
days: int = Query(7, ge=1, le=90),
|
||
user: Dict[str, Any] = Depends(get_admin_user),
|
||
):
|
||
"""管理员:全账号最近 N 天整体订单统计。"""
|
||
try:
|
||
by_symbol_raw = TradeStats.get_global_symbol_stats(days=days)
|
||
by_hour_raw = TradeStats.get_global_hourly_stats(days=days)
|
||
by_symbol = []
|
||
for row in by_symbol_raw:
|
||
tc = int(row.get("trade_count") or 0)
|
||
win_count = int(row.get("win_count") or 0)
|
||
loss_count = int(row.get("loss_count") or 0)
|
||
net_pnl = float(row.get("net_pnl") or 0)
|
||
win_rate = (100.0 * win_count / tc) if tc > 0 else 0.0
|
||
by_symbol.append({
|
||
"symbol": (row.get("symbol") or "").strip(),
|
||
"trade_count": tc,
|
||
"win_count": win_count,
|
||
"loss_count": loss_count,
|
||
"net_pnl": round(net_pnl, 4),
|
||
"win_rate_pct": round(win_rate, 1),
|
||
})
|
||
by_symbol = [x for x in by_symbol if x["symbol"]]
|
||
by_symbol.sort(key=lambda x: (-x["net_pnl"], -x["trade_count"]))
|
||
hourly_agg = [{"hour": h, "trade_count": 0, "net_pnl": 0.0} for h in range(24)]
|
||
for row in by_hour_raw:
|
||
h = row.get("hour")
|
||
if h is not None and 0 <= int(h) <= 23:
|
||
hi = int(h)
|
||
hourly_agg[hi]["trade_count"] = int(row.get("trade_count") or 0)
|
||
hourly_agg[hi]["net_pnl"] = round(float(row.get("net_pnl") or 0), 4)
|
||
total_trade_count = sum(x["trade_count"] for x in by_symbol)
|
||
total_win = sum(x["win_count"] for x in by_symbol)
|
||
total_loss = sum(x["loss_count"] for x in by_symbol)
|
||
total_net_pnl = sum(x["net_pnl"] for x in by_symbol)
|
||
suggestions = _build_suggestions(by_symbol)
|
||
return {
|
||
"days": days,
|
||
"summary": {
|
||
"trade_count": total_trade_count,
|
||
"win_count": total_win,
|
||
"loss_count": total_loss,
|
||
"net_pnl": round(total_net_pnl, 4),
|
||
},
|
||
"by_symbol": by_symbol,
|
||
"hourly_agg": hourly_agg,
|
||
"suggestions": suggestions,
|
||
}
|
||
except Exception as e:
|
||
logger.exception("get_admin_overall_trade_stats 失败")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
def _aggregate_daily_by_symbol(daily: list) -> list:
|
||
"""将 daily(按 date+symbol)聚合成按 symbol 的汇总。"""
|
||
from collections import defaultdict
|
||
agg = defaultdict(lambda: {"trade_count": 0, "win_count": 0, "loss_count": 0, "net_pnl": 0.0})
|
||
for row in daily:
|
||
sym = (row.get("symbol") or "").strip()
|
||
if not sym:
|
||
continue
|
||
agg[sym]["trade_count"] += int(row.get("trade_count") or 0)
|
||
agg[sym]["win_count"] += int(row.get("win_count") or 0)
|
||
agg[sym]["loss_count"] += int(row.get("loss_count") or 0)
|
||
try:
|
||
agg[sym]["net_pnl"] += float(row.get("net_pnl") or 0)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
out = []
|
||
for symbol, v in agg.items():
|
||
tc = v["trade_count"]
|
||
win_rate = (100.0 * v["win_count"] / tc) if tc > 0 else 0.0
|
||
out.append({
|
||
"symbol": symbol,
|
||
"trade_count": tc,
|
||
"win_count": v["win_count"],
|
||
"loss_count": v["loss_count"],
|
||
"net_pnl": round(v["net_pnl"], 4),
|
||
"win_rate_pct": round(win_rate, 1),
|
||
})
|
||
return sorted(out, key=lambda x: (-x["net_pnl"], -x["trade_count"]))
|
||
|
||
|
||
def _aggregate_hourly(by_hour: list) -> list:
|
||
"""将 by_hour(按 date+hour)聚合成按 hour 0-23 的汇总。"""
|
||
from collections import defaultdict
|
||
agg = defaultdict(lambda: {"trade_count": 0, "net_pnl": 0.0})
|
||
for row in by_hour:
|
||
h = row.get("hour")
|
||
if h is None:
|
||
continue
|
||
try:
|
||
h = int(h)
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if 0 <= h <= 23:
|
||
agg[h]["trade_count"] += int(row.get("trade_count") or 0)
|
||
try:
|
||
agg[h]["net_pnl"] += float(row.get("net_pnl") or 0)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return [{"hour": h, "trade_count": agg[h]["trade_count"], "net_pnl": round(agg[h]["net_pnl"], 4)} for h in range(24)]
|
||
|
||
|
||
def _build_suggestions(by_symbol: list) -> dict:
|
||
"""
|
||
根据按交易对汇总生成白名单/黑名单建议(仅展示,不自动改策略)。
|
||
- 黑名单:净亏且笔数多 → 建议降权或观察
|
||
- 白名单:净盈且胜率较高、笔数足够 → 可优先考虑
|
||
"""
|
||
blacklist = []
|
||
whitelist = []
|
||
for row in by_symbol:
|
||
sym = row.get("symbol", "")
|
||
tc = int(row.get("trade_count") or 0)
|
||
net_pnl = float(row.get("net_pnl") or 0)
|
||
win_rate = float(row.get("win_rate_pct") or 0)
|
||
if tc < 2:
|
||
continue
|
||
if net_pnl < 0:
|
||
blacklist.append({
|
||
"symbol": sym,
|
||
"trade_count": tc,
|
||
"net_pnl": round(net_pnl, 2),
|
||
"win_rate_pct": round(win_rate, 1),
|
||
"suggestion": "近期净亏且笔数较多,建议降权或观察后再开仓",
|
||
})
|
||
elif net_pnl > 0 and win_rate >= 50:
|
||
whitelist.append({
|
||
"symbol": sym,
|
||
"trade_count": tc,
|
||
"net_pnl": round(net_pnl, 2),
|
||
"win_rate_pct": round(win_rate, 1),
|
||
"suggestion": "近期净盈且胜率尚可,可优先考虑",
|
||
})
|
||
return {"blacklist": blacklist, "whitelist": whitelist}
|
||
|
||
|
||
@router.get("/trade-stats")
|
||
async def get_trade_stats(
|
||
days: int = Query(7, ge=1, le=90),
|
||
account_id: int = Depends(get_account_id),
|
||
):
|
||
"""获取交易统计:最近 N 天按交易对、按小时聚合(来自 trade_stats_daily / trade_stats_time_bucket)。
|
||
返回原始 daily/by_hour、按交易对汇总 by_symbol、按小时汇总 hourly_agg、以及白名单/黑名单建议。"""
|
||
try:
|
||
daily = TradeStats.get_daily_stats(account_id=account_id, days=days)
|
||
by_hour = TradeStats.get_hourly_stats(account_id=account_id, days=days)
|
||
by_symbol = _aggregate_daily_by_symbol(daily)
|
||
hourly_agg = _aggregate_hourly(by_hour)
|
||
suggestions = _build_suggestions(by_symbol)
|
||
return {
|
||
"days": days,
|
||
"daily": daily,
|
||
"by_hour": by_hour,
|
||
"by_symbol": by_symbol,
|
||
"hourly_agg": hourly_agg,
|
||
"suggestions": suggestions,
|
||
}
|
||
except Exception as e:
|
||
logger.exception("get_trade_stats 失败")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/performance")
|
||
async def get_performance_stats(
|
||
days: int = Query(7, ge=1, le=365),
|
||
account_id: int = Depends(get_account_id),
|
||
):
|
||
"""获取性能统计"""
|
||
try:
|
||
# 账户快照
|
||
snapshots = AccountSnapshot.get_recent(days, account_id=account_id)
|
||
|
||
# 交易统计(时间范围 + limit 防内存暴增)
|
||
start_ts = int((datetime.now() - timedelta(days=days)).timestamp())
|
||
end_ts = int(datetime.now().timestamp())
|
||
trades = Trade.get_all(
|
||
start_timestamp=start_ts,
|
||
end_timestamp=end_ts,
|
||
account_id=account_id,
|
||
time_filter="exit",
|
||
limit=10000,
|
||
)
|
||
|
||
return {
|
||
"snapshots": snapshots,
|
||
"trades": trades,
|
||
"period": f"Last {days} days"
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/dashboard")
|
||
async def get_dashboard_data(account_id: int = Depends(get_account_id)):
|
||
"""获取仪表板数据"""
|
||
logger.info("=" * 60)
|
||
logger.info(f"获取仪表板数据 - account_id={account_id}")
|
||
logger.info("=" * 60)
|
||
try:
|
||
account_data = None
|
||
account_error = None
|
||
|
||
# 优先请求币安实时余额;失败时(如 -1003 IP 封禁)再回退到数据库快照
|
||
try:
|
||
from api.routes.account import get_realtime_account_data
|
||
account_data = await get_realtime_account_data(account_id=account_id)
|
||
if account_data and account_data.get('total_balance') is not None:
|
||
logger.info("使用币安实时账户数据")
|
||
else:
|
||
account_data = None
|
||
account_error = "实时余额返回为空"
|
||
except Exception as live_err:
|
||
account_error = str(live_err)
|
||
logger.warning(f"获取实时账户数据失败 (account_id={account_id}),回退到数据库快照: {live_err}")
|
||
|
||
# 实时请求失败或无数据时,使用数据库快照
|
||
if not account_data or account_data.get('total_balance') is None:
|
||
try:
|
||
snapshots = AccountSnapshot.get_recent(1, account_id=account_id)
|
||
if snapshots:
|
||
account_data = {
|
||
"total_balance": snapshots[0].get('total_balance', 0),
|
||
"available_balance": snapshots[0].get('available_balance', 0),
|
||
"total_position_value": snapshots[0].get('total_position_value', 0),
|
||
"total_pnl": snapshots[0].get('total_pnl', 0),
|
||
"open_positions": snapshots[0].get('open_positions', 0)
|
||
}
|
||
logger.info("使用数据库快照作为账户数据")
|
||
else:
|
||
if not account_data:
|
||
account_data = {}
|
||
account_data.setdefault("total_balance", 0)
|
||
account_data.setdefault("available_balance", 0)
|
||
account_data.setdefault("total_position_value", 0)
|
||
account_data.setdefault("total_pnl", 0)
|
||
account_data.setdefault("open_positions", 0)
|
||
logger.warning("数据库中没有账户快照数据,仪表板显示 0;交易进程会定期写入快照")
|
||
except Exception as db_error:
|
||
logger.error(f"从数据库获取账户快照失败: {db_error}")
|
||
if not account_data:
|
||
account_data = {
|
||
"total_balance": 0,
|
||
"available_balance": 0,
|
||
"total_position_value": 0,
|
||
"total_pnl": 0,
|
||
"open_positions": 0
|
||
}
|
||
|
||
# 获取持仓数据:优先「币安实时持仓」(含本系统下的挂单),失败时回退到数据库列表
|
||
open_trades = []
|
||
positions_error = None
|
||
try:
|
||
try:
|
||
from api.routes.account import fetch_realtime_positions
|
||
open_trades = await fetch_realtime_positions(account_id)
|
||
except Exception as fetch_err:
|
||
logger.warning(f"获取币安实时持仓失败,回退到数据库列表: {fetch_err}")
|
||
open_trades = []
|
||
if not open_trades:
|
||
db_trades = Trade.get_all(status='open', account_id=account_id, limit=500)
|
||
for trade in db_trades:
|
||
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
|
||
leverage = float(trade.get('leverage', 1))
|
||
pnl = float(trade.get('pnl', 0))
|
||
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
|
||
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
|
||
open_trades.append({
|
||
**trade,
|
||
'entry_value_usdt': entry_value_usdt,
|
||
'mark_price': trade.get('entry_price', 0),
|
||
'pnl': pnl,
|
||
'pnl_percent': pnl_percent
|
||
})
|
||
try:
|
||
from api.routes.account import fetch_live_positions_pnl
|
||
live_list = await fetch_live_positions_pnl(account_id)
|
||
by_symbol = {p["symbol"]: p for p in live_list}
|
||
for t in open_trades:
|
||
sym = t.get("symbol")
|
||
if sym and sym in by_symbol:
|
||
lp = by_symbol[sym]
|
||
t["mark_price"] = lp.get("mark_price", t.get("entry_price"))
|
||
t["pnl"] = lp.get("pnl", 0)
|
||
t["pnl_percent"] = lp.get("pnl_percent", 0)
|
||
except Exception as merge_err:
|
||
logger.debug(f"合并实时持仓盈亏失败: {merge_err}")
|
||
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
|
||
else:
|
||
logger.info(f"使用币安实时持仓作为列表: {len(open_trades)} 个持仓")
|
||
except Exception as db_error:
|
||
logger.error(f"从数据库获取持仓记录失败: {db_error}")
|
||
|
||
# 最近的扫描记录
|
||
recent_scans = []
|
||
try:
|
||
recent_scans = MarketScan.get_recent(10)
|
||
except Exception as e:
|
||
logger.error(f"获取扫描记录失败: {e}")
|
||
|
||
# 最近的信号
|
||
recent_signals = []
|
||
try:
|
||
recent_signals = TradingSignal.get_recent(20)
|
||
except Exception as e:
|
||
logger.error(f"获取交易信号失败: {e}")
|
||
|
||
# 计算仓位占比信息
|
||
position_stats = None
|
||
if account_data:
|
||
try:
|
||
from database.models import TradingConfig
|
||
total_balance = float(account_data.get('total_balance', 0))
|
||
max_total_position_percent = float(TradingConfig.get_value('MAX_TOTAL_POSITION_PERCENT', 0.30, account_id=account_id))
|
||
|
||
# 名义仓位(notional)与保证金占用(margin)是两个口径:
|
||
# - 名义仓位可以 > 100%(高杠杆下非常正常)
|
||
# - MAX_TOTAL_POSITION_PERCENT 在当前系统语义里是“保证金占用比例”
|
||
total_notional_value = float(account_data.get('total_position_value', 0))
|
||
|
||
# 优先使用 account_data 里的 total_margin_value;如果没有则从 open_trades 汇总兜底
|
||
total_margin_value = account_data.get('total_margin_value', None)
|
||
try:
|
||
total_margin_value = float(total_margin_value) if total_margin_value is not None else None
|
||
except Exception:
|
||
total_margin_value = None
|
||
|
||
if total_margin_value is None:
|
||
total_margin_value = 0.0
|
||
for t in open_trades or []:
|
||
try:
|
||
mv = t.get("margin_usdt", None)
|
||
if mv is None:
|
||
# fallback:名义/杠杆
|
||
nv = float(t.get("notional_usdt", 0) or 0)
|
||
lv = float(t.get("leverage", 0) or 0)
|
||
if lv <= 0:
|
||
lv = 1.0
|
||
mv = nv / lv
|
||
total_margin_value += float(mv or 0)
|
||
except Exception:
|
||
continue
|
||
|
||
# 当前仓位占比(保证金口径,与你的 MAX_TOTAL_POSITION_PERCENT 对齐)
|
||
current_margin_percent = (total_margin_value / total_balance * 100) if total_balance > 0 else 0
|
||
# 名义占比(仅用于参考)
|
||
current_notional_percent = (total_notional_value / total_balance * 100) if total_balance > 0 else 0
|
||
|
||
# 最大允许保证金(USDT)
|
||
max_margin_value = total_balance * max_total_position_percent
|
||
|
||
position_stats = {
|
||
# 兼容旧字段:current_position_percent 现在代表“保证金占比”
|
||
"current_position_percent": round(current_margin_percent, 2),
|
||
"current_position_percent_type": "margin",
|
||
"current_notional_percent": round(current_notional_percent, 2),
|
||
"max_position_percent": round(max_total_position_percent * 100, 2),
|
||
# 兼容旧字段:max_position_value/total_position_value 现在代表“保证金(USDT)”
|
||
"max_position_value": round(max_margin_value, 2),
|
||
"total_balance": round(total_balance, 2),
|
||
"total_position_value": round(total_margin_value, 2),
|
||
# 额外信息:名义价值(USDT),用于解释“名义占比可能>100%”
|
||
"total_notional_value": round(total_notional_value, 2),
|
||
"total_margin_value": round(total_margin_value, 2),
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"计算仓位占比信息失败: {e}")
|
||
|
||
# 获取交易配置(用于前端显示止损止盈等参数)
|
||
trading_config = {}
|
||
try:
|
||
from database.models import TradingConfig
|
||
config_keys = ['STOP_LOSS_PERCENT', 'TAKE_PROFIT_PERCENT', 'LEVERAGE', 'MAX_POSITION_PERCENT']
|
||
for key in config_keys:
|
||
config = TradingConfig.get(key, account_id=account_id)
|
||
if config:
|
||
trading_config[key] = {
|
||
'value': TradingConfig._convert_value(config['config_value'], config['config_type']),
|
||
'type': config['config_type']
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"获取交易配置失败: {e}")
|
||
|
||
# 本系统持仓数 = 数据库 status=open 条数,与下方「当前持仓」列表一致;币安持仓数 = 接口/快照中的 open_positions,可能与币安页面一致
|
||
open_trades_count = len(open_trades)
|
||
result = {
|
||
"account": account_data,
|
||
"open_trades": open_trades,
|
||
"open_trades_count": open_trades_count, # 本系统持仓数,与列表条数一致
|
||
"recent_scans": recent_scans,
|
||
"recent_signals": recent_signals,
|
||
"position_stats": position_stats,
|
||
"trading_config": trading_config, # 添加交易配置
|
||
"_debug": { # 添加调试信息
|
||
"account_id": account_id,
|
||
"account_data_total_balance": account_data.get('total_balance', 'N/A') if account_data else 'N/A',
|
||
"open_trades_count": open_trades_count,
|
||
}
|
||
}
|
||
|
||
# 如果有错误,在响应中包含错误信息(但不影响返回)
|
||
if account_error or positions_error:
|
||
result["warnings"] = {}
|
||
if account_error:
|
||
result["warnings"]["account"] = account_error
|
||
if positions_error:
|
||
result["warnings"]["positions"] = positions_error
|
||
|
||
logger.info(f"返回仪表板数据:")
|
||
logger.info(f" - account_id: {account_id}")
|
||
logger.info(f" - total_balance: {account_data.get('total_balance', 'N/A') if account_data else 'N/A'}")
|
||
logger.info(f" - available_balance: {account_data.get('available_balance', 'N/A') if account_data else 'N/A'}")
|
||
logger.info(f" - open_trades count: {len(open_trades)}")
|
||
if open_trades and len(open_trades) > 0:
|
||
logger.info(f" - 第一个持仓: {open_trades[0].get('symbol', 'N/A')}")
|
||
logger.info("=" * 60)
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取仪表板数据失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"获取仪表板数据失败: {str(e)}")
|