feat(trades, database, binance_client, position_manager, risk_manager): 优化交易记录查询与内存管理

在 `trades.py` 中为获取所有有记录的交易对添加了限制条数的逻辑,避免全表加载。`models.py` 中调整了查询逻辑,未传递 limit 时使用默认上限以防内存暴增。`binance_client.py` 中为交易对信息缓存添加了最大大小限制,确保内存使用合理。`position_manager.py` 和 `risk_manager.py` 中的交易记录查询也进行了条数限制,提升了系统的稳定性与性能。此更新有助于优化内存管理与查询效率。
This commit is contained in:
薇薇安 2026-02-21 00:53:32 +08:00
parent 6f9e55aaee
commit 174943722a
5 changed files with 25 additions and 10 deletions

View File

@ -434,9 +434,9 @@ async def sync_trades_from_binance(
symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")}) symbol_list = list({t.get("symbol") for t in (trades_in_range or []) if t.get("symbol")})
logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对") logger.info(f"从 DB 查询到 {len(trades_in_range or [])} 条记录,涉及 {len(symbol_list)} 个交易对")
# 如果时间范围内没有记录,再尝试「所有有记录的 symbol」 # 如果时间范围内没有记录,再尝试「所有有记录的 symbol」(限制条数避免全表加载)
if not symbol_list: if not symbol_list:
all_trades = Trade.get_all(account_id=aid) all_trades = Trade.get_all(account_id=aid, limit=5000)
symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")}) symbol_list = list({t.get("symbol") for t in (all_trades or []) if t.get("symbol")})
logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)}") logger.info(f"获取到所有有记录的 symbol: {len(symbol_list)}")

View File

@ -1134,10 +1134,13 @@ class Trade:
query += " ORDER BY " + time_col + " DESC, id DESC" query += " ORDER BY " + time_col + " DESC, id DESC"
else: else:
query += " ORDER BY COALESCE(exit_time, entry_time) DESC, id DESC" query += " ORDER BY COALESCE(exit_time, entry_time) DESC, id DESC"
if limit is not None and limit > 0: # 未传 limit 时使用默认上限,防止全表加载导致内存暴增
query += " LIMIT %s" _limit = limit
params.append(int(limit)) if _limit is None or _limit <= 0:
logger.debug(f"查询交易记录: time_filter={time_filter}, limit={limit}, reconciled_only={reconciled_only}, include_sync={include_sync}") _limit = 20000
query += " LIMIT %s"
params.append(int(_limit))
logger.debug(f"查询交易记录: time_filter={time_filter}, limit={_limit}, reconciled_only={reconciled_only}, include_sync={include_sync}")
result = db.execute_query(query, params) result = db.execute_query(query, params)
return result return result

View File

@ -205,7 +205,8 @@ class BinanceClient:
self.client: Optional[AsyncClient] = None self.client: Optional[AsyncClient] = None
self.socket_manager: Optional[BinanceSocketManager] = None self.socket_manager: Optional[BinanceSocketManager] = None
self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息 self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息(有上限,防止内存无限增长)
self._symbol_info_cache_max_size = 200
self._last_request_time = {} # 记录每个API端点的最后请求时间 self._last_request_time = {} # 记录每个API端点的最后请求时间
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制 self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数 self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
@ -1405,6 +1406,10 @@ class BinanceClient:
await self.redis_cache.set(cache_key, info, ttl=3600) await self.redis_cache.set(cache_key, info, ttl=3600)
# Redis 写入成功则不占进程内存 # Redis 写入成功则不占进程内存
else: else:
if len(self._symbol_info_cache) >= self._symbol_info_cache_max_size and symbol not in self._symbol_info_cache:
_oldest = next(iter(self._symbol_info_cache), None)
if _oldest is not None:
self._symbol_info_cache.pop(_oldest, None)
self._symbol_info_cache[symbol] = info self._symbol_info_cache[symbol] = info
logger.debug(f"从 DB 缓存解析 {symbol} 交易对信息") logger.debug(f"从 DB 缓存解析 {symbol} 交易对信息")
return info return info
@ -1489,6 +1494,10 @@ class BinanceClient:
wrote = await self.redis_cache.set(cache_key, info, ttl=3600) wrote = await self.redis_cache.set(cache_key, info, ttl=3600)
if not wrote: if not wrote:
if len(self._symbol_info_cache) >= self._symbol_info_cache_max_size and symbol not in self._symbol_info_cache:
_oldest = next(iter(self._symbol_info_cache), None)
if _oldest is not None:
self._symbol_info_cache.pop(_oldest, None)
self._symbol_info_cache[symbol] = info self._symbol_info_cache[symbol] = info
logger.debug(f"获取 {symbol} 精度信息: {info}") logger.debug(f"获取 {symbol} 精度信息: {info}")
return info return info

View File

@ -2412,8 +2412,8 @@ class PositionManager:
binance_symbols = {p['symbol'] for p in binance_positions} binance_symbols = {p['symbol'] for p in binance_positions}
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})") logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
# 2. 获取数据库中状态为open的交易记录仅当前账号 # 2. 获取数据库中状态为open的交易记录仅当前账号,限制条数防内存暴增
db_open_trades = Trade.get_all(status='open', account_id=self.account_id) db_open_trades = Trade.get_all(status='open', account_id=self.account_id, limit=500)
db_open_symbols = {t['symbol'] for t in db_open_trades} db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else ''})") logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else ''})")

View File

@ -891,10 +891,13 @@ class RiskManager:
from database.models import Trade from database.models import Trade
# 查询最近N+1次已平仓的交易多查一次确保能判断是否连续 # 查询最近N+1次已平仓的交易多查一次确保能判断是否连续
# 限制 limit 避免单 symbol 拉取全表导致内存暴增2 CPU 4G 多账号场景)
need_count = max_consecutive + 10
recent_trades = Trade.get_all( recent_trades = Trade.get_all(
symbol=symbol, symbol=symbol,
status='closed', status='closed',
account_id=int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1) account_id=int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1),
limit=min(need_count, 200),
) )
# 按平仓时间倒序排序(最新的在前) # 按平仓时间倒序排序(最新的在前)