This commit is contained in:
薇薇安 2026-02-14 18:06:10 +08:00
parent 19371a8e60
commit 16cf4f2157
7 changed files with 127 additions and 155 deletions

View File

@ -648,98 +648,64 @@ async def fetch_live_positions_pnl(account_id: int):
pass
@router.get("/positions")
async def get_realtime_positions(account_id: int = Depends(get_account_id)):
"""获取实时持仓数据"""
async def fetch_realtime_positions(account_id: int):
"""
获取指定账号的币安实时持仓列表与仪表板/GET /positions 一致
每条持仓会尝试关联本账号下的 DB 记录开仓时间止损止盈entry_order_id
失败时返回 []不抛异常便于仪表板回退到 DB 列表
"""
client = None
try:
logger.info(f"get_realtime_positions: 请求的 account_id={account_id}")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
logger.info(f"get_realtime_positions: 获取到的 account_id={account_id}, status={status}, api_key exists={bool(api_key)}")
logger.info(f"get_realtime_positions: API Key 前4位={api_key[:4] if api_key and len(api_key) >= 4 else 'N/A'}, 后4位=...{api_key[-4:] if api_key and len(api_key) >= 4 else 'N/A'}")
logger.info(f"尝试获取实时持仓数据 (testnet={use_testnet}, account_id={account_id})")
if not api_key or not api_secret:
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(
status_code=400,
detail=error_msg
)
# 导入BinanceClient
logger.debug(f"fetch_realtime_positions(account_id={account_id}): 无 API 密钥,返回空列表")
return []
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
# 确保传递了正确的 api_key 和 api_secret避免 BinanceClient 从 config 读取
client = BinanceClient(
api_key=api_key, # 明确传递,避免从 config 读取
api_secret=api_secret, # 明确传递,避免从 config 读取
testnet=use_testnet
)
logger.info(f"BinanceClient 创建成功 (account_id={account_id})")
logger.info("连接币安API获取持仓...")
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
positions = await client.get_open_positions()
logger.info(f"获取到 {len(positions)} 个持仓")
# 并发获取所有持仓的挂单信息包括普通挂单和Algo挂单
open_orders_map = {}
try:
position_symbols = [p.get('symbol') for p in positions if float(p.get('positionAmt', 0)) != 0]
if position_symbols:
# 定义获取函数同时获取普通挂单和Algo挂单
async def fetch_both_orders(symbol):
try:
# 并发调用两个接口
t1 = client.get_open_orders(symbol)
t2 = client.futures_get_open_algo_orders(symbol, algo_type="CONDITIONAL")
res = await asyncio.gather(t1, t2, return_exceptions=True)
orders = []
# 1. 普通订单
if isinstance(res[0], list):
orders.extend(res[0])
else:
logger.warning(f"获取 {symbol} 普通挂单失败: {res[0]}")
# 2. Algo订单 (需要标准化为普通订单格式)
if isinstance(res[1], list):
for algo in res[1]:
# 标准化 Algo Order 结构
orders.append({
'orderId': algo.get('algoId'),
'type': algo.get('orderType'), # Algo订单使用 orderType
'type': algo.get('orderType'),
'side': algo.get('side'),
'stopPrice': algo.get('triggerPrice'), # Algo订单使用 triggerPrice
'price': 0, # 通常为0
'stopPrice': algo.get('triggerPrice'),
'price': 0,
'origType': algo.get('algoType'),
'reduceOnly': algo.get('reduceOnly'),
'status': 'NEW', # 列表中都是生效中的
'status': 'NEW',
'_is_algo': True
})
else:
logger.warning(f"获取 {symbol} Algo挂单失败: {res[1]}")
return orders
except Exception as e:
logger.error(f"获取 {symbol} 订单组合失败: {e}")
logger.debug(f"获取 {symbol} 订单失败: {e}")
return []
logger.info(f"正在获取挂单信息(包含Algo): {position_symbols}")
tasks = [fetch_both_orders(sym) for sym in position_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
for sym, orders in zip(position_symbols, results):
if isinstance(orders, list):
# 过滤出止盈止损单
conditional_orders = []
for o in orders:
o_type = o.get('type')
@ -756,50 +722,23 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
})
if conditional_orders:
open_orders_map[sym] = conditional_orders
else:
logger.warning(f"获取 {sym} 挂单失败: {orders}")
except Exception as e:
logger.error(f"批量获取挂单失败: {e}")
logger.debug(f"批量获取挂单失败: {e}")
# 格式化持仓数据
formatted_positions = []
for pos in positions:
position_amt = float(pos.get('positionAmt', 0))
if position_amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0))
mark_price = float(pos.get('markPrice', 0)) or entry_price
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
if mark_price == 0:
mark_price = entry_price
# === 名义/保证金口径说明(与币安展示更接近)===
# - 币安的名义价值/仓位价值通常随标记价(markPrice)变动
# - DB 中的 notional_usdt/margin_usdt 通常是“开仓时”写入,用于复盘/统计
# - 若发生部分止盈/减仓:币安 positionAmt 会变小,但 DB 里的 notional/margin 可能仍是“原始开仓量”
# → 会出现:数量=6.8,但名义/保证金像是 13.6 的两倍(与你反馈一致)
#
# 因此:实时持仓展示统一使用“当前数量×标记价”的实时名义/保证金,
# 并额外返回 original_* 字段保留 DB 开仓口径,避免混用导致误解。
# 兼容旧字段entry_value_usdt 仍保留(但它是按入场价计算的名义)
entry_value_usdt = abs(position_amt) * entry_price
leverage = float(pos.get('leverage', 1))
if leverage <= 0:
leverage = 1.0
# 当前持仓名义价值USDT按标记价
leverage = max(1.0, float(pos.get('leverage', 1)))
notional_usdt_live = abs(position_amt) * mark_price
# 当前持仓保证金USDT名义/杠杆
margin_usdt_live = notional_usdt_live / leverage
pnl_percent = 0
if margin_usdt_live > 0:
pnl_percent = (unrealized_pnl / margin_usdt_live) * 100
# 尝试从数据库获取开仓时间、止损止盈价格(以及交易规模字段)
pnl_percent = (unrealized_pnl / margin_usdt_live * 100) if margin_usdt_live > 0 else 0
entry_time = None
stop_loss_price = None
take_profit_price = None
@ -810,11 +749,11 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
db_notional_usdt = None
entry_order_id = None
entry_order_type = None
id = None
try:
from database.models import Trade
db_trades = Trade.get_by_symbol(pos.get('symbol'), status='open')
db_trades = Trade.get_by_symbol(pos.get('symbol'), status='open', account_id=account_id)
if db_trades:
# 找到匹配的交易记录(优先通过 entry_price 近似匹配;否则取最新一条 open 记录兜底)
matched = None
for db_trade in db_trades:
try:
@ -825,7 +764,6 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
continue
if matched is None:
matched = db_trades[0]
entry_time = matched.get('entry_time')
stop_loss_price = matched.get('stop_loss_price')
take_profit_price = matched.get('take_profit_price')
@ -835,10 +773,10 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
db_margin_usdt = matched.get('margin_usdt')
db_notional_usdt = matched.get('notional_usdt')
entry_order_id = matched.get('entry_order_id')
id = matched.get('id')
except Exception as e:
logger.debug(f"获取数据库信息失败: {e}")
# 如果数据库中有 entry_order_id尝试从币安查询订单类型LIMIT/MARKET
if entry_order_id:
try:
info = await client.client.futures_get_order(symbol=pos.get('symbol'), orderId=int(entry_order_id))
@ -846,48 +784,37 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
entry_order_type = info.get("type")
except Exception:
entry_order_type = None
# 如果没有从数据库获取到止损止盈价格,前端会自己计算
# 注意:数据库可能没有存储止损止盈价格,这是正常的
formatted_positions.append({
"id": id,
"symbol": pos.get('symbol'),
"side": "BUY" if position_amt > 0 else "SELL",
"quantity": abs(position_amt),
"entry_price": entry_price,
# 兼容旧字段entry_value_usdt 仍保留(前端已有使用)
"entry_value_usdt": entry_value_usdt,
# 实时展示字段:与币安更一致(按当前数量×标记价)
"notional_usdt": notional_usdt_live,
"margin_usdt": margin_usdt_live,
# 额外返回“开仓记录口径”(用于排查部分止盈/减仓导致的不一致)
"original_notional_usdt": db_notional_usdt,
"original_margin_usdt": db_margin_usdt,
"mark_price": mark_price,
"pnl": unrealized_pnl,
"pnl_percent": pnl_percent, # 基于保证金的盈亏百分比
"pnl_percent": pnl_percent,
"leverage": int(pos.get('leverage', 1)),
"entry_time": entry_time, # 开仓时间
"stop_loss_price": stop_loss_price, # 止损价格(如果可用)
"take_profit_price": take_profit_price, # 止盈价格(如果可用)
"entry_time": entry_time,
"stop_loss_price": stop_loss_price,
"take_profit_price": take_profit_price,
"take_profit_1": take_profit_1,
"take_profit_2": take_profit_2,
"atr": atr_value,
"entry_order_id": entry_order_id,
"entry_order_type": entry_order_type, # LIMIT / MARKET用于仪表板展示“限价/市价”)
"open_orders": open_orders_map.get(pos.get('symbol'), []), # 实时挂单信息
"entry_order_type": entry_order_type,
"open_orders": open_orders_map.get(pos.get('symbol'), []),
})
logger.info(f"格式化后 {len(formatted_positions)} 个有效持仓")
return formatted_positions
except HTTPException:
raise
except Exception as e:
error_msg = f"获取持仓数据失败: {str(e)}"
logger.error(error_msg, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
logger.warning(f"fetch_realtime_positions(account_id={account_id}) 失败: {e}", exc_info=True)
return []
finally:
# 确保断开连接(避免连接泄漏)
try:
if client is not None:
await client.disconnect()
@ -895,6 +822,18 @@ async def get_realtime_positions(account_id: int = Depends(get_account_id)):
pass
@router.get("/positions")
async def get_realtime_positions(account_id: int = Depends(get_account_id)):
"""获取实时持仓数据(币安实际持仓,并关联本账号 DB 记录)"""
api_key, api_secret, _, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id}")
result = await fetch_realtime_positions(account_id)
if result is None:
raise HTTPException(status_code=500, detail="获取持仓数据失败")
return result
@router.post("/positions/{symbol}/close")
async def close_position(symbol: str, account_id: int = Depends(get_account_id)):
"""手动平仓指定交易对的持仓"""

