Modified the admin dashboard statistics to retrieve account snapshots from the last 30 days instead of just 1 day, ensuring more comprehensive data. Additionally, introduced a new data source option for trades, allowing users to select between 'binance' and 'local' records, with appropriate handling for each source. Updated the frontend components to reflect these changes and improve user experience in managing trade data.
269 lines
10 KiB
Python
269 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
定时任务:从币安拉取各账号最近 6 小时的订单/成交数据,去重写入 DB。
|
||
供 crontab 定时执行,建议每 3 小时的第 0 分钟执行(如 0 */3 * * *),存在一定延时性。
|
||
用法:
|
||
python scripts/sync_binance_orders.py # 所有有效账号,最近 6 小时
|
||
python scripts/sync_binance_orders.py -a 2 # 指定账号
|
||
python scripts/sync_binance_orders.py -h 12 # 拉取最近 12 小时
|
||
"""
|
||
import argparse
|
||
import asyncio
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timezone, timedelta
|
||
from pathlib import Path
|
||
|
||
proj = Path(__file__).resolve().parent.parent
|
||
if (proj / "backend").exists():
|
||
sys.path.insert(0, str(proj / "backend"))
|
||
sys.path.insert(0, str(proj))
|
||
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
async def _get_active_symbols(client, start_ms: int, end_ms: int) -> list:
|
||
try:
|
||
symbols = set()
|
||
current_end = end_ms
|
||
for _ in range(10):
|
||
rows = await client.client.futures_income_history(
|
||
startTime=start_ms,
|
||
endTime=current_end,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
if not rows:
|
||
break
|
||
for r in rows:
|
||
sym = (r.get("symbol") or "").strip()
|
||
if sym and sym.endswith("USDT"):
|
||
symbols.add(sym)
|
||
if len(rows) < 1000:
|
||
break
|
||
oldest = min(r.get("time", current_end) for r in rows)
|
||
current_end = oldest - 1
|
||
if current_end < start_ms:
|
||
break
|
||
await asyncio.sleep(0.4)
|
||
return sorted(symbols)
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
async def sync_account(account_id: int, hours: int = 6) -> tuple:
|
||
"""同步单个账号的 trades 和 orders,返回 (trades_ins, orders_ins, err)"""
|
||
from database.models import Account
|
||
from database.connection import db
|
||
from trading_system.binance_client import BinanceClient
|
||
|
||
api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id)
|
||
if not api_key or not api_secret:
|
||
return 0, 0, "未配置 API 密钥"
|
||
|
||
now = datetime.now(BEIJING_TZ)
|
||
end_ms = int(now.timestamp() * 1000)
|
||
start_ms = end_ms - hours * 3600 * 1000
|
||
|
||
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
|
||
try:
|
||
await client.connect()
|
||
except Exception as e:
|
||
return 0, 0, str(e)
|
||
|
||
try:
|
||
sym_list = await _get_active_symbols(client, start_ms, end_ms)
|
||
if not sym_list:
|
||
sym_list = await client.get_all_usdt_pairs()
|
||
if not sym_list:
|
||
return 0, 0, "无法获取交易对列表"
|
||
|
||
sem = asyncio.Semaphore(2)
|
||
|
||
async def _fetch_trades(sym):
|
||
async with sem:
|
||
try:
|
||
rows = await client.client.futures_account_trades(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
return rows or []
|
||
except Exception:
|
||
return []
|
||
finally:
|
||
await asyncio.sleep(0.35)
|
||
|
||
async def _fetch_orders(sym):
|
||
async with sem:
|
||
try:
|
||
rows = await client.client.futures_get_all_orders(
|
||
symbol=sym,
|
||
startTime=start_ms,
|
||
endTime=end_ms,
|
||
limit=1000,
|
||
recvWindow=20000,
|
||
)
|
||
return rows or []
|
||
except Exception:
|
||
return []
|
||
finally:
|
||
await asyncio.sleep(0.35)
|
||
|
||
trades_chunks = await asyncio.gather(*[_fetch_trades(s) for s in sym_list])
|
||
orders_chunks = await asyncio.gather(*[_fetch_orders(s) for s in sym_list])
|
||
|
||
all_trades = []
|
||
for sym, rows in zip(sym_list, trades_chunks):
|
||
for r in rows:
|
||
r["_symbol"] = sym
|
||
all_trades.append(r)
|
||
|
||
all_orders = []
|
||
for sym, rows in zip(sym_list, orders_chunks):
|
||
for r in rows:
|
||
r["_symbol"] = sym
|
||
all_orders.append(r)
|
||
|
||
# 写入 DB(INSERT IGNORE 去重)
|
||
trade_sql = """INSERT IGNORE INTO binance_trades
|
||
(account_id, symbol, trade_id, order_id, side, position_side, price, qty, quote_qty,
|
||
realized_pnl, commission, commission_asset, buyer, maker, trade_time)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
|
||
trade_params = []
|
||
for r in all_trades:
|
||
sym = r.get("_symbol") or r.get("symbol")
|
||
trade_params.append((
|
||
account_id,
|
||
sym,
|
||
int(r.get("id", 0) or 0),
|
||
int(r.get("orderId", 0) or 0),
|
||
r.get("side") or "",
|
||
r.get("positionSide") or None,
|
||
float(r.get("price", 0) or 0),
|
||
float(r.get("qty", 0) or 0),
|
||
float(r.get("quoteQty", 0) or 0) if r.get("quoteQty") else None,
|
||
float(r.get("realizedPnl", 0) or 0) if r.get("realizedPnl") is not None else None,
|
||
float(r.get("commission", 0) or 0) if r.get("commission") is not None else None,
|
||
r.get("commissionAsset") or None,
|
||
1 if r.get("buyer") else 0,
|
||
1 if r.get("maker") else 0,
|
||
int(r.get("time", 0) or 0),
|
||
))
|
||
|
||
order_sql = """INSERT IGNORE INTO binance_orders
|
||
(account_id, symbol, order_id, client_order_id, side, type, orig_type, status,
|
||
price, avg_price, orig_qty, executed_qty, cum_qty, cum_quote, stop_price,
|
||
reduce_only, position_side, order_time, update_time)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
|
||
order_params = []
|
||
for r in all_orders:
|
||
sym = r.get("_symbol") or r.get("symbol")
|
||
order_params.append((
|
||
account_id,
|
||
sym,
|
||
int(r.get("orderId", 0) or 0),
|
||
r.get("clientOrderId") or None,
|
||
r.get("side") or "",
|
||
r.get("type") or None,
|
||
r.get("origType") or None,
|
||
r.get("status") or "",
|
||
float(r.get("price", 0) or 0) if r.get("price") else None,
|
||
float(r.get("avgPrice", 0) or 0) if r.get("avgPrice") else None,
|
||
float(r.get("origQty", 0) or 0) if r.get("origQty") else None,
|
||
float(r.get("executedQty", 0) or 0) if r.get("executedQty") else None,
|
||
float(r.get("cumQty", 0) or 0) if r.get("cumQty") else None,
|
||
float(r.get("cumQuote", 0) or 0) if r.get("cumQuote") else None,
|
||
float(r.get("stopPrice", 0) or 0) if r.get("stopPrice") else None,
|
||
1 if r.get("reduceOnly") else 0,
|
||
r.get("positionSide") or None,
|
||
int(r.get("time", 0) or 0),
|
||
int(r.get("updateTime", 0) or 0) if r.get("updateTime") else None,
|
||
))
|
||
|
||
trades_ins = 0
|
||
orders_ins = 0
|
||
if trade_params:
|
||
try:
|
||
db.execute_many(trade_sql, trade_params)
|
||
trades_ins = len(trade_params) # INSERT IGNORE 无法直接得到实际插入数,这里用传入数近似
|
||
except Exception as e:
|
||
return 0, 0, f"写入 trades 失败: {e}"
|
||
if order_params:
|
||
try:
|
||
db.execute_many(order_sql, order_params)
|
||
orders_ins = len(order_params)
|
||
except Exception as e:
|
||
return len(trade_params), 0, f"写入 orders 失败: {e}"
|
||
|
||
return len(trade_params), len(order_params), None
|
||
finally:
|
||
if client.client:
|
||
await client.client.close_connection()
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="同步币安订单/成交到 DB(供 crontab 定时执行)")
|
||
parser.add_argument("-a", "--account", type=int, default=None, help="指定账号 ID,不传则同步所有有效账号")
|
||
parser.add_argument("--hours", type=int, default=6, help="拉取最近 N 小时,默认 6")
|
||
parser.add_argument("--delay-between-accounts", type=int, default=90, help="多账号时,每个账号之间等待秒数,默认 90(避免触发币安限频)")
|
||
args = parser.parse_args()
|
||
hours = args.hours
|
||
delay_between = max(0, args.delay_between_accounts)
|
||
|
||
try:
|
||
from database.models import Account
|
||
except ImportError as e:
|
||
print(f"导入失败: {e}")
|
||
sys.exit(1)
|
||
|
||
rows = Account.list_all()
|
||
accounts = [r for r in (rows or []) if (r.get("status") or "active").lower() == "active" and r.get("id")]
|
||
if args.account:
|
||
accounts = [a for a in accounts if a["id"] == args.account]
|
||
|
||
# 过滤掉未配置 API 密钥的账号
|
||
to_sync = []
|
||
skipped = []
|
||
for acc in accounts:
|
||
aid = acc["id"]
|
||
api_key, api_secret, _, _ = Account.get_credentials(aid)
|
||
if api_key and api_secret:
|
||
to_sync.append(acc)
|
||
else:
|
||
skipped.append(acc.get("name") or f"账号{aid}")
|
||
|
||
if skipped:
|
||
print(f"跳过无 API 密钥的账号: {', '.join(skipped)}")
|
||
|
||
if not to_sync:
|
||
print("无可同步账号(需已配置 API 密钥)")
|
||
sys.exit(0)
|
||
|
||
print(f"同步 {len(to_sync)} 个账号,最近 {hours} 小时,开始时间 {datetime.now(BEIJING_TZ).isoformat()}")
|
||
sys.stdout.flush()
|
||
|
||
async def run_all():
|
||
for i, acc in enumerate(to_sync):
|
||
if i > 0 and delay_between > 0:
|
||
print(f" 等待 {delay_between} 秒后同步下一账号(避免限频)...")
|
||
sys.stdout.flush()
|
||
await asyncio.sleep(delay_between)
|
||
aid = acc["id"]
|
||
name = acc.get("name") or f"账号{aid}"
|
||
tr, ord_cnt, err = await sync_account(aid, hours)
|
||
if err:
|
||
print(f" {name} (id={aid}): 失败 {err}")
|
||
else:
|
||
print(f" {name} (id={aid}): trades {tr} 条, orders {ord_cnt} 条")
|
||
|
||
asyncio.run(run_all())
|
||
print(f"同步完成 {datetime.now(BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}")
|
||
sys.stdout.flush()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|