auto_trade_sys/backend/api/routes/data_management.py
薇薇安 3b0526f392 feat(data_management): 增强交易数据统计与推算功能
在后端 API 中新增按小时和星期的交易统计功能,优化 `_compute_binance_stats` 函数以支持更细致的统计分析。同时,新增 `_enrich_trades_with_derived` 函数,补充交易记录的推算字段,包括入场价、交易小时和星期,提升策略分析的便利性。前端 `DataManagement` 组件更新,展示按小时和星期的统计信息,增强用户对交易数据的可视化理解。
2026-02-22 11:16:33 +08:00

375 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。
仅管理员可用。
"""
import asyncio
from pathlib import Path
from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Optional
from api.auth_deps import get_admin_user
from database.models import Trade, Account
from datetime import datetime, timezone, timedelta
router = APIRouter(prefix="/api/admin/data", tags=["数据管理"])
BEIJING_TZ = timezone(timedelta(hours=8))
def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]):
now = datetime.now(BEIJING_TZ)
end_ts = int(now.timestamp())
start_ts = None
if period:
if period == "today":
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(today.timestamp())
elif period == "1d":
start_ts = end_ts - 24 * 3600
elif period == "7d":
start_ts = end_ts - 7 * 24 * 3600
elif period == "30d":
start_ts = end_ts - 30 * 24 * 3600
elif period == "week":
days = now.weekday()
week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(week_start.timestamp())
elif period == "month":
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_ts = int(month_start.timestamp())
if start_date:
try:
s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00"
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
start_ts = int(dt.timestamp())
except ValueError:
pass
if end_date:
try:
s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59"
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
end_ts = int(dt.timestamp())
except ValueError:
pass
if start_ts is None:
start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天
return start_ts, end_ts
def _compute_binance_stats(data: list, data_type: str) -> dict:
"""计算用于策略分析的统计数据(成交/订单原始字段均已保留,导出 JSON 含全部)"""
stats = {"count": len(data)}
valid = [r for r in data if isinstance(r, dict) and "_error" not in r]
if not valid:
return stats
if data_type == "trades":
pnls = []
commissions = []
quote_qtys = []
by_symbol = {}
wins, losses = 0, 0
maker_count, taker_count = 0, 0
for r in valid:
sym = r.get("_symbol") or r.get("symbol") or "-"
p = float(r.get("realizedPnl") or 0)
c = float(r.get("commission") or 0)
qq = float(r.get("quoteQty") or 0)
pnls.append(p)
commissions.append(c)
if qq:
quote_qtys.append(qq)
if p > 0:
wins += 1
elif p < 0:
losses += 1
if r.get("maker"):
maker_count += 1
else:
taker_count += 1
by_symbol[sym] = by_symbol.get(sym, {"count": 0, "pnl": 0.0, "commission": 0.0, "quoteQty": 0.0})
by_symbol[sym]["count"] += 1
by_symbol[sym]["pnl"] += p
by_symbol[sym]["commission"] += c
by_symbol[sym]["quoteQty"] += qq
stats["total_realized_pnl"] = round(sum(pnls), 4)
stats["total_commission"] = round(sum(commissions), 4)
stats["net_pnl"] = round(stats["total_realized_pnl"] - stats["total_commission"], 4)
stats["win_count"] = wins
stats["loss_count"] = losses
stats["win_rate"] = round(100 * wins / (wins + losses), 1) if (wins + losses) > 0 else 0
stats["avg_pnl_per_trade"] = round(sum(pnls) / len(pnls), 4) if pnls else 0
stats["total_quote_qty"] = round(sum(quote_qtys), 2)
stats["maker_count"] = maker_count
stats["taker_count"] = taker_count
stats["by_symbol"] = {
k: {
"count": v["count"],
"pnl": round(v["pnl"], 4),
"commission": round(v["commission"], 4),
"quoteQty": round(v["quoteQty"], 2),
}
for k, v in sorted(by_symbol.items())
}
by_hour = {}
by_weekday = {}
weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
for r in valid:
t = r.get("time") or r.get("trade_time") or 0
if t:
dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ)
h = dt.hour
wd = dt.weekday()
by_hour[h] = by_hour.get(h, {"count": 0, "pnl": 0.0})
by_hour[h]["count"] += 1
by_hour[h]["pnl"] += float(r.get("realizedPnl") or 0)
by_weekday[wd] = by_weekday.get(wd, {"count": 0, "pnl": 0.0})
by_weekday[wd]["count"] += 1
by_weekday[wd]["pnl"] += float(r.get("realizedPnl") or 0)
stats["by_hour"] = {str(k): {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_hour.items())}
stats["by_weekday"] = {weekday_names[k]: {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_weekday.items())}
else:
by_status = {}
by_type = {}
by_symbol = {}
filled_count = 0
for r in valid:
status = r.get("status") or "UNKNOWN"
typ = r.get("type") or r.get("origType") or "UNKNOWN"
sym = r.get("_symbol") or r.get("symbol") or "-"
by_status[status] = by_status.get(status, 0) + 1
by_type[typ] = by_type.get(typ, 0) + 1
by_symbol[sym] = by_symbol.get(sym, 0) + 1
if status == "FILLED":
filled_count += 1
stats["by_status"] = by_status
stats["by_type"] = by_type
stats["by_symbol"] = dict(sorted(by_symbol.items()))
stats["filled_count"] = filled_count
return stats
async def _get_active_symbols_from_income(binance_client, start_ms: int, end_ms: int) -> list:
"""
通过收益历史 API 获取该时间段内有交易活动的交易对,避免全量遍历 250+ 交易对。
一次 API 调用weight 100即可拿到有成交/盈亏的 symbol 列表,大幅减少后续 trades/orders 的请求数。
"""
try:
symbols = set()
current_end = end_ms
for _ in range(10): # 最多分页 10 次(单次最多 1000 条)
rows = await binance_client.futures_income_history(
startTime=start_ms,
endTime=current_end,
limit=1000,
recvWindow=20000,
)
if not rows:
break
for r in rows:
sym = (r.get("symbol") or "").strip()
if sym and sym.endswith("USDT"):
symbols.add(sym)
if len(rows) < 1000:
break
oldest = min(r.get("time", current_end) for r in rows)
current_end = oldest - 1
if current_end < start_ms:
break
await asyncio.sleep(0.15)
return sorted(symbols)
except Exception:
return []
@router.get("/accounts")
async def list_accounts(_admin=Depends(get_admin_user), active_only: bool = Query(False)):
"""获取账号列表供数据管理选择。active_only=true 时仅返回 status=active 的账号"""
rows = Account.list_all()
accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])]
if active_only:
accounts = [a for a in accounts if (a.get("status") or "").lower() == "active"]
return {"accounts": accounts}
@router.get("/trades")
async def query_db_trades(
_admin=Depends(get_admin_user),
account_id: int = Query(..., ge=1, description="账号 ID"),
period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"),
date: Optional[str] = Query(None, description="YYYY-MM-DD指定日期等同于 start_date=end_date"),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
symbol: Optional[str] = Query(None),
time_filter: str = Query("created", description="created/entry/exit"),
reconciled_only: Optional[str] = Query(None),
limit: int = Query(500, ge=1, le=2000),
):
"""
查询 DB 交易记录(管理员可指定任意账号)
"""
sd, ed = start_date, end_date
if date:
sd, ed = date, date
_reconciled = str(reconciled_only or "").lower() in ("true", "1", "yes")
start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed)
trades = Trade.get_all(
start_timestamp=start_ts,
end_timestamp=end_ts,
symbol=symbol,
status=None,
account_id=account_id,
time_filter=time_filter,
limit=limit,
reconciled_only=_reconciled,
include_sync=True,
)
out = []
for t in trades:
row = dict(t)
for k, v in row.items():
if hasattr(v, "isoformat"):
row[k] = v.isoformat()
out.append(row)
return {"total": len(out), "trades": out}
def _enrich_trades_with_derived(trades: list) -> list:
"""补充推算字段:入场价、交易小时、星期,便于策略分析"""
result = []
for r in trades:
out = dict(r)
t = r.get("time") or 0
if t:
dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ)
out["_trade_hour"] = dt.hour
out["_trade_weekday"] = dt.weekday()
out["_trade_date"] = dt.strftime("%Y-%m-%d")
pnl = float(r.get("realizedPnl") or 0)
qty = float(r.get("qty") or 0)
price = float(r.get("price") or 0)
side = (r.get("side") or "").upper()
if qty and pnl != 0 and side:
if side == "SELL":
out["_approx_entry_price"] = round(price - pnl / qty, 8)
else:
out["_approx_entry_price"] = round(price + pnl / qty, 8)
else:
out["_approx_entry_price"] = None
result.append(out)
return result
def _binance_row_to_api_format(row: dict, data_type: str) -> dict:
"""将 DB 行转换为前端/导出期望的币安 API 格式"""
if data_type == "trades":
return {
"id": row.get("trade_id"),
"orderId": row.get("order_id"),
"symbol": row.get("symbol"),
"_symbol": row.get("symbol"),
"side": row.get("side"),
"positionSide": row.get("position_side"),
"price": str(row.get("price") or ""),
"qty": str(row.get("qty") or ""),
"quoteQty": str(row.get("quote_qty") or ""),
"realizedPnl": str(row.get("realized_pnl") or ""),
"commission": str(row.get("commission") or ""),
"commissionAsset": row.get("commission_asset"),
"buyer": bool(row.get("buyer")),
"maker": bool(row.get("maker")),
"time": row.get("trade_time"),
}
else:
return {
"orderId": row.get("order_id"),
"clientOrderId": row.get("client_order_id"),
"symbol": row.get("symbol"),
"_symbol": row.get("symbol"),
"side": row.get("side"),
"type": row.get("type"),
"origType": row.get("orig_type"),
"status": row.get("status"),
"price": str(row.get("price") or ""),
"avgPrice": str(row.get("avg_price") or ""),
"origQty": str(row.get("orig_qty") or ""),
"executedQty": str(row.get("executed_qty") or ""),
"cumQty": str(row.get("cum_qty") or ""),
"cumQuote": str(row.get("cum_quote") or ""),
"stopPrice": str(row.get("stop_price") or "") if row.get("stop_price") else "",
"reduceOnly": bool(row.get("reduce_only")),
"positionSide": row.get("position_side"),
"time": row.get("order_time"),
"updateTime": row.get("update_time"),
}
@router.post("/binance-fetch")
async def query_binance_data_from_db(
_admin=Depends(get_admin_user),
account_id: int = Query(..., ge=1),
symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则全部"),
data_type: str = Query("trades", description="orders 或 trades"),
days: int = Query(7, ge=0, le=7),
):
"""
从 DB 查询已同步的币安订单/成交(由定时任务 scripts/sync_binance_orders.py 拉取入库)
"""
from database.connection import db
now = datetime.now(BEIJING_TZ)
end_ts = int(now.timestamp())
if days == 0:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(today_start.timestamp())
else:
start_ts = end_ts - days * 24 * 3600
start_ms = start_ts * 1000
end_ms = end_ts * 1000
symbol_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()]
try:
if data_type == "trades":
q = """SELECT * FROM binance_trades
WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s"""
params = [account_id, start_ms, end_ms]
if symbol_list:
q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")"
params.extend(symbol_list)
q += " ORDER BY trade_time DESC LIMIT 5000"
else:
q = """SELECT * FROM binance_orders
WHERE account_id = %s AND order_time >= %s AND order_time <= %s"""
params = [account_id, start_ms, end_ms]
if symbol_list:
q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")"
params.extend(symbol_list)
q += " ORDER BY order_time DESC LIMIT 5000"
rows = db.execute_query(q, params)
except Exception as e:
raise HTTPException(status_code=500, detail=f"查询失败(请确认已执行 add_binance_sync_tables.sql 并运行过同步脚本): {e}")
all_data = [_binance_row_to_api_format(dict(r), data_type) for r in (rows or [])]
if data_type == "trades":
all_data = _enrich_trades_with_derived(all_data)
symbols_queried = len(symbol_list) if symbol_list else len({(r or {}).get("symbol") for r in (rows or []) if (r or {}).get("symbol")})
stats = _compute_binance_stats(all_data, data_type)
return {
"total": len(all_data),
"data_type": data_type,
"symbols_queried": symbols_queried,
"stats": stats,
"data": all_data,
"source": "db",
}