auto_trade_sys/backend/api/routes/account.py
薇薇安 cbba86001a feat(spot_order): 添加现货下单API与前端支持
在后端API中新增现货下单功能,支持市价单和限价单的创建,并提供相应的错误处理机制。前端组件更新以支持现货下单的快速操作,允许用户选择现货市场并设置默认下单金额。此改动提升了用户体验,增强了交易系统的功能性与灵活性。
2026-02-25 08:53:39 +08:00

2071 lines
99 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
账户实时数据API - 从币安API获取实时账户和订单数据
"""
from fastapi import APIRouter, HTTPException, Header, Depends
from fastapi import Query
import asyncio
import sys
from pathlib import Path
import logging
import time
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
sys.path.insert(0, str(project_root / 'trading_system'))
from database.models import TradingConfig, Account
from api.auth_deps import get_account_id
logger = logging.getLogger(__name__)
router = APIRouter()
async def _ensure_exchange_sltp_for_symbol(symbol: str, account_id: int = 1):
"""
在币安侧补挂该 symbol 的止损/止盈保护单STOP_MARKET + TAKE_PROFIT_MARKET
该接口用于“手动补挂”,不依赖 trading_system 的监控任务。
"""
# 从 accounts 表读取账号私有API密钥
account_id_int = int(account_id or 1)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id_int)
if (not api_key or not api_secret) and status == "active":
logger.error(f"[account_id={account_id_int}] API密钥未配置")
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id_int}")
# 导入交易系统的BinanceClient复用其精度/持仓模式处理)
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
# 1) 获取当前持仓(需要知道方向)
raw_positions = await client.client.futures_position_information(symbol=symbol)
nonzero = []
for p in raw_positions or []:
try:
amt = float(p.get("positionAmt", 0) or 0)
if amt != 0:
nonzero.append((amt, p))
except Exception:
continue
if not nonzero:
raise HTTPException(status_code=400, detail=f"{symbol} 当前无持仓,无法补挂止盈止损")
# 2) 获取持仓模式
dual_side = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side = bool(mode_res.get("dualSidePosition"))
except Exception:
dual_side = None
# 3) 取净持仓(单向)或第一条非零腿(对冲/兜底)
amt, p0 = nonzero[0]
net_amt = sum([a for a, _ in nonzero])
if dual_side is False:
amt = net_amt
side = "BUY" if amt > 0 else "SELL"
mark_price = None
try:
mark_price = float(p0.get("markPrice", 0) or 0) or None
except Exception:
mark_price = None
# 4) 从数据库 open trade 取止损/止盈价(优先取最近一条)
from database.models import Trade
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id) or []
if not open_trades:
raise HTTPException(status_code=400, detail=f"{symbol} 数据库无 open 交易记录,无法确定止损止盈价")
# 尽量取最新一条 open trade避免同一symbol多条 open 时取到旧记录)
try:
open_trades.sort(key=lambda x: int(x.get("id", 0) or 0), reverse=True)
except Exception:
pass
trade = open_trades[0]
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_2") or trade.get("take_profit_price") or trade.get("take_profit_1")
try:
sl = float(sl) if sl is not None else None
except Exception:
sl = None
try:
tp = float(tp) if tp is not None else None
except Exception:
tp = None
# 兼容旧数据库:如果 trades 表还没迁移 stop_loss_price / take_profit_price 字段,
# 则回退用 entry_price/quantity/leverage + 配置的 STOP_LOSS_PERCENT/TAKE_PROFIT_PERCENT 计算。
if not sl or not tp:
try:
entry_price = float(trade.get("entry_price") or 0)
qty = float(trade.get("quantity") or 0)
lv = float(trade.get("leverage") or 0) or float(p0.get("leverage") or 0) or 10.0
if entry_price <= 0 or qty <= 0 or lv <= 0:
raise ValueError("entry_price/quantity/leverage invalid")
def _ratio(v, default):
try:
x = float(v)
# 兼容:若误存成 5表示5%),则转为 0.05
if x > 1:
x = x / 100.0
if x < 0:
x = default
return x
except Exception:
return default
sl_pct = _ratio(TradingConfig.get_value("STOP_LOSS_PERCENT", 0.05), 0.05)
tp_pct = _ratio(TradingConfig.get_value("TAKE_PROFIT_PERCENT", 0.15), 0.15)
notional = entry_price * qty
margin = notional / lv
sl_amount = margin * sl_pct
tp_amount = margin * tp_pct
if side == "BUY":
sl = entry_price - (sl_amount / qty)
tp = entry_price + (tp_amount / qty)
else:
sl = entry_price + (sl_amount / qty)
tp = entry_price - (tp_amount / qty)
if not sl or not tp or sl <= 0 or tp <= 0:
raise ValueError("computed sl/tp invalid")
except Exception:
raise HTTPException(status_code=400, detail=f"{symbol} 数据库缺少止损/止盈价,且无法回退计算,无法补挂")
# 5) 取消旧的保护单Algo 条件单),避免重复
try:
await client.cancel_open_algo_orders_by_order_types(
symbol, {"STOP_MARKET", "TAKE_PROFIT_MARKET", "TRAILING_STOP_MARKET"}
)
except Exception:
pass
# 6) 下保护单closePosition=True
symbol_info = await client.get_symbol_info(symbol)
# 使用 trading_system/binance_client 的格式化方法(如果不存在则回退简单格式)
try:
fmt_price = BinanceClient._format_price_str_with_rounding # type: ignore[attr-defined]
except Exception:
fmt_price = None
def _fmt(price: float, rounding_mode: str) -> str:
if fmt_price:
return fmt_price(price, symbol_info, rounding_mode) # type: ignore[misc]
return str(round(float(price), int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8))
# 触发价避免“立即触发”
cp = float(mark_price) if mark_price else None
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
min_step = tick if tick and tick > 0 else (10 ** (-pp) if pp and pp > 0 else 1e-8)
sl_price = float(sl)
tp_price = float(tp)
if cp and cp > 0:
# stop
if side == "BUY" and sl_price >= cp:
sl_price = max(0.0, cp - min_step)
if side == "SELL" and sl_price <= cp:
sl_price = cp + min_step
# tp
if side == "BUY" and tp_price <= cp:
tp_price = cp + min_step
if side == "SELL" and tp_price >= cp:
tp_price = max(0.0, cp - min_step)
# rounding止损 long 用 UPshort 用 DOWN止盈 long 用 DOWNshort 用 UP
sl_round = "UP" if side == "BUY" else "DOWN"
tp_round = "DOWN" if side == "BUY" else "UP"
close_side = "SELL" if side == "BUY" else "BUY"
# Algo 条件单使用 triggerPrice不是 stopPrice
sl_params = {
"algoType": "CONDITIONAL",
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"triggerPrice": _fmt(sl_price, sl_round),
"closePosition": True,
"workingType": "MARK_PRICE",
}
tp_params = {
"algoType": "CONDITIONAL",
"symbol": symbol,
"side": close_side,
"type": "TAKE_PROFIT_MARKET",
"triggerPrice": _fmt(tp_price, tp_round),
"closePosition": True,
"workingType": "MARK_PRICE",
}
if dual_side is True:
sl_params["positionSide"] = "LONG" if side == "BUY" else "SHORT"
tp_params["positionSide"] = "LONG" if side == "BUY" else "SHORT"
sl_order = await client.futures_create_algo_order(sl_params)
tp_order = await client.futures_create_algo_order(tp_params)
# 再查一次未成交委托,确认是否真的挂上(并用于前端展示/排查)
open_orders = []
try:
oo = await client.futures_get_open_algo_orders(symbol=symbol, algo_type="CONDITIONAL")
if isinstance(oo, list):
for o in oo:
try:
if not isinstance(o, dict):
continue
otype2 = str(o.get("orderType") or o.get("type") or "").upper()
if otype2 in {"STOP_MARKET", "TAKE_PROFIT_MARKET", "TRAILING_STOP_MARKET", "STOP", "TAKE_PROFIT"}:
open_orders.append(
{
"algoId": o.get("algoId"),
"orderType": otype2,
"side": o.get("side"),
"triggerPrice": o.get("triggerPrice"),
"workingType": o.get("workingType"),
"positionSide": o.get("positionSide"),
"closePosition": o.get("closePosition"),
"algoStatus": o.get("algoStatus"),
"updateTime": o.get("updateTime"),
}
)
except Exception:
continue
except Exception:
open_orders = []
return {
"symbol": symbol,
"position_side": side,
"dual_side_position": dual_side,
"stop_loss_price": sl_price,
"take_profit_price": tp_price,
"orders": {
"stop_market": sl_order,
"take_profit_market": tp_order,
},
"open_protection_orders": open_orders,
"ui_hint": "在币安【U本位合约】里这类 STOP/TP 通常显示在【条件单/止盈止损/计划委托】而不一定在普通【当前委托(限价)】列表。",
}
finally:
await client.disconnect()
@router.post("/positions/{symbol}/sltp/ensure")
async def ensure_position_sltp(symbol: str, account_id: int = Depends(get_account_id)):
"""
手动补挂该 symbol 的止盈止损保护单(币安侧可见)。
"""
try:
return await _ensure_exchange_sltp_for_symbol(symbol, account_id=int(account_id))
except HTTPException:
raise
except Exception as e:
msg = str(e) or repr(e) or "unknown error"
logger.error(f"{symbol} 补挂止盈止损失败: {msg}", exc_info=True)
raise HTTPException(status_code=500, detail=f"补挂止盈止损失败: {msg}")
@router.post("/positions/sltp/ensure-all")
async def ensure_all_positions_sltp(
limit: int = Query(50, ge=1, le=200, description="最多处理多少个持仓symbol"),
account_id: int = Depends(get_account_id),
):
"""
批量补挂当前所有持仓的止盈止损保护单。
"""
# 先拿当前持仓symbol列表
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
logger.error(f"[account_id={account_id}] API密钥未配置")
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id}")
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
positions = await client.get_open_positions()
symbols = [p["symbol"] for p in (positions or []) if float(p.get("positionAmt", 0) or 0) != 0]
finally:
await client.disconnect()
symbols = symbols[: int(limit or 50)]
results = []
errors = []
for sym in symbols:
try:
res = await _ensure_exchange_sltp_for_symbol(sym, account_id=account_id)
results.append(
{
"symbol": sym,
"ok": True,
"orders": res.get("orders"),
"open_protection_orders": res.get("open_protection_orders"),
}
)
except HTTPException as he:
errors.append(
{
"symbol": sym,
"ok": False,
"status_code": getattr(he, "status_code", None),
"detail": getattr(he, "detail", None),
}
)
except Exception as e:
msg = str(e) or repr(e) or "unknown error"
errors.append({"symbol": sym, "ok": False, "error": msg})
return {
"total": len(symbols),
"ok": len([r for r in results if r.get("ok")]),
"failed": len(errors),
"results": results,
"errors": errors,
}
async def get_realtime_account_data(account_id: int = 1):
"""从币安API实时获取账户数据"""
logger.info("=" * 60)
logger.info("开始获取实时账户数据")
logger.info("=" * 60)
try:
# 从 accounts 表读取账号私有API密钥
logger.info(f"步骤1: 从accounts读取API配置... (account_id={account_id})")
logger.info(f" - 请求的 account_id: {account_id}")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
logger.info(f" - 获取到的 account_id 状态: {status}")
logger.info(f" - API密钥存在: {bool(api_key)}")
if api_key:
logger.info(f" - API密钥长度: {len(api_key)} 字符")
else:
logger.warning(" - API密钥为空!")
logger.info(f" - API密钥Secret存在: {bool(api_secret)}")
if api_secret:
logger.info(f" - API密钥Secret长度: {len(api_secret)} 字符")
else:
logger.warning(" - API密钥Secret为空!")
logger.info(f" - 使用测试网: {use_testnet}")
if not api_key or not api_secret:
error_msg = f"API密钥未配置account_id={account_id}请在配置界面设置该账号的BINANCE_API_KEY和BINANCE_API_SECRET"
logger.error(f"[account_id={account_id}] API密钥未配置")
logger.error(f"{error_msg}")
raise HTTPException(
status_code=400,
detail=error_msg
)
# 导入交易系统的BinanceClient
logger.info("步骤2: 导入BinanceClient...")
try:
from binance_client import BinanceClient
logger.info(" ✓ 从当前路径导入BinanceClient成功")
except ImportError as e:
logger.warning(f" - 从当前路径导入失败: {e}")
# 如果直接导入失败尝试从trading_system导入
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
logger.info(f" - 添加路径到sys.path: {trading_system_path}")
try:
from binance_client import BinanceClient
logger.info(" ✓ 从trading_system路径导入BinanceClient成功")
except ImportError as e2:
logger.error(f" ✗ 导入BinanceClient失败: {e2}")
raise
# 创建客户端
logger.info("步骤3: 创建BinanceClient实例...")
logger.info(f" - 使用的 account_id: {account_id}")
logger.info(f" - API Key 前4位: {api_key[:4] if api_key and len(api_key) >= 4 else 'N/A'}...")
logger.info(f" - API Key 后4位: ...{api_key[-4:] if api_key and len(api_key) >= 4 else 'N/A'}")
logger.info(f" - API Secret 前4位: {api_secret[:4] if api_secret and len(api_secret) >= 4 else 'N/A'}...")
logger.info(f" - API Secret 后4位: ...{api_secret[-4:] if api_secret and len(api_secret) >= 4 else 'N/A'}")
logger.info(f" - testnet: {use_testnet}")
# 确保传递了正确的 api_key 和 api_secret避免 BinanceClient 从 config 读取
if not api_key or not api_secret:
error_msg = f"API密钥为空 (account_id={account_id})无法创建BinanceClient"
logger.error(f"{error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
client = BinanceClient(
api_key=api_key, # 明确传递,避免从 config 读取
api_secret=api_secret, # 明确传递,避免从 config 读取
testnet=use_testnet
)
logger.info(f" ✓ 客户端创建成功 (testnet={use_testnet}, account_id={account_id})")
# 连接币安API
logger.info("步骤4: 连接币安API...")
try:
await client.connect()
logger.info(" ✓ 币安API连接成功")
except Exception as e:
logger.error(f" ✗ 币安API连接失败: {e}", exc_info=True)
raise
# 读取持仓模式(单向/对冲),用于前端“重点说明/自检”
dual_side_position = None
position_mode = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side_position = bool(mode_res.get("dualSidePosition"))
position_mode = "hedge" if dual_side_position else "one_way"
except Exception as e:
logger.warning(f"读取持仓模式失败(将显示为未知): {e}")
# 获取账户余额
logger.info("步骤5: 获取账户余额...")
try:
balance = await client.get_account_balance()
logger.info(" ✓ 账户余额获取成功")
logger.info(f" - 返回数据类型: {type(balance)}")
logger.info(f" - 返回数据内容: {balance}")
if balance:
logger.info(f" - 总余额: {balance.get('total', 'N/A')} USDT")
logger.info(f" - 可用余额: {balance.get('available', 'N/A')} USDT")
logger.info(f" - 保证金: {balance.get('margin', 'N/A')} USDT")
if balance.get('total', 0) == 0:
logger.warning(" ⚠ 账户余额为0可能是API权限问题或账户确实无余额")
else:
logger.warning(" ⚠ 返回的余额数据为空")
except Exception as e:
logger.error(f" ✗ 获取账户余额失败: {e}", exc_info=True)
raise
# 获取持仓
logger.info("步骤6: 获取持仓信息...")
try:
positions = await client.get_open_positions()
logger.info(" ✓ 持仓信息获取成功")
logger.info(f" - 返回数据类型: {type(positions)}")
logger.info(f" - 持仓数量: {len(positions)}")
if positions:
logger.info(" - 持仓详情:")
for i, pos in enumerate(positions[:5], 1): # 只显示前5个
logger.info(f" {i}. {pos.get('symbol', 'N/A')}: "
f"数量={pos.get('positionAmt', 0)}, "
f"入场价={pos.get('entryPrice', 0)}, "
f"盈亏={pos.get('unRealizedProfit', 0)}")
if len(positions) > 5:
logger.info(f" ... 还有 {len(positions) - 5} 个持仓")
else:
logger.info(" - 当前无持仓")
except Exception as e:
logger.error(f" ✗ 获取持仓信息失败: {e}", exc_info=True)
raise
# 计算总仓位价值和总盈亏
logger.info("步骤7: 计算仓位统计...")
# total_position_value历史上这里代表“名义仓位价值(notional)”(按标记价)
total_position_value = 0
# total_margin_value更贴近风控配置语义保证金占用
total_margin_value = 0
total_pnl = 0
open_positions_count = 0
for pos in positions:
position_amt = float(pos.get('positionAmt', 0))
if position_amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0))
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
if mark_price == 0:
# 如果没有标记价格,使用入场价
mark_price = entry_price
position_value = abs(position_amt * mark_price)
total_position_value += position_value
# 保证金占用(粗略口径):名义/杠杆(币安页面的展示会更复杂,但这个口径与 MAX_TOTAL_POSITION_PERCENT 对齐)
try:
lv = float(pos.get('leverage', 0) or 0)
if lv <= 0:
lv = 1.0
except Exception:
lv = 1.0
total_margin_value += (position_value / lv)
total_pnl += unrealized_pnl
open_positions_count += 1
logger.debug(f" - {pos.get('symbol')}: 价值={position_value:.2f}, 盈亏={unrealized_pnl:.2f}")
logger.info(" ✓ 仓位统计计算完成")
logger.info(f" - 总名义仓位: {total_position_value:.2f} USDT")
logger.info(f" - 总保证金占用(估算): {total_margin_value:.2f} USDT")
logger.info(f" - 总盈亏: {total_pnl:.2f} USDT")
logger.info(f" - 持仓数量: {open_positions_count}")
# 断开连接
logger.info("步骤8: 断开币安API连接...")
try:
await client.disconnect()
logger.info(" ✓ 连接已断开")
except Exception as e:
logger.warning(f" ⚠ 断开连接时出错: {e}")
# 构建返回结果
# 注意:币安合约账户的余额字段(根据官方文档):
# - walletBalance: 钱包余额(不包括未实现盈亏,只反映已实现的盈亏、转账、手续费等)
# - marginBalance: 保证金余额(钱包余额 + 未实现盈亏),这是账户的总权益,用户看到的"总余额"
# - availableBalance: 可用余额(可用于开仓的余额)
# 这里使用 marginBalance 作为 total_balance因为这才是用户看到的"总余额"(包括未实现盈亏)
wallet_balance = balance.get('walletBalance') if balance and 'walletBalance' in balance else balance.get('total', 0) if balance else 0
available_balance = balance.get('availableBalance') if balance and 'availableBalance' in balance else balance.get('available', 0) if balance else 0
margin_balance = balance.get('marginBalance') if balance and 'marginBalance' in balance else balance.get('margin', 0) if balance else 0
unrealized_profit = balance.get('unrealizedProfit', 0) if balance else 0
# 如果没有 marginBalance尝试从 total 字段获取(向后兼容)
if margin_balance == 0 and balance and 'total' in balance:
margin_balance = balance.get('total', 0)
logger.info(f"构建返回结果:")
logger.info(f" - wallet_balance (钱包余额,不包括未实现盈亏): {wallet_balance}")
logger.info(f" - margin_balance (保证金余额,总权益,包括未实现盈亏): {margin_balance}")
logger.info(f" - available_balance (可用余额): {available_balance}")
logger.info(f" - unrealized_profit (未实现盈亏): {unrealized_profit}")
result = {
"total_balance": margin_balance, # 使用保证金余额作为总余额(包括未实现盈亏),这是用户看到的"总余额"
"available_balance": available_balance,
"margin_balance": margin_balance, # 添加保证金余额字段
"wallet_balance": wallet_balance, # 添加钱包余额字段(不包括未实现盈亏)
"unrealized_profit": unrealized_profit, # 添加未实现盈亏字段
# 名义仓位(按标记价汇总)
"total_position_value": total_position_value,
# 保证金占用(名义/杠杆汇总)
"total_margin_value": total_margin_value,
"total_pnl": total_pnl,
"open_positions": open_positions_count,
# 账户持仓模式(重点:建议使用 one_way
"position_mode": position_mode,
"dual_side_position": dual_side_position,
}
logger.info("=" * 60)
logger.info(f"账户数据获取成功! (account_id={account_id})")
logger.info(f"最终结果 - total_balance={result.get('total_balance', 'N/A')}, available_balance={result.get('available_balance', 'N/A')}, open_positions={result.get('open_positions', 'N/A')}")
logger.info("=" * 60)
return result
except HTTPException as e:
logger.error("=" * 60)
logger.error(f"HTTP异常: {e.status_code} - {e.detail}")
logger.error("=" * 60)
raise
except Exception as e:
error_msg = f"获取账户数据失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"异常类型: {type(e).__name__}")
logger.error(f"错误信息: {error_msg}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.get("/realtime")
async def get_realtime_account(account_id: int = Depends(get_account_id)):
"""获取实时账户数据"""
return await get_realtime_account_data(account_id=account_id)
async def fetch_live_positions_pnl(account_id: int):
"""
获取指定账号的实时持仓盈亏(仅 mark_price / pnl / pnl_percent供仪表板合并用。
失败时返回空列表,不抛异常。
"""
client = None
try:
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if not api_key or not api_secret:
return []
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
positions = await client.get_open_positions()
result = []
for pos in positions:
amt = float(pos.get('positionAmt', 0))
if amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0)) or entry_price
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
leverage = max(1, float(pos.get('leverage', 1)))
notional = abs(amt) * mark_price
margin = notional / leverage
pnl_percent = (unrealized_pnl / margin * 100) if margin > 0 else 0
result.append({
"symbol": pos.get("symbol"),
"mark_price": mark_price,
"pnl": unrealized_pnl,
"pnl_percent": pnl_percent,
})
return result
except Exception as e:
logger.debug(f"fetch_live_positions_pnl(account_id={account_id}) 失败: {e}")
return []
finally:
try:
if client is not None:
await client.disconnect()
except Exception:
pass
async def fetch_realtime_positions(account_id: int):
"""
获取指定账号的「币安实时持仓」列表(与仪表板/GET /positions 一致)。
每条持仓会尝试关联本账号下的 DB 记录开仓时间、止损止盈、entry_order_id 等)。
失败时返回 [],不抛异常,便于仪表板回退到 DB 列表。
"""
client = None
try:
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if not api_key or not api_secret:
logger.debug(f"fetch_realtime_positions(account_id={account_id}): 无 API 密钥,返回空列表")
return []
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
positions = await client.get_open_positions()
open_orders_map = {}
try:
position_symbols = [p.get('symbol') for p in positions if float(p.get('positionAmt', 0)) != 0]
if position_symbols:
async def fetch_both_orders(symbol):
try:
t1 = client.get_open_orders(symbol)
t2 = client.futures_get_open_algo_orders(symbol, algo_type="CONDITIONAL")
res = await asyncio.gather(t1, t2, return_exceptions=True)
orders = []
if isinstance(res[0], list):
orders.extend(res[0])
if isinstance(res[1], list):
for algo in res[1]:
orders.append({
'orderId': algo.get('algoId'),
'type': algo.get('orderType'),
'side': algo.get('side'),
'stopPrice': algo.get('triggerPrice'),
'price': 0,
'origType': algo.get('algoType'),
'reduceOnly': algo.get('reduceOnly'),
'status': 'NEW',
'_is_algo': True
})
return orders
except Exception as e:
logger.debug(f"获取 {symbol} 订单失败: {e}")
return []
tasks = [fetch_both_orders(sym) for sym in position_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
for sym, orders in zip(position_symbols, results):
if isinstance(orders, list):
conditional_orders = []
for o in orders:
o_type = o.get('type')
if o_type in ['STOP_MARKET', 'TAKE_PROFIT_MARKET', 'STOP', 'TAKE_PROFIT']:
conditional_orders.append({
'orderId': o.get('orderId'),
'type': o_type,
'side': o.get('side'),
'stopPrice': float(o.get('stopPrice', 0)),
'price': float(o.get('price', 0)),
'origType': o.get('origType'),
'reduceOnly': o.get('reduceOnly'),
'status': o.get('status')
})
if conditional_orders:
open_orders_map[sym] = conditional_orders
except Exception as e:
logger.debug(f"批量获取挂单失败: {e}")
formatted_positions = []
for pos in positions:
position_amt = float(pos.get('positionAmt', 0))
if position_amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0)) or entry_price
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
entry_value_usdt = abs(position_amt) * entry_price
leverage = max(1.0, float(pos.get('leverage', 1)))
notional_usdt_live = abs(position_amt) * mark_price
margin_usdt_live = notional_usdt_live / leverage
pnl_percent = (unrealized_pnl / margin_usdt_live * 100) if margin_usdt_live > 0 else 0
entry_time = None
created_at = None
stop_loss_price = None
take_profit_price = None
take_profit_1 = None
take_profit_2 = None
atr_value = None
db_margin_usdt = None
db_notional_usdt = None
entry_order_id = None
entry_order_type = None
id = None
try:
from database.models import Trade
db_trades = Trade.get_by_symbol(pos.get('symbol'), status='open', account_id=account_id)
if db_trades:
matched = None
for db_trade in db_trades:
try:
if abs(float(db_trade.get('entry_price', 0)) - entry_price) < 0.01:
matched = db_trade
break
except Exception:
continue
if matched is None:
matched = db_trades[0]
entry_time = matched.get('entry_time')
# 创建时间:兼容 DB 列名 created_at / create_at
created_at = matched.get('created_at') if matched.get('created_at') is not None else matched.get('create_at')
stop_loss_price = matched.get('stop_loss_price')
take_profit_price = matched.get('take_profit_price')
take_profit_1 = matched.get('take_profit_1')
take_profit_2 = matched.get('take_profit_2')
atr_value = matched.get('atr')
db_margin_usdt = matched.get('margin_usdt')
db_notional_usdt = matched.get('notional_usdt')
entry_order_id = matched.get('entry_order_id')
id = matched.get('id')
except Exception as e:
logger.debug(f"获取数据库信息失败: {e}")
if entry_order_id:
try:
info = await client.client.futures_get_order(symbol=pos.get('symbol'), orderId=int(entry_order_id))
if isinstance(info, dict):
entry_order_type = info.get("type")
except Exception:
entry_order_type = None
symbol_orders = open_orders_map.get(pos.get('symbol'), [])
# 若 DB 未提供止损/止盈,从交易所 open_orders 反填,避免持仓记录显示“无 SL/TP”而实际有挂单
if symbol_orders and (stop_loss_price is None or take_profit_price is None or take_profit_1 is None):
is_long = position_amt > 0
sl_prices = []
tp_prices = []
for o in symbol_orders:
if not o.get('reduceOnly'):
continue
t = o.get('type')
sp = o.get('stopPrice')
try:
sp = float(sp) if sp is not None else None
except (TypeError, ValueError):
sp = None
if sp is None:
continue
if t == 'STOP_MARKET':
sl_prices.append(sp)
elif t in ('TAKE_PROFIT_MARKET', 'TAKE_PROFIT'):
tp_prices.append(sp)
if sl_prices and stop_loss_price is None:
stop_loss_price = sl_prices[0]
if tp_prices:
tp_prices.sort(key=lambda p: abs(p - entry_price))
if take_profit_1 is None:
take_profit_1 = tp_prices[0]
if len(tp_prices) > 1 and take_profit_2 is None:
take_profit_2 = tp_prices[1]
if take_profit_price is None:
take_profit_price = take_profit_1
formatted_positions.append({
"id": id,
"symbol": pos.get('symbol'),
"side": "BUY" if position_amt > 0 else "SELL",
"quantity": abs(position_amt),
"entry_price": entry_price,
"entry_value_usdt": entry_value_usdt,
"notional_usdt": notional_usdt_live,
"margin_usdt": margin_usdt_live,
"original_notional_usdt": db_notional_usdt,
"original_margin_usdt": db_margin_usdt,
"mark_price": mark_price,
"pnl": unrealized_pnl,
"pnl_percent": pnl_percent,
"leverage": int(pos.get('leverage', 1)),
"entry_time": entry_time,
"created_at": created_at,
"stop_loss_price": stop_loss_price,
"take_profit_price": take_profit_price,
"take_profit_1": take_profit_1,
"take_profit_2": take_profit_2,
"atr": atr_value,
"entry_order_id": entry_order_id,
"entry_order_type": entry_order_type,
"open_orders": symbol_orders,
})
return formatted_positions
except Exception as e:
logger.warning(f"fetch_realtime_positions(account_id={account_id}) 失败: {e}", exc_info=True)
return []
finally:
try:
if client is not None:
await client.disconnect()
except Exception:
pass
@router.get("/positions")
async def get_realtime_positions(account_id: int = Depends(get_account_id)):
"""获取实时持仓数据(币安实际持仓,并关联本账号 DB 记录)"""
api_key, api_secret, _, _ = Account.get_credentials(account_id)
if not api_key or not api_secret:
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id}")
result = await fetch_realtime_positions(account_id)
if result is None:
raise HTTPException(status_code=500, detail="获取持仓数据失败")
return result
@router.post("/positions/{symbol}/close")
async def close_position(symbol: str, account_id: int = Depends(get_account_id)):
"""手动平仓指定交易对的持仓"""
try:
logger.info(f"=" * 60)
logger.info(f"收到平仓请求: {symbol}")
logger.info(f"=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
logger.info("✓ 成功导入交易系统模块")
except ImportError as import_error:
logger.warning(f"首次导入失败: {import_error}尝试从trading_system路径导入")
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
logger.info("✓ 从trading_system路径导入成功")
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 检查币安是否有持仓(使用原始 position_information确保能拿到 positionSide 以处理 -4061
logger.info(f"检查 {symbol} 在币安的持仓状态...")
# 读取持仓模式dualSidePosition=True => 对冲模式(必须传 positionSide=LONG/SHORT
dual_side = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side = bool(mode_res.get("dualSidePosition"))
except Exception as e:
logger.warning(f"读取持仓模式失败(将按单向模式兜底): {e}")
dual_side = None
raw_positions = await client.client.futures_position_information(symbol=symbol)
nonzero_positions = []
for p in raw_positions or []:
try:
amt = float(p.get("positionAmt", 0))
except Exception:
continue
if abs(amt) > 0:
nonzero_positions.append((amt, p))
# 兼容旧逻辑:如果原始接口异常,回退到封装方法
if not nonzero_positions:
try:
positions = await client.get_open_positions()
position = next((p for p in positions if p['symbol'] == symbol and float(p['positionAmt']) != 0), None)
if position:
nonzero_positions = [(float(position["positionAmt"]), {"positionAmt": position["positionAmt"]})]
except Exception:
nonzero_positions = []
if not nonzero_positions:
logger.warning(f"{symbol} 币安账户中没有持仓,可能已被平仓")
# 检查数据库中是否有未平仓的记录,如果有则更新(仅当前账号)
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id)
if open_trades:
trade = open_trades[0]
# 获取当前价格作为平仓价格
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(trade['entry_price'])
# 计算盈亏
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
# 更新数据库
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None
)
logger.info(f"✓ 已更新数据库记录(币安无持仓但数据库有记录)")
return {
"message": f"{symbol} 平仓操作完成(币安账户中没有持仓,可能已被平仓)",
"symbol": symbol,
"status": "closed"
}
# 获取交易对精度信息,调整数量精度(平仓不要向上补 minQty避免超过持仓数量
symbol_info = None
try:
symbol_info = await client.get_symbol_info(symbol)
except Exception:
symbol_info = None
def _adjust_close_qty(qty: float) -> float:
if qty is None:
return 0.0
q = float(qty)
if not symbol_info:
return q
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = float(symbol_info.get('stepSize', 0) or 0)
if step_size and step_size > 0:
# 向下取整,避免超过持仓
q = float(int(q / step_size)) * step_size
else:
q = round(q, quantity_precision)
q = round(q, quantity_precision)
return q
# 组装平仓订单(对冲模式可能同币种有 LONG/SHORT 两个仓位,这里一并平掉)
orders = []
order_ids = []
# 如果 dual_side 无法读取,按 raw_positions 是否包含 positionSide 来推断
if dual_side is None:
if any(isinstance(p, dict) and (p.get("positionSide") in ("LONG", "SHORT")) for _, p in nonzero_positions):
dual_side = True
else:
dual_side = False
logger.info(f"{symbol} 持仓模式: {'HEDGE(对冲)' if dual_side else 'ONE-WAY(单向)'}")
# 构造待平仓列表:[(positionSide, amt)]
to_close = []
if dual_side:
for amt, p in nonzero_positions:
ps = (p.get("positionSide") or "").upper()
if ps not in ("LONG", "SHORT"):
ps = "LONG" if amt > 0 else "SHORT"
to_close.append((ps, amt))
else:
# 单向模式只应存在一个净仓位;如果有多个,按合计处理
net_amt = sum([amt for amt, _ in nonzero_positions])
if abs(net_amt) > 0:
to_close.append(("BOTH", net_amt))
logger.info(f"✓ 币安账户中 {symbol} 待平仓: {to_close}")
for ps, amt in to_close:
side = 'SELL' if float(amt) > 0 else 'BUY'
quantity = abs(float(amt))
quantity = _adjust_close_qty(quantity)
if quantity <= 0:
logger.warning(f"{symbol} 平仓数量调整后为0跳过该仓位: positionSide={ps}, amt={amt}")
continue
order_params = {
"symbol": symbol,
"side": side,
"type": "MARKET",
"quantity": quantity,
}
# 对冲模式必须传 positionSide=LONG/SHORT并且某些账户会 -1106因此这里不再传 reduceOnly
if dual_side and ps in ("LONG", "SHORT"):
order_params["positionSide"] = ps
else:
# 单向模式用 reduceOnly 防止反向开仓
order_params["reduceOnly"] = True
logger.info(
f"开始执行平仓下单: {symbol} side={side} qty={quantity} "
f"positionSide={order_params.get('positionSide')} reduceOnly={order_params.get('reduceOnly')}"
)
try:
order = await client.client.futures_create_order(**order_params)
if not order:
raise RuntimeError("币安API返回 None")
orders.append(order)
oid = order.get("orderId")
if oid:
order_ids.append(oid)
except Exception as order_error:
error_msg = f"{symbol} 平仓失败:下单异常 - {str(order_error)}"
logger.error(error_msg)
logger.error(f" 错误类型: {type(order_error).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=error_msg)
if not orders:
raise HTTPException(status_code=400, detail=f"{symbol} 无可平仓的有效仓位数量调整后为0或无持仓")
logger.info(f"{symbol} 平仓订单已提交: {order_ids}")
# 等待订单成交,获取实际成交价格
import asyncio
await asyncio.sleep(1)
# 获取订单详情(可能多个订单,按订单号分别取价)
exit_prices = {}
exit_commissions = {}
exit_realized_pnls = {}
exit_commission_assets = {}
# 新增:获取最近成交记录以计算佣金和实际盈亏
try:
# 等待一小段时间确保成交记录已生成
await asyncio.sleep(1)
recent_trades = await client.get_recent_trades(symbol, limit=20)
except Exception as e:
logger.warning(f"获取最近成交记录失败: {e}")
recent_trades = []
for oid in order_ids:
try:
# 1. 获取价格
order_info = await client.client.futures_get_order(symbol=symbol, orderId=oid)
if order_info:
p = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0))
if p <= 0 and order_info.get('fills'):
total_qty = 0
total_value = 0
for fill in order_info.get('fills', []):
qty = float(fill.get('qty', 0))
price = float(fill.get('price', 0))
total_qty += qty
total_value += qty * price
if total_qty > 0:
p = total_value / total_qty
if p > 0:
exit_prices[oid] = p
# 2. 计算佣金和实际盈亏(从 recent_trades 匹配)
related_trades = [t for t in recent_trades if str(t.get('orderId')) == str(oid)]
if related_trades:
total_realized_pnl = 0.0
total_commission = 0.0
commission_assets = set()
for t in related_trades:
total_realized_pnl += float(t.get('realizedPnl', 0))
total_commission += float(t.get('commission', 0))
commission_assets.add(t.get('commissionAsset'))
exit_realized_pnls[oid] = total_realized_pnl
exit_commissions[oid] = total_commission
exit_commission_assets[oid] = "/".join(commission_assets) if commission_assets else None
except Exception as e:
logger.warning(f"获取订单详情失败 (orderId={oid}): {e}")
# 兜底:如果无法获取订单价格,使用当前价格
fallback_exit_price = None
try:
ticker = await client.get_ticker_24h(symbol)
fallback_exit_price = float(ticker['price']) if ticker else None
except Exception:
fallback_exit_price = None
# 更新数据库记录
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id)
if open_trades:
# 对冲模式可能有多条 tradeBUY/LONG 和 SELL/SHORT尽量按方向匹配订单更新
used_order_ids = set()
for trade in open_trades:
try:
entry_price = float(trade['entry_price'])
trade_quantity = float(trade['quantity'])
except Exception:
continue
# 选择一个未使用的 orderId如果只有一个就复用
chosen_oid = None
for oid in order_ids:
if oid not in used_order_ids:
chosen_oid = oid
break
if chosen_oid is None and order_ids:
chosen_oid = order_ids[0]
if chosen_oid:
used_order_ids.add(chosen_oid)
exit_price = exit_prices.get(chosen_oid) if chosen_oid else None
if not exit_price:
exit_price = fallback_exit_price or entry_price
# 计算盈亏(数据库侧依旧按名义盈亏;收益率展示用保证金口径在前端/统计里另算)
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * trade_quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * trade_quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=chosen_oid,
realized_pnl=exit_realized_pnls.get(chosen_oid),
commission=exit_commissions.get(chosen_oid),
commission_asset=exit_commission_assets.get(chosen_oid)
)
logger.info(f"✓ 已更新数据库记录 trade_id={trade['id']} order_id={chosen_oid} (盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)")
logger.info(f"{symbol} 平仓成功")
return {
"message": f"{symbol} 平仓成功",
"symbol": symbol,
"status": "closed"
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"平仓失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"平仓操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/positions/close-all")
async def close_all_positions(account_id: int = Depends(get_account_id)):
"""一键全平:平仓所有持仓"""
try:
logger.info("=" * 60)
logger.info("收到一键全平请求")
logger.info("=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
logger.info("✓ 成功导入交易系统模块")
except ImportError as import_error:
logger.warning(f"首次导入失败: {import_error}尝试从trading_system路径导入")
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
logger.info("✓ 从trading_system路径导入成功")
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 获取所有持仓
positions = await client.get_open_positions()
if not positions:
logger.info("当前没有持仓")
return {
"message": "当前没有持仓",
"closed": 0,
"failed": 0,
"results": []
}
logger.info(f"发现 {len(positions)} 个持仓,开始逐一平仓...")
results = []
closed_count = 0
failed_count = 0
for position in positions:
symbol = position.get('symbol')
position_amt = float(position.get('positionAmt', 0))
if abs(position_amt) <= 0:
continue
try:
logger.info(f"开始平仓 {symbol} (数量: {position_amt})...")
# 确定平仓方向
side = 'SELL' if position_amt > 0 else 'BUY'
# 使用市价单平仓
order = await client.place_order(
symbol=symbol,
side=side,
order_type='MARKET',
quantity=abs(position_amt),
reduce_only=True
)
if order and order.get('orderId'):
logger.info(f"{symbol} 平仓订单已提交: {order.get('orderId')}")
# 获取成交价格
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
if not exit_price:
# 如果订单中没有价格,获取当前价格
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else 0
# 更新数据库记录(仅当前账号)
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id)
for trade in open_trades:
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order.get('orderId')
)
logger.info(f"✓ 已更新数据库记录 trade_id={trade['id']} (盈亏: {pnl:.2f} USDT)")
closed_count += 1
results.append({
"symbol": symbol,
"status": "success",
"order_id": order.get('orderId'),
"message": f"{symbol} 平仓成功"
})
else:
logger.warning(f"{symbol} 平仓订单提交失败")
failed_count += 1
results.append({
"symbol": symbol,
"status": "failed",
"message": f"{symbol} 平仓失败: 订单未提交"
})
except Exception as e:
logger.error(f"{symbol} 平仓失败: {e}")
failed_count += 1
results.append({
"symbol": symbol,
"status": "failed",
"message": f"{symbol} 平仓失败: {str(e)}"
})
logger.info(f"一键全平完成: 成功 {closed_count} / 失败 {failed_count}")
return {
"message": f"一键全平完成: 成功 {closed_count} / 失败 {failed_count}",
"closed": closed_count,
"failed": failed_count,
"results": results
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"一键全平失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"一键全平操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/positions/{symbol}/open")
async def open_position_from_recommendation(
symbol: str,
entry_price: float = Query(..., description="入场价格"),
stop_loss_price: float = Query(..., description="止损价格"),
direction: str = Query(..., description="交易方向: BUY 或 SELL"),
notional_usdt: float = Query(..., description="下单名义价值USDT"),
leverage: int = Query(10, description="杠杆倍数"),
account_id: int = Depends(get_account_id)
):
"""根据推荐信息手动开仓"""
try:
logger.info("=" * 60)
logger.info(f"收到手动开仓请求: {symbol}")
logger.info(f" 入场价: {entry_price}, 止损价: {stop_loss_price}")
logger.info(f" 方向: {direction}, 名义价值: {notional_usdt} USDT, 杠杆: {leverage}x")
logger.info("=" * 60)
if direction not in ('BUY', 'SELL'):
raise HTTPException(status_code=400, detail="交易方向必须是 BUY 或 SELL")
if notional_usdt <= 0:
raise HTTPException(status_code=400, detail="下单名义价值必须大于0")
if entry_price <= 0 or stop_loss_price <= 0:
raise HTTPException(status_code=400, detail="入场价和止损价必须大于0")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 设置杠杆
await client.set_leverage(symbol, leverage)
logger.info(f"✓ 已设置杠杆: {leverage}x")
# 获取交易对信息
symbol_info = await client.get_symbol_info(symbol)
if not symbol_info:
raise HTTPException(status_code=400, detail=f"无法获取 {symbol} 的交易对信息")
# 计算下单数量:数量 = 名义价值 / 入场价
quantity = notional_usdt / entry_price
logger.info(f"计算下单数量: {quantity:.8f} (名义价值: {notional_usdt} USDT / 入场价: {entry_price})")
# 调整数量精度
adjusted_quantity = client._adjust_quantity_precision(quantity, symbol_info)
if adjusted_quantity <= 0:
raise HTTPException(status_code=400, detail=f"调整后的数量无效: {adjusted_quantity}")
logger.info(f"调整后的数量: {adjusted_quantity:.8f}")
# 检查最小名义价值
min_notional = symbol_info.get('minNotional', 5.0)
actual_notional = adjusted_quantity * entry_price
if actual_notional < min_notional:
raise HTTPException(
status_code=400,
detail=f"订单名义价值不足: {actual_notional:.2f} USDT < 最小要求: {min_notional:.2f} USDT"
)
# 下 limit 订单
logger.info(f"开始下 limit 订单: {symbol} {direction} {adjusted_quantity} @ {entry_price}")
order = await client.place_order(
symbol=symbol,
side=direction,
quantity=adjusted_quantity,
order_type='LIMIT',
price=entry_price,
reduce_only=False
)
if not order:
raise HTTPException(status_code=500, detail="下单失败币安API返回None")
order_id = order.get('orderId')
logger.info(f"✓ 订单已提交: orderId={order_id}")
# 等待订单成交最多等待30秒
import asyncio
filled_order = None
for i in range(30):
await asyncio.sleep(1)
try:
order_status = await client.client.futures_get_order(symbol=symbol, orderId=order_id)
if order_status.get('status') == 'FILLED':
filled_order = order_status
logger.info(f"✓ 订单已成交: orderId={order_id}")
break
elif order_status.get('status') in ('CANCELED', 'EXPIRED', 'REJECTED'):
raise HTTPException(status_code=400, detail=f"订单未成交,状态: {order_status.get('status')}")
except Exception as e:
if i == 29: # 最后一次尝试
logger.warning(f"订单状态查询失败或未成交: {e}")
continue
if not filled_order:
logger.warning(f"订单 {order_id} 在30秒内未成交但订单已提交")
return {
"message": f"{symbol} 订单已提交但未成交(请稍后检查)",
"symbol": symbol,
"order_id": order_id,
"status": "pending"
}
# 订单已成交,保存到数据库
avg_price = float(filled_order.get('avgPrice', entry_price))
executed_qty = float(filled_order.get('executedQty', adjusted_quantity))
# 计算实际使用的名义价值和保证金
actual_notional = executed_qty * avg_price
actual_margin = actual_notional / leverage
# 保存交易记录
# trade_id = Trade.create(
# account_id=account_id,
# symbol=symbol,
# side=direction,
# quantity=executed_qty,
# entry_price=avg_price,
# leverage=leverage,
# entry_order_id=order_id,
# entry_reason='manual_from_recommendation',
# notional_usdt=actual_notional,
# margin_usdt=actual_margin,
# stop_loss_price=stop_loss_price,
# # 如果有推荐中的止盈价,也可以传入,这里先不传
# )
# logger.info(f"✓ 交易记录已保存: trade_id={trade_id}")
# 尝试挂止损/止盈保护单(如果系统支持)
# try:
# # 这里可以调用 _ensure_exchange_sltp_for_symbol 来挂保护单
# # 但需要先获取持仓信息来确定方向
# positions = await client.get_open_positions()
# position = next((p for p in positions if p['symbol'] == symbol), None)
# if position:
# # 可以在这里挂止损单,但需要知道 take_profit_price
# # 暂时只记录止损价到数据库,由系统自动监控
# logger.info(f"止损价已记录到数据库: {stop_loss_price}")
# except Exception as e:
# logger.warning(f"挂保护单失败(不影响开仓): {e}")
return {
"message": f"{symbol} 开仓成功",
"symbol": symbol,
"order_id": order_id,
"trade_id": None,
"quantity": executed_qty,
"entry_price": avg_price,
"notional_usdt": actual_notional,
"margin_usdt": actual_margin,
"status": "filled"
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"开仓失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"开仓操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/spot/order")
async def place_spot_order(
symbol: str = Query(..., description="交易对,如 BTCUSDT"),
side: str = Query("BUY", description="BUY 或 SELL"),
quote_order_qty: float = Query(..., description="下单金额USDT市价单时使用"),
order_type: str = Query("MARKET", description="MARKET 或 LIMIT"),
price: float = Query(None, description="限价单价格LIMIT 时必填)"),
account_id: int = Depends(get_account_id),
):
"""现货一键下单(市价单或限价单)。仅支持当前账号 API 密钥对应的现货账户。"""
try:
if side not in ("BUY", "SELL"):
raise HTTPException(status_code=400, detail="side 必须是 BUY 或 SELL")
if quote_order_qty <= 0:
raise HTTPException(status_code=400, detail="下单金额必须大于 0")
if order_type.upper() == "LIMIT" and (price is None or price <= 0):
raise HTTPException(status_code=400, detail="限价单必须填写 price")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id}")
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / "trading_system"
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
# 现货使用底层 AsyncClient 的 create_orderspot API
if order_type.upper() == "MARKET":
order = await client.client.create_order(
symbol=symbol,
side=side,
type="MARKET",
quoteOrderQty=quote_order_qty,
)
else:
# LIMIT: 需要 quantity用 quote_order_qty / price 估算
qty = quote_order_qty / price
order = await client.client.create_order(
symbol=symbol,
side=side,
type="LIMIT",
timeInForce="GTC",
quantity=round(qty, 8),
price=price,
)
return {
"message": f"现货订单已提交: {symbol} {side}",
"symbol": symbol,
"order_id": order.get("orderId"),
"client_order_id": order.get("clientOrderId"),
"status": order.get("status"),
}
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.exception("现货下单失败: %s", e)
raise HTTPException(status_code=500, detail=f"现货下单失败: {str(e)}")
def _order_is_sltp(o: dict, type_key: str = "type") -> bool:
"""判断是否为止损/止盈类订单(含普通单与 Algo 条件单)"""
t = str(o.get(type_key) or o.get("orderType") or "").upper()
return t in ("STOP_MARKET", "TAKE_PROFIT_MARKET", "STOP", "TAKE_PROFIT")
@router.post("/positions/sync")
async def sync_positions(
account_id: int = Depends(get_account_id),
only_recover_when_has_sltp: bool = Query(True, description="仅当该持仓存在止损/止盈单时才补建记录(用于区分系统单,减少手动单误建)"),
):
"""同步币安实际持仓状态与数据库状态"""
try:
logger.info("=" * 60)
logger.info("收到持仓状态同步请求")
logger.info("=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
# 导入数据库模型
from database.models import Trade
# 创建客户端
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
logger.info("连接币安API...")
await client.connect()
try:
# 1. 获取币安实际持仓
binance_positions = await client.get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions if float(p.get('positionAmt', 0)) != 0}
logger.info(f"币安实际持仓: {len(binance_symbols)}")
if binance_symbols:
logger.info(f" 持仓列表: {', '.join(binance_symbols)}")
# 2. 获取数据库中状态为open的交易记录限制条数防内存暴增
db_open_trades = Trade.get_all(status='open', account_id=account_id, limit=500)
db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.info(f"数据库open状态: {len(db_open_symbols)}")
if db_open_symbols:
logger.info(f" 持仓列表: {', '.join(db_open_symbols)}")
# 3. 找出在数据库中open但在币安已不存在的持仓需要更新为closed
missing_in_binance = db_open_symbols - binance_symbols
updated_count = 0
if missing_in_binance:
logger.info(f"发现 {len(missing_in_binance)} 个持仓在数据库中是open但币安已不存在: {', '.join(missing_in_binance)}")
for symbol in missing_in_binance:
try:
# 尝试从币安历史订单获取“真实平仓信息”(价格/时间/原因/订单号)
latest_close_order = None
try:
end_time_ms = int(time.time() * 1000)
start_time_ms = end_time_ms - (7 * 24 * 60 * 60 * 1000)
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time_ms,
endTime=end_time_ms,
)
if isinstance(orders, list) and orders:
close_orders = [
o for o in orders
if isinstance(o, dict)
and o.get("reduceOnly") is True
and o.get("status") == "FILLED"
]
if close_orders:
close_orders.sort(key=lambda x: x.get("updateTime", 0), reverse=True)
latest_close_order = close_orders[0]
except Exception:
latest_close_order = None
# 获取该交易对的 open 记录(仅当前账号,避免误更新其他账号)
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id)
for trade in open_trades:
trade_id = trade['id']
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
# 获取当前价格作为平仓价格
exit_price = None
exit_order_id = None
exit_time_ts = None
exit_reason = "sync"
otype = ""
if latest_close_order and isinstance(latest_close_order, dict):
try:
exit_price = float(latest_close_order.get("avgPrice", 0) or 0) or None
except Exception:
exit_price = None
exit_order_id = latest_close_order.get("orderId") or None
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
try:
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
# 检查订单的 reduceOnly 字段:如果是 true说明是自动平仓不应该标记为 manual
is_reduce_only = latest_close_order.get("reduceOnly", False) if latest_close_order else False
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
# 如果是 reduceOnly 订单,说明是自动平仓(可能是保护单触发的),先标记为 sync后续用价格判断
if is_reduce_only:
exit_reason = "sync" # 临时标记,后续用价格判断
else:
exit_reason = "manual" # 非 reduceOnly 的 MARKET/LIMIT 订单才是真正的手动平仓
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
# 这对于保护单触发的 MARKET 订单特别重要
if exit_reason == "sync" or exit_reason == "manual":
try:
def _close_to(a: float, b: float, max_pct: float = 0.02) -> bool:
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(exit_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
# 优先检查止损
if sl is not None and _close_to(ep, float(sl), max_pct=0.02):
exit_reason = "stop_loss"
# 然后检查止盈
elif tp is not None and _close_to(ep, float(tp), max_pct=0.02):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.02):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.02):
exit_reason = "take_profit"
# 如果价格接近入场价,可能是移动止损触发的
elif is_reduce_only:
entry_price_val = float(trade.get("entry_price", 0) or 0)
if entry_price_val > 0 and _close_to(ep, entry_price_val, max_pct=0.01):
exit_reason = "trailing_stop"
except Exception:
pass
if not exit_price or exit_price <= 0:
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else entry_price
# 计算盈亏
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
else:
pnl = (entry_price - exit_price) * quantity
# 计算基于保证金的盈亏百分比
leverage = float(trade.get('leverage', 10))
entry_value = entry_price * quantity
margin = entry_value / leverage if leverage > 0 else entry_value
pnl_percent_margin = (pnl / margin * 100) if margin > 0 else 0
# 从币安成交获取手续费与实际盈亏,保证统计与币安一致
sync_commission = None
sync_commission_asset = None
sync_realized_pnl = None
if exit_order_id:
try:
recent_trades = await client.get_recent_trades(symbol, limit=30)
related = [t for t in recent_trades if str(t.get('orderId')) == str(exit_order_id)]
if related:
sync_commission = sum(float(t.get('commission', 0)) for t in related)
assets = {t.get('commissionAsset') for t in related if t.get('commissionAsset')}
sync_commission_asset = "/".join(assets) if assets else None
sync_realized_pnl = sum(float(t.get('realizedPnl', 0)) for t in related)
except Exception as fee_err:
logger.debug(f"同步 {symbol} 平仓手续费失败: {fee_err}")
# 更新数据库记录
duration_minutes = None
try:
et = trade.get("entry_time")
if et is not None and exit_time_ts is not None:
et_i = int(et)
if exit_time_ts >= et_i:
duration_minutes = int((exit_time_ts - et_i) / 60)
except Exception:
duration_minutes = None
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent_margin, # 使用基于保证金的盈亏百分比
exit_order_id=exit_order_id,
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
commission=sync_commission,
commission_asset=sync_commission_asset or None,
realized_pnl=sync_realized_pnl,
)
updated_count += 1
logger.info(
f"{symbol} 已更新为closed (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent_margin:.2f}% of margin, "
f"原因: {exit_reason}, 类型: {otype or '-'}"
f")"
)
except Exception as e:
logger.error(f"{symbol} 更新失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.info("✓ 数据库与币安状态一致,无需更新")
# 4. 币安有仓但数据库无记录:优先用「开仓订单 clientOrderId 前缀」判断是否系统单,仅对系统单补建
missing_in_db = binance_symbols - db_open_symbols
recovered_count = 0
system_order_prefix = ""
try:
from database.models import TradingConfig, GlobalStrategyConfig
system_order_prefix = (
TradingConfig.get_value("SYSTEM_ORDER_ID_PREFIX", None, account_id=account_id)
or GlobalStrategyConfig.get_value("SYSTEM_ORDER_ID_PREFIX", "")
or ""
)
system_order_prefix = (system_order_prefix or "").strip()
except Exception:
pass
if missing_in_db:
logger.info(f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: {', '.join(missing_in_db)}")
if system_order_prefix:
logger.info(f" → 仅对开仓订单 clientOrderId 前缀为「{system_order_prefix}」的持仓补建(系统单标识)")
elif only_recover_when_has_sltp:
logger.info(" → 仅对「存在止损/止盈单」的持仓补建记录(视为系统单),避免手动单误建")
for symbol in missing_in_db:
try:
pos = next((p for p in binance_positions if p.get('symbol') == symbol), None)
if not pos or float(pos.get('positionAmt', 0)) == 0:
continue
position_amt = float(pos['positionAmt'])
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
entry_price = float(pos.get('entryPrice', 0))
leverage = int(pos.get('leverage', 10)) or 10
notional = quantity * entry_price
if notional < 1.0:
continue
# 补建时尽量拿到 entry_order_id优先 get_all_orders+clientOrderId 前缀,兜底 get_recent_trades(100)+重试
entry_order_id = None
client_order_id = None
if system_order_prefix:
try:
end_ms = int(time.time() * 1000)
start_ms = end_ms - (24 * 3600 * 1000)
orders = await client.client.futures_get_all_orders(
symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000
)
if isinstance(orders, list):
open_orders = [
o for o in orders
if isinstance(o, dict) and o.get("reduceOnly") is False
and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED"
]
our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)]
if our_orders:
our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True)
best = None
for o in our_orders:
ap = float(o.get("avgPrice") or 0)
eq = float(o.get("executedQty") or o.get("origQty") or 0)
if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6:
best = o
break
if best is None:
best = our_orders[0]
entry_order_id = best.get("orderId")
client_order_id = (best.get("clientOrderId") or "").strip() or None
except Exception as e:
logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}")
if entry_order_id is None:
try:
trades = await client.get_recent_trades(symbol, limit=100)
if not trades:
await asyncio.sleep(2)
trades = await client.get_recent_trades(symbol, limit=100)
if trades:
same_side = [t for t in trades if str(t.get('side', '')).upper() == side]
same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True)
if system_order_prefix and same_side:
for t in same_side[:5]:
oid = t.get("orderId")
if not oid:
continue
try:
info = await client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000)
cid = (info or {}).get("clientOrderId") or ""
if cid.startswith(system_order_prefix):
entry_order_id = oid
client_order_id = cid.strip() or None
break
except Exception:
continue
if entry_order_id is None and same_side:
entry_order_id = same_side[0].get("orderId")
except Exception as e:
logger.debug(f"获取 {symbol} 成交记录失败: {e}")
if entry_order_id and client_order_id is None:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
client_order_id = (order_info or {}).get("clientOrderId") or None
if client_order_id:
client_order_id = client_order_id.strip() or None
except Exception:
pass
is_clearly_manual = False
if system_order_prefix and entry_order_id and client_order_id and not client_order_id.startswith(system_order_prefix):
is_clearly_manual = True
logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id!r} 非系统前缀,视为手动单,跳过补建")
elif system_order_prefix and entry_order_id and not client_order_id:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
if cid and not cid.startswith(system_order_prefix):
is_clearly_manual = True
except Exception:
pass
if is_clearly_manual:
continue
elif only_recover_when_has_sltp:
has_sltp = False
try:
normal = await client.get_open_orders(symbol)
for o in (normal or []):
if _order_is_sltp(o, "type"):
has_sltp = True
break
if not has_sltp:
algo = await client.futures_get_open_algo_orders(symbol=symbol, algo_type="CONDITIONAL")
for o in (algo or []):
if _order_is_sltp(o, "orderType"):
has_sltp = True
break
except Exception as e:
logger.debug(f"检查 {symbol} 止盈止损单失败: {e}")
if not has_sltp:
logger.debug(f" {symbol} 无止损/止盈单,跳过补建(视为非系统单)")
continue
if entry_order_id and hasattr(Trade, 'get_by_entry_order_id'):
try:
existing = Trade.get_by_entry_order_id(entry_order_id)
if existing:
continue
except Exception:
pass
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason='sync_recovered',
entry_order_id=entry_order_id,
client_order_id=client_order_id,
notional_usdt=notional,
margin_usdt=(notional / leverage) if leverage > 0 else None,
account_id=account_id,
)
recovered_count += 1
logger.info(f"{symbol} 已补建交易记录 (ID: {trade_id}, orderId: {entry_order_id or '-'})")
except Exception as e:
logger.warning(f"{symbol} 补建记录失败: {e}")
if recovered_count > 0:
logger.info(f"共补建 {recovered_count} 条交易记录,将在订单记录与统计中展示")
result = {
"message": "持仓状态同步完成",
"binance_positions": len(binance_symbols),
"db_open_positions": len(db_open_symbols),
"updated_to_closed": updated_count,
"recovered_count": recovered_count,
"missing_in_binance": list(missing_in_binance),
"missing_in_db": list(missing_in_db)
}
logger.info("=" * 60)
logger.info("持仓状态同步完成!")
logger.info(f"结果: {result}")
logger.info("=" * 60)
return result
finally:
await client.disconnect()
logger.info("✓ 已断开币安API连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"同步持仓状态失败: {str(e)}"
logger.error(error_msg, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)