feat(user_data_stream): 增强订单和算法更新事件的日志记录

在 `user_data_stream.py` 中为 `ORDER_TRADE_UPDATE` 和 `ALGO_UPDATE` 事件添加了日志记录功能,确保在接收到相关推送时能够记录事件信息。这一改进提升了系统的可追踪性和调试能力。
This commit is contained in:
薇薇安 2026-02-21 17:09:41 +08:00
parent e1759a7f4c
commit fc6c31dd5d
4 changed files with 298 additions and 0 deletions

View File

@ -0,0 +1,103 @@
# DB 与币安订单对账说明
## 一、查询系统今日落入 DB 的单子
### 1. 命令行脚本(推荐)
```bash
# 今日、默认账号、按创建时间(落库时间)
python scripts/query_trades_today.py
# 指定账号
python scripts/query_trades_today.py --account 2
# 指定日期
python scripts/query_trades_today.py --date 2026-02-21
# 按入场时间筛选
python scripts/query_trades_today.py --time-filter entry
# 仅可对账记录(有开仓/平仓订单号)
python scripts/query_trades_today.py --reconciled-only
# 导出到 JSON 文件
python scripts/query_trades_today.py -o today_trades.json
```
### 2. API 接口
```
GET /api/trades?period=today&time_filter=created&reconciled_only=false
```
- `period=today`:今天
- `time_filter=created`:按创建时间(落库时间),便于对照「何时写入 DB」
- `time_filter=entry`:按入场时间
- `time_filter=exit`:按平仓时间
- `reconciled_only=false`:包含所有记录(含取消、无订单号)
### 3. 前端导出
交易记录页面 → 选择「今天」→ 导出 JSON / Excel。
---
## 二、币安订单推送日志
系统会将收到的 **ORDER_TRADE_UPDATE**、**ALGO_UPDATE** 写入日志,便于与 DB 对照。
### 日志路径
```
{项目根}/logs/binance_order_events.log
```
### 格式
每行一条 JSON例如
```json
{"ts":1737500000000,"event_type":"ORDER_TRADE_UPDATE","account_id":1,"E":1737500000123,"symbol":"BTCUSDT","orderId":123456,"clientOrderId":"SYS_1737500000_abcd","event":"TRADE","status":"FILLED","reduceOnly":false,"avgPrice":"95000","executedQty":"0.01","realizedPnl":"0"}
```
### 字段说明
| 字段 | 说明 |
|------|------|
| ts | 本机接收时间戳 |
| event_type | ORDER_TRADE_UPDATE / ALGO_UPDATE |
| account_id | 账号 ID |
| E | 币安事件时间(毫秒) |
| symbol | 交易对 |
| orderId | 币安订单号 |
| clientOrderId | 自定义订单号(系统前缀) |
| event | NEW/TRADE/CANCELED |
| status | NEW/FILLED/CANCELED 等 |
| reduceOnly | 是否只减仓(平仓单) |
| avgPrice/executedQty | 成交价/成交量FILLED 时) |
| realizedPnl | 实现盈亏(平仓时) |
| algoId/triggeredOrderId | ALGO_UPDATE 专用 |
### 对账用法
```bash
# 查看今天收到的所有订单推送
grep "ORDER_TRADE_UPDATE" logs/binance_order_events.log
# 查看 FILLED 成交
grep '"status":"FILLED"' logs/binance_order_events.log
# 按 clientOrderId 对照
grep "SYS_1737500000_abcd" logs/binance_order_events.log
```
---
## 三、对账流程建议
1. **查 DB 今日记录**`python scripts/query_trades_today.py -o db_today.json`
2. **查币安推送日志**`tail -f logs/binance_order_events.log` 或 `grep "ORDER_TRADE_UPDATE" logs/binance_order_events.log`
3. **对照**:用 `clientOrderId``orderId` 关联 DB 记录与推送日志,确认:
- DB 有 pending 且收到 FILLED 推送 → 应更新为 open
- DB 有 open 且收到 reduceOnly FILLED → 应更新 exit_order_id
- 收到推送但 DB 无对应记录 → 可能漏建或为手动单

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
查询今日落入 DB 的交易记录按创建时间 created_at便于与币安对账时对照
用法:
python scripts/query_trades_today.py # 今日,默认账号
python scripts/query_trades_today.py --account 2 # 账号 2
python scripts/query_trades_today.py --date 2026-02-21 # 指定日期
python scripts/query_trades_today.py --time-filter entry # 按入场时间
python scripts/query_trades_today.py -o today_trades.json # 导出 JSON
"""
import argparse
import json
import os
import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta
# 添加 backend 到路径
backend = Path(__file__).resolve().parent.parent / "backend"
if backend.exists():
sys.path.insert(0, str(backend))
BEIJING_TZ = timezone(timedelta(hours=8))
def get_today_range(date_str: str = None):
"""返回当日 00:00 和 23:59:59 的 Unix 时间戳"""
if date_str:
try:
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=BEIJING_TZ)
except ValueError:
dt = datetime.now(BEIJING_TZ)
else:
dt = datetime.now(BEIJING_TZ)
start = dt.replace(hour=0, minute=0, second=0, microsecond=0)
end = dt.replace(hour=23, minute=59, second=59, microsecond=999999)
return int(start.timestamp()), int(end.timestamp())
def main():
parser = argparse.ArgumentParser(description="查询今日落入 DB 的交易记录")
parser.add_argument("--account", "-a", type=int, default=None, help="账号 ID默认从 ATS_ACCOUNT_ID 或 1")
parser.add_argument("--date", "-d", type=str, default=None, help="日期 YYYY-MM-DD默认今天")
parser.add_argument("--time-filter", "-t", choices=["created", "entry", "exit"], default="created",
help="时间筛选created=创建时间(落库), entry=入场时间, exit=平仓时间")
parser.add_argument("--reconciled-only", action="store_true", help="仅可对账记录")
parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件")
args = parser.parse_args()
account_id = args.account
if account_id is None:
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
start_ts, end_ts = get_today_range(args.date)
date_label = args.date or datetime.now(BEIJING_TZ).strftime("%Y-%m-%d")
try:
from database.models import Trade
except ImportError as e:
print(f"无法导入 Trade 模型: {e}")
print("请确保在项目根目录运行,且 backend 可访问")
sys.exit(1)
trades = Trade.get_all(
start_timestamp=start_ts,
end_timestamp=end_ts,
symbol=None,
status=None,
account_id=account_id,
time_filter=args.time_filter,
limit=2000,
reconciled_only=args.reconciled_only,
include_sync=True,
)
# 序列化 datetime
def _serialize(obj):
if hasattr(obj, "isoformat"):
return obj.isoformat()
return obj
out = []
for t in trades:
row = dict(t)
for k, v in row.items():
if hasattr(v, "isoformat"):
row[k] = v.isoformat()
out.append(row)
print(f"账号 {account_id} | 日期 {date_label} | 按{args.time_filter} | 共 {len(out)}")
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(out, f, ensure_ascii=False, indent=2)
print(f"已导出到 {args.output}")
else:
print(json.dumps(out, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,84 @@
"""
币安订单推送事件日志 ORDER_TRADE_UPDATE / ALGO_UPDATE 等写入日志文件便于与 DB 对账
每行一条 JSON event_typeaccount_id时间关键字段方便 grep / 脚本分析
"""
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_LOG_DIR: Optional[Path] = None
_LOG_PATH: Optional[Path] = None
def _get_log_path() -> Optional[Path]:
global _LOG_DIR, _LOG_PATH
if _LOG_PATH is not None:
return _LOG_PATH
try:
root = Path(__file__).resolve().parent.parent
_LOG_DIR = root / "logs"
_LOG_DIR.mkdir(parents=True, exist_ok=True)
_LOG_PATH = _LOG_DIR / "binance_order_events.log"
return _LOG_PATH
except Exception as e:
logger.debug(f"binance_order_event_logger 无法创建日志路径: {e}")
return None
def _default_serializer(obj):
if hasattr(obj, "isoformat"):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def log_order_event(account_id: int, event_type: str, event_time_ms: Optional[int], data: Dict[str, Any]) -> None:
"""
将币安订单推送事件写入日志文件每行一条 JSON
Args:
account_id: 账号 ID
event_type: ORDER_TRADE_UPDATE / ALGO_UPDATE
event_time_ms: 事件时间毫秒
data: 原始 payload o a 部分会提取关键字段
"""
path = _get_log_path()
if not path:
return
try:
# 提取关键字段便于对账
row = {
"ts": int(time.time() * 1000),
"event_type": event_type,
"account_id": account_id,
"E": event_time_ms,
}
if event_type == "ORDER_TRADE_UPDATE" and data:
o = data
row["symbol"] = (o.get("s") or "").strip()
row["orderId"] = o.get("i")
row["clientOrderId"] = (o.get("c") or "").strip() or None
row["event"] = (o.get("x") or "").strip() # NEW/TRADE/CANCELED
row["status"] = (o.get("X") or "").strip() # NEW/FILLED/CANCELED
row["reduceOnly"] = o.get("R") is True
if row["status"] == "FILLED":
row["avgPrice"] = o.get("ap")
row["executedQty"] = o.get("z")
row["realizedPnl"] = o.get("rp")
elif event_type == "ALGO_UPDATE" and data:
o = data
row["symbol"] = (o.get("s") or "").strip()
row["algoId"] = o.get("i")
row["algoStatus"] = (o.get("X") or "").strip() # TRIGGERED/FINISHED
row["triggeredOrderId"] = o.get("ai") # 触发后的普通订单 id
else:
row["raw"] = data
line = json.dumps(row, ensure_ascii=False, default=_default_serializer) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
except Exception as e:
logger.debug(f"binance_order_event_logger 写入失败: {e}")

View File

@ -463,12 +463,22 @@ class UserDataStream:
return True
if e == "ORDER_TRADE_UPDATE":
logger.debug(f"UserDataStream: 收到 ORDER_TRADE_UPDATE 推送")
try:
from .binance_order_event_logger import log_order_event
log_order_event(self.account_id, "ORDER_TRADE_UPDATE", data.get("E"), data.get("o") or {})
except Exception:
pass
self._on_order_trade_update(data.get("o") or {}, event_time_ms=data.get("E"))
elif e == "ACCOUNT_UPDATE":
logger.debug(f"UserDataStream: 收到 ACCOUNT_UPDATE 推送")
self._on_account_update(data.get("a") or {})
elif e == "ALGO_UPDATE":
logger.debug(f"UserDataStream: 收到 ALGO_UPDATE 推送")
try:
from .binance_order_event_logger import log_order_event
log_order_event(self.account_id, "ALGO_UPDATE", data.get("E"), data.get("o") or {})
except Exception:
pass
self._on_algo_update(data.get("o") or {})
else:
logger.debug(f"UserDataStream: 收到未知事件类型: {e}")