View File

@ -151,48 +151,47 @@ async def get_dashboard_data(account_id: int = Depends(get_account_id)):
"open_positions": 0
}
# 获取持仓数据(强制使用数据库记录)
# 获取持仓数据:优先「币安实时持仓」(含本系统下的挂单),失败时回退到数据库列表
open_trades = []
positions_error = None
try:
db_trades = Trade.get_all(status='open', account_id=account_id)
# 格式化数据库记录,添加 entry_value_usdt 字段
open_trades = []
for trade in db_trades:
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
leverage = float(trade.get('leverage', 1))
pnl = float(trade.get('pnl', 0))
# 数据库中的pnl_percent是价格涨跌幅需要转换为收益率
# 收益率 = 盈亏 / 保证金
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
formatted_trade = {
**trade,
'entry_value_usdt': entry_value_usdt,
'mark_price': trade.get('entry_price', 0), # 默认入场价,下面用实时数据覆盖
'pnl': pnl,
'pnl_percent': pnl_percent
}
open_trades.append(formatted_trade)
# 合并实时持仓盈亏mark_price / pnl / pnl_percent仪表板可显示浮盈浮亏
try:
from api.routes.account import fetch_live_positions_pnl
live_list = await fetch_live_positions_pnl(account_id)
by_symbol = {p["symbol"]: p for p in live_list}
for t in open_trades:
sym = t.get("symbol")
if sym and sym in by_symbol:
lp = by_symbol[sym]
t["mark_price"] = lp.get("mark_price", t.get("entry_price"))
t["pnl"] = lp.get("pnl", 0)
t["pnl_percent"] = lp.get("pnl_percent", 0)
if by_symbol:
logger.info(f"已合并 {len(by_symbol)} 个实时持仓盈亏到仪表板")
except Exception as merge_err:
logger.debug(f"合并实时持仓盈亏失败(仪表板仍显示持仓,盈亏为 0: {merge_err}")
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
from api.routes.account import fetch_realtime_positions
open_trades = await fetch_realtime_positions(account_id)
except Exception as fetch_err:
logger.warning(f"获取币安实时持仓失败,回退到数据库列表: {fetch_err}")
open_trades = []
if not open_trades:
db_trades = Trade.get_all(status='open', account_id=account_id)
for trade in db_trades:
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
leverage = float(trade.get('leverage', 1))
pnl = float(trade.get('pnl', 0))
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
open_trades.append({
**trade,
'entry_value_usdt': entry_value_usdt,
'mark_price': trade.get('entry_price', 0),
'pnl': pnl,
'pnl_percent': pnl_percent
})
try:
from api.routes.account import fetch_live_positions_pnl
live_list = await fetch_live_positions_pnl(account_id)
by_symbol = {p["symbol"]: p for p in live_list}
for t in open_trades:
sym = t.get("symbol")
if sym and sym in by_symbol:
lp = by_symbol[sym]
t["mark_price"] = lp.get("mark_price", t.get("entry_price"))
t["pnl"] = lp.get("pnl", 0)
t["pnl_percent"] = lp.get("pnl_percent", 0)
except Exception as merge_err:
logger.debug(f"合并实时持仓盈亏失败: {merge_err}")
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
else:
logger.info(f"使用币安实时持仓作为列表: {len(open_trades)} 个持仓")
except Exception as db_error:
logger.error(f"从数据库获取持仓记录失败: {db_error}")
@ -286,9 +285,12 @@ async def get_dashboard_data(account_id: int = Depends(get_account_id)):
except Exception as e:
logger.debug(f"获取交易配置失败: {e}")
# 本系统持仓数 = 数据库 status=open 条数,与下方「当前持仓」列表一致;币安持仓数 = 接口/快照中的 open_positions可能与币安页面一致
open_trades_count = len(open_trades)
result = {
"account": account_data,
"open_trades": open_trades,
"open_trades_count": open_trades_count, # 本系统持仓数,与列表条数一致
"recent_scans": recent_scans,
"recent_signals": recent_signals,
"position_stats": position_stats,
@ -296,7 +298,7 @@ async def get_dashboard_data(account_id: int = Depends(get_account_id)):
"_debug": { # 添加调试信息
"account_id": account_id,
"account_data_total_balance": account_data.get('total_balance', 'N/A') if account_data else 'N/A',
"open_trades_count": len(open_trades),
"open_trades_count": open_trades_count,
}
}

