fix(position_manager): 优化代码结构和日志记录

在持仓管理模块中,调整了代码缩进和结构,提升了可读性和一致性。同时,增强了日志记录,确保在保存交易记录时提供更清晰的信息。这一改动旨在提升系统的稳定性和可维护性,确保交易策略的有效性与安全性。
This commit is contained in:
薇薇安 2026-02-26 09:49:58 +08:00
parent beafeb2707
commit d80d4559c5

View File

@ -603,7 +603,7 @@ class PositionManager:
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
except Exception:
entry_order_id = None
break
break
else:
logger.info(f"{symbol} [智能入场] 限价超时,但偏离{drift_ratio*100:.2f}%>{max_drift_ratio*100:.2f}%,取消并放弃本次交易")
try:
@ -611,7 +611,7 @@ class PositionManager:
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
return None
# 震荡/不允许市价兜底:尝试追价(减小 offset -> 更靠近当前价),但不突破追价上限
try:
@ -626,7 +626,7 @@ class PositionManager:
if side == "BUY":
cap = initial_limit * (1 + max_drift_ratio)
desired = min(desired, cap)
else:
else:
cap = initial_limit * (1 - max_drift_ratio)
desired = max(desired, cap)
@ -851,30 +851,30 @@ class PositionManager:
logger.info(f"{symbol} 已完善 pending 记录 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
if trade_id is None:
# 无 pending 或未匹配到:走新建(兜底)
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id
# 如果 REST 已获取到 entry_order_id直接写入否则留空等待 WS 推送或后续同步补全
trade_id = Trade.create(
symbol=symbol,
side=side,
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason,
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id, # REST 已获取则直接写入
client_order_id=fallback_client_order_id,
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
entry_context=entry_context,
account_id=self.account_id,
)
)
if entry_order_id:
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
else:
logger.warning(f"{symbol} 交易记录已保存但 entry_order_id 为空 (ID: {trade_id}),等待 WS 推送或后续同步补全")
# 如果有 client_order_id尝试通过 REST 查询订单号补全
@ -1084,39 +1084,39 @@ class PositionManager:
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time()
exit_dt = get_beijing_time()
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = position_info.get('strategyType', 'trend_following')
db_update_retries = 3
for db_attempt in range(db_update_retries):
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
updated = True
break
except Exception as e:
err_msg = str(e).strip() or f"{type(e).__name__}"
if db_attempt < db_update_retries - 1:
wait_sec = 2
logger.warning(
f"{symbol} [平仓] 更新数据库失败 (第 {db_attempt + 1}/{db_update_retries} 次): {err_msg}"
f"{wait_sec}秒后重试"
db_update_retries = 3
for db_attempt in range(db_update_retries):
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
await asyncio.sleep(wait_sec)
else:
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {err_msg}")
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
updated = True
break
except Exception as e:
err_msg = str(e).strip() or f"{type(e).__name__}"
if db_attempt < db_update_retries - 1:
wait_sec = 2
logger.warning(
f"{symbol} [平仓] 更新数据库失败 (第 {db_attempt + 1}/{db_update_retries} 次): {err_msg}"
f"{wait_sec}秒后重试"
)
await asyncio.sleep(wait_sec)
else:
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {err_msg}")
# 清理本地记录
await self._stop_position_monitoring(symbol)
@ -1192,7 +1192,7 @@ class PositionManager:
del self.active_positions[symbol]
logger.info(f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} (原因: {reason})")
return True
else:
else:
# place_order 返回 None可能是 -2022ReduceOnly rejected等竞态场景
# 兜底再查一次实时持仓如果已经为0则当作“已平仓”处理避免刷屏与误判失败
try:
@ -1244,13 +1244,13 @@ class PositionManager:
except Exception:
amt0 = None
if amt0 is not None and abs(amt0) <= 0:
try:
await self._stop_position_monitoring(symbol)
try:
await self._stop_position_monitoring(symbol)
except Exception:
pass
try:
if symbol in self.active_positions:
del self.active_positions[symbol]
if symbol in self.active_positions:
del self.active_positions[symbol]
except Exception:
pass
logger.warning(f"{symbol} [平仓] 异常后检查币安持仓已为0已清理本地记录")
@ -2895,14 +2895,14 @@ class PositionManager:
f"{symbol} [状态同步] ❌ 获取当前价格时KeyError: {key_error}, "
f"ticker数据: {ticker if 'ticker' in locals() else 'N/A'}"
)
continue
continue
except Exception as ticker_error:
logger.warning(
f"{symbol} [状态同步] 获取当前价格失败: "
f"错误类型={type(ticker_error).__name__}, 错误消息={str(ticker_error)}"
f"将保留open状态等待下次同步"
)
continue
continue
# 计算盈亏确保所有值都是float类型避免Decimal类型问题
try:
@ -2928,12 +2928,12 @@ class PositionManager:
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
else:
# 降级方案:使用价格差计算
if trade.get('side') == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
if trade.get('side') == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
logger.debug(
f"{symbol} [状态同步] 盈亏计算: "
@ -3002,7 +3002,7 @@ class PositionManager:
# 计算持仓时间和亏损比例(用于特征判断)
entry_time = trade.get("entry_time")
duration_minutes = None
duration_minutes = None
if entry_time and exit_time_ts:
try:
duration_minutes = (exit_time_ts - int(entry_time)) / 60.0
@ -3140,8 +3140,8 @@ class PositionManager:
xt = int(exit_time_ts) if exit_time_ts is not None else int(get_beijing_time().timestamp())
if et is not None and xt >= et:
duration_minutes = int((xt - et) / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = 'trend_following' # 默认策略类型
@ -3461,29 +3461,29 @@ class PositionManager:
)
elif sync_create_manual:
# 为手动开仓的持仓创建数据库记录并启动监控(仅当显式开启且未走上面的「补建系统单」时)
for symbol in missing_in_db:
try:
# 获取币安持仓详情
binance_position = next(
(p for p in binance_positions if p['symbol'] == symbol),
None
)
if not binance_position:
continue
position_amt = binance_position['positionAmt']
entry_price = binance_position['entryPrice']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
for symbol in missing_in_db:
try:
# 获取币安持仓详情
binance_position = next(
(p for p in binance_positions if p['symbol'] == symbol),
None
)
if not binance_position:
continue
position_amt = binance_position['positionAmt']
entry_price = binance_position['entryPrice']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
notional = (float(entry_price) * float(quantity)) if entry_price and quantity else 0
if notional < 1.0:
logger.debug(f"{symbol} [状态同步] 跳过灰尘持仓 (名义 {notional:.4f} USDT < 1),不创建记录")
continue
logger.info(
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
logger.info(
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
# 尽量从币安成交取 entry_order_id 与真实开仓时间limit=100、空时重试一次
entry_order_id = None
entry_time_ts = None
@ -3510,105 +3510,100 @@ class PositionManager:
except Exception:
pass
# 创建数据库记录(显式传入 account_id、真实开仓时间
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=binance_position.get('leverage', 10),
entry_reason='manual_entry', # 标记为手动开仓
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=binance_position.get('leverage', 10),
entry_reason='manual_entry', # 标记为手动开仓
entry_order_id=entry_order_id,
notional_usdt=notional,
margin_usdt=(notional / float(binance_position.get('leverage', 10) or 10)) if float(binance_position.get('leverage', 10) or 0) > 0 else None,
account_id=self.account_id,
entry_time=entry_time_ts,
)
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
# 创建本地持仓记录(用于监控)
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# ---------- 手动开仓补建 SL/TP 顺序:先读交易所 → 决定 SL/TP → 再写 position_info ----------
# 本分支不调用 _ensure_exchange_sltp_orders仅写入 active_positions后续由监控在价格推送时可能调用。
# 若此处用 risk_manager 初始止损写 position_info而交易所已是保本后续同步会覆盖。故先读交易所已达保本则采用。
sl_from_ex, tp_from_ex = await self._get_sltp_from_exchange(symbol, side)
breakeven = self._breakeven_stop_price(entry_price, side, None)
use_exchange_sl = False
if sl_from_ex is not None:
if side == 'BUY' and sl_from_ex >= breakeven:
use_exchange_sl = True
elif side == 'SELL' and sl_from_ex <= breakeven:
use_exchange_sl = True
# 计算止损止盈(缺失时用 risk_manager 基于保证金)
leverage = binance_position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
# ⚠️ 关键修复:配置值格式转换(兼容百分比形式和比例形式)
)
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
# 创建本地持仓记录(用于监控)
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# ---------- 手动开仓补建 SL/TP 顺序:先读交易所 → 决定 SL/TP → 再写 position_info ----------
sl_from_ex, tp_from_ex = await self._get_sltp_from_exchange(symbol, side)
breakeven = self._breakeven_stop_price(entry_price, side, None)
use_exchange_sl = False
if sl_from_ex is not None:
if side == 'BUY' and sl_from_ex >= breakeven:
use_exchange_sl = True
elif side == 'SELL' and sl_from_ex <= breakeven:
use_exchange_sl = True
# 计算止损止盈(缺失时用 risk_manager 基于保证金)
leverage = binance_position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
if stop_loss_pct_margin is not None and stop_loss_pct_margin > 1:
stop_loss_pct_margin = stop_loss_pct_margin / 100.0
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
# ⚠️ 关键修复:配置值格式转换(兼容百分比形式和比例形式)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
if take_profit_pct_margin is not None and take_profit_pct_margin > 1:
take_profit_pct_margin = take_profit_pct_margin / 100.0
# 如果配置中没有设置止盈则使用止损的2倍作为默认
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
if use_exchange_sl:
stop_loss_price = sl_from_ex
initial_stop_loss = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
logger.info(f" {symbol} [补建-手动] 使用交易所已有止损(保本/移动sl={stop_loss_price},不覆盖为初始止损 {initial_stop_loss}")
else:
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
initial_stop_loss = stop_loss_price
if tp_from_ex is not None:
take_profit_price = tp_from_ex
else:
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0, # 手动开仓,无法计算涨跌幅
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
if use_exchange_sl:
stop_loss_price = sl_from_ex
initial_stop_loss = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
logger.info(f" {symbol} [补建-手动] 使用交易所已有止损(保本/移动sl={stop_loss_price},不覆盖为初始止损 {initial_stop_loss}")
else:
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
initial_stop_loss = stop_loss_price
if tp_from_ex is not None:
take_profit_price = tp_from_ex
else:
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0,
'orderId': entry_order_id,
'tradeId': trade_id,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': initial_stop_loss,
'leverage': leverage,
'entryReason': 'manual_entry',
'entryTime': entry_time_ts if entry_time_ts is not None else get_beijing_time(), # 真实开仓时间(来自币安成交/订单)
'atr': None,
'maxProfit': 0.0,
'tradeId': trade_id,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': initial_stop_loss,
'leverage': leverage,
'entryReason': 'manual_entry',
'entryTime': entry_time_ts if entry_time_ts is not None else get_beijing_time(),
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False,
'breakevenStopSet': use_exchange_sl
}
self.active_positions[symbol] = position_info
# 启动WebSocket监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
}
self.active_positions[symbol] = position_info
# 启动WebSocket监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
logger.info(f"[账号{self.account_id}] {symbol} [状态同步] ✓ 已启动实时监控")
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
# 6. 同步挂单信息 (STOP_MARKET / TAKE_PROFIT_MARKET)
if self.active_positions:
@ -3732,24 +3727,24 @@ class PositionManager:
stop_loss_price = None
take_profit_price = None
if stop_loss_price is None or take_profit_price is None:
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
if stop_loss_pct_margin is not None and stop_loss_pct_margin > 1:
stop_loss_pct_margin = stop_loss_pct_margin / 100.0
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
if take_profit_pct_margin is not None and take_profit_pct_margin > 1:
take_profit_pct_margin = take_profit_pct_margin / 100.0
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = (stop_loss_pct_margin or 0.05) * 2.0
if stop_loss_price is None:
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin or 0.05
)
)
if take_profit_price is None:
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
entry_reason = 'manual_entry_temp' if sync_create_manual else 'sync_recovered'
position_info = {
@ -3801,7 +3796,7 @@ class PositionManager:
except Exception:
mp = None
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=mp or current_price)
except Exception as e:
except Exception as e:
logger.warning(f"{symbol} 补挂币安止盈止损失败(不影响监控): {e}")
# 重启后立即按当前价做一次保本/移动止损检查并同步,不依赖首条 WS 推送
try:
@ -4144,38 +4139,38 @@ class PositionManager:
protect_amount = max(margin * trailing_protect, self._min_protect_amount_for_fees(margin, leverage))
if position_info['side'] == 'BUY':
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
new_stop_loss = max(new_stop_loss, self._breakeven_stop_price(entry_price, 'BUY'))
new_stop_loss = max(new_stop_loss, self._breakeven_stop_price(entry_price, 'BUY'))
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"[账号{self.account_id}] {symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"[账号{self.account_id}] {symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
try:
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=current_price_float)
logger.info(f"[账号{self.account_id}] {symbol} [实时监控] 已同步移动止损至交易所")
except Exception as sync_e:
logger.warning(
f"[账号{self.account_id}] {symbol} [实时监控] 同步移动止损至交易所失败: {type(sync_e).__name__}: {sync_e}",
exc_info=False,
)
try:
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=current_price_float)
logger.info(f"[账号{self.account_id}] {symbol} [实时监控] 已同步移动止损至交易所")
except Exception as sync_e:
logger.warning(
f"[账号{self.account_id}] {symbol} [实时监控] 同步移动止损至交易所失败: {type(sync_e).__name__}: {sync_e}",
exc_info=False,
)
else: # SELL
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
new_stop_loss = min(new_stop_loss, self._breakeven_stop_price(entry_price, 'SELL'))
if new_stop_loss > position_info['stopLoss'] and pnl_amount > 0:
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
new_stop_loss = min(new_stop_loss, self._breakeven_stop_price(entry_price, 'SELL'))
if new_stop_loss > position_info['stopLoss'] and pnl_amount > 0:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"[账号{self.account_id}] {symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"[账号{self.account_id}] {symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
try:
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=current_price_float)
logger.info(f"[账号{self.account_id}] {symbol} [实时监控] 已同步移动止损至交易所")
except Exception as sync_e:
logger.warning(
f"[账号{self.account_id}] {symbol} [实时监控] 同步移动止损至交易所失败: {type(sync_e).__name__}: {sync_e}",
exc_info=False,
)
try:
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=current_price_float)
logger.info(f"[账号{self.account_id}] {symbol} [实时监控] 已同步移动止损至交易所")
except Exception as sync_e:
logger.warning(
f"[账号{self.account_id}] {symbol} [实时监控] 同步移动止损至交易所失败: {type(sync_e).__name__}: {sync_e}",
exc_info=False,
)
# 检查止损(基于保证金收益比)
# ⚠️ 重要:止损检查应该在时间锁之前,止损必须立即执行