auto_trade_sys/backend/api/routes/data_management.py
薇薇安 aaef73c2b3 feat(data_management): 优化数据管理功能与API接口
在后端 API 中新增 `_get_active_symbols_from_income` 函数,通过收益历史 API 获取有交易活动的交易对,减少后续请求数。更新 `fetch_binance_data` 函数以支持动态获取交易对,并优化前端 `DataManagement` 组件,确保仅显示状态为 active 的账号。调整 API 服务以支持可选参数 `activeOnly`,提升数据查询的灵活性与用户体验。
2026-02-22 10:43:37 +08:00

243 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据管理:查询 DB 交易、从币安拉取订单/成交,供策略分析与导出。
仅管理员可用。
"""
import asyncio
from pathlib import Path
from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Optional
from api.auth_deps import get_admin_user
from database.models import Trade, Account
from datetime import datetime, timezone, timedelta
router = APIRouter(prefix="/api/admin/data", tags=["数据管理"])
BEIJING_TZ = timezone(timedelta(hours=8))
def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_date: Optional[str]):
now = datetime.now(BEIJING_TZ)
end_ts = int(now.timestamp())
start_ts = None
if period:
if period == "today":
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(today.timestamp())
elif period == "1d":
start_ts = end_ts - 24 * 3600
elif period == "7d":
start_ts = end_ts - 7 * 24 * 3600
elif period == "30d":
start_ts = end_ts - 30 * 24 * 3600
elif period == "week":
days = now.weekday()
week_start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(week_start.timestamp())
elif period == "month":
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_ts = int(month_start.timestamp())
if start_date:
try:
s = start_date if len(start_date) > 10 else f"{start_date} 00:00:00"
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
start_ts = int(dt.timestamp())
except ValueError:
pass
if end_date:
try:
s = end_date if len(end_date) > 10 else f"{end_date} 23:59:59"
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=BEIJING_TZ)
end_ts = int(dt.timestamp())
except ValueError:
pass
if start_ts is None:
start_ts = end_ts - 7 * 24 * 3600 # 默认 7 天
return start_ts, end_ts
async def _get_active_symbols_from_income(binance_client, start_ms: int, end_ms: int) -> list:
"""
通过收益历史 API 获取该时间段内有交易活动的交易对,避免全量遍历 250+ 交易对。
一次 API 调用weight 100即可拿到有成交/盈亏的 symbol 列表,大幅减少后续 trades/orders 的请求数。
"""
try:
symbols = set()
current_end = end_ms
for _ in range(10): # 最多分页 10 次(单次最多 1000 条)
rows = await binance_client.futures_income_history(
startTime=start_ms,
endTime=current_end,
limit=1000,
recvWindow=20000,
)
if not rows:
break
for r in rows:
sym = (r.get("symbol") or "").strip()
if sym and sym.endswith("USDT"):
symbols.add(sym)
if len(rows) < 1000:
break
oldest = min(r.get("time", current_end) for r in rows)
current_end = oldest - 1
if current_end < start_ms:
break
await asyncio.sleep(0.15)
return sorted(symbols)
except Exception:
return []
@router.get("/accounts")
async def list_accounts(_admin=Depends(get_admin_user), active_only: bool = Query(False)):
"""获取账号列表供数据管理选择。active_only=true 时仅返回 status=active 的账号"""
rows = Account.list_all()
accounts = [{"id": r["id"], "name": r.get("name") or f"Account {r['id']}", "status": r.get("status") or "active"} for r in (rows or [])]
if active_only:
accounts = [a for a in accounts if (a.get("status") or "").lower() == "active"]
return {"accounts": accounts}
@router.get("/trades")
async def query_db_trades(
_admin=Depends(get_admin_user),
account_id: int = Query(..., ge=1, description="账号 ID"),
period: Optional[str] = Query(None, description="today/1d/7d/30d/week/month"),
date: Optional[str] = Query(None, description="YYYY-MM-DD指定日期等同于 start_date=end_date"),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
symbol: Optional[str] = Query(None),
time_filter: str = Query("created", description="created/entry/exit"),
reconciled_only: Optional[str] = Query(None),
limit: int = Query(500, ge=1, le=2000),
):
"""
查询 DB 交易记录(管理员可指定任意账号)
"""
sd, ed = start_date, end_date
if date:
sd, ed = date, date
_reconciled = str(reconciled_only or "").lower() in ("true", "1", "yes")
start_ts, end_ts = _get_timestamp_range(period or "today", sd, ed)
trades = Trade.get_all(
start_timestamp=start_ts,
end_timestamp=end_ts,
symbol=symbol,
status=None,
account_id=account_id,
time_filter=time_filter,
limit=limit,
reconciled_only=_reconciled,
include_sync=True,
)
out = []
for t in trades:
row = dict(t)
for k, v in row.items():
if hasattr(v, "isoformat"):
row[k] = v.isoformat()
out.append(row)
return {"total": len(out), "trades": out}
@router.post("/binance-fetch")
async def fetch_binance_data(
_admin=Depends(get_admin_user),
account_id: int = Query(..., ge=1),
symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则拉取该时间段内全部交易对的订单/成交"),
data_type: str = Query("trades", description="orders 或 trades"),
days: int = Query(7, ge=0, le=7),
):
"""
从币安拉取订单/成交记录(需账号已配置 API
"""
try:
import sys
proj = Path(__file__).resolve().parents[3] # backend/api/routes -> project root
if str(proj) not in sys.path:
sys.path.insert(0, str(proj))
from trading_system.binance_client import BinanceClient
except ImportError as e:
raise HTTPException(status_code=500, detail=f"导入失败: {e}")
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
raise HTTPException(status_code=400, detail="该账号未配置 API 密钥")
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
try:
await client.connect()
except Exception as e:
raise HTTPException(status_code=502, detail=f"连接币安失败: {e}")
try:
now = datetime.now(BEIJING_TZ)
end_ms = int(now.timestamp() * 1000)
if days == 0:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_ms = int(today_start.timestamp() * 1000)
else:
start_ms = end_ms - days * 24 * 3600 * 1000
sym_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()]
if not sym_list:
sym_list = await _get_active_symbols_from_income(client.client, start_ms, end_ms)
if not sym_list:
sym_list = await client.get_all_usdt_pairs()
if not sym_list:
raise HTTPException(status_code=500, detail="无法获取交易对列表,请手动指定交易对")
sem = asyncio.Semaphore(5)
async def _fetch_one(sym: str):
async with sem:
try:
if data_type == "trades":
rows = await client.client.futures_account_trades(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
else:
rows = await client.client.futures_get_all_orders(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
if isinstance(rows, list):
for r in rows:
r["_symbol"] = sym
return rows
except Exception as e:
return [{"_symbol": sym, "_error": str(e)}]
finally:
await asyncio.sleep(0.12)
tasks = [_fetch_one(sym) for sym in sym_list]
chunks = await asyncio.gather(*tasks)
all_data = []
for ch in chunks:
all_data.extend(ch)
time_key = "time" if (all_data and "time" in (all_data[0] or {})) else "updateTime"
all_data.sort(key=lambda x: x.get(time_key, 0), reverse=True)
return {
"total": len(all_data),
"data_type": data_type,
"symbols_queried": len(sym_list),
"data": all_data,
}
finally:
if client.client:
await client.client.close_connection()