#!/usr/bin/env python3 """ 查询今日落入 DB 的交易记录(按创建时间 created_at,便于与币安对账时对照)。 用法: python scripts/query_trades_today.py # 今日,默认账号 python scripts/query_trades_today.py --account 2 # 账号 2 python scripts/query_trades_today.py --date 2026-02-21 # 指定日期 python scripts/query_trades_today.py --time-filter entry # 按入场时间 python scripts/query_trades_today.py -o today_trades.json # 导出 JSON """ import argparse import json import os import sys from pathlib import Path from datetime import datetime, timezone, timedelta # 添加 backend 到路径 backend = Path(__file__).resolve().parent.parent / "backend" if backend.exists(): sys.path.insert(0, str(backend)) BEIJING_TZ = timezone(timedelta(hours=8)) def _debug_db(account_id: int, start_ts: int, end_ts: int, time_filter: str, date_label: str): """无结果时输出诊断:DB 概况、查询时间范围、是否有 trades 表 created_at 等""" try: from database.connection import db # 数据库名 db_name = getattr(db, "database", "?") print(f"\n[调试] 数据库: {db_name} (来自 DB_NAME 环境变量)") print(f"[调试] 查询时间范围: {start_ts} ~ {end_ts} ({date_label} 00:00 ~ 23:59 北京)") # 该账号总记录数(不限日期) try: r = db.execute_one( "SELECT COUNT(*) as c FROM trades WHERE account_id = %s", (account_id,), ) total = r.get("c", 0) if r else 0 print(f"[调试] 账号 {account_id} trades 总记录数: {total}") except Exception as e: print(f"[调试] 查询总记录数失败: {e}") # created_at / entry_time 范围 try: r = db.execute_one( """SELECT MIN(created_at) as min_c, MAX(created_at) as max_c, MIN(entry_time) as min_e, MAX(entry_time) as max_e FROM trades WHERE account_id = %s""", (account_id,), ) if r: for k, v in r.items(): if v is not None: dt = datetime.fromtimestamp(int(v), tz=BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S") print(f"[调试] {k}: {v} -> {dt}") except Exception as e: print(f"[调试] 查询时间范围失败(可能无 created_at): {e}") # 按 entry 查当天是否有记录(辅助判断) if time_filter == "created": try: r2 = db.execute_one( "SELECT COUNT(*) as c FROM trades WHERE account_id = %s AND entry_time >= %s AND entry_time <= %s", (account_id, start_ts, end_ts), ) c2 = r2.get("c", 0) if r2 else 0 print(f"[调试] 同一天按 entry_time 筛选: {c2} 条") except Exception: pass except ImportError as e: print(f"[调试] 无法导入 database: {e}") def get_today_range(date_str: str = None): """返回当日 00:00 和 23:59:59 的 Unix 时间戳""" if date_str: try: dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=BEIJING_TZ) except ValueError: dt = datetime.now(BEIJING_TZ) else: dt = datetime.now(BEIJING_TZ) start = dt.replace(hour=0, minute=0, second=0, microsecond=0) end = dt.replace(hour=23, minute=59, second=59, microsecond=999999) return int(start.timestamp()), int(end.timestamp()) def main(): parser = argparse.ArgumentParser(description="查询今日落入 DB 的交易记录") parser.add_argument("--account", "-a", type=int, default=None, help="账号 ID,默认从 ATS_ACCOUNT_ID 或 1") parser.add_argument("--date", "-d", type=str, default=None, help="日期 YYYY-MM-DD,默认今天") parser.add_argument("--time-filter", "-t", choices=["created", "entry", "exit"], default="created", help="时间筛选:created=创建时间(落库), entry=入场时间, exit=平仓时间") parser.add_argument("--reconciled-only", action="store_true", help="仅可对账记录") parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件") parser.add_argument("--debug", action="store_true", help="无结果时输出诊断信息") args = parser.parse_args() account_id = args.account if account_id is None: account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1") start_ts, end_ts = get_today_range(args.date) date_label = args.date or datetime.now(BEIJING_TZ).strftime("%Y-%m-%d") try: from database.models import Trade except ImportError as e: print(f"无法导入 Trade 模型: {e}") print("请确保在项目根目录运行,且 backend 可访问") sys.exit(1) trades = Trade.get_all( start_timestamp=start_ts, end_timestamp=end_ts, symbol=None, status=None, account_id=account_id, time_filter=args.time_filter, limit=2000, reconciled_only=args.reconciled_only, include_sync=True, ) # 序列化 datetime def _serialize(obj): if hasattr(obj, "isoformat"): return obj.isoformat() return obj out = [] for t in trades: row = dict(t) for k, v in row.items(): if hasattr(v, "isoformat"): row[k] = v.isoformat() out.append(row) print(f"账号 {account_id} | 日期 {date_label} | 按{args.time_filter} | 共 {len(out)} 条") if len(out) == 0: if args.debug: _debug_db(account_id, start_ts, end_ts, args.time_filter, date_label) else: print("提示: 加 --debug 可查看数据库概况与时间范围,便于排查") if args.output: with open(args.output, "w", encoding="utf-8") as f: json.dump(out, f, ensure_ascii=False, indent=2) print(f"已导出到 {args.output}") else: print(json.dumps(out, ensure_ascii=False, indent=2, default=str)) if __name__ == "__main__": main()