auto_trade_sys/scripts/sync_binance_orders.py
薇薇安 fc81a8d5d6 feat(data_management): 增强数据管理功能与统计分析
在后端 API 中新增 `_compute_binance_stats` 函数,用于计算交易和订单的统计数据,并更新 `query_db_trades` 函数以支持从数据库查询已同步的币安订单和成交记录。前端 `DataManagement` 组件进行了优化,新增统计数据显示功能,确保用户能够查看交易的盈亏、手续费、胜率等关键指标,提升了数据分析的可视化效果与用户体验。
2026-02-22 11:02:12 +08:00

247 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
定时任务:从币安拉取各账号最近 6 小时的订单/成交数据,去重写入 DB。
供 crontab 定时执行,如: 0 */3 * * * cd /path/to/project && python scripts/sync_binance_orders.py
用法:
python scripts/sync_binance_orders.py # 所有有效账号,最近 6 小时
python scripts/sync_binance_orders.py -a 2 # 指定账号
python scripts/sync_binance_orders.py -h 12 # 拉取最近 12 小时
"""
import argparse
import asyncio
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
proj = Path(__file__).resolve().parent.parent
if (proj / "backend").exists():
sys.path.insert(0, str(proj / "backend"))
sys.path.insert(0, str(proj))
BEIJING_TZ = timezone(timedelta(hours=8))
async def _get_active_symbols(client, start_ms: int, end_ms: int) -> list:
try:
symbols = set()
current_end = end_ms
for _ in range(10):
rows = await client.client.futures_income_history(
startTime=start_ms,
endTime=current_end,
limit=1000,
recvWindow=20000,
)
if not rows:
break
for r in rows:
sym = (r.get("symbol") or "").strip()
if sym and sym.endswith("USDT"):
symbols.add(sym)
if len(rows) < 1000:
break
oldest = min(r.get("time", current_end) for r in rows)
current_end = oldest - 1
if current_end < start_ms:
break
await asyncio.sleep(0.15)
return sorted(symbols)
except Exception:
return []
async def sync_account(account_id: int, hours: int = 6) -> tuple:
"""同步单个账号的 trades 和 orders返回 (trades_ins, orders_ins, err)"""
from database.models import Account
from database.connection import db
from trading_system.binance_client import BinanceClient
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
return 0, 0, "未配置 API 密钥"
now = datetime.now(BEIJING_TZ)
end_ms = int(now.timestamp() * 1000)
start_ms = end_ms - hours * 3600 * 1000
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
try:
await client.connect()
except Exception as e:
return 0, 0, str(e)
try:
sym_list = await _get_active_symbols(client, start_ms, end_ms)
if not sym_list:
sym_list = await client.get_all_usdt_pairs()
if not sym_list:
return 0, 0, "无法获取交易对列表"
sem = asyncio.Semaphore(5)
async def _fetch_trades(sym):
async with sem:
try:
rows = await client.client.futures_account_trades(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
return rows or []
except Exception:
return []
finally:
await asyncio.sleep(0.12)
async def _fetch_orders(sym):
async with sem:
try:
rows = await client.client.futures_get_all_orders(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
return rows or []
except Exception:
return []
finally:
await asyncio.sleep(0.12)
trades_chunks = await asyncio.gather(*[_fetch_trades(s) for s in sym_list])
orders_chunks = await asyncio.gather(*[_fetch_orders(s) for s in sym_list])
all_trades = []
for sym, rows in zip(sym_list, trades_chunks):
for r in rows:
r["_symbol"] = sym
all_trades.append(r)
all_orders = []
for sym, rows in zip(sym_list, orders_chunks):
for r in rows:
r["_symbol"] = sym
all_orders.append(r)
# 写入 DBINSERT IGNORE 去重)
trade_sql = """INSERT IGNORE INTO binance_trades
(account_id, symbol, trade_id, order_id, side, position_side, price, qty, quote_qty,
realized_pnl, commission, commission_asset, buyer, maker, trade_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
trade_params = []
for r in all_trades:
sym = r.get("_symbol") or r.get("symbol")
trade_params.append((
account_id,
sym,
int(r.get("id", 0) or 0),
int(r.get("orderId", 0) or 0),
r.get("side") or "",
r.get("positionSide") or None,
float(r.get("price", 0) or 0),
float(r.get("qty", 0) or 0),
float(r.get("quoteQty", 0) or 0) if r.get("quoteQty") else None,
float(r.get("realizedPnl", 0) or 0) if r.get("realizedPnl") is not None else None,
float(r.get("commission", 0) or 0) if r.get("commission") is not None else None,
r.get("commissionAsset") or None,
1 if r.get("buyer") else 0,
1 if r.get("maker") else 0,
int(r.get("time", 0) or 0),
))
order_sql = """INSERT IGNORE INTO binance_orders
(account_id, symbol, order_id, client_order_id, side, type, orig_type, status,
price, avg_price, orig_qty, executed_qty, cum_qty, cum_quote, stop_price,
reduce_only, position_side, order_time, update_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
order_params = []
for r in all_orders:
sym = r.get("_symbol") or r.get("symbol")
order_params.append((
account_id,
sym,
int(r.get("orderId", 0) or 0),
r.get("clientOrderId") or None,
r.get("side") or "",
r.get("type") or None,
r.get("origType") or None,
r.get("status") or "",
float(r.get("price", 0) or 0) if r.get("price") else None,
float(r.get("avgPrice", 0) or 0) if r.get("avgPrice") else None,
float(r.get("origQty", 0) or 0) if r.get("origQty") else None,
float(r.get("executedQty", 0) or 0) if r.get("executedQty") else None,
float(r.get("cumQty", 0) or 0) if r.get("cumQty") else None,
float(r.get("cumQuote", 0) or 0) if r.get("cumQuote") else None,
float(r.get("stopPrice", 0) or 0) if r.get("stopPrice") else None,
1 if r.get("reduceOnly") else 0,
r.get("positionSide") or None,
int(r.get("time", 0) or 0),
int(r.get("updateTime", 0) or 0) if r.get("updateTime") else None,
))
trades_ins = 0
orders_ins = 0
if trade_params:
try:
db.execute_many(trade_sql, trade_params)
trades_ins = len(trade_params) # INSERT IGNORE 无法直接得到实际插入数,这里用传入数近似
except Exception as e:
return 0, 0, f"写入 trades 失败: {e}"
if order_params:
try:
db.execute_many(order_sql, order_params)
orders_ins = len(order_params)
except Exception as e:
return len(trade_params), 0, f"写入 orders 失败: {e}"
return len(trade_params), len(order_params), None
finally:
if client.client:
await client.client.close_connection()
def main():
parser = argparse.ArgumentParser(description="同步币安订单/成交到 DB供 crontab 定时执行)")
parser.add_argument("-a", "--account", type=int, default=None, help="指定账号 ID不传则同步所有有效账号")
parser.add_argument("--hours", type=int, default=6, help="拉取最近 N 小时,默认 6")
args = parser.parse_args()
hours = args.hours
try:
from database.models import Account
except ImportError as e:
print(f"导入失败: {e}")
sys.exit(1)
rows = Account.list_all()
accounts = [r for r in (rows or []) if (r.get("status") or "active").lower() == "active" and r.get("id")]
if args.account:
accounts = [a for a in accounts if a["id"] == args.account]
if not accounts:
print("无有效账号")
sys.exit(0)
print(f"同步 {len(accounts)} 个账号,最近 {hours} 小时,开始时间 {datetime.now(BEIJING_TZ).isoformat()}")
async def run_all():
for acc in accounts:
aid = acc["id"]
name = acc.get("name") or f"账号{aid}"
tr, ord_cnt, err = await sync_account(aid, hours)
if err:
print(f" {name} (id={aid}): 失败 {err}")
else:
print(f" {name} (id={aid}): trades {tr} 条, orders {ord_cnt}")
asyncio.run(run_all())
print("同步完成")
if __name__ == "__main__":
main()