auto_trade_sys/scripts/sync_binance_orders.py
薇薇安 3b0526f392 feat(data_management): 增强交易数据统计与推算功能
在后端 API 中新增按小时和星期的交易统计功能,优化 `_compute_binance_stats` 函数以支持更细致的统计分析。同时,新增 `_enrich_trades_with_derived` 函数,补充交易记录的推算字段,包括入场价、交易小时和星期,提升策略分析的便利性。前端 `DataManagement` 组件更新,展示按小时和星期的统计信息,增强用户对交易数据的可视化理解。
2026-02-22 11:16:33 +08:00

270 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
定时任务:从币安拉取各账号最近 6 小时的订单/成交数据,去重写入 DB。
供 crontab 定时执行,如: 0 */3 * * * cd /path/to/project && python scripts/sync_binance_orders.py
用法:
python scripts/sync_binance_orders.py # 所有有效账号,最近 6 小时
python scripts/sync_binance_orders.py -a 2 # 指定账号
python scripts/sync_binance_orders.py -h 12 # 拉取最近 12 小时
"""
import argparse
import asyncio
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
proj = Path(__file__).resolve().parent.parent
if (proj / "backend").exists():
sys.path.insert(0, str(proj / "backend"))
sys.path.insert(0, str(proj))
BEIJING_TZ = timezone(timedelta(hours=8))
async def _get_active_symbols(client, start_ms: int, end_ms: int) -> list:
try:
symbols = set()
current_end = end_ms
for _ in range(10):
rows = await client.client.futures_income_history(
startTime=start_ms,
endTime=current_end,
limit=1000,
recvWindow=20000,
)
if not rows:
break
for r in rows:
sym = (r.get("symbol") or "").strip()
if sym and sym.endswith("USDT"):
symbols.add(sym)
if len(rows) < 1000:
break
oldest = min(r.get("time", current_end) for r in rows)
current_end = oldest - 1
if current_end < start_ms:
break
await asyncio.sleep(0.4)
return sorted(symbols)
except Exception:
return []
async def sync_account(account_id: int, hours: int = 6) -> tuple:
"""同步单个账号的 trades 和 orders返回 (trades_ins, orders_ins, err)"""
from database.models import Account
from database.connection import db
from trading_system.binance_client import BinanceClient
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
return 0, 0, "未配置 API 密钥"
now = datetime.now(BEIJING_TZ)
end_ms = int(now.timestamp() * 1000)
start_ms = end_ms - hours * 3600 * 1000
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
try:
await client.connect()
except Exception as e:
return 0, 0, str(e)
try:
sym_list = await _get_active_symbols(client, start_ms, end_ms)
if not sym_list:
sym_list = await client.get_all_usdt_pairs()
if not sym_list:
return 0, 0, "无法获取交易对列表"
sem = asyncio.Semaphore(2)
async def _fetch_trades(sym):
async with sem:
try:
rows = await client.client.futures_account_trades(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
return rows or []
except Exception:
return []
finally:
await asyncio.sleep(0.35)
async def _fetch_orders(sym):
async with sem:
try:
rows = await client.client.futures_get_all_orders(
symbol=sym,
startTime=start_ms,
endTime=end_ms,
limit=1000,
recvWindow=20000,
)
return rows or []
except Exception:
return []
finally:
await asyncio.sleep(0.35)
trades_chunks = await asyncio.gather(*[_fetch_trades(s) for s in sym_list])
orders_chunks = await asyncio.gather(*[_fetch_orders(s) for s in sym_list])
all_trades = []
for sym, rows in zip(sym_list, trades_chunks):
for r in rows:
r["_symbol"] = sym
all_trades.append(r)
all_orders = []
for sym, rows in zip(sym_list, orders_chunks):
for r in rows:
r["_symbol"] = sym
all_orders.append(r)
# 写入 DBINSERT IGNORE 去重)
trade_sql = """INSERT IGNORE INTO binance_trades
(account_id, symbol, trade_id, order_id, side, position_side, price, qty, quote_qty,
realized_pnl, commission, commission_asset, buyer, maker, trade_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
trade_params = []
for r in all_trades:
sym = r.get("_symbol") or r.get("symbol")
trade_params.append((
account_id,
sym,
int(r.get("id", 0) or 0),
int(r.get("orderId", 0) or 0),
r.get("side") or "",
r.get("positionSide") or None,
float(r.get("price", 0) or 0),
float(r.get("qty", 0) or 0),
float(r.get("quoteQty", 0) or 0) if r.get("quoteQty") else None,
float(r.get("realizedPnl", 0) or 0) if r.get("realizedPnl") is not None else None,
float(r.get("commission", 0) or 0) if r.get("commission") is not None else None,
r.get("commissionAsset") or None,
1 if r.get("buyer") else 0,
1 if r.get("maker") else 0,
int(r.get("time", 0) or 0),
))
order_sql = """INSERT IGNORE INTO binance_orders
(account_id, symbol, order_id, client_order_id, side, type, orig_type, status,
price, avg_price, orig_qty, executed_qty, cum_qty, cum_quote, stop_price,
reduce_only, position_side, order_time, update_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
order_params = []
for r in all_orders:
sym = r.get("_symbol") or r.get("symbol")
order_params.append((
account_id,
sym,
int(r.get("orderId", 0) or 0),
r.get("clientOrderId") or None,
r.get("side") or "",
r.get("type") or None,
r.get("origType") or None,
r.get("status") or "",
float(r.get("price", 0) or 0) if r.get("price") else None,
float(r.get("avgPrice", 0) or 0) if r.get("avgPrice") else None,
float(r.get("origQty", 0) or 0) if r.get("origQty") else None,
float(r.get("executedQty", 0) or 0) if r.get("executedQty") else None,
float(r.get("cumQty", 0) or 0) if r.get("cumQty") else None,
float(r.get("cumQuote", 0) or 0) if r.get("cumQuote") else None,
float(r.get("stopPrice", 0) or 0) if r.get("stopPrice") else None,
1 if r.get("reduceOnly") else 0,
r.get("positionSide") or None,
int(r.get("time", 0) or 0),
int(r.get("updateTime", 0) or 0) if r.get("updateTime") else None,
))
trades_ins = 0
orders_ins = 0
if trade_params:
try:
db.execute_many(trade_sql, trade_params)
trades_ins = len(trade_params) # INSERT IGNORE 无法直接得到实际插入数,这里用传入数近似
except Exception as e:
return 0, 0, f"写入 trades 失败: {e}"
if order_params:
try:
db.execute_many(order_sql, order_params)
orders_ins = len(order_params)
except Exception as e:
return len(trade_params), 0, f"写入 orders 失败: {e}"
return len(trade_params), len(order_params), None
finally:
if client.client:
await client.client.close_connection()
def main():
parser = argparse.ArgumentParser(description="同步币安订单/成交到 DB供 crontab 定时执行)")
parser.add_argument("-a", "--account", type=int, default=None, help="指定账号 ID不传则同步所有有效账号")
parser.add_argument("--hours", type=int, default=6, help="拉取最近 N 小时,默认 6")
parser.add_argument("--delay-between-accounts", type=int, default=90, help="多账号时,每个账号之间等待秒数,默认 90避免触发币安限频")
args = parser.parse_args()
hours = args.hours
delay_between = max(0, args.delay_between_accounts)
try:
from database.models import Account
except ImportError as e:
print(f"导入失败: {e}")
sys.exit(1)
rows = Account.list_all()
accounts = [r for r in (rows or []) if (r.get("status") or "active").lower() == "active" and r.get("id")]
if args.account:
accounts = [a for a in accounts if a["id"] == args.account]
# 过滤掉未配置 API 密钥的账号
to_sync = []
skipped = []
for acc in accounts:
aid = acc["id"]
api_key, api_secret, _, _ = Account.get_credentials(aid)
if api_key and api_secret:
to_sync.append(acc)
else:
skipped.append(acc.get("name") or f"账号{aid}")
if skipped:
print(f"跳过无 API 密钥的账号: {', '.join(skipped)}")
if not to_sync:
print("无可同步账号(需已配置 API 密钥)")
sys.exit(0)
print(f"同步 {len(to_sync)} 个账号,最近 {hours} 小时,开始时间 {datetime.now(BEIJING_TZ).isoformat()}")
sys.stdout.flush()
async def run_all():
for i, acc in enumerate(to_sync):
if i > 0 and delay_between > 0:
print(f" 等待 {delay_between} 秒后同步下一账号(避免限频)...")
sys.stdout.flush()
await asyncio.sleep(delay_between)
aid = acc["id"]
name = acc.get("name") or f"账号{aid}"
tr, ord_cnt, err = await sync_account(aid, hours)
if err:
print(f" {name} (id={aid}): 失败 {err}")
else:
print(f" {name} (id={aid}): trades {tr} 条, orders {ord_cnt}")
asyncio.run(run_all())
print(f"同步完成 {datetime.now(BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}")
sys.stdout.flush()
if __name__ == "__main__":
main()