feat(database): 添加交易统计模型和聚合逻辑

在数据库模型中新增了 `TradeStats` 类,包含交易统计功能,支持按交易对和日期聚合数据。实现了从 `binance_trades` 和 `trades` 表中提取交易数据的逻辑,并创建了相应的统计表 `trade_stats_daily` 和 `trade_stats_time_bucket`。此改动旨在增强交易数据分析能力,为后续的风险控制和决策提供支持。
This commit is contained in:
薇薇安 2026-02-26 20:08:46 +08:00
parent 30c5635570
commit e2e7effca2
2 changed files with 280 additions and 0 deletions

View File

@ -1245,6 +1245,247 @@ class Trade:
return False, None
class TradeStats:
"""
交易统计按交易对+日期按小时聚合写入 trade_stats_daily / trade_stats_time_bucket
数据源优先 binance_trades定时同步的币安成交无表或无数据时回退到 trades
"""
@staticmethod
def _binance_trades_exists():
try:
db.execute_one("SELECT 1 FROM binance_trades LIMIT 1")
return True
except Exception:
return False
@staticmethod
def _ensure_tables():
db.execute_update(
"""
CREATE TABLE IF NOT EXISTS trade_stats_daily (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
account_id INT NOT NULL,
trade_date DATE NOT NULL,
symbol VARCHAR(50) NOT NULL,
trade_count INT NOT NULL,
win_count INT NOT NULL,
loss_count INT NOT NULL,
gross_pnl DECIMAL(20,8) NOT NULL,
net_pnl DECIMAL(20,8) NOT NULL,
total_commission DECIMAL(20,8) NOT NULL,
avg_pnl_per_trade DECIMAL(20,8) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uniq_account_date_symbol (account_id, trade_date, symbol),
KEY idx_trade_date (trade_date),
KEY idx_symbol (symbol)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
)
db.execute_update(
"""
CREATE TABLE IF NOT EXISTS trade_stats_time_bucket (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
account_id INT NOT NULL,
trade_date DATE NOT NULL,
hour TINYINT NOT NULL,
trade_count INT NOT NULL,
win_count INT NOT NULL,
loss_count INT NOT NULL,
gross_pnl DECIMAL(20,8) NOT NULL,
net_pnl DECIMAL(20,8) NOT NULL,
total_commission DECIMAL(20,8) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uniq_account_date_hour (account_id, trade_date, hour),
KEY idx_trade_date (trade_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
)
@staticmethod
def _aggregate_from_binance_trades(aid: int, start_ts: int, end_ts: int):
"""从 binance_trades 聚合,返回 (daily, hourly) 或 (None, None)。"""
from datetime import datetime, timezone
start_ms = start_ts * 1000
end_ms = end_ts * 1000
try:
rows = db.execute_query(
"""SELECT symbol, trade_time, realized_pnl, commission
FROM binance_trades
WHERE account_id = %s AND trade_time >= %s AND trade_time <= %s
LIMIT 100000""",
(aid, start_ms, end_ms),
)
except Exception as e:
logger.debug(f"[TradeStats] 查询 binance_trades 失败: {e}")
return None, None
if not rows:
return None, None
def to_date_hour(ts_ms):
try:
dt = datetime.fromtimestamp(int(ts_ms) // 1000, timezone.utc).astimezone(BEIJING_TZ)
return dt.date(), dt.hour
except Exception:
return None, None
daily, hourly = {}, {}
for r in rows:
try:
tm = r.get("trade_time")
if tm is None:
continue
date, hour = to_date_hour(tm)
if date is None:
continue
sym = (r.get("symbol") or "").strip()
if not sym:
continue
pnl = float(r.get("realized_pnl") or 0)
comm = float(r.get("commission") or 0)
dkey, hkey = (date, sym), (date, hour)
if dkey not in daily:
daily[dkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0}
daily[dkey]["trade_count"] += 1
if pnl > 0:
daily[dkey]["win_count"] += 1
elif pnl < 0:
daily[dkey]["loss_count"] += 1
daily[dkey]["gross_pnl"] += pnl
daily[dkey]["total_commission"] += comm
if hkey not in hourly:
hourly[hkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0}
hourly[hkey]["trade_count"] += 1
if pnl > 0:
hourly[hkey]["win_count"] += 1
elif pnl < 0:
hourly[hkey]["loss_count"] += 1
hourly[hkey]["gross_pnl"] += pnl
hourly[hkey]["total_commission"] += comm
except Exception as e:
logger.debug(f"[TradeStats] 处理行失败: {e}")
return daily, hourly
@staticmethod
def _aggregate_from_trades(aid: int, start_ts: int, end_ts: int):
"""从 trades 表聚合,返回 (daily, hourly)。"""
from datetime import datetime, timezone
try:
rows = Trade.get_all(
start_timestamp=start_ts, end_timestamp=end_ts, account_id=aid,
time_filter="exit", limit=100000, reconciled_only=False,
)
except Exception as e:
logger.debug(f"[TradeStats] Trade.get_all 失败: {e}")
return {}, {}
if not rows:
return {}, {}
def to_date_hour(ts):
try:
dt = datetime.fromtimestamp(int(ts), timezone.utc).astimezone(BEIJING_TZ)
return dt.date(), dt.hour
except Exception:
return None, None
daily, hourly = {}, {}
for r in rows:
try:
ts = r.get("exit_time") or r.get("entry_time")
if not ts:
continue
date, hour = to_date_hour(ts)
if date is None:
continue
sym = (r.get("symbol") or "").strip()
if not sym:
continue
pnl = float(r.get("pnl") or 0)
comm = float(r.get("commission") or 0) if "commission" in r else 0
dkey, hkey = (date, sym), (date, hour)
if dkey not in daily:
daily[dkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0}
daily[dkey]["trade_count"] += 1
if pnl > 0:
daily[dkey]["win_count"] += 1
elif pnl < 0:
daily[dkey]["loss_count"] += 1
daily[dkey]["gross_pnl"] += pnl
daily[dkey]["total_commission"] += comm
if hkey not in hourly:
hourly[hkey] = {"trade_count": 0, "win_count": 0, "loss_count": 0, "gross_pnl": 0.0, "total_commission": 0.0}
hourly[hkey]["trade_count"] += 1
if pnl > 0:
hourly[hkey]["win_count"] += 1
elif pnl < 0:
hourly[hkey]["loss_count"] += 1
hourly[hkey]["gross_pnl"] += pnl
hourly[hkey]["total_commission"] += comm
except Exception as e:
logger.debug(f"[TradeStats] 处理 trades 行失败: {e}")
return daily, hourly
@staticmethod
def _upsert_stats(aid: int, daily: dict, hourly: dict):
"""把 daily/hourly 聚合结果写入 trade_stats_daily / trade_stats_time_bucket。"""
for (trade_date, symbol), v in daily.items():
tc = v["trade_count"]
net = v["gross_pnl"] - v["total_commission"]
avg = (net / tc) if tc > 0 else 0.0
try:
db.execute_update(
"""INSERT INTO trade_stats_daily (
account_id, trade_date, symbol, trade_count, win_count, loss_count,
gross_pnl, net_pnl, total_commission, avg_pnl_per_trade
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
trade_count = VALUES(trade_count), win_count = VALUES(win_count),
loss_count = VALUES(loss_count), gross_pnl = VALUES(gross_pnl),
net_pnl = VALUES(net_pnl), total_commission = VALUES(total_commission),
avg_pnl_per_trade = VALUES(avg_pnl_per_trade)""",
(aid, trade_date, symbol, tc, v["win_count"], v["loss_count"],
v["gross_pnl"], net, v["total_commission"], avg),
)
except Exception as e:
logger.warning(f"[TradeStats] 写入 daily 失败 {symbol} {trade_date}: {e}")
for (trade_date, hour), v in hourly.items():
net = v["gross_pnl"] - v["total_commission"]
try:
db.execute_update(
"""INSERT INTO trade_stats_time_bucket (
account_id, trade_date, hour, trade_count, win_count, loss_count,
gross_pnl, net_pnl, total_commission
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
trade_count = VALUES(trade_count), win_count = VALUES(win_count),
loss_count = VALUES(loss_count), gross_pnl = VALUES(gross_pnl),
net_pnl = VALUES(net_pnl), total_commission = VALUES(total_commission)""",
(aid, trade_date, int(hour), v["trade_count"], v["win_count"], v["loss_count"],
v["gross_pnl"], net, v["total_commission"]),
)
except Exception as e:
logger.warning(f"[TradeStats] 写入 time_bucket 失败 {trade_date} h={hour}: {e}")
@staticmethod
def aggregate_recent_days(days: int = 7, account_id: int = None):
"""聚合最近 N 天到统计表。优先 binance_trades无数据则用 trades。"""
if days <= 0:
return
TradeStats._ensure_tables()
aid = int(account_id or DEFAULT_ACCOUNT_ID)
now_ts = get_beijing_time()
start_ts = now_ts - int(days) * 86400
daily, hourly = None, None
if TradeStats._binance_trades_exists():
daily, hourly = TradeStats._aggregate_from_binance_trades(aid, start_ts, now_ts)
if daily is None or hourly is None:
daily, hourly = TradeStats._aggregate_from_trades(aid, start_ts, now_ts)
if daily or hourly:
TradeStats._upsert_stats(aid, daily or {}, hourly or {})
class AccountSnapshot:
"""账户快照模型"""

View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
定时任务将最近 N 天的交易数据聚合到 trade_stats_daily / trade_stats_time_bucket
优先从 binance_trades 读取需先跑 sync_binance_orders.py无数据时用 trades
用法:
python scripts/aggregate_trade_stats.py # 默认 7 天、默认账号
python scripts/aggregate_trade_stats.py -d 30 # 最近 30 天
python scripts/aggregate_trade_stats.py -a 2 # 指定账号
"""
import argparse
import sys
from pathlib import Path
proj = Path(__file__).resolve().parent.parent
if (proj / "backend").exists():
sys.path.insert(0, str(proj / "backend"))
sys.path.insert(0, str(proj))
def main():
parser = argparse.ArgumentParser(description="聚合交易统计到 trade_stats_* 表")
parser.add_argument("-a", "--account", type=int, default=None, help="账号 ID不传则用默认")
parser.add_argument("-d", "--days", type=int, default=7, help="聚合最近 N 天,默认 7")
args = parser.parse_args()
if args.days <= 0:
print("days 须 > 0")
sys.exit(1)
try:
from database.models import TradeStats
TradeStats.aggregate_recent_days(days=args.days, account_id=args.account)
print(f"已聚合最近 {args.days} 天统计 (account_id={args.account or 'default'})")
except Exception as e:
print(f"聚合失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()