""" 数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。 仅管理员可用。 """ import asyncio from pathlib import Path from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from api.auth_deps import get_admin_user from database.models import Trade, Account from datetime import datetime, timezone, timedelta router = APIRouter(prefix="/api/admin/data", tags=["数据管理"]) BEIJING_TZ = timezone(timedelta(hours=8)) def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]): now = datetime.now(BEIJING_TZ) end_ts = int(now.timestamp()) start_ts = None if period: if period == "today": today = now.replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(today.timestamp()) elif period == "1d": start_ts = end_ts - 24 * 3600 elif period == "7d": start_ts = end_ts - 7 * 24 * 3600 elif period == "30d": start_ts = end_ts - 30 * 24 * 3600 elif period == "week": days = now.weekday() week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(week_start.timestamp()) elif period == "month": month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) start_ts = int(month_start.timestamp()) if start_date: try: s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00" dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ) start_ts = int(dt.timestamp()) except ValueError: pass if end_date: try: s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59" dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ) end_ts = int(dt.timestamp()) except ValueError: pass if start_ts is None: start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天 return start_ts, end_ts @router.get("/accounts") async def list_accounts(_admin=Depends(get_admin_user)): """获取所有账号列表,供数据管理选择""" rows = Account.list_all() accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])] return {"accounts": accounts} @router.get("/trades") async def query_db_trades( _admin=Depends(get_admin_user), account_id: int = Query(..., ge=1, description="账号 ID"), period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"), date: Optional[str] = Query(None, description="YYYY-MM-DD,指定日期(等同于 start_date=end_date)"), start_date: Optional[str] = Query(None), end_date: Optional[str] = Query(None), symbol: Optional[str] = Query(None), time_filter: str = Query("created", description="created/entry/exit"), reconciled_only: bool = Query(False), limit: int = Query(500, ge=1, le=2000), ): """ 查询 DB 交易记录(管理员可指定任意账号) """ sd, ed = start_date, end_date if date: sd, ed = date, date start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed) trades = Trade.get_all( start_timestamp=start_ts, end_timestamp=end_ts, symbol=symbol, status=None, account_id=account_id, time_filter=time_filter, limit=limit, reconciled_only=reconciled_only, include_sync=True, ) out = [] for t in trades: row = dict(t) for k, v in row.items(): if hasattr(v, "isoformat"): row[k] = v.isoformat() out.append(row) return {"total": len(out), "trades": out} @router.post("/binance-fetch") async def fetch_binance_data( _admin=Depends(get_admin_user), account_id: int = Query(..., ge=1), symbols: str = Query(..., description="交易对,逗号分隔,如 ASTERUSDT,FILUSDT"), data_type: str = Query("trades", description="orders 或 trades"), days: int = Query(7, ge=1, le=7), ): """ 从币安拉取订单/成交记录(需账号已配置 API) """ try: import sys proj = Path(__file__).resolve().parents[3] # backend/api/routes -> project root if str(proj) not in sys.path: sys.path.insert(0, str(proj)) from trading_system.binance_client import BinanceClient except ImportError as e: raise HTTPException(status_code=500, detail=f"导入失败: {e}") api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id) if not api_key or not api_secret: raise HTTPException(status_code=400, detail="该账号未配置 API 密钥") client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet) try: await client.connect() except Exception as e: raise HTTPException(status_code=502, detail=f"连接币安失败: {e}") try: sym_list = [s.strip().upper() for s in symbols.split(",") if s.strip()] if not sym_list: raise HTTPException(status_code=400, detail="请指定至少一个交易对") end_ms = int(datetime.now(BEIJING_TZ).timestamp() * 1000) start_ms = end_ms - days * 24 * 3600 * 1000 all_data = [] for sym in sym_list: try: if data_type == "trades": rows = await client.client.futures_account_trades( symbol=sym, startTime=start_ms, endTime=end_ms, limit=1000, recvWindow=20000, ) else: rows = await client.client.futures_get_all_orders( symbol=sym, startTime=start_ms, endTime=end_ms, limit=1000, recvWindow=20000, ) if isinstance(rows, list): for r in rows: r["_symbol"] = sym all_data.extend(rows) except Exception as e: all_data.append({"_symbol": sym, "_error": str(e)}) await asyncio.sleep(0.2) time_key = "time" if (all_data and "time" in (all_data[0] or {})) else "updateTime" all_data.sort(key=lambda x: x.get(time_key, 0), reverse=True) return {"total": len(all_data), "data_type": data_type, "data": all_data} finally: if client.client: await client.client.close_connection()