auto_trade_sys/scripts/fix_trade_records.py
薇薇安 4c7cd86fb0 1
2026-02-13 08:15:09 +08:00

282 lines
12 KiB
Python

import asyncio
import logging
import os
import sys
import time
from datetime import datetime, timedelta
from decimal import Decimal
from typing import List, Dict, Any, Optional
# Add project root and backend to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'backend'))
from backend.database.connection import db
from backend.database.models import Trade, Account
from backend.config_manager import ConfigManager
from trading_system.binance_client import BinanceClient
from binance import AsyncClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def get_fills_with_retry(client, symbol, start_time_ms, retries=3):
"""Fetch fills with retry logic"""
for i in range(retries):
try:
return await client.client.futures_account_trades(
symbol=symbol,
startTime=start_time_ms,
limit=100
)
except Exception as e:
if i == retries - 1:
raise e
logger.warning(f"Error fetching fills for {symbol} (Attempt {i+1}/{retries}): {e}. Retrying...")
await asyncio.sleep(2)
async def fix_time_inversion_trades(client, account_id):
"""Fix trades where exit_time < entry_time"""
logger.info("Scanning for time-inverted trades...")
# Query invalid trades
sql = """
SELECT * FROM trades
WHERE status = 'closed'
AND exit_time < entry_time
AND account_id = %s
"""
trades = db.execute_query(sql, (account_id,))
if not trades:
logger.info("No time-inverted trades found.")
return
for trade in trades:
symbol = trade['symbol']
trade_id = trade['id']
entry_time = trade['entry_time']
exit_time = trade['exit_time']
side = trade['side'] # OPEN side (BUY/SELL)
logger.info(f"Found invalid trade #{trade_id} {symbol}: Entry={entry_time}, Exit={exit_time} (Diff: {exit_time - entry_time}s)")
# 1. Check actual position on Binance
# Convert entry_time to ms for Binance API
start_time_ms = int(entry_time * 1000)
try:
# Fetch user trades (fills) after entry time with retry
fills = await get_fills_with_retry(client, symbol, start_time_ms)
# Filter for closing trades
# If entry side was BUY, we look for SELL. If SELL, look for BUY.
close_side = 'SELL' if side == 'BUY' else 'BUY'
closing_fills = [f for f in fills if f['side'] == close_side and float(f['realizedPnl']) != 0]
if not closing_fills:
# No closing trades found. Position might still be open or we missed it?
# Check current position risk
positions = await client.get_open_positions()
current_pos = next((p for p in positions if p['symbol'] == symbol), None)
if current_pos and float(current_pos['positionAmt']) != 0:
logger.info(f" -> Position for {symbol} is actually OPEN on Binance. Reverting status.")
# Revert to open
db.execute_update(
"UPDATE trades SET status='open', exit_time=NULL, exit_price=NULL, pnl=NULL, pnl_percent=NULL, exit_reason=NULL WHERE id=%s",
(trade_id,)
)
else:
logger.warning(f" -> Position closed on Binance but no closing fills found after entry time. Maybe closed exactly at entry or data issue?")
else:
# Found closing fills. Calculate metrics.
# We assume the latest fills correspond to this trade.
# Sort by time desc
closing_fills.sort(key=lambda x: x['time'], reverse=True)
last_fill = closing_fills[0]
actual_exit_time_ms = last_fill['time']
actual_exit_time_s = int(actual_exit_time_ms / 1000)
total_pnl = sum(float(f['realizedPnl']) for f in closing_fills)
total_commission = sum(float(f['commission']) for f in closing_fills)
commission_asset = closing_fills[0]['commissionAsset']
# Calculate avg exit price
total_qty = sum(float(f['qty']) for f in closing_fills)
total_value = sum(float(f['qty']) * float(f['price']) for f in closing_fills)
avg_exit_price = total_value / total_qty if total_qty > 0 else 0
entry_price = float(trade['entry_price'])
pnl_percent = 0
if entry_price > 0:
if side == 'BUY':
pnl_percent = ((avg_exit_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - avg_exit_price) / entry_price) * 100
logger.info(f" -> Found closing fills. Updating Trade #{trade_id}:")
logger.info(f" New Exit Time: {actual_exit_time_s} (was {exit_time})")
logger.info(f" Realized PnL: {total_pnl}")
logger.info(f" Commission: {total_commission} {commission_asset}")
# Update DB
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_exit_price,
exit_reason='sync_fix',
pnl=total_pnl, # Use realized PnL as PnL
pnl_percent=pnl_percent,
exit_time_ts=actual_exit_time_s,
realized_pnl=total_pnl,
commission=total_commission,
commission_asset=commission_asset
)
except Exception as e:
logger.error(f"Error processing trade #{trade_id}: {e}")
async def backfill_commissions(client, account_id):
"""Backfill missing commission data for recent trades"""
logger.info("Backfilling commissions for recent trades...")
# Get recent closed trades (last 24h)
start_time = int((datetime.now() - timedelta(hours=24)).timestamp())
sql = """
SELECT * FROM trades
WHERE status = 'closed'
AND account_id = %s
AND exit_time > %s
AND (commission IS NULL OR commission = 0)
"""
trades = db.execute_query(sql, (account_id, start_time))
if not trades:
logger.info("No recent trades with missing commissions found.")
return
logger.info(f"Found {len(trades)} trades to update commissions.")
for trade in trades:
symbol = trade['symbol']
trade_id = trade['id']
entry_time = trade['entry_time']
try:
# Fetch user trades (fills) after entry time
start_time_ms = int(entry_time * 1000)
# Add retry logic for get_my_trades
max_retries = 3
fills = None
for attempt in range(max_retries):
try:
# Use asyncio.wait_for to enforce timeout
fills = await asyncio.wait_for(
client.client.futures_account_trades(
symbol=symbol,
startTime=start_time_ms,
limit=100
),
timeout=10.0
)
break
except Exception as e:
if attempt == max_retries - 1:
logger.error(f" ❌ Failed to fetch fills for {symbol} after {max_retries} attempts: {e}")
break
logger.warning(f" ⚠️ Error fetching fills for {symbol} (Attempt {attempt+1}/{max_retries}): {e}, retrying...")
await asyncio.sleep(2)
if not fills:
continue
# Filter for closing trades (approximate logic: matches trade direction)
# Actually we want ALL commissions for this round trip (Open + Close)
# But simpler to just get closing commissions for now as per `update_exit`
# Simple approach: Sum all commissions for this symbol after entry time
# This might over-count if there are multiple trades, but better than 0.
# Better approach: match exit order ID if available
exit_order_id = trade.get('exit_order_id')
related_fills = []
if exit_order_id:
related_fills = [f for f in fills if str(f['orderId']) == str(exit_order_id)]
else:
# Fallback: fills around exit time
exit_time = trade['exit_time']
if exit_time:
exit_time_ms = exit_time * 1000
related_fills = [f for f in fills if abs(f['time'] - exit_time_ms) < 5000] # 5s window
if related_fills:
total_commission = sum(float(f['commission']) for f in related_fills)
commission_asset = related_fills[0]['commissionAsset']
realized_pnl = sum(float(f['realizedPnl']) for f in related_fills)
logger.info(f"Updating Trade #{trade_id} {symbol}: Comm={total_commission} {commission_asset}, PnL={realized_pnl}")
db.execute_update(
"UPDATE trades SET commission=%s, commission_asset=%s, realized_pnl=%s WHERE id=%s",
(total_commission, commission_asset, realized_pnl, trade_id)
)
except Exception as e:
logger.error(f"Error backfilling trade #{trade_id}: {e}")
async def main():
try:
# Get all active accounts
accounts = Account.list_all()
active_accounts = [a for a in accounts if a['status'] == 'active' and a['api_key_enc']]
if not active_accounts:
logger.error("No active accounts with API keys found.")
return
for account in active_accounts:
account_id = account['id']
account_name = account['name']
logger.info(f"=== Processing Account: {account_name} (ID: {account_id}) ===")
# Get credentials
api_key, api_secret, testnet, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
logger.warning(f"Skipping account {account_id} due to missing keys.")
continue
# Init client
client = BinanceClient(api_key, api_secret, testnet=testnet)
# Initialize connection
if not client.client:
client.client = AsyncClient(api_key, api_secret, testnet=testnet)
try:
await fix_time_inversion_trades(client, account_id)
await backfill_commissions(client, account_id)
finally:
# Close connection properly
if client.client:
await client.client.close_connection()
logger.info("All accounts processed.")
except Exception as e:
logger.error(f"Script failed: {e}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())