diff --git a/backend/database/models.py b/backend/database/models.py index 7e9b02c..fa7274c 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -1245,6 +1245,247 @@ class Trade: return False, None +class TradeStats: + """ + 交易统计:按交易对+日期、按小时聚合,写入 trade_stats_daily / trade_stats_time_bucket。 + 数据源优先 binance_trades(定时同步的币安成交),无表或无数据时回退到 trades。 + """ + + @staticmethod + def _binance_trades_exists(): + try: + db.execute_one("SELECT 1 FROM binance_trades LIMIT 1") + return True + except Exception: + return False + + @staticmethod + def _ensure_tables(): + db.execute_update( + """ + CREATE TABLE IF NOT EXISTS trade_stats_daily ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + account_id INT NOT NULL, + trade_date DATE NOT NULL, + symbol VARCHAR(50) NOT NULL, + trade_count INT NOT NULL, + win_count INT NOT NULL, + loss_count INT NOT NULL, + gross_pnl DECIMAL(20,8) NOT NULL, + net_pnl DECIMAL(20,8) NOT NULL, + total_commission DECIMAL(20,8) NOT NULL, + avg_pnl_per_trade DECIMAL(20,8) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_account_date_symbol (account_id, trade_date, symbol), + KEY idx_trade_date (trade_date), + KEY idx_symbol (symbol) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + ) + db.execute_update( + """ + CREATE TABLE IF NOT EXISTS trade_stats_time_bucket ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + account_id INT NOT NULL, + trade_date DATE NOT NULL, + hour TINYINT NOT NULL, + trade_count INT NOT NULL, + win_count INT NOT NULL, + loss_count INT NOT NULL, + gross_pnl DECIMAL(20,8) NOT NULL, + net_pnl DECIMAL(20,8) NOT NULL, + total_commission DECIMAL(20,8) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_account_date_hour (account_id, trade_date, hour), + KEY idx_trade_date (trade_date) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + ) + + @staticmethod + def _aggregate_from_binance_trades(aid: int, start_ts: int, end_ts: int): + """从 binance_trades 聚合,返回 (daily, hourly) 或 (None, None)。""" + from datetime import datetime, timezone + start_ms = start_ts * 1000 + end_ms = end_ts * 1000 + try: + rows = db.execute_query( + """SELECT symbol, trade_time, realized_pnl, commission + FROM binance_trades + WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s + LIMIT 100000""", + (aid, start_ms, end_ms), + ) + except Exception as e: + logger.debug(f"[TradeStats] 查询 binance_trades 失败: {e}") + return None, None + if not rows: + return None, None + + def to_date_hour(ts_ms): + try: + dt = datetime.fromtimestamp(int(ts_ms) // 1000, timezone.utc).astimezone(BEIJING_TZ) + return dt.date(), dt.hour + except Exception: + return None, None + + daily, hourly = {}, {} + for r in rows: + try: + tm = r.get("trade_time") + if tm is None: + continue + date, hour = to_date_hour(tm) + if date is None: + continue + sym = (r.get("symbol") or "").strip() + if not sym: + continue + pnl = float(r.get("realized_pnl") or 0) + comm = float(r.get("commission") or 0) + dkey, hkey = (date, sym), (date, hour) + if dkey not in daily: + daily[dkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0} + daily[dkey]["trade_count"] += 1 + if pnl > 0: + daily[dkey]["win_count"] += 1 + elif pnl < 0: + daily[dkey]["loss_count"] += 1 + daily[dkey]["gross_pnl"] += pnl + daily[dkey]["total_commission"] += comm + if hkey not in hourly: + hourly[hkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0} + hourly[hkey]["trade_count"] += 1 + if pnl > 0: + hourly[hkey]["win_count"] += 1 + elif pnl < 0: + hourly[hkey]["loss_count"] += 1 + hourly[hkey]["gross_pnl"] += pnl + hourly[hkey]["total_commission"] += comm + except Exception as e: + logger.debug(f"[TradeStats] 处理行失败: {e}") + return daily, hourly + + @staticmethod + def _aggregate_from_trades(aid: int, start_ts: int, end_ts: int): + """从 trades 表聚合,返回 (daily, hourly)。""" + from datetime import datetime, timezone + try: + rows = Trade.get_all( + start_timestamp=start_ts, end_timestamp=end_ts, account_id=aid, + time_filter="exit", limit=100000, reconciled_only=False, + ) + except Exception as e: + logger.debug(f"[TradeStats] Trade.get_all 失败: {e}") + return {}, {} + if not rows: + return {}, {} + + def to_date_hour(ts): + try: + dt = datetime.fromtimestamp(int(ts), timezone.utc).astimezone(BEIJING_TZ) + return dt.date(), dt.hour + except Exception: + return None, None + + daily, hourly = {}, {} + for r in rows: + try: + ts = r.get("exit_time") or r.get("entry_time") + if not ts: + continue + date, hour = to_date_hour(ts) + if date is None: + continue + sym = (r.get("symbol") or "").strip() + if not sym: + continue + pnl = float(r.get("pnl") or 0) + comm = float(r.get("commission") or 0) if "commission" in r else 0 + dkey, hkey = (date, sym), (date, hour) + if dkey not in daily: + daily[dkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0} + daily[dkey]["trade_count"] += 1 + if pnl > 0: + daily[dkey]["win_count"] += 1 + elif pnl < 0: + daily[dkey]["loss_count"] += 1 + daily[dkey]["gross_pnl"] += pnl + daily[dkey]["total_commission"] += comm + if hkey not in hourly: + hourly[hkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0} + hourly[hkey]["trade_count"] += 1 + if pnl > 0: + hourly[hkey]["win_count"] += 1 + elif pnl < 0: + hourly[hkey]["loss_count"] += 1 + hourly[hkey]["gross_pnl"] += pnl + hourly[hkey]["total_commission"] += comm + except Exception as e: + logger.debug(f"[TradeStats] 处理 trades 行失败: {e}") + return daily, hourly + + @staticmethod + def _upsert_stats(aid: int, daily: dict, hourly: dict): + """把 daily/hourly 聚合结果写入 trade_stats_daily / trade_stats_time_bucket。""" + for (trade_date, symbol), v in daily.items(): + tc = v["trade_count"] + net = v["gross_pnl"] - v["total_commission"] + avg = (net / tc) if tc > 0 else 0.0 + try: + db.execute_update( + """INSERT INTO trade_stats_daily ( + account_id, trade_date, symbol, trade_count, win_count, loss_count, + gross_pnl, net_pnl, total_commission, avg_pnl_per_trade + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + trade_count = VALUES(trade_count), win_count = VALUES(win_count), + loss_count = VALUES(loss_count), gross_pnl = VALUES(gross_pnl), + net_pnl = VALUES(net_pnl), total_commission = VALUES(total_commission), + avg_pnl_per_trade = VALUES(avg_pnl_per_trade)""", + (aid, trade_date, symbol, tc, v["win_count"], v["loss_count"], + v["gross_pnl"], net, v["total_commission"], avg), + ) + except Exception as e: + logger.warning(f"[TradeStats] 写入 daily 失败 {symbol} {trade_date}: {e}") + for (trade_date, hour), v in hourly.items(): + net = v["gross_pnl"] - v["total_commission"] + try: + db.execute_update( + """INSERT INTO trade_stats_time_bucket ( + account_id, trade_date, hour, trade_count, win_count, loss_count, + gross_pnl, net_pnl, total_commission + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + trade_count = VALUES(trade_count), win_count = VALUES(win_count), + loss_count = VALUES(loss_count), gross_pnl = VALUES(gross_pnl), + net_pnl = VALUES(net_pnl), total_commission = VALUES(total_commission)""", + (aid, trade_date, int(hour), v["trade_count"], v["win_count"], v["loss_count"], + v["gross_pnl"], net, v["total_commission"]), + ) + except Exception as e: + logger.warning(f"[TradeStats] 写入 time_bucket 失败 {trade_date} h={hour}: {e}") + + @staticmethod + def aggregate_recent_days(days: int = 7, account_id: int = None): + """聚合最近 N 天到统计表。优先 binance_trades,无数据则用 trades。""" + if days <= 0: + return + TradeStats._ensure_tables() + aid = int(account_id or DEFAULT_ACCOUNT_ID) + now_ts = get_beijing_time() + start_ts = now_ts - int(days) * 86400 + daily, hourly = None, None + if TradeStats._binance_trades_exists(): + daily, hourly = TradeStats._aggregate_from_binance_trades(aid, start_ts, now_ts) + if daily is None or hourly is None: + daily, hourly = TradeStats._aggregate_from_trades(aid, start_ts, now_ts) + if daily or hourly: + TradeStats._upsert_stats(aid, daily or {}, hourly or {}) + + class AccountSnapshot: """账户快照模型""" diff --git a/scripts/aggregate_trade_stats.py b/scripts/aggregate_trade_stats.py new file mode 100644 index 0000000..e03ae9f --- /dev/null +++ b/scripts/aggregate_trade_stats.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +""" +定时任务:将最近 N 天的交易数据聚合到 trade_stats_daily / trade_stats_time_bucket。 +优先从 binance_trades 读取(需先跑 sync_binance_orders.py),无数据时用 trades 表。 + +用法: + python scripts/aggregate_trade_stats.py # 默认 7 天、默认账号 + python scripts/aggregate_trade_stats.py -d 30 # 最近 30 天 + python scripts/aggregate_trade_stats.py -a 2 # 指定账号 +""" +import argparse +import sys +from pathlib import Path + +proj = Path(__file__).resolve().parent.parent +if (proj / "backend").exists(): + sys.path.insert(0, str(proj / "backend")) +sys.path.insert(0, str(proj)) + + +def main(): + parser = argparse.ArgumentParser(description="聚合交易统计到 trade_stats_* 表") + parser.add_argument("-a", "--account", type=int, default=None, help="账号 ID,不传则用默认") + parser.add_argument("-d", "--days", type=int, default=7, help="聚合最近 N 天,默认 7") + args = parser.parse_args() + if args.days <= 0: + print("days 须 > 0") + sys.exit(1) + try: + from database.models import TradeStats + TradeStats.aggregate_recent_days(days=args.days, account_id=args.account) + print(f"已聚合最近 {args.days} 天统计 (account_id={args.account or 'default'})") + except Exception as e: + print(f"聚合失败: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()