auto_trade_sys/scripts/query_trades_today.py
薇薇安 e40f5c797f 1
2026-02-21 17:14:03 +08:00

156 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
查询今日落入 DB 的交易记录(按创建时间 created_at便于与币安对账时对照
用法:
python scripts/query_trades_today.py # 今日,默认账号
python scripts/query_trades_today.py --account 2 # 账号 2
python scripts/query_trades_today.py --date 2026-02-21 # 指定日期
python scripts/query_trades_today.py --time-filter entry # 按入场时间
python scripts/query_trades_today.py -o today_trades.json # 导出 JSON
"""
import argparse
import json
import os
import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta
# 添加 backend 到路径
backend = Path(__file__).resolve().parent.parent / "backend"
if backend.exists():
sys.path.insert(0, str(backend))
BEIJING_TZ = timezone(timedelta(hours=8))
def _debug_db(account_id: int, start_ts: int, end_ts: int, time_filter: str, date_label: str):
"""无结果时输出诊断DB 概况、查询时间范围、是否有 trades 表 created_at 等"""
try:
from database.connection import db
# 数据库名
db_name = getattr(db, "database", "?")
print(f"\n[调试] 数据库: {db_name} (来自 DB_NAME 环境变量)")
print(f"[调试] 查询时间范围: {start_ts} ~ {end_ts} ({date_label} 00:00 ~ 23:59 北京)")
# 该账号总记录数(不限日期)
try:
r = db.execute_one(
"SELECT COUNT(*) as c FROM trades WHERE account_id = %s",
(account_id,),
)
total = r.get("c", 0) if r else 0
print(f"[调试] 账号 {account_id} trades 总记录数: {total}")
except Exception as e:
print(f"[调试] 查询总记录数失败: {e}")
# created_at / entry_time 范围
try:
r = db.execute_one(
"""SELECT MIN(created_at) as min_c, MAX(created_at) as max_c,
MIN(entry_time) as min_e, MAX(entry_time) as max_e
FROM trades WHERE account_id = %s""",
(account_id,),
)
if r:
for k, v in r.items():
if v is not None:
dt = datetime.fromtimestamp(int(v), tz=BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
print(f"[调试] {k}: {v} -> {dt}")
except Exception as e:
print(f"[调试] 查询时间范围失败(可能无 created_at): {e}")
# 按 entry 查当天是否有记录(辅助判断)
if time_filter == "created":
try:
r2 = db.execute_one(
"SELECT COUNT(*) as c FROM trades WHERE account_id = %s AND entry_time >= %s AND entry_time <= %s",
(account_id, start_ts, end_ts),
)
c2 = r2.get("c", 0) if r2 else 0
print(f"[调试] 同一天按 entry_time 筛选: {c2}")
except Exception:
pass
except ImportError as e:
print(f"[调试] 无法导入 database: {e}")
def get_today_range(date_str: str = None):
"""返回当日 00:00 和 23:59:59 的 Unix 时间戳"""
if date_str:
try:
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=BEIJING_TZ)
except ValueError:
dt = datetime.now(BEIJING_TZ)
else:
dt = datetime.now(BEIJING_TZ)
start = dt.replace(hour=0, minute=0, second=0, microsecond=0)
end = dt.replace(hour=23, minute=59, second=59, microsecond=999999)
return int(start.timestamp()), int(end.timestamp())
def main():
parser = argparse.ArgumentParser(description="查询今日落入 DB 的交易记录")
parser.add_argument("--account", "-a", type=int, default=None, help="账号 ID默认从 ATS_ACCOUNT_ID 或 1")
parser.add_argument("--date", "-d", type=str, default=None, help="日期 YYYY-MM-DD默认今天")
parser.add_argument("--time-filter", "-t", choices=["created", "entry", "exit"], default="created",
help="时间筛选created=创建时间(落库), entry=入场时间, exit=平仓时间")
parser.add_argument("--reconciled-only", action="store_true", help="仅可对账记录")
parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件")
parser.add_argument("--debug", action="store_true", help="无结果时输出诊断信息")
args = parser.parse_args()
account_id = args.account
if account_id is None:
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
start_ts, end_ts = get_today_range(args.date)
date_label = args.date or datetime.now(BEIJING_TZ).strftime("%Y-%m-%d")
try:
from database.models import Trade
except ImportError as e:
print(f"无法导入 Trade 模型: {e}")
print("请确保在项目根目录运行,且 backend 可访问")
sys.exit(1)
trades = Trade.get_all(
start_timestamp=start_ts,
end_timestamp=end_ts,
symbol=None,
status=None,
account_id=account_id,
time_filter=args.time_filter,
limit=2000,
reconciled_only=args.reconciled_only,
include_sync=True,
)
# 序列化 datetime
def _serialize(obj):
if hasattr(obj, "isoformat"):
return obj.isoformat()
return obj
out = []
for t in trades:
row = dict(t)
for k, v in row.items():
if hasattr(v, "isoformat"):
row[k] = v.isoformat()
out.append(row)
print(f"账号 {account_id} | 日期 {date_label} | 按{args.time_filter} | 共 {len(out)}")
if len(out) == 0:
if args.debug:
_debug_db(account_id, start_ts, end_ts, args.time_filter, date_label)
else:
print("提示: 加 --debug 可查看数据库概况与时间范围,便于排查")
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(out, f, ensure_ascii=False, indent=2)
print(f"已导出到 {args.output}")
else:
print(json.dumps(out, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()