diff --git a/backend/api/routes/data_management.py b/backend/api/routes/data_management.py index 2e1cd9c..94b7307 100644 --- a/backend/api/routes/data_management.py +++ b/backend/api/routes/data_management.py @@ -60,6 +60,84 @@ def _get_timestamp_range(period: Optional[str], start_date: Optional[str], end_d return start_ts, end_ts +def _compute_binance_stats(data: list, data_type: str) -> dict: + """计算用于策略分析的统计数据(成交/订单原始字段均已保留,导出 JSON 含全部)""" + stats = {"count": len(data)} + valid = [r for r in data if isinstance(r, dict) and "_error" not in r] + if not valid: + return stats + + if data_type == "trades": + pnls = [] + commissions = [] + quote_qtys = [] + by_symbol = {} + wins, losses = 0, 0 + maker_count, taker_count = 0, 0 + for r in valid: + sym = r.get("_symbol") or r.get("symbol") or "-" + p = float(r.get("realizedPnl") or 0) + c = float(r.get("commission") or 0) + qq = float(r.get("quoteQty") or 0) + pnls.append(p) + commissions.append(c) + if qq: + quote_qtys.append(qq) + if p > 0: + wins += 1 + elif p < 0: + losses += 1 + if r.get("maker"): + maker_count += 1 + else: + taker_count += 1 + by_symbol[sym] = by_symbol.get(sym, {"count": 0, "pnl": 0.0, "commission": 0.0, "quoteQty": 0.0}) + by_symbol[sym]["count"] += 1 + by_symbol[sym]["pnl"] += p + by_symbol[sym]["commission"] += c + by_symbol[sym]["quoteQty"] += qq + + stats["total_realized_pnl"] = round(sum(pnls), 4) + stats["total_commission"] = round(sum(commissions), 4) + stats["net_pnl"] = round(stats["total_realized_pnl"] - stats["total_commission"], 4) + stats["win_count"] = wins + stats["loss_count"] = losses + stats["win_rate"] = round(100 * wins / (wins + losses), 1) if (wins + losses) > 0 else 0 + stats["avg_pnl_per_trade"] = round(sum(pnls) / len(pnls), 4) if pnls else 0 + stats["total_quote_qty"] = round(sum(quote_qtys), 2) + stats["maker_count"] = maker_count + stats["taker_count"] = taker_count + stats["by_symbol"] = { + k: { + "count": v["count"], + "pnl": round(v["pnl"], 4), + "commission": round(v["commission"], 4), + "quoteQty": round(v["quoteQty"], 2), + } + for k, v in sorted(by_symbol.items()) + } + else: + by_status = {} + by_type = {} + by_symbol = {} + filled_count = 0 + for r in valid: + status = r.get("status") or "UNKNOWN" + typ = r.get("type") or r.get("origType") or "UNKNOWN" + sym = r.get("_symbol") or r.get("symbol") or "-" + by_status[status] = by_status.get(status, 0) + 1 + by_type[typ] = by_type.get(typ, 0) + 1 + by_symbol[sym] = by_symbol.get(sym, 0) + 1 + if status == "FILLED": + filled_count += 1 + stats["by_status"] = by_status + stats["by_type"] = by_type + stats["by_symbol"] = dict(sorted(by_symbol.items())) + stats["filled_count"] = filled_count + + return stats + + async def _get_active_symbols_from_income(binance_client, start_ms: int, end_ms: int) -> list: """ 通过收益历史 API 获取该时间段内有交易活动的交易对,避免全量遍历 250+ 交易对。 @@ -145,98 +223,106 @@ async def query_db_trades( return {"total": len(out), "trades": out} +def _binance_row_to_api_format(row: dict, data_type: str) -> dict: + """将 DB 行转换为前端/导出期望的币安 API 格式""" + if data_type == "trades": + return { + "id": row.get("trade_id"), + "orderId": row.get("order_id"), + "symbol": row.get("symbol"), + "_symbol": row.get("symbol"), + "side": row.get("side"), + "positionSide": row.get("position_side"), + "price": str(row.get("price") or ""), + "qty": str(row.get("qty") or ""), + "quoteQty": str(row.get("quote_qty") or ""), + "realizedPnl": str(row.get("realized_pnl") or ""), + "commission": str(row.get("commission") or ""), + "commissionAsset": row.get("commission_asset"), + "buyer": bool(row.get("buyer")), + "maker": bool(row.get("maker")), + "time": row.get("trade_time"), + } + else: + return { + "orderId": row.get("order_id"), + "clientOrderId": row.get("client_order_id"), + "symbol": row.get("symbol"), + "_symbol": row.get("symbol"), + "side": row.get("side"), + "type": row.get("type"), + "origType": row.get("orig_type"), + "status": row.get("status"), + "price": str(row.get("price") or ""), + "avgPrice": str(row.get("avg_price") or ""), + "origQty": str(row.get("orig_qty") or ""), + "executedQty": str(row.get("executed_qty") or ""), + "cumQty": str(row.get("cum_qty") or ""), + "cumQuote": str(row.get("cum_quote") or ""), + "stopPrice": str(row.get("stop_price") or "") if row.get("stop_price") else "", + "reduceOnly": bool(row.get("reduce_only")), + "positionSide": row.get("position_side"), + "time": row.get("order_time"), + "updateTime": row.get("update_time"), + } + + @router.post("/binance-fetch") -async def fetch_binance_data( +async def query_binance_data_from_db( _admin=Depends(get_admin_user), account_id: int = Query(..., ge=1), - symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则拉取该时间段内全部交易对的订单/成交"), + symbols: Optional[str] = Query(None, description="交易对,逗号分隔;留空则全部"), data_type: str = Query("trades", description="orders 或 trades"), days: int = Query(7, ge=0, le=7), ): """ - 从币安拉取订单/成交记录(需账号已配置 API) + 从 DB 查询已同步的币安订单/成交(由定时任务 scripts/sync_binance_orders.py 拉取入库) """ - try: - import sys - proj = Path(__file__).resolve().parents[3] # backend/api/routes -> project root - if str(proj) not in sys.path: - sys.path.insert(0, str(proj)) - from trading_system.binance_client import BinanceClient - except ImportError as e: - raise HTTPException(status_code=500, detail=f"导入失败: {e}") + from database.connection import db - api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id) - if not api_key or not api_secret: - raise HTTPException(status_code=400, detail="该账号未配置 API 密钥") + now = datetime.now(BEIJING_TZ) + end_ts = int(now.timestamp()) + if days == 0: + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + start_ts = int(today_start.timestamp()) + else: + start_ts = end_ts - days * 24 * 3600 + start_ms = start_ts * 1000 + end_ms = end_ts * 1000 - client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet) - try: - await client.connect() - except Exception as e: - raise HTTPException(status_code=502, detail=f"连接币安失败: {e}") + symbol_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()] try: - now = datetime.now(BEIJING_TZ) - end_ms = int(now.timestamp() * 1000) - if days == 0: - today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - start_ms = int(today_start.timestamp() * 1000) + if data_type == "trades": + q = """SELECT * FROM binance_trades + WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s""" + params = [account_id, start_ms, end_ms] + if symbol_list: + q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")" + params.extend(symbol_list) + q += " ORDER BY trade_time DESC LIMIT 5000" else: - start_ms = end_ms - days * 24 * 3600 * 1000 + q = """SELECT * FROM binance_orders + WHERE account_id = %s AND order_time >= %s AND order_time <= %s""" + params = [account_id, start_ms, end_ms] + if symbol_list: + q += " AND symbol IN (" + ",".join(["%s"] * len(symbol_list)) + ")" + params.extend(symbol_list) + q += " ORDER BY order_time DESC LIMIT 5000" - sym_list = [s.strip().upper() for s in (symbols or "").split(",") if s.strip()] - if not sym_list: - sym_list = await _get_active_symbols_from_income(client.client, start_ms, end_ms) - if not sym_list: - sym_list = await client.get_all_usdt_pairs() - if not sym_list: - raise HTTPException(status_code=500, detail="无法获取交易对列表,请手动指定交易对") + rows = db.execute_query(q, params) + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询失败(请确认已执行 add_binance_sync_tables.sql 并运行过同步脚本): {e}") - sem = asyncio.Semaphore(5) + all_data = [_binance_row_to_api_format(dict(r), data_type) for r in (rows or [])] + symbols_queried = len(symbol_list) if symbol_list else len({(r or {}).get("symbol") for r in (rows or []) if (r or {}).get("symbol")}) + stats = _compute_binance_stats(all_data, data_type) - async def _fetch_one(sym: str): - async with sem: - try: - if data_type == "trades": - rows = await client.client.futures_account_trades( - symbol=sym, - startTime=start_ms, - endTime=end_ms, - limit=1000, - recvWindow=20000, - ) - else: - rows = await client.client.futures_get_all_orders( - symbol=sym, - startTime=start_ms, - endTime=end_ms, - limit=1000, - recvWindow=20000, - ) - if isinstance(rows, list): - for r in rows: - r["_symbol"] = sym - return rows - except Exception as e: - return [{"_symbol": sym, "_error": str(e)}] - finally: - await asyncio.sleep(0.12) - - tasks = [_fetch_one(sym) for sym in sym_list] - chunks = await asyncio.gather(*tasks) - all_data = [] - for ch in chunks: - all_data.extend(ch) - - time_key = "time" if (all_data and "time" in (all_data[0] or {})) else "updateTime" - all_data.sort(key=lambda x: x.get(time_key, 0), reverse=True) - - return { - "total": len(all_data), - "data_type": data_type, - "symbols_queried": len(sym_list), - "data": all_data, - } - finally: - if client.client: - await client.client.close_connection() + return { + "total": len(all_data), + "data_type": data_type, + "symbols_queried": symbols_queried, + "stats": stats, + "data": all_data, + "source": "db", + } diff --git a/backend/database/add_binance_sync_tables.sql b/backend/database/add_binance_sync_tables.sql new file mode 100644 index 0000000..dbfcdc9 --- /dev/null +++ b/backend/database/add_binance_sync_tables.sql @@ -0,0 +1,56 @@ +-- 币安订单/成交同步表,供定时任务拉取后存储,数据管理从 DB 查询分析 +-- 执行: mysql -u user -p db_name < add_binance_sync_tables.sql + +USE `auto_trade_sys`; + +-- 币安成交记录(userTrades) +CREATE TABLE IF NOT EXISTS `binance_trades` ( + `id` BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, + `account_id` INT UNSIGNED NOT NULL, + `symbol` VARCHAR(32) NOT NULL, + `trade_id` BIGINT UNSIGNED NOT NULL COMMENT '币安 trade id', + `order_id` BIGINT UNSIGNED NOT NULL, + `side` VARCHAR(10) NOT NULL, + `position_side` VARCHAR(10) DEFAULT NULL, + `price` DECIMAL(24, 8) NOT NULL, + `qty` DECIMAL(24, 8) NOT NULL, + `quote_qty` DECIMAL(24, 8) DEFAULT NULL, + `realized_pnl` DECIMAL(24, 8) DEFAULT NULL, + `commission` DECIMAL(24, 8) DEFAULT NULL, + `commission_asset` VARCHAR(20) DEFAULT NULL, + `buyer` TINYINT(1) DEFAULT NULL, + `maker` TINYINT(1) DEFAULT NULL, + `trade_time` BIGINT UNSIGNED NOT NULL COMMENT '成交时间戳毫秒', + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY `uk_account_trade` (`account_id`, `trade_id`), + INDEX `idx_account_time` (`account_id`, `trade_time`), + INDEX `idx_symbol_time` (`account_id`, `symbol`, `trade_time`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='币安成交记录(定时同步)'; + +-- 币安订单记录(allOrders) +CREATE TABLE IF NOT EXISTS `binance_orders` ( + `id` BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, + `account_id` INT UNSIGNED NOT NULL, + `symbol` VARCHAR(32) NOT NULL, + `order_id` BIGINT UNSIGNED NOT NULL, + `client_order_id` VARCHAR(64) DEFAULT NULL, + `side` VARCHAR(10) NOT NULL, + `type` VARCHAR(32) DEFAULT NULL, + `orig_type` VARCHAR(32) DEFAULT NULL, + `status` VARCHAR(32) NOT NULL, + `price` DECIMAL(24, 8) DEFAULT NULL, + `avg_price` DECIMAL(24, 8) DEFAULT NULL, + `orig_qty` DECIMAL(24, 8) DEFAULT NULL, + `executed_qty` DECIMAL(24, 8) DEFAULT NULL, + `cum_qty` DECIMAL(24, 8) DEFAULT NULL, + `cum_quote` DECIMAL(24, 8) DEFAULT NULL, + `stop_price` DECIMAL(24, 8) DEFAULT NULL, + `reduce_only` TINYINT(1) DEFAULT NULL, + `position_side` VARCHAR(10) DEFAULT NULL, + `order_time` BIGINT UNSIGNED NOT NULL COMMENT '下单时间戳毫秒', + `update_time` BIGINT UNSIGNED DEFAULT NULL COMMENT '更新时间戳毫秒', + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY `uk_account_order` (`account_id`, `order_id`), + INDEX `idx_account_time` (`account_id`, `order_time`), + INDEX `idx_symbol_time` (`account_id`, `symbol`, `order_time`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='币安订单记录(定时同步)'; diff --git a/frontend/src/components/DataManagement.css b/frontend/src/components/DataManagement.css index 89973ab..a2bf9d2 100644 --- a/frontend/src/components/DataManagement.css +++ b/frontend/src/components/DataManagement.css @@ -66,6 +66,49 @@ font-size: 13px; } +.dm-stats { + margin-top: 12px; + padding: 12px; + background: #f8fafc; + border-radius: 8px; + border: 1px solid #e2e8f0; +} + +.dm-stats h4 { + margin: 0 0 10px 0; + font-size: 14px; + color: #334155; +} + +.dm-stats-grid { + display: flex; + flex-wrap: wrap; + gap: 16px 24px; +} + +.dm-stat-item { + display: flex; + flex-direction: column; + gap: 2px; +} + +.dm-stat-wide { + flex: 1 1 100%; +} + +.dm-stat-label { + font-size: 11px; + color: #64748b; + text-transform: uppercase; +} + +.dm-stat-profit { color: #059669; font-weight: 600; } +.dm-stat-loss { color: #dc2626; font-weight: 600; } +.dm-stat-symbols { display: flex; flex-wrap: wrap; gap: 8px 16px; font-size: 12px; } +.dm-stat-sym { background: #fff; padding: 2px 8px; border-radius: 4px; } +.dm-table .dm-profit { color: #059669; } +.dm-table .dm-loss { color: #dc2626; } + .dm-result { margin-top: 16px; } @@ -79,6 +122,14 @@ color: #555; } +.dm-source-tag { + font-size: 11px; + color: #64748b; + background: #e2e8f0; + padding: 2px 8px; + border-radius: 4px; +} + .dm-table-wrap { overflow-x: auto; max-height: 400px; diff --git a/frontend/src/components/DataManagement.jsx b/frontend/src/components/DataManagement.jsx index a20cf39..dfe9f75 100644 --- a/frontend/src/components/DataManagement.jsx +++ b/frontend/src/components/DataManagement.jsx @@ -85,7 +85,7 @@ export default function DataManagement() { const res = await api.postDataManagementFetchBinance(params) setBnResult(res) } catch (e) { - setBnError(e?.message || '拉取失败') + setBnError(e?.message || '查询失败') } finally { setBnLoading(false) } @@ -100,10 +100,19 @@ export default function DataManagement() { const exportBinance = () => { const items = bnResult?.data || bnResult?.items if (!items) return - const filename = `binance_${bnResult.data_type}_${bnAccountId}.json` - downloadJson({ total: bnResult.total, data_type: bnResult.data_type, data: items }, filename) + const filename = `binance_${bnResult.data_type}_${bnAccountId}_${new Date().toISOString().slice(0, 10)}.json` + const payload = { + meta: { total: bnResult.total, data_type: bnResult.data_type, symbols_queried: bnResult.symbols_queried, export_time: new Date().toISOString() }, + stats: bnResult.stats || {}, + data: items, + } + downloadJson(payload, filename) } + const bnData = bnResult?.data || bnResult?.items || [] + const bnStats = bnResult?.stats || {} + const isTrades = bnResult?.data_type === 'trades' + const today = new Date().toISOString().slice(0, 10) return ( @@ -192,9 +201,9 @@ export default function DataManagement() { )} - {/* 2. 从币安拉取 */} + {/* 2. 币安订单/成交(从 DB 查询,由定时任务 scripts/sync_binance_orders.py 同步) */}
-

从币安拉取

+

币安订单/成交查询

{bnError &&
{bnError}
} {bnResult && (
- 共 {bnResult.total} 条 - + 共 {bnResult.total} 条 · 查询 {bnResult.symbols_queried ?? '-'} 个交易对 + {bnResult.source === 'db' && 来自 DB(定时同步)} +
+ + {Object.keys(bnStats).length > 0 && ( +
+

统计数据

+
+ {isTrades && bnStats.total_realized_pnl !== undefined && ( + <> +
+ 总已实现盈亏 + = 0 ? 'dm-stat-profit' : 'dm-stat-loss'}>{bnStats.total_realized_pnl} +
+
+ 总手续费 + {bnStats.total_commission} +
+
+ 净盈亏 + = 0 ? 'dm-stat-profit' : 'dm-stat-loss'}>{bnStats.net_pnl} +
+
+ 胜 / 负 + {bnStats.win_count} / {bnStats.loss_count} +
+
+ 胜率 + {bnStats.win_rate}% +
+
+ 笔均盈亏 + {bnStats.avg_pnl_per_trade} +
+ {bnStats.total_quote_qty !== undefined && ( +
+ 总成交额(quoteQty) + {bnStats.total_quote_qty} +
+ )} + {bnStats.maker_count !== undefined && ( +
+ Maker / Taker + {bnStats.maker_count} / {bnStats.taker_count} +
+ )} + {bnStats.by_symbol && Object.keys(bnStats.by_symbol).length > 0 && ( +
+ 按交易对 +
+ {Object.entries(bnStats.by_symbol).map(([sym, v]) => ( + + {sym}: {typeof v === 'object' ? `${v.count}笔, 盈亏${v.pnl}, 成交额${v.quoteQty || v.commission || '-'}` : `${v}笔`} + + ))} +
+
+ )} + + )} + {!isTrades && (bnStats.by_status || bnStats.filled_count !== undefined) && ( + <> + {bnStats.filled_count !== undefined && ( +
+ 已成交笔数 + {bnStats.filled_count} +
+ )} +
+ 按状态 + {bnStats.by_status ? Object.entries(bnStats.by_status).map(([k, v]) => `${k}:${v}`).join(', ') : '-'} +
+ {bnStats.by_type && ( +
+ 按类型 + {Object.entries(bnStats.by_type).map(([k, v]) => `${k}:${v}`).join(', ')} +
+ )} + {bnStats.by_symbol && Object.keys(bnStats.by_symbol).length > 0 && ( +
+ 按交易对 +
+ {Object.entries(bnStats.by_symbol).slice(0, 20).map(([sym, v]) => ( + {sym}:{v}笔 + ))} + {Object.keys(bnStats.by_symbol).length > 20 && ...等{Object.keys(bnStats.by_symbol).length}个} +
+
+ )} + + )} +
+
+ )} +
+ - - - + {isTrades ? ( + <> + + + + + + + + + + ) : ( + <> + + + + + + + + + + )} - {((bnResult.data || bnResult.items) || []).slice(0, 100).map((r, i) => ( - + {bnData.slice(0, 100).map((r, i) => ( + + - - - + {isTrades ? ( + <> + + + + + + + + + + ) : ( + <> + + + + + + + + + + )} ))}
交易对id orderId sidepriceqtyrealizedPnlpositionSidepriceqtyquoteQtyrealizedPnlcommissionbuyermakertypestatuspriceorigQtyexecutedQtyavgPricecumQuotereduceOnlytime
{r._symbol || r.symbol}{r.id || '-'} {r.orderId} {r.side}{r.price}{r.qty}{r.realizedPnl}{r.positionSide || '-'}{r.price}{r.qty}{r.quoteQty}= 0 ? 'dm-profit' : 'dm-loss'}>{r.realizedPnl}{r.commission}{r.buyer ? '买' : '卖'}{r.maker ? '是' : '否'}{r.type || r.origType}{r.status}{r.price}{r.origQty}{r.executedQty}{r.avgPrice}{r.cumQuote || '-'}{r.reduceOnly ? '是' : '-'}{r.time ? new Date(r.time).toLocaleString() : r.updateTime ? new Date(r.updateTime).toLocaleString() : '-'}
- {((bnResult.data || bnResult.items)?.length || 0) > 100 && ( -
仅显示前 100 条,共 {bnResult.total} 条。导出可获取全部。
+ {bnData.length > 100 && ( +
仅显示前 100 条,共 {bnResult.total} 条。导出 JSON 含原始全部字段(symbol, id, orderId, side, price, qty, quoteQty, realizedPnl, commission 等)与统计。
)}
diff --git a/scripts/SYNC_BINANCE_README.md b/scripts/SYNC_BINANCE_README.md new file mode 100644 index 0000000..0a93612 --- /dev/null +++ b/scripts/SYNC_BINANCE_README.md @@ -0,0 +1,42 @@ +# 币安订单同步 + +## 1. 建表 + +首次使用需执行 SQL 建表: + +```bash +mysql -u user -p auto_trade_sys < backend/database/add_binance_sync_tables.sql +``` + +## 2. 定时任务脚本 + +`sync_binance_orders.py` 从币安拉取各账号最近 6 小时的订单/成交,去重写入 DB。 + +```bash +# 同步所有有效账号 +python scripts/sync_binance_orders.py + +# 指定账号 +python scripts/sync_binance_orders.py -a 2 + +# 拉取最近 12 小时 +python scripts/sync_binance_orders.py --hours 12 +``` + +## 3. Crontab 配置示例 + +每 3 小时执行一次(与 6 小时拉取窗口重叠,便于去重): + +```cron +0 */3 * * * cd /path/to/auto_trade_sys && /path/to/.venv/bin/python scripts/sync_binance_orders.py >> logs/sync_binance.log 2>&1 +``` + +或每 6 小时: + +```cron +0 */6 * * * cd /path/to/auto_trade_sys && /path/to/.venv/bin/python scripts/sync_binance_orders.py >> logs/sync_binance.log 2>&1 +``` + +## 4. 数据管理 + +管理后台「数据管理」-「币安订单/成交查询」从 DB 读取,不再调用币安 API。 diff --git a/scripts/sync_binance_orders.py b/scripts/sync_binance_orders.py new file mode 100644 index 0000000..5564e70 --- /dev/null +++ b/scripts/sync_binance_orders.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +定时任务:从币安拉取各账号最近 6 小时的订单/成交数据,去重写入 DB。 +供 crontab 定时执行,如: 0 */3 * * * cd /path/to/project && python scripts/sync_binance_orders.py + +用法: + python scripts/sync_binance_orders.py # 所有有效账号,最近 6 小时 + python scripts/sync_binance_orders.py -a 2 # 指定账号 + python scripts/sync_binance_orders.py -h 12 # 拉取最近 12 小时 +""" +import argparse +import asyncio +import os +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path + +proj = Path(__file__).resolve().parent.parent +if (proj / "backend").exists(): + sys.path.insert(0, str(proj / "backend")) +sys.path.insert(0, str(proj)) + +BEIJING_TZ = timezone(timedelta(hours=8)) + + +async def _get_active_symbols(client, start_ms: int, end_ms: int) -> list: + try: + symbols = set() + current_end = end_ms + for _ in range(10): + rows = await client.client.futures_income_history( + startTime=start_ms, + endTime=current_end, + limit=1000, + recvWindow=20000, + ) + if not rows: + break + for r in rows: + sym = (r.get("symbol") or "").strip() + if sym and sym.endswith("USDT"): + symbols.add(sym) + if len(rows) < 1000: + break + oldest = min(r.get("time", current_end) for r in rows) + current_end = oldest - 1 + if current_end < start_ms: + break + await asyncio.sleep(0.15) + return sorted(symbols) + except Exception: + return [] + + +async def sync_account(account_id: int, hours: int = 6) -> tuple: + """同步单个账号的 trades 和 orders,返回 (trades_ins, orders_ins, err)""" + from database.models import Account + from database.connection import db + from trading_system.binance_client import BinanceClient + + api_key, api_secret, use_testnet, _ = Account.get_credentials(account_id) + if not api_key or not api_secret: + return 0, 0, "未配置 API 密钥" + + now = datetime.now(BEIJING_TZ) + end_ms = int(now.timestamp() * 1000) + start_ms = end_ms - hours * 3600 * 1000 + + client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet) + try: + await client.connect() + except Exception as e: + return 0, 0, str(e) + + try: + sym_list = await _get_active_symbols(client, start_ms, end_ms) + if not sym_list: + sym_list = await client.get_all_usdt_pairs() + if not sym_list: + return 0, 0, "无法获取交易对列表" + + sem = asyncio.Semaphore(5) + + async def _fetch_trades(sym): + async with sem: + try: + rows = await client.client.futures_account_trades( + symbol=sym, + startTime=start_ms, + endTime=end_ms, + limit=1000, + recvWindow=20000, + ) + return rows or [] + except Exception: + return [] + finally: + await asyncio.sleep(0.12) + + async def _fetch_orders(sym): + async with sem: + try: + rows = await client.client.futures_get_all_orders( + symbol=sym, + startTime=start_ms, + endTime=end_ms, + limit=1000, + recvWindow=20000, + ) + return rows or [] + except Exception: + return [] + finally: + await asyncio.sleep(0.12) + + trades_chunks = await asyncio.gather(*[_fetch_trades(s) for s in sym_list]) + orders_chunks = await asyncio.gather(*[_fetch_orders(s) for s in sym_list]) + + all_trades = [] + for sym, rows in zip(sym_list, trades_chunks): + for r in rows: + r["_symbol"] = sym + all_trades.append(r) + + all_orders = [] + for sym, rows in zip(sym_list, orders_chunks): + for r in rows: + r["_symbol"] = sym + all_orders.append(r) + + # 写入 DB(INSERT IGNORE 去重) + trade_sql = """INSERT IGNORE INTO binance_trades + (account_id, symbol, trade_id, order_id, side, position_side, price, qty, quote_qty, + realized_pnl, commission, commission_asset, buyer, maker, trade_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" + trade_params = [] + for r in all_trades: + sym = r.get("_symbol") or r.get("symbol") + trade_params.append(( + account_id, + sym, + int(r.get("id", 0) or 0), + int(r.get("orderId", 0) or 0), + r.get("side") or "", + r.get("positionSide") or None, + float(r.get("price", 0) or 0), + float(r.get("qty", 0) or 0), + float(r.get("quoteQty", 0) or 0) if r.get("quoteQty") else None, + float(r.get("realizedPnl", 0) or 0) if r.get("realizedPnl") is not None else None, + float(r.get("commission", 0) or 0) if r.get("commission") is not None else None, + r.get("commissionAsset") or None, + 1 if r.get("buyer") else 0, + 1 if r.get("maker") else 0, + int(r.get("time", 0) or 0), + )) + + order_sql = """INSERT IGNORE INTO binance_orders + (account_id, symbol, order_id, client_order_id, side, type, orig_type, status, + price, avg_price, orig_qty, executed_qty, cum_qty, cum_quote, stop_price, + reduce_only, position_side, order_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" + order_params = [] + for r in all_orders: + sym = r.get("_symbol") or r.get("symbol") + order_params.append(( + account_id, + sym, + int(r.get("orderId", 0) or 0), + r.get("clientOrderId") or None, + r.get("side") or "", + r.get("type") or None, + r.get("origType") or None, + r.get("status") or "", + float(r.get("price", 0) or 0) if r.get("price") else None, + float(r.get("avgPrice", 0) or 0) if r.get("avgPrice") else None, + float(r.get("origQty", 0) or 0) if r.get("origQty") else None, + float(r.get("executedQty", 0) or 0) if r.get("executedQty") else None, + float(r.get("cumQty", 0) or 0) if r.get("cumQty") else None, + float(r.get("cumQuote", 0) or 0) if r.get("cumQuote") else None, + float(r.get("stopPrice", 0) or 0) if r.get("stopPrice") else None, + 1 if r.get("reduceOnly") else 0, + r.get("positionSide") or None, + int(r.get("time", 0) or 0), + int(r.get("updateTime", 0) or 0) if r.get("updateTime") else None, + )) + + trades_ins = 0 + orders_ins = 0 + if trade_params: + try: + db.execute_many(trade_sql, trade_params) + trades_ins = len(trade_params) # INSERT IGNORE 无法直接得到实际插入数,这里用传入数近似 + except Exception as e: + return 0, 0, f"写入 trades 失败: {e}" + if order_params: + try: + db.execute_many(order_sql, order_params) + orders_ins = len(order_params) + except Exception as e: + return len(trade_params), 0, f"写入 orders 失败: {e}" + + return len(trade_params), len(order_params), None + finally: + if client.client: + await client.client.close_connection() + + +def main(): + parser = argparse.ArgumentParser(description="同步币安订单/成交到 DB(供 crontab 定时执行)") + parser.add_argument("-a", "--account", type=int, default=None, help="指定账号 ID,不传则同步所有有效账号") + parser.add_argument("--hours", type=int, default=6, help="拉取最近 N 小时,默认 6") + args = parser.parse_args() + hours = args.hours + + try: + from database.models import Account + except ImportError as e: + print(f"导入失败: {e}") + sys.exit(1) + + rows = Account.list_all() + accounts = [r for r in (rows or []) if (r.get("status") or "active").lower() == "active" and r.get("id")] + if args.account: + accounts = [a for a in accounts if a["id"] == args.account] + if not accounts: + print("无有效账号") + sys.exit(0) + + print(f"同步 {len(accounts)} 个账号,最近 {hours} 小时,开始时间 {datetime.now(BEIJING_TZ).isoformat()}") + + async def run_all(): + for acc in accounts: + aid = acc["id"] + name = acc.get("name") or f"账号{aid}" + tr, ord_cnt, err = await sync_account(aid, hours) + if err: + print(f" {name} (id={aid}): 失败 {err}") + else: + print(f" {name} (id={aid}): trades {tr} 条, orders {ord_cnt} 条") + + asyncio.run(run_all()) + print("同步完成") + + +if __name__ == "__main__": + main()