""" 数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。 仅管理员可用。 """ import asyncio from pathlib import Path from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from api.auth_deps import get_admin_user from database.models import Trade, Account from datetime import datetime, timezone, timedelta router = APIRouter(prefix="/api/admin/data", tags=["数据管理"]) BEIJING_TZ = timezone(timedelta(hours=8)) def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]): now = datetime.now(BEIJING_TZ) end_ts = int(now.timestamp()) start_ts = None if period: if period == "today": today = now.replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(today.timestamp()) elif period == "1d": start_ts = end_ts - 24 * 3600 elif period == "7d": start_ts = end_ts - 7 * 24 * 3600 elif period == "30d": start_ts = end_ts - 30 * 24 * 3600 elif period == "week": days = now.weekday() week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(week_start.timestamp()) elif period == "month": month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) start_ts = int(month_start.timestamp()) if start_date: try: s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00" dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ) start_ts = int(dt.timestamp()) except ValueError: pass if end_date: try: s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59" dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ) end_ts = int(dt.timestamp()) except ValueError: pass if start_ts is None: start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天 return start_ts, end_ts def _compute_binance_stats(data: list, data_type: str) -> dict: """计算用于策略分析的统计数据(成交/订单原始字段均已保留,导出 JSON 含全部)""" stats = {"count": len(data)} valid = [r for r in data if isinstance(r, dict) and "_error" not in r] if not valid: return stats if data_type == "trades": pnls = [] commissions = [] quote_qtys = [] by_symbol = {} wins, losses = 0, 0 maker_count, taker_count = 0, 0 for r in valid: sym = r.get("_symbol") or r.get("symbol") or "-" p = float(r.get("realizedPnl") or 0) c = float(r.get("commission") or 0) qq = float(r.get("quoteQty") or 0) pnls.append(p) commissions.append(c) if qq: quote_qtys.append(qq) if p > 0: wins += 1 elif p < 0: losses += 1 if r.get("maker"): maker_count += 1 else: taker_count += 1 by_symbol[sym] = by_symbol.get(sym, {"count": 0, "pnl": 0.0, "commission": 0.0, "quoteQty": 0.0}) by_symbol[sym]["count"] += 1 by_symbol[sym]["pnl"] += p by_symbol[sym]["commission"] += c by_symbol[sym]["quoteQty"] += qq stats["total_realized_pnl"] = round(sum(pnls), 4) stats["total_commission"] = round(sum(commissions), 4) stats["net_pnl"] = round(stats["total_realized_pnl"] - stats["total_commission"], 4) stats["win_count"] = wins stats["loss_count"] = losses stats["win_rate"] = round(100 * wins / (wins + losses), 1) if (wins + losses) > 0 else 0 stats["avg_pnl_per_trade"] = round(sum(pnls) / len(pnls), 4) if pnls else 0 stats["total_quote_qty"] = round(sum(quote_qtys), 2) stats["maker_count"] = maker_count stats["taker_count"] = taker_count stats["by_symbol"] = { k: { "count": v["count"], "pnl": round(v["pnl"], 4), "commission": round(v["commission"], 4), "quoteQty": round(v["quoteQty"], 2), } for k, v in sorted(by_symbol.items()) } by_hour = {} by_weekday = {} weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] for r in valid: t = r.get("time") or r.get("trade_time") or 0 if t: dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ) h = dt.hour wd = dt.weekday() by_hour[h] = by_hour.get(h, {"count": 0, "pnl": 0.0}) by_hour[h]["count"] += 1 by_hour[h]["pnl"] += float(r.get("realizedPnl") or 0) by_weekday[wd] = by_weekday.get(wd, {"count": 0, "pnl": 0.0}) by_weekday[wd]["count"] += 1 by_weekday[wd]["pnl"] += float(r.get("realizedPnl") or 0) stats["by_hour"] = {str(k): {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_hour.items())} stats["by_weekday"] = {weekday_names[k]: {"count": v["count"], "pnl": round(v["pnl"], 4)} for k, v in sorted(by_weekday.items())} else: by_status = {} by_type = {} by_symbol = {} filled_count = 0 for r in valid: status = r.get("status") or "UNKNOWN" typ = r.get("type") or r.get("origType") or "UNKNOWN" sym = r.get("_symbol") or r.get("symbol") or "-" by_status[status] = by_status.get(status, 0) + 1 by_type[typ] = by_type.get(typ, 0) + 1 by_symbol[sym] = by_symbol.get(sym, 0) + 1 if status == "FILLED": filled_count += 1 stats["by_status"] = by_status stats["by_type"] = by_type stats["by_symbol"] = dict(sorted(by_symbol.items())) stats["filled_count"] = filled_count return stats async def _get_active_symbols_from_income(binance_client, start_ms: int, end_ms: int) -> list: """ 通过收益历史 API 获取该时间段内有交易活动的交易对,避免全量遍历 250+ 交易对。 一次 API 调用(weight 100)即可拿到有成交/盈亏的 symbol 列表,大幅减少后续 trades/orders 的请求数。 """ try: symbols = set() current_end = end_ms for _ in range(10): # 最多分页 10 次(单次最多 1000 条) rows = await binance_client.futures_income_history( startTime=start_ms, endTime=current_end, limit=1000, recvWindow=20000, ) if not rows: break for r in rows: sym = (r.get("symbol") or "").strip() if sym and sym.endswith("USDT"): symbols.add(sym) if len(rows) < 1000: break oldest = min(r.get("time", current_end) for r in rows) current_end = oldest - 1 if current_end < start_ms: break await asyncio.sleep(0.15) return sorted(symbols) except Exception: return [] @router.get("/accounts") async def list_accounts(_admin=Depends(get_admin_user), active_only: bool = Query(False)): """获取账号列表,供数据管理选择。active_only=true 时仅返回 status=active 的账号""" rows = Account.list_all() accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])] if active_only: accounts = [a for a in accounts if (a.get("status") or "").lower() == "active"] return {"accounts": accounts} @router.get("/trades") async def query_db_trades( _admin=Depends(get_admin_user), account_id: int = Query(..., ge=1, description="账号 ID"), period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"), date: Optional[str] = Query(None, description="YYYY-MM-DD,指定日期(等同于 start_date=end_date)"), start_date: Optional[str] = Query(None), end_date: Optional[str] = Query(None), symbol: Optional[str] = Query(None), time_filter: str = Query("created", description="created/entry/exit"), reconciled_only: Optional[str] = Query(None), limit: int = Query(500, ge=1, le=2000), ): """ 查询 DB 交易记录(管理员可指定任意账号) """ sd, ed = start_date, end_date if date: sd, ed = date, date _reconciled = str(reconciled_only or "").lower() in ("true", "1", "yes") start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed) trades = Trade.get_all( start_timestamp=start_ts, end_timestamp=end_ts, symbol=symbol, status=None, account_id=account_id, time_filter=time_filter, limit=limit, reconciled_only=_reconciled, include_sync=True, ) out = [] for t in trades: row = dict(t) for k, v in row.items(): if hasattr(v, "isoformat"): row[k] = v.isoformat() out.append(row) return {"total": len(out), "trades": out} def _enrich_trades_with_derived(trades: list) -> list: """补充推算字段:入场价、交易小时、星期,便于策略分析""" result = [] for r in trades: out = dict(r) t = r.get("time") or 0 if t: dt = datetime.fromtimestamp(t / 1000, tz=BEIJING_TZ) out["_trade_hour"] = dt.hour out["_trade_weekday"] = dt.weekday() out["_trade_date"] = dt.strftime("%Y-%m-%d") pnl = float(r.get("realizedPnl") or 0) qty = float(r.get("qty") or 0) price = float(r.get("price") or 0) side = (r.get("side") or "").upper() if qty and pnl != 0 and side: if side == "SELL": out["_approx_entry_price"] = round(price - pnl / qty, 8) else: out["_approx_entry_price"] = round(price + pnl / qty, 8) else: out["_approx_entry_price"] = None result.append(out) return result def _binance_row_to_api_format(row: dict, data_type: str) -> dict: """将 DB 行转换为前端/导出期望的币安 API 格式""" if data_type == "trades": return { "id": row.get("trade_id"), "orderId": row.get("order_id"), "symbol": row.get("symbol"), "_symbol": row.get("symbol"), "side": row.get("side"), "positionSide": row.get("position_side"), "price": str(row.get("price") or ""), "qty": str(row.get("qty") or ""), "quoteQty": str(row.get("quote_qty") or ""), "realizedPnl": str(row.get("realized_pnl") or ""), "commission": str(row.get("commission") or ""), "commissionAsset": row.get("commission_asset"), "buyer": bool(row.get("buyer")), "maker": bool(row.get("maker")), "time": row.get("trade_time"), } else: return { "orderId": row.get("order_id"), "clientOrderId": row.get("client_order_id"), "symbol": row.get("symbol"), "_symbol": row.get("symbol"), "side": row.get("side"), "type": row.get("type"), "origType": row.get("orig_type"), "status": row.get("status"), "price": str(row.get("price") or ""), "avgPrice": str(row.get("avg_price") or ""), "origQty": str(row.get("orig_qty") or ""), "executedQty": str(row.get("executed_qty") or ""), "cumQty": str(row.get("cum_qty") or ""), "cumQuote": str(row.get("cum_quote") or ""), "stopPrice": str(row.get("stop_price") or "") if row.get("stop_price") else "", "reduceOnly": bool(row.get("reduce_only")), "positionSide": row.get("position_side"), "time": row.get("order_time"), "updateTime": row.get("update_time"), } @router.post("/binance-fetch") async def query_binance_data_from_db( _admin=Depends(get_admin_user), account_id: int = Query(..., ge=1), symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则全部"), data_type: str = Query("trades", description="orders 或 trades"), days: int = Query(7, ge=0, le=7), ): """ 从 DB 查询已同步的币安订单/成交(由定时任务 scripts/sync_binance_orders.py 拉取入库) """ from database.connection import db now = datetime.now(BEIJING_TZ) end_ts = int(now.timestamp()) if days == 0: today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(today_start.timestamp()) else: start_ts = end_ts - days * 24 * 3600 start_ms = start_ts * 1000 end_ms = end_ts * 1000 symbol_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()] try: if data_type == "trades": q = """SELECT * FROM binance_trades WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s""" params = [account_id, start_ms, end_ms] if symbol_list: q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")" params.extend(symbol_list) q += " ORDER BY trade_time DESC LIMIT 5000" else: q = """SELECT * FROM binance_orders WHERE account_id = %s AND order_time >= %s AND order_time <= %s""" params = [account_id, start_ms, end_ms] if symbol_list: q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")" params.extend(symbol_list) q += " ORDER BY order_time DESC LIMIT 5000" rows = db.execute_query(q, params) except Exception as e: raise HTTPException(status_code=500, detail=f"查询失败(请确认已执行 add_binance_sync_tables.sql 并运行过同步脚本): {e}") all_data = [_binance_row_to_api_format(dict(r), data_type) for r in (rows or [])] if data_type == "trades": all_data = _enrich_trades_with_derived(all_data) symbols_queried = len(symbol_list) if symbol_list else len({(r or {}).get("symbol") for r in (rows or []) if (r or {}).get("symbol")}) stats = _compute_binance_stats(all_data, data_type) return { "total": len(all_data), "data_type": data_type, "symbols_queried": symbols_queried, "stats": stats, "data": all_data, "source": "db", }