在后端 API 中新增数据管理路由,支持从币安拉取订单和成交记录的功能。前端应用中引入数据管理组件,并在路由中添加相应的链接。更新了 API 服务,提供获取账号列表和查询 DB 交易的接口,增强了系统的数据处理能力与用户体验。
148 lines
5.3 KiB
Python
148 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
从币安拉取最近订单/成交记录,供策略分析或与 DB 对照。
|
||
当 DB 记录缺失时,可用此脚本直接查币安数据。
|
||
|
||
用法:
|
||
python scripts/fetch_binance_orders.py --account 2
|
||
python scripts/fetch_binance_orders.py --account 2 --symbol BTCUSDT --days 7
|
||
python scripts/fetch_binance_orders.py --account 2 --type orders -o binance_orders.json
|
||
python scripts/fetch_binance_orders.py --account 2 --symbols ASTERUSDT,FILUSDT,PENGUUSDT
|
||
"""
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
proj = Path(__file__).resolve().parent.parent
|
||
if (proj / "backend").exists():
|
||
sys.path.insert(0, str(proj / "backend"))
|
||
sys.path.insert(0, str(proj))
|
||
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _serialize(obj):
|
||
if hasattr(obj, "isoformat"):
|
||
return obj.isoformat()
|
||
if isinstance(obj, datetime):
|
||
return obj.isoformat()
|
||
return obj
|
||
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser(description="从币安拉取订单/成交记录")
|
||
parser.add_argument("--account", "-a", type=int, default=None,
|
||
help="账号 ID,默认 ATS_ACCOUNT_ID 或 1")
|
||
parser.add_argument("--symbol", "-s", type=str, default=None,
|
||
help="交易对,如 BTCUSDT(单 symbol)")
|
||
parser.add_argument("--symbols", type=str, default=None,
|
||
help="多个交易对,逗号分隔,如 ASTERUSDT,FILUSDT")
|
||
parser.add_argument("--days", "-d", type=int, default=7,
|
||
help="拉取最近 N 天,默认 7(币安单次最多 7 天)")
|
||
parser.add_argument("--type", "-t", choices=["orders", "trades"], default="trades",
|
||
help="orders=订单列表, trades=成交记录(策略分析推荐 trades)")
|
||
parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件")
|
||
args = parser.parse_args()
|
||
|
||
account_id = args.account or int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
|
||
|
||
symbols = []
|
||
if args.symbol:
|
||
symbols = [s.strip().upper() for s in args.symbol.split(",") if s.strip()]
|
||
elif args.symbols:
|
||
symbols = [s.strip().upper() for s in args.symbols.split(",") if s.strip()]
|
||
|
||
if not symbols:
|
||
print("请指定 --symbol 或 --symbols(逗号分隔)")
|
||
print(" 示例: --symbol BTCUSDT 或 --symbols ASTERUSDT,FILUSDT,PENGUUSDT")
|
||
sys.exit(1)
|
||
|
||
try:
|
||
from database.models import Account
|
||
from trading_system.binance_client import BinanceClient
|
||
except ImportError as e:
|
||
print(f"导入失败: {e}")
|
||
print("请确保在项目根目录运行,且 backend 可访问")
|
||
sys.exit(1)
|
||
|
||
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
|
||
if not api_key or not api_secret:
|
||
print(f"账号 {account_id} 未配置 API 密钥")
|
||
sys.exit(1)
|
||
|
||
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
|
||
try:
|
||
await client.connect()
|
||
except Exception as e:
|
||
print(f"连接币安失败: {e}")
|
||
sys.exit(1)
|
||
|
||
end_ms = int(datetime.now(BEIJING_TZ).timestamp() * 1000)
|
||
start_ms = end_ms - args.days * 24 * 3600 * 1000
|
||
|
||
all_data = []
|
||
for sym in symbols:
|
||
try:
|
||
if args.type == "trades":
|
||
rows = await client.client.futures_account_trades(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
for r in rows:
|
||
r["_symbol"] = sym
|
||
all_data.extend(rows)
|
||
print(f" {sym}: 成交 {len(rows)} 条")
|
||
else:
|
||
rows = await client.client.futures_get_all_orders(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
for r in rows:
|
||
r["_symbol"] = sym
|
||
all_data.extend(rows)
|
||
print(f" {sym}: 订单 {len(rows)} 条")
|
||
except Exception as e:
|
||
print(f" {sym}: 失败 {e}")
|
||
await asyncio.sleep(0.2)
|
||
|
||
# 按时间排序
|
||
if all_data:
|
||
time_key = "time" if "time" in all_data[0] else "updateTime"
|
||
all_data.sort(key=lambda x: x.get(time_key, 0), reverse=True)
|
||
|
||
# 转换大数/日期
|
||
out = []
|
||
for r in all_data:
|
||
row = dict(r)
|
||
for k, v in list(row.items()):
|
||
if isinstance(v, (datetime,)):
|
||
row[k] = v.isoformat()
|
||
out.append(row)
|
||
|
||
print(f"\n账号 {account_id} | 类型 {args.type} | 共 {len(out)} 条")
|
||
if args.output:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
json.dump(out, f, ensure_ascii=False, indent=2, default=_serialize)
|
||
print(f"已导出到 {args.output}")
|
||
else:
|
||
print(json.dumps(out[:50], ensure_ascii=False, indent=2, default=_serialize))
|
||
if len(out) > 50:
|
||
print(f"... 仅显示前 50 条,共 {len(out)} 条。可用 -o 导出全部")
|
||
|
||
if client.client:
|
||
await client.client.close_connection()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|