auto_trade_sys/trading_system/binance_order_event_logger.py
薇薇安 fa7208f5f3 feat(binance_order_event_logger, user_data_stream): 优化算法更新字段兼容性
在 `binance_order_event_logger.py` 和 `user_data_stream.py` 中更新了 ALGO_UPDATE 事件的字段处理逻辑,增强了对不同字段名的兼容性,确保在获取算法 ID 和客户端算法 ID 时能够正确解析多种可能的字段。这一改进提升了系统在处理算法订单时的灵活性与准确性。
2026-02-21 22:45:40 +08:00

87 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安订单推送事件日志:将 ORDER_TRADE_UPDATE / ALGO_UPDATE 等写入日志文件,便于与 DB 对账。
每行一条 JSON含 event_type、account_id、时间、关键字段方便 grep / 脚本分析。
"""
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_LOG_DIR: Optional[Path] = None
_LOG_PATH: Optional[Path] = None
def _get_log_path() -> Optional[Path]:
global _LOG_DIR, _LOG_PATH
if _LOG_PATH is not None:
return _LOG_PATH
try:
root = Path(__file__).resolve().parent.parent
_LOG_DIR = root / "logs"
_LOG_DIR.mkdir(parents=True, exist_ok=True)
_LOG_PATH = _LOG_DIR / "binance_order_events.log"
return _LOG_PATH
except Exception as e:
logger.debug(f"binance_order_event_logger 无法创建日志路径: {e}")
return None
def _default_serializer(obj):
if hasattr(obj, "isoformat"):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def log_order_event(account_id: int, event_type: str, event_time_ms: Optional[int], data: Dict[str, Any]) -> None:
"""
将币安订单推送事件写入日志文件(每行一条 JSON
Args:
account_id: 账号 ID
event_type: ORDER_TRADE_UPDATE / ALGO_UPDATE 等
event_time_ms: 事件时间(毫秒)
data: 原始 payload如 o 或 a 部分),会提取关键字段
"""
path = _get_log_path()
if not path:
return
try:
# 提取关键字段便于对账
row = {
"ts": int(time.time() * 1000),
"event_type": event_type,
"account_id": account_id,
"E": event_time_ms,
}
if event_type == "ORDER_TRADE_UPDATE" and data:
o = data
row["symbol"] = (o.get("s") or "").strip()
row["orderId"] = o.get("i")
row["clientOrderId"] = (o.get("c") or "").strip() or None
row["event"] = (o.get("x") or "").strip() # NEW/TRADE/CANCELED
row["status"] = (o.get("X") or "").strip() # NEW/FILLED/CANCELED
row["reduceOnly"] = o.get("R") is True
if row["status"] == "FILLED":
row["avgPrice"] = o.get("ap")
row["executedQty"] = o.get("z")
row["realizedPnl"] = o.get("rp")
elif event_type == "ALGO_UPDATE" and data:
o = data
row["symbol"] = (o.get("s") or "").strip()
# 币安文档aid=Algo Id兼容 i/algoIdcaid=Client Algo Id兼容 c/clientAlgoId
row["algoId"] = o.get("aid") or o.get("i") or o.get("algoId")
row["clientAlgoId"] = (o.get("caid") or o.get("c") or o.get("clientAlgoId") or "").strip() or None
row["algoStatus"] = (o.get("X") or "").strip() # TRIGGERED/FINISHED
row["triggeredOrderId"] = o.get("ai") or o.get("triggeredOrderId") or None
else:
row["raw"] = data
line = json.dumps(row, ensure_ascii=False, default=_default_serializer) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
except Exception as e:
logger.debug(f"binance_order_event_logger 写入失败: {e}")