View File

@ -467,6 +467,12 @@
margin: 0;
}
.positions-subtitle {
margin: 0 0 10px 0;
font-size: 12px;
color: #6c757d;
}
.sltp-all-btn {
padding: 0.5rem 0.9rem;
background: #1f7aec;

View File

@ -408,7 +408,12 @@ const StatsDashboard = () => {
</div>
<div className="info-item">
<span className="label">持仓数量:</span>
<span className="value">{account.open_positions}</span>
<span className="value">
本系统 <strong>{openTrades.length}</strong>
{account.open_positions != null && Number(account.open_positions) !== openTrades.length && (
<> · 币安 <strong>{account.open_positions}</strong></>
)}
</span>
</div>
<div className="info-item">
<span className="label">持仓模式:</span>
@ -524,6 +529,11 @@ const StatsDashboard = () => {
</button>
</div>
</div>
{account?.open_positions != null && openTrades.length !== Number(account.open_positions) && (
<p className="positions-subtitle" title="仅展示本系统开仓并记录的持仓;币安上其它持仓(如手动开仓)不会出现在此列表">
仅本系统记录 <strong>{openTrades.length}</strong> 币安实际 <strong>{account.open_positions}</strong>
</p>
)}
<div className="entry-type-summary">
<span className="entry-type-badge limit">限价入场: {entryTypeCounts.limit}</span>
<span className="entry-type-badge market">市价入场: {entryTypeCounts.market}</span>

View File

@ -776,16 +776,18 @@ class BinanceClient:
self.client.futures_position_information(recvWindow=20000),
timeout=read_timeout
)
# 只保留真实持仓:非零且名义价值 >= 1 USDT避免灰尘持仓被当成“有仓”导致同步时批量创建假 manual_entry
min_notional = 1.0
# 只保留非零持仓,且名义价值 >= 配置阈值,避免灰尘持仓被当成“有仓”;与仪表板不一致时可调低 POSITION_MIN_NOTIONAL_USDT 或设为 0
min_notional = getattr(config, 'POSITION_MIN_NOTIONAL_USDT', 1.0)
open_positions = []
skipped_low = []
for pos in positions:
amt = float(pos['positionAmt'])
if amt == 0:
continue
entry_price = float(pos['entryPrice'])
notional = abs(amt) * entry_price
if notional < min_notional:
if min_notional > 0 and notional < min_notional:
skipped_low.append((pos['symbol'], round(notional, 4)))
continue
open_positions.append({
'symbol': pos['symbol'],
@ -795,6 +797,11 @@ class BinanceClient:
'unRealizedProfit': float(pos['unRealizedProfit']),
'leverage': int(pos['leverage'])
})
if skipped_low and logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"获取持仓: 过滤掉 {len(skipped_low)} 个名义价值 < {min_notional} USDT 的仓位 {skipped_low}"
"与仪表板不一致时可设 POSITION_MIN_NOTIONAL_USDT=0 或更小"
)
return open_positions
except (asyncio.TimeoutError, BinanceAPIException) as e:
last_error = e

View File

@ -391,6 +391,8 @@ CONNECTION_TIMEOUT = int(os.getenv('CONNECTION_TIMEOUT', '30')) # 连接超时
CONNECTION_RETRIES = int(os.getenv('CONNECTION_RETRIES', '3')) # 连接重试次数
# 仅用于 get_open_positions / get_recent_trades 等只读接口的单次等待时间,不影响下单/止损止盈的快速失败
READ_ONLY_REQUEST_TIMEOUT = int(os.getenv('READ_ONLY_REQUEST_TIMEOUT', '60'))
# 获取持仓时过滤掉名义价值低于此值的仓位USDT与币安仪表板不一致时可调低或设为 0
POSITION_MIN_NOTIONAL_USDT = float(os.getenv('POSITION_MIN_NOTIONAL_USDT', '1.0'))
# Redis 缓存配置(优先从数据库,回退到环境变量和默认值)
REDIS_URL = _get_config_value('REDIS_URL', os.getenv('REDIS_URL', 'redis://localhost:6379'))

View File

@ -2951,22 +2951,28 @@ class PositionManager:
logger.warning("客户端未初始化,无法启动实时监控")
return
# 获取当前所有持仓
# 获取当前所有持仓(与 sync 一致:仅本系统关心的持仓会进 active_positions
positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in positions}
active_symbols = set(self.active_positions.keys())
sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False)
logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else ''})")
# 为所有币安持仓启动监控即使不在active_positions中可能是手动开仓的
# 仅为本系统已有记录的持仓启动监控;若未开启「同步创建手动开仓记录」,则不为「仅币安有仓」创建临时记录或监控
only_binance = binance_symbols - active_symbols
if only_binance and not sync_create_manual:
logger.info(f"跳过 {len(only_binance)} 个仅币安持仓的监控SYNC_CREATE_MANUAL_ENTRY_RECORD=False: {', '.join(only_binance)}")
for position in positions:
symbol = position['symbol']
if symbol not in self._monitor_tasks:
# 如果不在active_positions中先创建记录
# 若不在 active_positions 且未开启「同步创建手动开仓记录」,不创建临时记录、不为其启动监控
if symbol not in self.active_positions:
if not sync_create_manual:
continue
logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...")
# 这里会通过sync_positions_with_binance来处理但先启动监控
try:
entry_price = position.get('entryPrice', 0)
position_amt = position['positionAmt']