auto_trade_sys/scripts/fix_trade_records.py
薇薇安 20788e4b77 1
2026-02-11 11:38:27 +08:00

252 lines
10 KiB
Python

import asyncio
import logging
import os
import sys
from datetime import datetime, timedelta
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from trading_system.binance_client import BinanceClient
from binance import AsyncClient
from backend.database.connection import db
from backend.database.models import Trade, Account
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def fix_time_inversion_trades(client, account_id):
"""Fix trades where exit_time < entry_time"""
logger.info("Scanning for time-inverted trades...")
# Query invalid trades
sql = """
SELECT * FROM trades
WHERE status = 'closed'
AND exit_time < entry_time
AND account_id = %s
"""
trades = db.execute_query(sql, (account_id,))
if not trades:
logger.info("No time-inverted trades found.")
for trade in trades:
symbol = trade['symbol']
trade_id = trade['id']
entry_time = trade['entry_time']
exit_time = trade['exit_time']
quantity = trade['quantity']
side = trade['side'] # OPEN side (BUY/SELL)
logger.info(f"Found invalid trade #{trade_id} {symbol}: Entry={entry_time}, Exit={exit_time} (Diff: {exit_time - entry_time}s)")
# 1. Check actual position on Binance
# Convert entry_time to ms for Binance API
start_time_ms = int(entry_time * 1000)
try:
# Fetch user trades (fills) after entry time
fills = await client.client.futures_account_trades(
symbol=symbol,
startTime=start_time_ms,
limit=100
)
# Filter for closing trades
# If entry side was BUY, we look for SELL. If SELL, look for BUY.
close_side = 'SELL' if side == 'BUY' else 'BUY'
closing_fills = [f for f in fills if f['side'] == close_side and f['realizedPnl'] != '0']
if not closing_fills:
# No closing trades found. Position might still be open or we missed it?
# Check current position risk
positions = await client.get_open_positions()
current_pos = next((p for p in positions if p['symbol'] == symbol), None)
if current_pos and float(current_pos['positionAmt']) != 0:
logger.info(f" -> Position for {symbol} is actually OPEN on Binance. Reverting status.")
# Revert to open
db.execute_update(
"UPDATE trades SET status='open', exit_time=NULL, exit_price=NULL, pnl=NULL, pnl_percent=NULL, exit_reason=NULL WHERE id=%s",
(trade_id,)
)
else:
logger.warning(f" -> Position closed on Binance but no closing fills found after entry time. Maybe closed exactly at entry or data issue?")
else:
# Found closing fills. Calculate metrics.
# We assume the latest fills correspond to this trade.
# This is a simplification but better than "ghost" order.
# Sort by time desc
closing_fills.sort(key=lambda x: x['time'], reverse=True)
last_fill = closing_fills[0]
actual_exit_time_ms = last_fill['time']
actual_exit_time_s = int(actual_exit_time_ms / 1000)
total_pnl = sum(float(f['realizedPnl']) for f in closing_fills)
total_commission = sum(float(f['commission']) for f in closing_fills)
commission_asset = closing_fills[0]['commissionAsset']
# Calculate avg exit price
total_qty = sum(float(f['qty']) for f in closing_fills)
total_value = sum(float(f['qty']) * float(f['price']) for f in closing_fills)
avg_exit_price = total_value / total_qty if total_qty > 0 else 0
entry_price = float(trade['entry_price'])
pnl_percent = 0
if entry_price > 0:
if side == 'BUY':
pnl_percent = ((avg_exit_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - avg_exit_price) / entry_price) * 100
logger.info(f" -> Found closing fills. Updating Trade #{trade_id}:")
logger.info(f" New Exit Time: {actual_exit_time_s} (was {exit_time})")
logger.info(f" Realized PnL: {total_pnl}")
logger.info(f" Commission: {total_commission} {commission_asset}")
# Update DB
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_exit_price,
exit_reason='sync_fix',
pnl=total_pnl, # Use realized PnL as PnL
pnl_percent=pnl_percent,
exit_time_ts=actual_exit_time_s,
realized_pnl=total_pnl,
commission=total_commission,
commission_asset=commission_asset
)
except Exception as e:
logger.error(f"Error processing trade #{trade_id}: {type(e).__name__}: {e}")
import traceback
logger.error(traceback.format_exc())
async def backfill_commissions(client, account_id):
"""Backfill missing commission data for recent trades"""
logger.info("Backfilling commissions for recent trades...")
# Get recent closed trades (last 24h)
start_time = int((datetime.now() - timedelta(hours=24)).timestamp())
sql = """
SELECT * FROM trades
WHERE status = 'closed'
AND account_id = %s
AND exit_time > %s
AND (commission IS NULL OR commission = 0)
"""
trades = db.execute_query(sql, (account_id, start_time))
if not trades:
logger.info("No recent trades with missing commissions found.")
return
logger.info(f"Found {len(trades)} trades to update commissions.")
for trade in trades:
symbol = trade['symbol']
trade_id = trade['id']
entry_time = trade['entry_time']
try:
# Fetch user trades (fills) after entry time
start_time_ms = int(entry_time * 1000)
fills = await client.client.futures_account_trades(
symbol=symbol,
startTime=start_time_ms,
limit=100
)
if not fills:
continue
# Filter for closing trades (approximate logic: matches trade direction)
# Actually we want ALL commissions for this round trip (Open + Close)
# But simpler to just get closing commissions for now as per `update_exit`
# Simple approach: Sum all commissions for this symbol after entry time
# This might over-count if there are multiple trades, but better than 0.
# Better approach: match exit order ID if available
exit_order_id = trade.get('exit_order_id')
related_fills = []
if exit_order_id:
related_fills = [f for f in fills if str(f['orderId']) == str(exit_order_id)]
else:
# Fallback: fills around exit time
exit_time = trade['exit_time']
if exit_time:
exit_time_ms = exit_time * 1000
related_fills = [f for f in fills if abs(f['time'] - exit_time_ms) < 5000] # 5s window
if related_fills:
total_commission = sum(float(f['commission']) for f in related_fills)
commission_asset = related_fills[0]['commissionAsset']
realized_pnl = sum(float(f['realizedPnl']) for f in related_fills)
logger.info(f"Updating Trade #{trade_id} {symbol}: Comm={total_commission} {commission_asset}, PnL={realized_pnl}")
db.execute_update(
"UPDATE trades SET commission=%s, commission_asset=%s, realized_pnl=%s WHERE id=%s",
(total_commission, commission_asset, realized_pnl, trade_id)
)
except Exception as e:
logger.error(f"Error backfilling trade #{trade_id}: {e}")
async def main():
try:
# Get all active accounts
accounts = Account.list_all()
active_accounts = [a for a in accounts if a['status'] == 'active' and a['api_key_enc']]
if not active_accounts:
logger.error("No active accounts with API keys found.")
return
for account in active_accounts:
account_id = account['id']
account_name = account['name']
logger.info(f"=== Processing Account: {account_name} (ID: {account_id}) ===")
# Get credentials
api_key, api_secret, testnet, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
logger.warning(f"Skipping account {account_id} due to missing keys.")
continue
# Init client
client = BinanceClient(api_key, api_secret, testnet=testnet)
# Initialize connection
if not client.client:
client.client = AsyncClient(api_key, api_secret, testnet=testnet)
try:
await fix_time_inversion_trades(client, account_id)
await backfill_commissions(client, account_id)
finally:
# Close connection properly
if client.client:
await client.client.close_connection()
logger.info("All accounts processed.")
except Exception as e:
logger.error(f"Script failed: {e}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())