import asyncio import logging import os import sys import time from datetime import datetime, timedelta from decimal import Decimal from typing import List, Dict, Any, Optional # Add project root and backend to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'backend')) from backend.database.connection import db from backend.database.models import Trade, Account from backend.config_manager import ConfigManager from trading_system.binance_client import BinanceClient from binance import AsyncClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def get_fills_with_retry(client, symbol, start_time_ms, retries=3): """Fetch fills with retry logic""" for i in range(retries): try: return await client.client.futures_account_trades( symbol=symbol, startTime=start_time_ms, limit=100 ) except Exception as e: if i == retries - 1: raise e logger.warning(f"Error fetching fills for {symbol} (Attempt {i+1}/{retries}): {e}. Retrying...") await asyncio.sleep(2) async def fix_time_inversion_trades(client, account_id): """Fix trades where exit_time < entry_time""" logger.info("Scanning for time-inverted trades...") # Query invalid trades sql = """ SELECT * FROM trades WHERE status = 'closed' AND exit_time < entry_time AND account_id = %s """ trades = db.execute_query(sql, (account_id,)) if not trades: logger.info("No time-inverted trades found.") return for trade in trades: symbol = trade['symbol'] trade_id = trade['id'] entry_time = trade['entry_time'] exit_time = trade['exit_time'] side = trade['side'] # OPEN side (BUY/SELL) logger.info(f"Found invalid trade #{trade_id} {symbol}: Entry={entry_time}, Exit={exit_time} (Diff: {exit_time - entry_time}s)") # 1. Check actual position on Binance # Convert entry_time to ms for Binance API start_time_ms = int(entry_time * 1000) try: # Fetch user trades (fills) after entry time with retry fills = await get_fills_with_retry(client, symbol, start_time_ms) # Filter for closing trades # If entry side was BUY, we look for SELL. If SELL, look for BUY. close_side = 'SELL' if side == 'BUY' else 'BUY' closing_fills = [f for f in fills if f['side'] == close_side and float(f['realizedPnl']) != 0] if not closing_fills: # No closing trades found. Position might still be open or we missed it? # Check current position risk positions = await client.get_open_positions() current_pos = next((p for p in positions if p['symbol'] == symbol), None) if current_pos and float(current_pos['positionAmt']) != 0: logger.info(f" -> Position for {symbol} is actually OPEN on Binance. Reverting status.") # Revert to open db.execute_update( "UPDATE trades SET status='open', exit_time=NULL, exit_price=NULL, pnl=NULL, pnl_percent=NULL, exit_reason=NULL WHERE id=%s", (trade_id,) ) else: logger.warning(f" -> Position closed on Binance but no closing fills found after entry time. Maybe closed exactly at entry or data issue?") else: # Found closing fills. Calculate metrics. # We assume the latest fills correspond to this trade. # Sort by time desc closing_fills.sort(key=lambda x: x['time'], reverse=True) last_fill = closing_fills[0] actual_exit_time_ms = last_fill['time'] actual_exit_time_s = int(actual_exit_time_ms / 1000) total_pnl = sum(float(f['realizedPnl']) for f in closing_fills) total_commission = sum(float(f['commission']) for f in closing_fills) commission_asset = closing_fills[0]['commissionAsset'] # Calculate avg exit price total_qty = sum(float(f['qty']) for f in closing_fills) total_value = sum(float(f['qty']) * float(f['price']) for f in closing_fills) avg_exit_price = total_value / total_qty if total_qty > 0 else 0 entry_price = float(trade['entry_price']) pnl_percent = 0 if entry_price > 0: if side == 'BUY': pnl_percent = ((avg_exit_price - entry_price) / entry_price) * 100 else: pnl_percent = ((entry_price - avg_exit_price) / entry_price) * 100 logger.info(f" -> Found closing fills. Updating Trade #{trade_id}:") logger.info(f" New Exit Time: {actual_exit_time_s} (was {exit_time})") logger.info(f" Realized PnL: {total_pnl}") logger.info(f" Commission: {total_commission} {commission_asset}") # Update DB Trade.update_exit( trade_id=trade_id, exit_price=avg_exit_price, exit_reason='sync_fix', pnl=total_pnl, # Use realized PnL as PnL pnl_percent=pnl_percent, exit_time_ts=actual_exit_time_s, realized_pnl=total_pnl, commission=total_commission, commission_asset=commission_asset ) except Exception as e: logger.error(f"Error processing trade #{trade_id}: {e}") async def backfill_commissions(client, account_id): """Backfill missing commission data for recent trades""" logger.info("Backfilling commissions for recent trades...") # Get recent closed trades (last 24h) start_time = int((datetime.now() - timedelta(hours=24)).timestamp()) sql = """ SELECT * FROM trades WHERE status = 'closed' AND account_id = %s AND exit_time > %s AND (commission IS NULL OR commission = 0) """ trades = db.execute_query(sql, (account_id, start_time)) if not trades: logger.info("No recent trades with missing commissions found.") return logger.info(f"Found {len(trades)} trades to update commissions.") for trade in trades: symbol = trade['symbol'] trade_id = trade['id'] entry_time = trade['entry_time'] try: # Fetch user trades (fills) after entry time start_time_ms = int(entry_time * 1000) # Add retry logic for get_my_trades max_retries = 3 fills = None for attempt in range(max_retries): try: # Use asyncio.wait_for to enforce timeout fills = await asyncio.wait_for( client.client.futures_account_trades( symbol=symbol, startTime=start_time_ms, limit=100 ), timeout=10.0 ) break except Exception as e: if attempt == max_retries - 1: logger.error(f" ❌ Failed to fetch fills for {symbol} after {max_retries} attempts: {e}") break logger.warning(f" ⚠️ Error fetching fills for {symbol} (Attempt {attempt+1}/{max_retries}): {e}, retrying...") await asyncio.sleep(2) if not fills: continue # Filter for closing trades (approximate logic: matches trade direction) # Actually we want ALL commissions for this round trip (Open + Close) # But simpler to just get closing commissions for now as per `update_exit` # Simple approach: Sum all commissions for this symbol after entry time # This might over-count if there are multiple trades, but better than 0. # Better approach: match exit order ID if available exit_order_id = trade.get('exit_order_id') related_fills = [] if exit_order_id: related_fills = [f for f in fills if str(f['orderId']) == str(exit_order_id)] else: # Fallback: fills around exit time exit_time = trade['exit_time'] if exit_time: exit_time_ms = exit_time * 1000 related_fills = [f for f in fills if abs(f['time'] - exit_time_ms) < 5000] # 5s window if related_fills: total_commission = sum(float(f['commission']) for f in related_fills) commission_asset = related_fills[0]['commissionAsset'] realized_pnl = sum(float(f['realizedPnl']) for f in related_fills) logger.info(f"Updating Trade #{trade_id} {symbol}: Comm={total_commission} {commission_asset}, PnL={realized_pnl}") db.execute_update( "UPDATE trades SET commission=%s, commission_asset=%s, realized_pnl=%s WHERE id=%s", (total_commission, commission_asset, realized_pnl, trade_id) ) except Exception as e: logger.error(f"Error backfilling trade #{trade_id}: {e}") async def main(): try: # Get all active accounts accounts = Account.list_all() active_accounts = [a for a in accounts if a['status'] == 'active' and a['api_key_enc']] if not active_accounts: logger.error("No active accounts with API keys found.") return for account in active_accounts: account_id = account['id'] account_name = account['name'] logger.info(f"=== Processing Account: {account_name} (ID: {account_id}) ===") # Get credentials api_key, api_secret, testnet, _ = Account.get_credentials(account_id) if not api_key or not api_secret: logger.warning(f"Skipping account {account_id} due to missing keys.") continue # Init client client = BinanceClient(api_key, api_secret, testnet=testnet) # Initialize connection if not client.client: client.client = AsyncClient(api_key, api_secret, testnet=testnet) try: await fix_time_inversion_trades(client, account_id) await backfill_commissions(client, account_id) finally: # Close connection properly if client.client: await client.client.close_connection() logger.info("All accounts processed.") except Exception as e: logger.error(f"Script failed: {e}") if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())