在后端 API 的 `query_db_trades` 函数中,将 `reconciled_only` 参数类型从布尔值更改为可选字符串,并在查询逻辑中添加了相应的处理。同时,在前端 `DataManagement` 组件中,初始化 `dbDate` 为当前日期,并优化了参数构建逻辑,以确保在请求时正确传递日期和对账状态。这些改动提升了数据查询的灵活性与准确性。
185 lines
6.7 KiB
Python
185 lines
6.7 KiB
Python
"""
|
||
数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。
|
||
仅管理员可用。
|
||
"""
|
||
import asyncio
|
||
from pathlib import Path
|
||
|
||
from fastapi import APIRouter, Query, Depends, HTTPException
|
||
from typing import Optional
|
||
|
||
from api.auth_deps import get_admin_user
|
||
from database.models import Trade, Account
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
router = APIRouter(prefix="/api/admin/data", tags=["数据管理"])
|
||
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]):
|
||
now = datetime.now(BEIJING_TZ)
|
||
end_ts = int(now.timestamp())
|
||
start_ts = None
|
||
|
||
if period:
|
||
if period == "today":
|
||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(today.timestamp())
|
||
elif period == "1d":
|
||
start_ts = end_ts - 24 * 3600
|
||
elif period == "7d":
|
||
start_ts = end_ts - 7 * 24 * 3600
|
||
elif period == "30d":
|
||
start_ts = end_ts - 30 * 24 * 3600
|
||
elif period == "week":
|
||
days = now.weekday()
|
||
week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(week_start.timestamp())
|
||
elif period == "month":
|
||
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||
start_ts = int(month_start.timestamp())
|
||
|
||
if start_date:
|
||
try:
|
||
s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00"
|
||
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
|
||
start_ts = int(dt.timestamp())
|
||
except ValueError:
|
||
pass
|
||
if end_date:
|
||
try:
|
||
s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59"
|
||
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
|
||
end_ts = int(dt.timestamp())
|
||
except ValueError:
|
||
pass
|
||
|
||
if start_ts is None:
|
||
start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天
|
||
return start_ts, end_ts
|
||
|
||
|
||
@router.get("/accounts")
|
||
async def list_accounts(_admin=Depends(get_admin_user)):
|
||
"""获取所有账号列表,供数据管理选择"""
|
||
rows = Account.list_all()
|
||
accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])]
|
||
return {"accounts": accounts}
|
||
|
||
|
||
@router.get("/trades")
|
||
async def query_db_trades(
|
||
_admin=Depends(get_admin_user),
|
||
account_id: int = Query(..., ge=1, description="账号 ID"),
|
||
period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"),
|
||
date: Optional[str] = Query(None, description="YYYY-MM-DD,指定日期(等同于 start_date=end_date)"),
|
||
start_date: Optional[str] = Query(None),
|
||
end_date: Optional[str] = Query(None),
|
||
symbol: Optional[str] = Query(None),
|
||
time_filter: str = Query("created", description="created/entry/exit"),
|
||
reconciled_only: Optional[str] = Query(None),
|
||
limit: int = Query(500, ge=1, le=2000),
|
||
):
|
||
"""
|
||
查询 DB 交易记录(管理员可指定任意账号)
|
||
"""
|
||
sd, ed = start_date, end_date
|
||
if date:
|
||
sd, ed = date, date
|
||
_reconciled = str(reconciled_only or "").lower() in ("true", "1", "yes")
|
||
start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed)
|
||
trades = Trade.get_all(
|
||
start_timestamp=start_ts,
|
||
end_timestamp=end_ts,
|
||
symbol=symbol,
|
||
status=None,
|
||
account_id=account_id,
|
||
time_filter=time_filter,
|
||
limit=limit,
|
||
reconciled_only=_reconciled,
|
||
include_sync=True,
|
||
)
|
||
out = []
|
||
for t in trades:
|
||
row = dict(t)
|
||
for k, v in row.items():
|
||
if hasattr(v, "isoformat"):
|
||
row[k] = v.isoformat()
|
||
out.append(row)
|
||
return {"total": len(out), "trades": out}
|
||
|
||
|
||
@router.post("/binance-fetch")
|
||
async def fetch_binance_data(
|
||
_admin=Depends(get_admin_user),
|
||
account_id: int = Query(..., ge=1),
|
||
symbols: str = Query(..., description="交易对,逗号分隔,如 ASTERUSDT,FILUSDT"),
|
||
data_type: str = Query("trades", description="orders 或 trades"),
|
||
days: int = Query(7, ge=1, le=7),
|
||
):
|
||
"""
|
||
从币安拉取订单/成交记录(需账号已配置 API)
|
||
"""
|
||
try:
|
||
import sys
|
||
proj = Path(__file__).resolve().parents[3] # backend/api/routes -> project root
|
||
if str(proj) not in sys.path:
|
||
sys.path.insert(0, str(proj))
|
||
from trading_system.binance_client import BinanceClient
|
||
except ImportError as e:
|
||
raise HTTPException(status_code=500, detail=f"导入失败: {e}")
|
||
|
||
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
|
||
if not api_key or not api_secret:
|
||
raise HTTPException(status_code=400, detail="该账号未配置 API 密钥")
|
||
|
||
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
|
||
try:
|
||
await client.connect()
|
||
except Exception as e:
|
||
raise HTTPException(status_code=502, detail=f"连接币安失败: {e}")
|
||
|
||
try:
|
||
sym_list = [s.strip().upper() for s in symbols.split(",") if s.strip()]
|
||
if not sym_list:
|
||
raise HTTPException(status_code=400, detail="请指定至少一个交易对")
|
||
|
||
end_ms = int(datetime.now(BEIJING_TZ).timestamp() * 1000)
|
||
start_ms = end_ms - days * 24 * 3600 * 1000
|
||
|
||
all_data = []
|
||
for sym in sym_list:
|
||
try:
|
||
if data_type == "trades":
|
||
rows = await client.client.futures_account_trades(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
else:
|
||
rows = await client.client.futures_get_all_orders(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
if isinstance(rows, list):
|
||
for r in rows:
|
||
r["_symbol"] = sym
|
||
all_data.extend(rows)
|
||
except Exception as e:
|
||
all_data.append({"_symbol": sym, "_error": str(e)})
|
||
await asyncio.sleep(0.2)
|
||
|
||
time_key = "time" if (all_data and "time" in (all_data[0] or {})) else "updateTime"
|
||
all_data.sort(key=lambda x: x.get(time_key, 0), reverse=True)
|
||
|
||
return {"total": len(all_data), "data_type": data_type, "data": all_data}
|
||
finally:
|
||
if client.client:
|
||
await client.client.close_connection()
|