From fc6c31dd5df643f903af50d178ef95a66db205da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Sat, 21 Feb 2026 17:09:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(user=5Fdata=5Fstream):=20=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E8=AE=A2=E5=8D=95=E5=92=8C=E7=AE=97=E6=B3=95=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E4=BA=8B=E4=BB=B6=E7=9A=84=E6=97=A5=E5=BF=97=E8=AE=B0?= =?UTF-8?q?=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 `user_data_stream.py` 中为 `ORDER_TRADE_UPDATE` 和 `ALGO_UPDATE` 事件添加了日志记录功能,确保在接收到相关推送时能够记录事件信息。这一改进提升了系统的可追踪性和调试能力。 --- docs/DB与币安订单对账说明.md | 103 +++++++++++++++++++ scripts/query_trades_today.py | 101 ++++++++++++++++++ trading_system/binance_order_event_logger.py | 84 +++++++++++++++ trading_system/user_data_stream.py | 10 ++ 4 files changed, 298 insertions(+) create mode 100644 docs/DB与币安订单对账说明.md create mode 100644 scripts/query_trades_today.py create mode 100644 trading_system/binance_order_event_logger.py diff --git a/docs/DB与币安订单对账说明.md b/docs/DB与币安订单对账说明.md new file mode 100644 index 0000000..54de34b --- /dev/null +++ b/docs/DB与币安订单对账说明.md @@ -0,0 +1,103 @@ +# DB 与币安订单对账说明 + +## 一、查询系统今日落入 DB 的单子 + +### 1. 命令行脚本(推荐) + +```bash +# 今日、默认账号、按创建时间(落库时间) +python scripts/query_trades_today.py + +# 指定账号 +python scripts/query_trades_today.py --account 2 + +# 指定日期 +python scripts/query_trades_today.py --date 2026-02-21 + +# 按入场时间筛选 +python scripts/query_trades_today.py --time-filter entry + +# 仅可对账记录(有开仓/平仓订单号) +python scripts/query_trades_today.py --reconciled-only + +# 导出到 JSON 文件 +python scripts/query_trades_today.py -o today_trades.json +``` + +### 2. API 接口 + +``` +GET /api/trades?period=today&time_filter=created&reconciled_only=false +``` + +- `period=today`:今天 +- `time_filter=created`:按创建时间(落库时间),便于对照「何时写入 DB」 +- `time_filter=entry`:按入场时间 +- `time_filter=exit`:按平仓时间 +- `reconciled_only=false`:包含所有记录(含取消、无订单号) + +### 3. 前端导出 + +交易记录页面 → 选择「今天」→ 导出 JSON / Excel。 + +--- + +## 二、币安订单推送日志 + +系统会将收到的 **ORDER_TRADE_UPDATE**、**ALGO_UPDATE** 写入日志,便于与 DB 对照。 + +### 日志路径 + +``` +{项目根}/logs/binance_order_events.log +``` + +### 格式 + +每行一条 JSON,例如: + +```json +{"ts":1737500000000,"event_type":"ORDER_TRADE_UPDATE","account_id":1,"E":1737500000123,"symbol":"BTCUSDT","orderId":123456,"clientOrderId":"SYS_1737500000_abcd","event":"TRADE","status":"FILLED","reduceOnly":false,"avgPrice":"95000","executedQty":"0.01","realizedPnl":"0"} +``` + +### 字段说明 + +| 字段 | 说明 | +|------|------| +| ts | 本机接收时间戳 | +| event_type | ORDER_TRADE_UPDATE / ALGO_UPDATE | +| account_id | 账号 ID | +| E | 币安事件时间(毫秒) | +| symbol | 交易对 | +| orderId | 币安订单号 | +| clientOrderId | 自定义订单号(系统前缀) | +| event | NEW/TRADE/CANCELED | +| status | NEW/FILLED/CANCELED 等 | +| reduceOnly | 是否只减仓(平仓单) | +| avgPrice/executedQty | 成交价/成交量(FILLED 时) | +| realizedPnl | 实现盈亏(平仓时) | +| algoId/triggeredOrderId | ALGO_UPDATE 专用 | + +### 对账用法 + +```bash +# 查看今天收到的所有订单推送 +grep "ORDER_TRADE_UPDATE" logs/binance_order_events.log + +# 查看 FILLED 成交 +grep '"status":"FILLED"' logs/binance_order_events.log + +# 按 clientOrderId 对照 +grep "SYS_1737500000_abcd" logs/binance_order_events.log +``` + +--- + +## 三、对账流程建议 + +1. **查 DB 今日记录**:`python scripts/query_trades_today.py -o db_today.json` +2. **查币安推送日志**:`tail -f logs/binance_order_events.log` 或 `grep "ORDER_TRADE_UPDATE" logs/binance_order_events.log` +3. **对照**:用 `clientOrderId` 或 `orderId` 关联 DB 记录与推送日志,确认: + - DB 有 pending 且收到 FILLED 推送 → 应更新为 open + - DB 有 open 且收到 reduceOnly FILLED → 应更新 exit_order_id + - 收到推送但 DB 无对应记录 → 可能漏建或为手动单 diff --git a/scripts/query_trades_today.py b/scripts/query_trades_today.py new file mode 100644 index 0000000..9ae0eb7 --- /dev/null +++ b/scripts/query_trades_today.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +查询今日落入 DB 的交易记录(按创建时间 created_at,便于与币安对账时对照)。 + +用法: + python scripts/query_trades_today.py # 今日,默认账号 + python scripts/query_trades_today.py --account 2 # 账号 2 + python scripts/query_trades_today.py --date 2026-02-21 # 指定日期 + python scripts/query_trades_today.py --time-filter entry # 按入场时间 + python scripts/query_trades_today.py -o today_trades.json # 导出 JSON +""" +import argparse +import json +import os +import sys +from pathlib import Path +from datetime import datetime, timezone, timedelta + +# 添加 backend 到路径 +backend = Path(__file__).resolve().parent.parent / "backend" +if backend.exists(): + sys.path.insert(0, str(backend)) + +BEIJING_TZ = timezone(timedelta(hours=8)) + + +def get_today_range(date_str: str = None): + """返回当日 00:00 和 23:59:59 的 Unix 时间戳""" + if date_str: + try: + dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=BEIJING_TZ) + except ValueError: + dt = datetime.now(BEIJING_TZ) + else: + dt = datetime.now(BEIJING_TZ) + start = dt.replace(hour=0, minute=0, second=0, microsecond=0) + end = dt.replace(hour=23, minute=59, second=59, microsecond=999999) + return int(start.timestamp()), int(end.timestamp()) + + +def main(): + parser = argparse.ArgumentParser(description="查询今日落入 DB 的交易记录") + parser.add_argument("--account", "-a", type=int, default=None, help="账号 ID,默认从 ATS_ACCOUNT_ID 或 1") + parser.add_argument("--date", "-d", type=str, default=None, help="日期 YYYY-MM-DD,默认今天") + parser.add_argument("--time-filter", "-t", choices=["created", "entry", "exit"], default="created", + help="时间筛选:created=创建时间(落库), entry=入场时间, exit=平仓时间") + parser.add_argument("--reconciled-only", action="store_true", help="仅可对账记录") + parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件") + args = parser.parse_args() + + account_id = args.account + if account_id is None: + account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") + + start_ts, end_ts = get_today_range(args.date) + date_label = args.date or datetime.now(BEIJING_TZ).strftime("%Y-%m-%d") + + try: + from database.models import Trade + except ImportError as e: + print(f"无法导入 Trade 模型: {e}") + print("请确保在项目根目录运行,且 backend 可访问") + sys.exit(1) + + trades = Trade.get_all( + start_timestamp=start_ts, + end_timestamp=end_ts, + symbol=None, + status=None, + account_id=account_id, + time_filter=args.time_filter, + limit=2000, + reconciled_only=args.reconciled_only, + include_sync=True, + ) + + # 序列化 datetime + def _serialize(obj): + if hasattr(obj, "isoformat"): + return obj.isoformat() + return obj + + out = [] + for t in trades: + row = dict(t) + for k, v in row.items(): + if hasattr(v, "isoformat"): + row[k] = v.isoformat() + out.append(row) + + print(f"账号 {account_id} | 日期 {date_label} | 按{args.time_filter} | 共 {len(out)} 条") + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + json.dump(out, f, ensure_ascii=False, indent=2) + print(f"已导出到 {args.output}") + else: + print(json.dumps(out, ensure_ascii=False, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/trading_system/binance_order_event_logger.py b/trading_system/binance_order_event_logger.py new file mode 100644 index 0000000..56bef3c --- /dev/null +++ b/trading_system/binance_order_event_logger.py @@ -0,0 +1,84 @@ +""" +币安订单推送事件日志:将 ORDER_TRADE_UPDATE / ALGO_UPDATE 等写入日志文件,便于与 DB 对账。 +每行一条 JSON,含 event_type、account_id、时间、关键字段,方便 grep / 脚本分析。 +""" +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + +_LOG_DIR: Optional[Path] = None +_LOG_PATH: Optional[Path] = None + + +def _get_log_path() -> Optional[Path]: + global _LOG_DIR, _LOG_PATH + if _LOG_PATH is not None: + return _LOG_PATH + try: + root = Path(__file__).resolve().parent.parent + _LOG_DIR = root / "logs" + _LOG_DIR.mkdir(parents=True, exist_ok=True) + _LOG_PATH = _LOG_DIR / "binance_order_events.log" + return _LOG_PATH + except Exception as e: + logger.debug(f"binance_order_event_logger 无法创建日志路径: {e}") + return None + + +def _default_serializer(obj): + if hasattr(obj, "isoformat"): + return obj.isoformat() + raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") + + +def log_order_event(account_id: int, event_type: str, event_time_ms: Optional[int], data: Dict[str, Any]) -> None: + """ + 将币安订单推送事件写入日志文件(每行一条 JSON)。 + + Args: + account_id: 账号 ID + event_type: ORDER_TRADE_UPDATE / ALGO_UPDATE 等 + event_time_ms: 事件时间(毫秒) + data: 原始 payload(如 o 或 a 部分),会提取关键字段 + """ + path = _get_log_path() + if not path: + return + try: + # 提取关键字段便于对账 + row = { + "ts": int(time.time() * 1000), + "event_type": event_type, + "account_id": account_id, + "E": event_time_ms, + } + if event_type == "ORDER_TRADE_UPDATE" and data: + o = data + row["symbol"] = (o.get("s") or "").strip() + row["orderId"] = o.get("i") + row["clientOrderId"] = (o.get("c") or "").strip() or None + row["event"] = (o.get("x") or "").strip() # NEW/TRADE/CANCELED + row["status"] = (o.get("X") or "").strip() # NEW/FILLED/CANCELED + row["reduceOnly"] = o.get("R") is True + if row["status"] == "FILLED": + row["avgPrice"] = o.get("ap") + row["executedQty"] = o.get("z") + row["realizedPnl"] = o.get("rp") + elif event_type == "ALGO_UPDATE" and data: + o = data + row["symbol"] = (o.get("s") or "").strip() + row["algoId"] = o.get("i") + row["algoStatus"] = (o.get("X") or "").strip() # TRIGGERED/FINISHED + row["triggeredOrderId"] = o.get("ai") # 触发后的普通订单 id + else: + row["raw"] = data + line = json.dumps(row, ensure_ascii=False, default=_default_serializer) + "\n" + with open(path, "a", encoding="utf-8") as f: + f.write(line) + except Exception as e: + logger.debug(f"binance_order_event_logger 写入失败: {e}") diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index c593933..a5fd4c3 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -463,12 +463,22 @@ class UserDataStream: return True if e == "ORDER_TRADE_UPDATE": logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送") + try: + from .binance_order_event_logger import log_order_event + log_order_event(self.account_id, "ORDER_TRADE_UPDATE", data.get("E"), data.get("o") or {}) + except Exception: + pass self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E")) elif e == "ACCOUNT_UPDATE": logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送") self._on_account_update(data.get("a") or {}) elif e == "ALGO_UPDATE": logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送") + try: + from .binance_order_event_logger import log_order_event + log_order_event(self.account_id, "ALGO_UPDATE", data.get("E"), data.get("o") or {}) + except Exception: + pass self._on_algo_update(data.get("o") or {}) else: logger.debug(f"UserDataStream: 收到未知事件类型: {e}")