156 lines
6.1 KiB
Python
156 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
查询今日落入 DB 的交易记录(按创建时间 created_at,便于与币安对账时对照)。
|
||
|
||
用法:
|
||
python scripts/query_trades_today.py # 今日,默认账号
|
||
python scripts/query_trades_today.py --account 2 # 账号 2
|
||
python scripts/query_trades_today.py --date 2026-02-21 # 指定日期
|
||
python scripts/query_trades_today.py --time-filter entry # 按入场时间
|
||
python scripts/query_trades_today.py -o today_trades.json # 导出 JSON
|
||
"""
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# 添加 backend 到路径
|
||
backend = Path(__file__).resolve().parent.parent / "backend"
|
||
if backend.exists():
|
||
sys.path.insert(0, str(backend))
|
||
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _debug_db(account_id: int, start_ts: int, end_ts: int, time_filter: str, date_label: str):
|
||
"""无结果时输出诊断:DB 概况、查询时间范围、是否有 trades 表 created_at 等"""
|
||
try:
|
||
from database.connection import db
|
||
# 数据库名
|
||
db_name = getattr(db, "database", "?")
|
||
print(f"\n[调试] 数据库: {db_name} (来自 DB_NAME 环境变量)")
|
||
print(f"[调试] 查询时间范围: {start_ts} ~ {end_ts} ({date_label} 00:00 ~ 23:59 北京)")
|
||
# 该账号总记录数(不限日期)
|
||
try:
|
||
r = db.execute_one(
|
||
"SELECT COUNT(*) as c FROM trades WHERE account_id = %s",
|
||
(account_id,),
|
||
)
|
||
total = r.get("c", 0) if r else 0
|
||
print(f"[调试] 账号 {account_id} trades 总记录数: {total}")
|
||
except Exception as e:
|
||
print(f"[调试] 查询总记录数失败: {e}")
|
||
# created_at / entry_time 范围
|
||
try:
|
||
r = db.execute_one(
|
||
"""SELECT MIN(created_at) as min_c, MAX(created_at) as max_c,
|
||
MIN(entry_time) as min_e, MAX(entry_time) as max_e
|
||
FROM trades WHERE account_id = %s""",
|
||
(account_id,),
|
||
)
|
||
if r:
|
||
for k, v in r.items():
|
||
if v is not None:
|
||
dt = datetime.fromtimestamp(int(v), tz=BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
||
print(f"[调试] {k}: {v} -> {dt}")
|
||
except Exception as e:
|
||
print(f"[调试] 查询时间范围失败(可能无 created_at): {e}")
|
||
# 按 entry 查当天是否有记录(辅助判断)
|
||
if time_filter == "created":
|
||
try:
|
||
r2 = db.execute_one(
|
||
"SELECT COUNT(*) as c FROM trades WHERE account_id = %s AND entry_time >= %s AND entry_time <= %s",
|
||
(account_id, start_ts, end_ts),
|
||
)
|
||
c2 = r2.get("c", 0) if r2 else 0
|
||
print(f"[调试] 同一天按 entry_time 筛选: {c2} 条")
|
||
except Exception:
|
||
pass
|
||
except ImportError as e:
|
||
print(f"[调试] 无法导入 database: {e}")
|
||
|
||
|
||
def get_today_range(date_str: str = None):
|
||
"""返回当日 00:00 和 23:59:59 的 Unix 时间戳"""
|
||
if date_str:
|
||
try:
|
||
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=BEIJING_TZ)
|
||
except ValueError:
|
||
dt = datetime.now(BEIJING_TZ)
|
||
else:
|
||
dt = datetime.now(BEIJING_TZ)
|
||
start = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
end = dt.replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
return int(start.timestamp()), int(end.timestamp())
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="查询今日落入 DB 的交易记录")
|
||
parser.add_argument("--account", "-a", type=int, default=None, help="账号 ID,默认从 ATS_ACCOUNT_ID 或 1")
|
||
parser.add_argument("--date", "-d", type=str, default=None, help="日期 YYYY-MM-DD,默认今天")
|
||
parser.add_argument("--time-filter", "-t", choices=["created", "entry", "exit"], default="created",
|
||
help="时间筛选:created=创建时间(落库), entry=入场时间, exit=平仓时间")
|
||
parser.add_argument("--reconciled-only", action="store_true", help="仅可对账记录")
|
||
parser.add_argument("-o", "--output", type=str, help="导出到 JSON 文件")
|
||
parser.add_argument("--debug", action="store_true", help="无结果时输出诊断信息")
|
||
args = parser.parse_args()
|
||
|
||
account_id = args.account
|
||
if account_id is None:
|
||
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
|
||
|
||
start_ts, end_ts = get_today_range(args.date)
|
||
date_label = args.date or datetime.now(BEIJING_TZ).strftime("%Y-%m-%d")
|
||
|
||
try:
|
||
from database.models import Trade
|
||
except ImportError as e:
|
||
print(f"无法导入 Trade 模型: {e}")
|
||
print("请确保在项目根目录运行,且 backend 可访问")
|
||
sys.exit(1)
|
||
|
||
trades = Trade.get_all(
|
||
start_timestamp=start_ts,
|
||
end_timestamp=end_ts,
|
||
symbol=None,
|
||
status=None,
|
||
account_id=account_id,
|
||
time_filter=args.time_filter,
|
||
limit=2000,
|
||
reconciled_only=args.reconciled_only,
|
||
include_sync=True,
|
||
)
|
||
|
||
# 序列化 datetime
|
||
def _serialize(obj):
|
||
if hasattr(obj, "isoformat"):
|
||
return obj.isoformat()
|
||
return obj
|
||
|
||
out = []
|
||
for t in trades:
|
||
row = dict(t)
|
||
for k, v in row.items():
|
||
if hasattr(v, "isoformat"):
|
||
row[k] = v.isoformat()
|
||
out.append(row)
|
||
|
||
print(f"账号 {account_id} | 日期 {date_label} | 按{args.time_filter} | 共 {len(out)} 条")
|
||
if len(out) == 0:
|
||
if args.debug:
|
||
_debug_db(account_id, start_ts, end_ts, args.time_filter, date_label)
|
||
else:
|
||
print("提示: 加 --debug 可查看数据库概况与时间范围,便于排查")
|
||
if args.output:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
json.dump(out, f, ensure_ascii=False, indent=2)
|
||
print(f"已导出到 {args.output}")
|
||
else:
|
||
print(json.dumps(out, ensure_ascii=False, indent=2, default=str))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|