在后端 API 中新增按小时和星期的交易统计功能,优化 `_compute_binance_stats` 函数以支持更细致的统计分析。同时,新增 `_enrich_trades_with_derived` 函数,补充交易记录的推算字段,包括入场价、交易小时和星期,提升策略分析的便利性。前端 `DataManagement` 组件更新,展示按小时和星期的统计信息,增强用户对交易数据的可视化理解。
375 lines
15 KiB
Python
375 lines
15 KiB
Python
"""
|
||
数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。
|
||
仅管理员可用。
|
||
"""
|
||
import asyncio
|
||
from pathlib import Path
|
||
|
||
from fastapi import APIRouter, Query, Depends, HTTPException
|
||
from typing import Optional
|
||
|
||
from api.auth_deps import get_admin_user
|
||
from database.models import Trade, Account
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
router = APIRouter(prefix="/api/admin/data", tags=["数据管理"])
|
||
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]):
|
||
now = datetime.now(BEIJING_TZ)
|
||
end_ts = int(now.timestamp())
|
||
start_ts = None
|
||
|
||
if period:
|
||
if period == "today":
|
||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(today.timestamp())
|
||
elif period == "1d":
|
||
start_ts = end_ts - 24 * 3600
|
||
elif period == "7d":
|
||
start_ts = end_ts - 7 * 24 * 3600
|
||
elif period == "30d":
|
||
start_ts = end_ts - 30 * 24 * 3600
|
||
elif period == "week":
|
||
days = now.weekday()
|
||
week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(week_start.timestamp())
|
||
elif period == "month":
|
||
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(month_start.timestamp())
|
||
|
||
if start_date:
|
||
try:
|
||
s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00"
|
||
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
|
||
start_ts = int(dt.timestamp())
|
||
except ValueError:
|
||
pass
|
||
if end_date:
|
||
try:
|
||
s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59"
|
||
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
|
||
end_ts = int(dt.timestamp())
|
||
except ValueError:
|
||
pass
|
||
|
||
if start_ts is None:
|
||
start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天
|
||
return start_ts, end_ts
|
||
|
||
|
||
def _compute_binance_stats(data: list, data_type: str) -> dict:
|
||
"""计算用于策略分析的统计数据(成交/订单原始字段均已保留,导出 JSON 含全部)"""
|
||
stats = {"count": len(data)}
|
||
valid = [r for r in data if isinstance(r, dict) and "_error" not in r]
|
||
if not valid:
|
||
return stats
|
||
|
||
if data_type == "trades":
|
||
pnls = []
|
||
commissions = []
|
||
quote_qtys = []
|
||
by_symbol = {}
|
||
wins, losses = 0, 0
|
||
maker_count, taker_count = 0, 0
|
||
for r in valid:
|
||
sym = r.get("_symbol") or r.get("symbol") or "-"
|
||
p = float(r.get("realizedPnl") or 0)
|
||
c = float(r.get("commission") or 0)
|
||
qq = float(r.get("quoteQty") or 0)
|
||
pnls.append(p)
|
||
commissions.append(c)
|
||
if qq:
|
||
quote_qtys.append(qq)
|
||
if p > 0:
|
||
wins += 1
|
||
elif p < 0:
|
||
losses += 1
|
||
if r.get("maker"):
|
||
maker_count += 1
|
||
else:
|
||
taker_count += 1
|
||
by_symbol[sym] = by_symbol.get(sym, {"count": 0, "pnl": 0.0, "commission": 0.0, "quoteQty": 0.0})
|
||
by_symbol[sym]["count"] += 1
|
||
by_symbol[sym]["pnl"] += p
|
||
by_symbol[sym]["commission"] += c
|
||
by_symbol[sym]["quoteQty"] += qq
|
||
|
||
stats["total_realized_pnl"] = round(sum(pnls), 4)
|
||
stats["total_commission"] = round(sum(commissions), 4)
|
||
stats["net_pnl"] = round(stats["total_realized_pnl"] - stats["total_commission"], 4)
|
||
stats["win_count"] = wins
|
||
stats["loss_count"] = losses
|
||
stats["win_rate"] = round(100 * wins / (wins + losses), 1) if (wins + losses) > 0 else 0
|
||
stats["avg_pnl_per_trade"] = round(sum(pnls) / len(pnls), 4) if pnls else 0
|
||
stats["total_quote_qty"] = round(sum(quote_qtys), 2)
|
||
stats["maker_count"] = maker_count
|
||
stats["taker_count"] = taker_count
|
||
stats["by_symbol"] = {
|
||
k: {
|
||
"count": v["count"],
|
||
"pnl": round(v["pnl"], 4),
|
||
"commission": round(v["commission"], 4),
|
||
"quoteQty": round(v["quoteQty"], 2),
|
||
}
|
||
for k, v in sorted(by_symbol.items())
|
||
}
|
||
|
||
by_hour = {}
|
||
by_weekday = {}
|
||
weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
|
||
for r in valid:
|
||
t = r.get("time") or r.get("trade_time") or 0
|
||
if t:
|
||
dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ)
|
||
h = dt.hour
|
||
wd = dt.weekday()
|
||
by_hour[h] = by_hour.get(h, {"count": 0, "pnl": 0.0})
|
||
by_hour[h]["count"] += 1
|
||
by_hour[h]["pnl"] += float(r.get("realizedPnl") or 0)
|
||
by_weekday[wd] = by_weekday.get(wd, {"count": 0, "pnl": 0.0})
|
||
by_weekday[wd]["count"] += 1
|
||
by_weekday[wd]["pnl"] += float(r.get("realizedPnl") or 0)
|
||
stats["by_hour"] = {str(k): {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_hour.items())}
|
||
stats["by_weekday"] = {weekday_names[k]: {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_weekday.items())}
|
||
else:
|
||
by_status = {}
|
||
by_type = {}
|
||
by_symbol = {}
|
||
filled_count = 0
|
||
for r in valid:
|
||
status = r.get("status") or "UNKNOWN"
|
||
typ = r.get("type") or r.get("origType") or "UNKNOWN"
|
||
sym = r.get("_symbol") or r.get("symbol") or "-"
|
||
by_status[status] = by_status.get(status, 0) + 1
|
||
by_type[typ] = by_type.get(typ, 0) + 1
|
||
by_symbol[sym] = by_symbol.get(sym, 0) + 1
|
||
if status == "FILLED":
|
||
filled_count += 1
|
||
stats["by_status"] = by_status
|
||
stats["by_type"] = by_type
|
||
stats["by_symbol"] = dict(sorted(by_symbol.items()))
|
||
stats["filled_count"] = filled_count
|
||
|
||
return stats
|
||
|
||
|
||
async def _get_active_symbols_from_income(binance_client, start_ms: int, end_ms: int) -> list:
|
||
"""
|
||
通过收益历史 API 获取该时间段内有交易活动的交易对,避免全量遍历 250+ 交易对。
|
||
一次 API 调用(weight 100)即可拿到有成交/盈亏的 symbol 列表,大幅减少后续 trades/orders 的请求数。
|
||
"""
|
||
try:
|
||
symbols = set()
|
||
current_end = end_ms
|
||
for _ in range(10): # 最多分页 10 次(单次最多 1000 条)
|
||
rows = await binance_client.futures_income_history(
|
||
startTime=start_ms,
|
||
endTime=current_end,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
if not rows:
|
||
break
|
||
for r in rows:
|
||
sym = (r.get("symbol") or "").strip()
|
||
if sym and sym.endswith("USDT"):
|
||
symbols.add(sym)
|
||
if len(rows) < 1000:
|
||
break
|
||
oldest = min(r.get("time", current_end) for r in rows)
|
||
current_end = oldest - 1
|
||
if current_end < start_ms:
|
||
break
|
||
await asyncio.sleep(0.15)
|
||
return sorted(symbols)
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
@router.get("/accounts")
|
||
async def list_accounts(_admin=Depends(get_admin_user), active_only: bool = Query(False)):
|
||
"""获取账号列表,供数据管理选择。active_only=true 时仅返回 status=active 的账号"""
|
||
rows = Account.list_all()
|
||
accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])]
|
||
if active_only:
|
||
accounts = [a for a in accounts if (a.get("status") or "").lower() == "active"]
|
||
return {"accounts": accounts}
|
||
|
||
|
||
@router.get("/trades")
|
||
async def query_db_trades(
|
||
_admin=Depends(get_admin_user),
|
||
account_id: int = Query(..., ge=1, description="账号 ID"),
|
||
period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"),
|
||
date: Optional[str] = Query(None, description="YYYY-MM-DD,指定日期(等同于 start_date=end_date)"),
|
||
start_date: Optional[str] = Query(None),
|
||
end_date: Optional[str] = Query(None),
|
||
symbol: Optional[str] = Query(None),
|
||
time_filter: str = Query("created", description="created/entry/exit"),
|
||
reconciled_only: Optional[str] = Query(None),
|
||
limit: int = Query(500, ge=1, le=2000),
|
||
):
|
||
"""
|
||
查询 DB 交易记录(管理员可指定任意账号)
|
||
"""
|
||
sd, ed = start_date, end_date
|
||
if date:
|
||
sd, ed = date, date
|
||
_reconciled = str(reconciled_only or "").lower() in ("true", "1", "yes")
|
||
start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed)
|
||
trades = Trade.get_all(
|
||
start_timestamp=start_ts,
|
||
end_timestamp=end_ts,
|
||
symbol=symbol,
|
||
status=None,
|
||
account_id=account_id,
|
||
time_filter=time_filter,
|
||
limit=limit,
|
||
reconciled_only=_reconciled,
|
||
include_sync=True,
|
||
)
|
||
out = []
|
||
for t in trades:
|
||
row = dict(t)
|
||
for k, v in row.items():
|
||
if hasattr(v, "isoformat"):
|
||
row[k] = v.isoformat()
|
||
out.append(row)
|
||
return {"total": len(out), "trades": out}
|
||
|
||
|
||
def _enrich_trades_with_derived(trades: list) -> list:
|
||
"""补充推算字段:入场价、交易小时、星期,便于策略分析"""
|
||
result = []
|
||
for r in trades:
|
||
out = dict(r)
|
||
t = r.get("time") or 0
|
||
if t:
|
||
dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ)
|
||
out["_trade_hour"] = dt.hour
|
||
out["_trade_weekday"] = dt.weekday()
|
||
out["_trade_date"] = dt.strftime("%Y-%m-%d")
|
||
pnl = float(r.get("realizedPnl") or 0)
|
||
qty = float(r.get("qty") or 0)
|
||
price = float(r.get("price") or 0)
|
||
side = (r.get("side") or "").upper()
|
||
if qty and pnl != 0 and side:
|
||
if side == "SELL":
|
||
out["_approx_entry_price"] = round(price - pnl / qty, 8)
|
||
else:
|
||
out["_approx_entry_price"] = round(price + pnl / qty, 8)
|
||
else:
|
||
out["_approx_entry_price"] = None
|
||
result.append(out)
|
||
return result
|
||
|
||
|
||
def _binance_row_to_api_format(row: dict, data_type: str) -> dict:
|
||
"""将 DB 行转换为前端/导出期望的币安 API 格式"""
|
||
if data_type == "trades":
|
||
return {
|
||
"id": row.get("trade_id"),
|
||
"orderId": row.get("order_id"),
|
||
"symbol": row.get("symbol"),
|
||
"_symbol": row.get("symbol"),
|
||
"side": row.get("side"),
|
||
"positionSide": row.get("position_side"),
|
||
"price": str(row.get("price") or ""),
|
||
"qty": str(row.get("qty") or ""),
|
||
"quoteQty": str(row.get("quote_qty") or ""),
|
||
"realizedPnl": str(row.get("realized_pnl") or ""),
|
||
"commission": str(row.get("commission") or ""),
|
||
"commissionAsset": row.get("commission_asset"),
|
||
"buyer": bool(row.get("buyer")),
|
||
"maker": bool(row.get("maker")),
|
||
"time": row.get("trade_time"),
|
||
}
|
||
else:
|
||
return {
|
||
"orderId": row.get("order_id"),
|
||
"clientOrderId": row.get("client_order_id"),
|
||
"symbol": row.get("symbol"),
|
||
"_symbol": row.get("symbol"),
|
||
"side": row.get("side"),
|
||
"type": row.get("type"),
|
||
"origType": row.get("orig_type"),
|
||
"status": row.get("status"),
|
||
"price": str(row.get("price") or ""),
|
||
"avgPrice": str(row.get("avg_price") or ""),
|
||
"origQty": str(row.get("orig_qty") or ""),
|
||
"executedQty": str(row.get("executed_qty") or ""),
|
||
"cumQty": str(row.get("cum_qty") or ""),
|
||
"cumQuote": str(row.get("cum_quote") or ""),
|
||
"stopPrice": str(row.get("stop_price") or "") if row.get("stop_price") else "",
|
||
"reduceOnly": bool(row.get("reduce_only")),
|
||
"positionSide": row.get("position_side"),
|
||
"time": row.get("order_time"),
|
||
"updateTime": row.get("update_time"),
|
||
}
|
||
|
||
|
||
@router.post("/binance-fetch")
|
||
async def query_binance_data_from_db(
|
||
_admin=Depends(get_admin_user),
|
||
account_id: int = Query(..., ge=1),
|
||
symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则全部"),
|
||
data_type: str = Query("trades", description="orders 或 trades"),
|
||
days: int = Query(7, ge=0, le=7),
|
||
):
|
||
"""
|
||
从 DB 查询已同步的币安订单/成交(由定时任务 scripts/sync_binance_orders.py 拉取入库)
|
||
"""
|
||
from database.connection import db
|
||
|
||
now = datetime.now(BEIJING_TZ)
|
||
end_ts = int(now.timestamp())
|
||
if days == 0:
|
||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(today_start.timestamp())
|
||
else:
|
||
start_ts = end_ts - days * 24 * 3600
|
||
start_ms = start_ts * 1000
|
||
end_ms = end_ts * 1000
|
||
|
||
symbol_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()]
|
||
|
||
try:
|
||
if data_type == "trades":
|
||
q = """SELECT * FROM binance_trades
|
||
WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s"""
|
||
params = [account_id, start_ms, end_ms]
|
||
if symbol_list:
|
||
q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")"
|
||
params.extend(symbol_list)
|
||
q += " ORDER BY trade_time DESC LIMIT 5000"
|
||
else:
|
||
q = """SELECT * FROM binance_orders
|
||
WHERE account_id = %s AND order_time >= %s AND order_time <= %s"""
|
||
params = [account_id, start_ms, end_ms]
|
||
if symbol_list:
|
||
q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")"
|
||
params.extend(symbol_list)
|
||
q += " ORDER BY order_time DESC LIMIT 5000"
|
||
|
||
rows = db.execute_query(q, params)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"查询失败(请确认已执行 add_binance_sync_tables.sql 并运行过同步脚本): {e}")
|
||
|
||
all_data = [_binance_row_to_api_format(dict(r), data_type) for r in (rows or [])]
|
||
if data_type == "trades":
|
||
all_data = _enrich_trades_with_derived(all_data)
|
||
symbols_queried = len(symbol_list) if symbol_list else len({(r or {}).get("symbol") for r in (rows or []) if (r or {}).get("symbol")})
|
||
stats = _compute_binance_stats(all_data, data_type)
|
||
|
||
return {
|
||
"total": len(all_data),
|
||
"data_type": data_type,
|
||
"symbols_queried": symbols_queried,
|
||
"stats": stats,
|
||
"data": all_data,
|
||
"source": "db",
|
||
}
|