#!/usr/bin/env python3 """ 定时任务:从币安拉取各账号最近 6 小时的订单/成交数据,去重写入 DB。 供 crontab 定时执行,如: 0 */3 * * * cd /path/to/project && python scripts/sync_binance_orders.py 用法: python scripts/sync_binance_orders.py # 所有有效账号,最近 6 小时 python scripts/sync_binance_orders.py -a 2 # 指定账号 python scripts/sync_binance_orders.py -h 12 # 拉取最近 12 小时 """ import argparse import asyncio import os import sys from datetime import datetime, timezone, timedelta from pathlib import Path proj = Path(__file__).resolve().parent.parent if (proj / "backend").exists(): sys.path.insert(0, str(proj / "backend")) sys.path.insert(0, str(proj)) BEIJING_TZ = timezone(timedelta(hours=8)) async def _get_active_symbols(client, start_ms: int, end_ms: int) -> list: try: symbols = set() current_end = end_ms for _ in range(10): rows = await client.client.futures_income_history( startTime=start_ms, endTime=current_end, limit=1000, recvWindow=20000, ) if not rows: break for r in rows: sym = (r.get("symbol") or "").strip() if sym and sym.endswith("USDT"): symbols.add(sym) if len(rows) < 1000: break oldest = min(r.get("time", current_end) for r in rows) current_end = oldest - 1 if current_end < start_ms: break await asyncio.sleep(0.4) return sorted(symbols) except Exception: return [] async def sync_account(account_id: int, hours: int = 6) -> tuple: """同步单个账号的 trades 和 orders,返回 (trades_ins, orders_ins, err)""" from database.models import Account from database.connection import db from trading_system.binance_client import BinanceClient api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id) if not api_key or not api_secret: return 0, 0, "未配置 API 密钥" now = datetime.now(BEIJING_TZ) end_ms = int(now.timestamp() * 1000) start_ms = end_ms - hours * 3600 * 1000 client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet) try: await client.connect() except Exception as e: return 0, 0, str(e) try: sym_list = await _get_active_symbols(client, start_ms, end_ms) if not sym_list: sym_list = await client.get_all_usdt_pairs() if not sym_list: return 0, 0, "无法获取交易对列表" sem = asyncio.Semaphore(2) async def _fetch_trades(sym): async with sem: try: rows = await client.client.futures_account_trades( symbol=sym, startTime=start_ms, endTime=end_ms, limit=1000, recvWindow=20000, ) return rows or [] except Exception: return [] finally: await asyncio.sleep(0.35) async def _fetch_orders(sym): async with sem: try: rows = await client.client.futures_get_all_orders( symbol=sym, startTime=start_ms, endTime=end_ms, limit=1000, recvWindow=20000, ) return rows or [] except Exception: return [] finally: await asyncio.sleep(0.35) trades_chunks = await asyncio.gather(*[_fetch_trades(s) for s in sym_list]) orders_chunks = await asyncio.gather(*[_fetch_orders(s) for s in sym_list]) all_trades = [] for sym, rows in zip(sym_list, trades_chunks): for r in rows: r["_symbol"] = sym all_trades.append(r) all_orders = [] for sym, rows in zip(sym_list, orders_chunks): for r in rows: r["_symbol"] = sym all_orders.append(r) # 写入 DB(INSERT IGNORE 去重) trade_sql = """INSERT IGNORE INTO binance_trades (account_id, symbol, trade_id, order_id, side, position_side, price, qty, quote_qty, realized_pnl, commission, commission_asset, buyer, maker, trade_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" trade_params = [] for r in all_trades: sym = r.get("_symbol") or r.get("symbol") trade_params.append(( account_id, sym, int(r.get("id", 0) or 0), int(r.get("orderId", 0) or 0), r.get("side") or "", r.get("positionSide") or None, float(r.get("price", 0) or 0), float(r.get("qty", 0) or 0), float(r.get("quoteQty", 0) or 0) if r.get("quoteQty") else None, float(r.get("realizedPnl", 0) or 0) if r.get("realizedPnl") is not None else None, float(r.get("commission", 0) or 0) if r.get("commission") is not None else None, r.get("commissionAsset") or None, 1 if r.get("buyer") else 0, 1 if r.get("maker") else 0, int(r.get("time", 0) or 0), )) order_sql = """INSERT IGNORE INTO binance_orders (account_id, symbol, order_id, client_order_id, side, type, orig_type, status, price, avg_price, orig_qty, executed_qty, cum_qty, cum_quote, stop_price, reduce_only, position_side, order_time, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" order_params = [] for r in all_orders: sym = r.get("_symbol") or r.get("symbol") order_params.append(( account_id, sym, int(r.get("orderId", 0) or 0), r.get("clientOrderId") or None, r.get("side") or "", r.get("type") or None, r.get("origType") or None, r.get("status") or "", float(r.get("price", 0) or 0) if r.get("price") else None, float(r.get("avgPrice", 0) or 0) if r.get("avgPrice") else None, float(r.get("origQty", 0) or 0) if r.get("origQty") else None, float(r.get("executedQty", 0) or 0) if r.get("executedQty") else None, float(r.get("cumQty", 0) or 0) if r.get("cumQty") else None, float(r.get("cumQuote", 0) or 0) if r.get("cumQuote") else None, float(r.get("stopPrice", 0) or 0) if r.get("stopPrice") else None, 1 if r.get("reduceOnly") else 0, r.get("positionSide") or None, int(r.get("time", 0) or 0), int(r.get("updateTime", 0) or 0) if r.get("updateTime") else None, )) trades_ins = 0 orders_ins = 0 if trade_params: try: db.execute_many(trade_sql, trade_params) trades_ins = len(trade_params) # INSERT IGNORE 无法直接得到实际插入数,这里用传入数近似 except Exception as e: return 0, 0, f"写入 trades 失败: {e}" if order_params: try: db.execute_many(order_sql, order_params) orders_ins = len(order_params) except Exception as e: return len(trade_params), 0, f"写入 orders 失败: {e}" return len(trade_params), len(order_params), None finally: if client.client: await client.client.close_connection() def main(): parser = argparse.ArgumentParser(description="同步币安订单/成交到 DB(供 crontab 定时执行)") parser.add_argument("-a", "--account", type=int, default=None, help="指定账号 ID,不传则同步所有有效账号") parser.add_argument("--hours", type=int, default=6, help="拉取最近 N 小时,默认 6") parser.add_argument("--delay-between-accounts", type=int, default=90, help="多账号时,每个账号之间等待秒数,默认 90(避免触发币安限频)") args = parser.parse_args() hours = args.hours delay_between = max(0, args.delay_between_accounts) try: from database.models import Account except ImportError as e: print(f"导入失败: {e}") sys.exit(1) rows = Account.list_all() accounts = [r for r in (rows or []) if (r.get("status") or "active").lower() == "active" and r.get("id")] if args.account: accounts = [a for a in accounts if a["id"] == args.account] # 过滤掉未配置 API 密钥的账号 to_sync = [] skipped = [] for acc in accounts: aid = acc["id"] api_key, api_secret, _, _ = Account.get_credentials(aid) if api_key and api_secret: to_sync.append(acc) else: skipped.append(acc.get("name") or f"账号{aid}") if skipped: print(f"跳过无 API 密钥的账号: {', '.join(skipped)}") if not to_sync: print("无可同步账号(需已配置 API 密钥)") sys.exit(0) print(f"同步 {len(to_sync)} 个账号,最近 {hours} 小时,开始时间 {datetime.now(BEIJING_TZ).isoformat()}") sys.stdout.flush() async def run_all(): for i, acc in enumerate(to_sync): if i > 0 and delay_between > 0: print(f" 等待 {delay_between} 秒后同步下一账号(避免限频)...") sys.stdout.flush() await asyncio.sleep(delay_between) aid = acc["id"] name = acc.get("name") or f"账号{aid}" tr, ord_cnt, err = await sync_account(aid, hours) if err: print(f" {name} (id={aid}): 失败 {err}") else: print(f" {name} (id={aid}): trades {tr} 条, orders {ord_cnt} 条") asyncio.run(run_all()) print(f"同步完成 {datetime.now(BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}") sys.stdout.flush() if __name__ == "__main__": main()