Introduced new configuration options to manage trading activities on Sundays and during night hours. This includes limits on the number of trades on Sundays, minimum signal strength requirements for Sunday trades, and restrictions on opening new positions during specified night hours. Updated relevant backend and frontend components to reflect these changes, enhancing risk control and user awareness of trading conditions.
2721 lines
134 KiB
Python
2721 lines
134 KiB
Python
"""
|
||
币安客户端封装 - 提供异步交易接口
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import random
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Any
|
||
from binance import AsyncClient, BinanceSocketManager
|
||
from binance.exceptions import BinanceAPIException
|
||
|
||
|
||
class AlgoOrderPositionUnavailableError(Exception):
|
||
"""条件单被拒:持仓未就绪或已平(如 GTE can only be used with open positions),调用方可仅打 WARNING 不刷 ERROR。"""
|
||
def __init__(self, symbol: str, message: str = ""):
|
||
self.symbol = symbol
|
||
self.message = message or "Position not available for algo order"
|
||
super().__init__(f"{symbol}: {self.message}")
|
||
|
||
try:
|
||
from . import config
|
||
from .redis_cache import RedisCache
|
||
except ImportError:
|
||
import config
|
||
from redis_cache import RedisCache
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 交易对/费率等固定内容优先从 DB 读取(market_cache 表),减少 API 调用
|
||
def _get_market_cache():
|
||
"""延迟导入 MarketCache(需 backend 在 sys.path)。失败返回 None。"""
|
||
try:
|
||
backend_path = Path(__file__).resolve().parent.parent / "backend"
|
||
if backend_path.exists() and str(backend_path) not in sys.path:
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import MarketCache
|
||
return MarketCache
|
||
except Exception as e:
|
||
logger.debug("MarketCache 不可用: %s", e)
|
||
return None
|
||
|
||
|
||
def _load_exchange_info_from_db(max_age_seconds: int = 86400):
|
||
"""同步从 DB 读取 exchange_info(供 run_in_executor 调用)。"""
|
||
mc = _get_market_cache()
|
||
if not mc:
|
||
return None
|
||
return mc.get_exchange_info(max_age_seconds=max_age_seconds)
|
||
|
||
|
||
def _save_exchange_info_to_db(data: dict) -> bool:
|
||
"""同步将 exchange_info 写入 DB。"""
|
||
mc = _get_market_cache()
|
||
if not mc:
|
||
return False
|
||
return mc.set_exchange_info(data)
|
||
|
||
|
||
def _parse_usdt_pairs_from_exchange_info(exchange_info: dict) -> tuple:
|
||
"""从 exchange_info 解析 USDT 永续列表和 display_to_api 映射。返回 (usdt_pairs, display_to_api_symbol)."""
|
||
if not exchange_info or not isinstance(exchange_info.get("symbols"), list):
|
||
return [], {}
|
||
usdt_pairs = []
|
||
display_to_api = {}
|
||
for s in exchange_info["symbols"]:
|
||
if not (s.get("symbol", "").endswith("USDT") and s.get("status") == "TRADING" and s.get("contractType") == "PERPETUAL"):
|
||
continue
|
||
sym = s["symbol"]
|
||
if sym.isascii():
|
||
usdt_pairs.append(sym)
|
||
continue
|
||
api_sym = (s.get("baseAsset") or "") + (s.get("quoteAsset") or "")
|
||
if api_sym and api_sym.isascii():
|
||
usdt_pairs.append(api_sym)
|
||
display_to_api[sym] = api_sym
|
||
else:
|
||
usdt_pairs.append(sym)
|
||
return usdt_pairs, display_to_api
|
||
|
||
|
||
def _parse_symbol_info_from_exchange_info(exchange_info: dict, symbol: str) -> Optional[Dict]:
|
||
"""从 exchange_info 中解析单个 symbol 的精度/限制信息,与 get_symbol_info 返回格式一致。"""
|
||
if not exchange_info or not isinstance(exchange_info.get("symbols"), list):
|
||
return None
|
||
for s in exchange_info["symbols"]:
|
||
if s.get("symbol") != symbol:
|
||
continue
|
||
quantity_precision = s.get("quantityPrecision", 8)
|
||
price_precision = s.get("pricePrecision", 8)
|
||
min_qty, step_size, min_notional, tick_size = None, None, None, None
|
||
for f in s.get("filters", []):
|
||
if f.get("filterType") == "LOT_SIZE":
|
||
min_qty = float(f.get("minQty", 0))
|
||
step_size = float(f.get("stepSize", 0))
|
||
elif f.get("filterType") == "PRICE_FILTER":
|
||
tick_size = float(f.get("tickSize", 0) or 0)
|
||
elif f.get("filterType") == "MIN_NOTIONAL":
|
||
min_notional = float(f.get("notional", 0))
|
||
if min_notional is None or min_notional == 0:
|
||
min_notional = 5.0
|
||
max_leverage_supported = 125
|
||
if s.get("leverageBracket") and len(s.get("leverageBracket", [])) > 0:
|
||
max_leverage_supported = s["leverageBracket"][0].get("maxLeverage", 125)
|
||
return {
|
||
"quantityPrecision": quantity_precision,
|
||
"pricePrecision": price_precision,
|
||
"minQty": min_qty or 0,
|
||
"stepSize": step_size or 0,
|
||
"tickSize": tick_size or 0,
|
||
"minNotional": min_notional,
|
||
"maxLeverage": int(max_leverage_supported),
|
||
}
|
||
return None
|
||
|
||
|
||
def _format_exception(e: Exception) -> str:
|
||
"""格式化异常用于日志,避免空 message 导致日志无内容"""
|
||
if isinstance(e, BinanceAPIException):
|
||
code = getattr(e, 'code', getattr(e, 'status_code', ''))
|
||
msg = str(e).strip() or getattr(e, 'message', '')
|
||
return f"BinanceAPIException(code={code}) {msg}"
|
||
name = type(e).__name__
|
||
msg = str(e).strip()
|
||
return f"{name}: {msg}" if msg else name
|
||
|
||
|
||
class BinanceClient:
|
||
"""币安客户端封装类"""
|
||
|
||
def __init__(self, api_key: str = None, api_secret: str = None, testnet: bool = None):
|
||
"""
|
||
初始化币安客户端
|
||
|
||
Args:
|
||
api_key: API密钥(如果为None,从config读取)
|
||
api_secret: API密钥(如果为None,从config读取)
|
||
testnet: 是否使用测试网(如果为None,从config读取)
|
||
"""
|
||
# 记录是否明确传入了 api_key 和 api_secret(用于后续判断是否应该从 config 刷新)
|
||
# 注意:空字符串 "" 也被视为明确传入(用于推荐服务等只需要公开数据的场景)
|
||
self._explicit_api_key = api_key is not None
|
||
self._explicit_api_secret = api_secret is not None
|
||
|
||
# 如果未提供参数(None),从config读取(确保使用最新值)
|
||
# 注意:如果明确传递了 api_key 和 api_secret(包括空字符串),应该使用传递的值,而不是从 config 读取
|
||
if api_key is None:
|
||
# 尝试从配置管理器直接获取
|
||
if config._config_manager:
|
||
try:
|
||
api_key = config._config_manager.get('BINANCE_API_KEY')
|
||
except:
|
||
pass
|
||
# 如果还是None,使用config模块的值
|
||
if not api_key:
|
||
api_key = config.BINANCE_API_KEY
|
||
# 如果传入的是空字符串,保持为空字符串(不覆盖)
|
||
# 这样推荐服务可以使用空字符串来明确表示"只使用公开接口"
|
||
|
||
if api_secret is None:
|
||
# 尝试从配置管理器直接获取
|
||
if config._config_manager:
|
||
try:
|
||
api_secret = config._config_manager.get('BINANCE_API_SECRET')
|
||
except:
|
||
pass
|
||
# 如果还是None,使用config模块的值
|
||
if not api_secret:
|
||
api_secret = config.BINANCE_API_SECRET
|
||
# 如果传入的是空字符串,保持为空字符串(不覆盖)
|
||
|
||
if testnet is None:
|
||
testnet = config.USE_TESTNET
|
||
|
||
self.api_key = api_key
|
||
self.api_secret = api_secret
|
||
self.testnet = testnet
|
||
|
||
# 记录使用的 API Key(用于调试,只显示前后4位)
|
||
if api_key:
|
||
key_display = f"{api_key[:4]}...{api_key[-4:]}" if len(api_key) > 8 else api_key
|
||
logger.info(f"BinanceClient.__init__: 使用 API Key {key_display}, testnet={testnet}, explicit_key={self._explicit_api_key}, explicit_secret={self._explicit_api_secret}")
|
||
else:
|
||
logger.warning("BinanceClient.__init__: API Key 为空!")
|
||
|
||
# 如果传入的是空字符串,确保不会被覆盖(用于推荐服务等场景)
|
||
if self._explicit_api_key and not api_key:
|
||
logger.info("BinanceClient.__init__: 明确传入空 API Key(推荐服务模式:只使用公开接口)")
|
||
|
||
# 初始化 Redis 缓存(必须在 __init__ 中初始化,不能依赖 _refresh_api_credentials)
|
||
try:
|
||
self.redis_cache = RedisCache(
|
||
redis_url=config.REDIS_URL,
|
||
use_tls=config.REDIS_USE_TLS,
|
||
ssl_cert_reqs=config.REDIS_SSL_CERT_REQS,
|
||
ssl_ca_certs=config.REDIS_SSL_CA_CERTS,
|
||
username=config.REDIS_USERNAME,
|
||
password=config.REDIS_PASSWORD
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"初始化 Redis 缓存失败: {e},某些功能可能不可用")
|
||
self.redis_cache = None
|
||
|
||
self.client: Optional[AsyncClient] = None
|
||
self.socket_manager: Optional[BinanceSocketManager] = None
|
||
self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息(有上限,防止内存无限增长)
|
||
self._symbol_info_cache_max_size = 200
|
||
self._last_request_time = {} # 记录每个API端点的最后请求时间
|
||
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
|
||
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
|
||
# ⚠️ 内存优化:进程内存只做临时缓存,主要数据在 Redis
|
||
self._price_cache: Dict[str, Dict] = {} # WebSocket价格缓存 {symbol: {price, volume, changePercent, timestamp}}
|
||
self._price_cache_ttl = 30 # 价格缓存有效期(秒,减少进程内存占用)
|
||
self._price_cache_max_size = 50 # 最多保留 50 个 symbol 的价格缓存(减少进程内存占用)
|
||
|
||
# 显示名 -> API symbol 映射(当交易所返回中文/非 ASCII 的 symbol 时,用 baseAsset+quoteAsset 作为下单用 symbol)
|
||
self._display_to_api_symbol: Dict[str, str] = {}
|
||
|
||
# 持仓模式缓存(币安 U本位合约):dualSidePosition=True => 对冲模式;False => 单向模式
|
||
self._dual_side_position: Optional[bool] = None
|
||
self._position_mode_checked_at: float = 0.0
|
||
# 你可能会在币安端切换“对冲/单向”模式;TTL 过长会导致短时间内仍按旧模式下单(携带 positionSide)
|
||
# 这里缩短到 10s,兼顾及时性与接口频率。
|
||
self._position_mode_ttl: float = 10.0 # 秒,避免频繁调用接口
|
||
|
||
# WebSocket 交易客户端(延迟初始化,仅在需要时创建)
|
||
self._ws_trade_client = None
|
||
|
||
# 隐藏敏感信息,只显示前4位和后4位
|
||
api_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key
|
||
api_secret_display = f"{self.api_secret[:4]}...{self.api_secret[-4:]}" if self.api_secret and len(self.api_secret) > 8 else self.api_secret
|
||
logger.info(f"初始化币安客户端: {api_key_display}, {api_secret_display}, {self.testnet}")
|
||
|
||
def _refresh_api_credentials(self):
|
||
"""刷新API密钥(从配置管理器重新读取,确保使用最新值)"""
|
||
# 如果明确传入了 api_key 和 api_secret,不应该从 config 刷新(避免覆盖多账号场景下的正确密钥)
|
||
if self._explicit_api_key and self._explicit_api_secret:
|
||
logger.debug("BinanceClient: 使用了明确的 API 密钥,跳过从 config 刷新(避免覆盖多账号场景)")
|
||
return
|
||
|
||
# 优先从配置管理器读取(会从Redis获取最新值)
|
||
if config._config_manager:
|
||
try:
|
||
# 强制从Redis重新加载配置
|
||
config._config_manager.reload_from_redis()
|
||
new_api_key = config._config_manager.get('BINANCE_API_KEY')
|
||
new_api_secret = config._config_manager.get('BINANCE_API_SECRET')
|
||
|
||
# 如果获取到新值且与当前值不同,更新
|
||
# 注意:即使传入了明确的密钥,如果只传入了其中一个,另一个仍可以从 config 刷新
|
||
if not self._explicit_api_key and new_api_key and new_api_key != self.api_key:
|
||
old_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key
|
||
new_key_display = f"{new_api_key[:4]}...{new_api_key[-4:]}" if new_api_key and len(new_api_key) > 8 else new_api_key
|
||
logger.info(f"检测到API密钥已更新: {old_key_display} -> {new_key_display}")
|
||
self.api_key = new_api_key
|
||
# 如果客户端已连接,需要重新连接以使用新密钥
|
||
if self.client:
|
||
logger.warning("API密钥已更新,但客户端已连接,需要重新连接才能使用新密钥")
|
||
|
||
if not self._explicit_api_secret and new_api_secret and new_api_secret != self.api_secret:
|
||
logger.info("检测到API密钥Secret已更新")
|
||
self.api_secret = new_api_secret
|
||
# 如果客户端已连接,需要重新连接以使用新密钥
|
||
if self.client:
|
||
logger.warning("API密钥Secret已更新,但客户端已连接,需要重新连接才能使用新密钥")
|
||
except Exception as e:
|
||
logger.debug(f"从配置管理器刷新API密钥失败: {e},使用现有值")
|
||
|
||
# 注意:redis_cache 已在 __init__ 中初始化,这里不需要再次初始化
|
||
|
||
async def connect(self, timeout: int = None, retries: int = None, requests_params: Dict = None):
|
||
"""
|
||
连接币安API
|
||
|
||
Args:
|
||
timeout: 连接超时时间(秒),默认从config读取
|
||
retries: 重试次数,默认从config读取
|
||
requests_params: 请求参数(如代理配置),例如 {'proxy': 'http://127.0.0.1:7890'}
|
||
"""
|
||
# 连接前刷新API密钥(确保使用最新值,支持热更新)
|
||
# 但如果 API 密钥为空(只用于获取公开行情),则跳过
|
||
if self.api_key and self.api_secret:
|
||
self._refresh_api_credentials()
|
||
else:
|
||
logger.info("BinanceClient: 使用公开 API(无需认证),只能获取行情数据")
|
||
|
||
timeout = timeout or config.CONNECTION_TIMEOUT
|
||
retries = retries or config.CONNECTION_RETRIES
|
||
|
||
last_error = None
|
||
for attempt in range(retries):
|
||
try:
|
||
logger.info(
|
||
f"尝试连接币安API (第 {attempt + 1}/{retries} 次, "
|
||
f"测试网: {self.testnet}, 超时: {timeout}秒)..."
|
||
)
|
||
|
||
# 不在此处设置全局 timeout,避免拖慢下单/止损止盈(需快速失败并重试);只读接口在各自方法内用 asyncio.wait_for 单独加长超时
|
||
req_params = dict(requests_params or {})
|
||
# 创建客户端(使用最新的API密钥,如果为空则只能访问公开接口)
|
||
self.client = await AsyncClient.create(
|
||
api_key=self.api_key or None, # 空字符串转为 None
|
||
api_secret=self.api_secret or None,
|
||
testnet=self.testnet,
|
||
requests_params=req_params
|
||
)
|
||
|
||
# 测试连接(带超时)
|
||
try:
|
||
await asyncio.wait_for(self.client.ping(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
await self.client.close_connection()
|
||
raise asyncio.TimeoutError(f"ping超时 ({timeout}秒)")
|
||
|
||
self.socket_manager = BinanceSocketManager(self.client)
|
||
logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})")
|
||
|
||
# 连接 Redis 缓存
|
||
await self.redis_cache.connect()
|
||
|
||
# 验证API密钥权限(仅当提供了有效的 API key 时)
|
||
if self.api_key and self.api_secret:
|
||
await self._verify_api_permissions()
|
||
else:
|
||
logger.info("✓ 使用公开 API,跳过权限验证(只能获取行情数据)")
|
||
|
||
# 预热读取持仓模式(避免首次下单时才触发 -4061)
|
||
try:
|
||
dual = await self._get_dual_side_position(force=True)
|
||
mode = "HEDGE(对冲)" if dual else "ONE-WAY(单向)"
|
||
logger.info(f"✓ 当前合约持仓模式: {mode}")
|
||
except Exception as e:
|
||
logger.debug(f"读取持仓模式失败(可忽略,后续下单会再尝试): {e}")
|
||
|
||
# 启动 WebSocket 交易客户端(优先用 WS 下单,减少 REST 超时)
|
||
if self.api_key and self.api_secret:
|
||
try:
|
||
from .ws_trade_client import WSTradeClient
|
||
self._ws_trade_client = WSTradeClient(self.api_key, self.api_secret, self.testnet)
|
||
await self._ws_trade_client.start()
|
||
logger.info("✓ WebSocket 交易客户端已启动(条件单优先走 WS,失败回退 REST)")
|
||
except Exception as e:
|
||
logger.warning(f"启动 WebSocket 交易客户端失败(将仅用 REST): {e}")
|
||
self._ws_trade_client = None
|
||
|
||
return
|
||
|
||
except asyncio.TimeoutError as e:
|
||
last_error = f"连接超时: {e}"
|
||
logger.warning(f"连接超时,剩余 {retries - attempt - 1} 次重试机会")
|
||
if attempt < retries - 1:
|
||
await asyncio.sleep(2) # 等待2秒后重试
|
||
except Exception as e:
|
||
last_error = str(e)
|
||
logger.warning(f"连接失败: {e},剩余 {retries - attempt - 1} 次重试机会")
|
||
if self.client:
|
||
try:
|
||
await self.client.close_connection()
|
||
except:
|
||
pass
|
||
if attempt < retries - 1:
|
||
await asyncio.sleep(2)
|
||
|
||
error_msg = f"连接币安API失败 (已重试 {retries} 次): {last_error}"
|
||
logger.error("=" * 60)
|
||
logger.error(error_msg)
|
||
logger.error("=" * 60)
|
||
logger.error("故障排查建议:")
|
||
logger.error("1. 检查网络连接是否正常")
|
||
logger.error("2. 检查API密钥是否正确")
|
||
logger.error("3. 如果在中国大陆,可能需要使用代理或VPN")
|
||
if self.testnet:
|
||
logger.error("4. 测试网地址可能无法访问,尝试设置 USE_TESTNET=False")
|
||
logger.error("5. 检查防火墙设置")
|
||
logger.error("=" * 60)
|
||
raise ConnectionError(error_msg)
|
||
|
||
async def _verify_api_permissions(self):
|
||
"""
|
||
验证API密钥权限
|
||
"""
|
||
try:
|
||
# 尝试获取账户信息来验证权限
|
||
await self.client.futures_account()
|
||
logger.info("✓ API密钥权限验证通过")
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
if error_code == -2015:
|
||
logger.warning("⚠ API密钥权限验证失败,可能无法进行合约交易")
|
||
logger.warning("请检查API密钥是否启用了合约交易权限")
|
||
else:
|
||
logger.warning(f"⚠ API密钥验证时出现错误: {e}")
|
||
|
||
def _futures_base_url(self) -> str:
|
||
"""合约 REST 根地址(用于 listenKey 等)"""
|
||
return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com"
|
||
|
||
async def create_futures_listen_key(self, prefer_ws: bool = True, max_retries: int = 3) -> Optional[str]:
|
||
"""
|
||
创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。
|
||
文档:每个账号 listenKey 独立,60 分钟无 keepalive 会失效。
|
||
|
||
优先使用 WebSocket API(如果 WSTradeClient 已连接),否则使用 REST API。
|
||
文档:如果该帐户具有有效的 listenKey,则返回现有 key 并将其有效期延长 60 分钟。
|
||
|
||
Args:
|
||
prefer_ws: 是否优先使用 WebSocket API(默认 True)
|
||
max_retries: REST API 失败时的最大重试次数(默认 3,共 4 次尝试)
|
||
"""
|
||
if not self.api_key:
|
||
return None
|
||
|
||
# 方法1: WebSocket API(优先,需 WSTradeClient 已连接)
|
||
if prefer_ws:
|
||
ws_result = await self._create_listen_key_via_ws()
|
||
if ws_result:
|
||
return ws_result
|
||
logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...")
|
||
|
||
# 方法2: REST API(文档:POST /fapi/v1/listenKey,带重试)
|
||
last_error = None
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
import aiohttp
|
||
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
|
||
headers = {"X-MBX-APIKEY": self.api_key}
|
||
# 超时:按文档无明确限制,启动时可能并发多连接,适当放宽
|
||
timeout_sec = 50 if attempt == 0 else 60
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout_sec)) as resp:
|
||
text = await resp.text()
|
||
if resp.status != 200:
|
||
logger.warning(
|
||
"create_futures_listen_key (REST) 失败 status=%s body=%s (尝试 %d/%d)",
|
||
resp.status, (text[:500] if text else ""), attempt + 1, max_retries + 1,
|
||
)
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(8)
|
||
continue
|
||
return None
|
||
try:
|
||
data = json.loads(text) if (text and text.strip()) else {}
|
||
except Exception:
|
||
data = {}
|
||
key = data.get("listenKey") if isinstance(data, dict) else None
|
||
if key:
|
||
logger.info("✓ 合约 User Data Stream listenKey 已创建 (REST)")
|
||
return key
|
||
except asyncio.TimeoutError:
|
||
last_error = f"请求超时({timeout_sec}秒)"
|
||
logger.warning(
|
||
"create_futures_listen_key (REST) 失败: %s (尝试 %d/%d)",
|
||
last_error, attempt + 1, max_retries + 1,
|
||
)
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(8)
|
||
continue
|
||
except Exception as e:
|
||
last_error = getattr(e, "message", str(e)) or repr(e)
|
||
logger.warning(
|
||
"create_futures_listen_key (REST) 失败: %s - %s (尝试 %d/%d)",
|
||
type(e).__name__, last_error, attempt + 1, max_retries + 1,
|
||
)
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(8)
|
||
continue
|
||
|
||
logger.error(f"create_futures_listen_key (REST) 重试 {max_retries + 1} 次后仍失败: {last_error}")
|
||
return None
|
||
|
||
async def _create_listen_key_via_ws(self) -> Optional[str]:
|
||
"""通过 WebSocket API 创建 listenKey(优先方案)。"""
|
||
if not self.api_key:
|
||
return None
|
||
|
||
# 检查是否有可用的 WSTradeClient
|
||
ws_client = getattr(self, "_ws_trade_client", None)
|
||
if not ws_client:
|
||
return None
|
||
|
||
# 检查连接状态(如果未连接,返回 None 让调用方回退到 REST)
|
||
if not ws_client.is_connected():
|
||
return None
|
||
|
||
try:
|
||
# 使用 WebSocket API: userDataStream.start
|
||
result = await ws_client._send_request(
|
||
"userDataStream.start",
|
||
{"apiKey": self.api_key},
|
||
timeout=30.0
|
||
)
|
||
if result and isinstance(result, dict):
|
||
listen_key = result.get("listenKey")
|
||
if listen_key:
|
||
logger.info("✓ 合约 User Data Stream listenKey 已创建 (WebSocket API)")
|
||
return listen_key
|
||
logger.debug("WebSocket API 创建 listenKey 返回结果格式异常")
|
||
return None
|
||
except Exception as e:
|
||
err_msg = getattr(e, "message", str(e)) or repr(e)
|
||
logger.debug(
|
||
"create_futures_listen_key (WebSocket API) 失败: %s - %s",
|
||
type(e).__name__, err_msg,
|
||
)
|
||
return None
|
||
|
||
async def keepalive_futures_listen_key(self, listen_key: str, prefer_ws: bool = True):
|
||
"""
|
||
延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。返回 (ok: bool, code_1125: bool),-1125 表示 key 不存在需换新。
|
||
|
||
优先使用 WebSocket API(如果 WSTradeClient 已连接),否则使用 REST API。
|
||
|
||
Args:
|
||
prefer_ws: 是否优先使用 WebSocket API(默认 True)。如果 WebSocket 不可用,自动回退到 REST API。
|
||
"""
|
||
if not self.api_key or not listen_key:
|
||
return False, False
|
||
|
||
# 方法1: WebSocket API(优先,如果 WSTradeClient 已连接)
|
||
if prefer_ws:
|
||
ws_result = await self._keepalive_listen_key_via_ws(listen_key)
|
||
if ws_result[0]: # 如果成功
|
||
return ws_result
|
||
# WebSocket 不可用或失败,回退到 REST API
|
||
if ws_result[1]: # code_1125
|
||
# WebSocket 返回 -1125,说明 key 不存在,直接返回
|
||
return False, True
|
||
logger.debug("WSTradeClient 未连接或 WebSocket API 失败,回退到 REST API...")
|
||
|
||
# 方法2: REST API(文档:PUT /fapi/v1/listenKey?listenKey=xxx,延长 60 分钟)
|
||
try:
|
||
import aiohttp
|
||
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
|
||
headers = {"X-MBX-APIKEY": self.api_key}
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp:
|
||
text = await resp.text()
|
||
ok = resp.status == 200
|
||
code_1125 = False
|
||
if not ok and text:
|
||
try:
|
||
data = json.loads(text) if text.strip().startswith("{") else {}
|
||
code_1125 = int(data.get("code", 0)) == -1125
|
||
except Exception:
|
||
pass
|
||
logger.debug(f"keepalive_futures_listen_key (REST) 失败 status={resp.status} body={text}")
|
||
return ok, code_1125
|
||
except asyncio.TimeoutError:
|
||
logger.warning("keepalive_futures_listen_key (REST) 失败: 请求超时(45秒)")
|
||
return False, False
|
||
except Exception as e:
|
||
logger.debug(f"keepalive_futures_listen_key (REST) 失败: {e}")
|
||
return False, False
|
||
|
||
async def _keepalive_listen_key_via_ws(self, listen_key: str) -> tuple[bool, bool]:
|
||
"""通过 WebSocket API 延长 listenKey 有效期(优先方案)。"""
|
||
if not self.api_key or not listen_key:
|
||
return False, False
|
||
|
||
# 检查是否有可用的 WSTradeClient
|
||
ws_client = getattr(self, "_ws_trade_client", None)
|
||
if not ws_client:
|
||
return False, False
|
||
|
||
# 检查连接状态(如果未连接,返回 False 让调用方回退到 REST)
|
||
if not ws_client.is_connected():
|
||
return False, False
|
||
|
||
try:
|
||
# 使用 WebSocket API: userDataStream.ping
|
||
result = await ws_client._send_request(
|
||
"userDataStream.ping",
|
||
{"apiKey": self.api_key},
|
||
timeout=30.0
|
||
)
|
||
if result:
|
||
logger.debug("✓ listenKey keepalive 成功 (WebSocket API)")
|
||
return True, False
|
||
return False, False
|
||
except Exception as e:
|
||
err_msg = getattr(e, "message", str(e)) or repr(e)
|
||
error_code = getattr(e, "code", None)
|
||
code_1125 = (error_code == -1125) if error_code else False
|
||
logger.debug(
|
||
"keepalive_futures_listen_key (WebSocket API) 失败: %s - %s",
|
||
type(e).__name__, err_msg,
|
||
)
|
||
return False, code_1125
|
||
|
||
@staticmethod
|
||
def _to_bool(value: Any) -> Optional[bool]:
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, (int, float)):
|
||
return bool(value)
|
||
s = str(value).strip().lower()
|
||
if s in {"true", "1", "yes", "y"}:
|
||
return True
|
||
if s in {"false", "0", "no", "n"}:
|
||
return False
|
||
return None
|
||
|
||
async def _get_dual_side_position(self, force: bool = False) -> Optional[bool]:
|
||
"""
|
||
获取币安合约持仓模式:
|
||
- True: 对冲模式(Hedge Mode,需要下单时传 positionSide=LONG/SHORT)
|
||
- False: 单向模式(One-way Mode,不应传 positionSide=LONG/SHORT)
|
||
若配置 ONE_WAY_POSITION_ONLY=True,直接返回 False,不请求 API。
|
||
"""
|
||
one_way_only = bool(config.TRADING_CONFIG.get("ONE_WAY_POSITION_ONLY", True))
|
||
if one_way_only:
|
||
self._dual_side_position = False
|
||
return False
|
||
|
||
if not self.client:
|
||
return None
|
||
|
||
now = time.time()
|
||
if not force and self._dual_side_position is not None and (now - self._position_mode_checked_at) < self._position_mode_ttl:
|
||
return self._dual_side_position
|
||
|
||
# python-binance: futures_get_position_mode -> {"dualSidePosition": true/false}
|
||
res = await self.client.futures_get_position_mode()
|
||
dual = self._to_bool(res.get("dualSidePosition") if isinstance(res, dict) else None)
|
||
self._dual_side_position = dual
|
||
self._position_mode_checked_at = now
|
||
return dual
|
||
|
||
async def _resolve_position_side_for_order(
|
||
self,
|
||
symbol: str,
|
||
side: str,
|
||
reduce_only: bool,
|
||
provided: Optional[str],
|
||
) -> Optional[str]:
|
||
"""
|
||
在对冲模式下,必须传 positionSide=LONG/SHORT。
|
||
- 开仓:BUY=>LONG, SELL=>SHORT
|
||
- 平仓(reduceOnly):需要知道要减少的是 LONG 还是 SHORT
|
||
"""
|
||
if provided:
|
||
ps = provided.strip().upper()
|
||
if ps in {"LONG", "SHORT"}:
|
||
return ps
|
||
if ps == "BOTH":
|
||
# 对冲模式下 BOTH 无效,这里按开仓方向兜底
|
||
return "LONG" if side.upper() == "BUY" else "SHORT"
|
||
|
||
side_u = side.upper()
|
||
if not reduce_only:
|
||
return "LONG" if side_u == "BUY" else "SHORT"
|
||
|
||
# reduceOnly 平仓:尝试从当前持仓推断(避免调用方漏传)
|
||
try:
|
||
positions = await self.client.futures_position_information(symbol=symbol)
|
||
nonzero = []
|
||
for p in positions or []:
|
||
try:
|
||
amt = float(p.get("positionAmt", 0))
|
||
except Exception:
|
||
continue
|
||
if abs(amt) > 0:
|
||
nonzero.append((amt, p))
|
||
|
||
if len(nonzero) == 1:
|
||
amt, p = nonzero[0]
|
||
ps = (p.get("positionSide") or "").upper()
|
||
if ps in {"LONG", "SHORT"}:
|
||
return ps
|
||
return "LONG" if amt > 0 else "SHORT"
|
||
|
||
# 多持仓:按平仓方向推断(SELL 通常平 LONG;BUY 通常平 SHORT)
|
||
if side_u == "SELL":
|
||
cand = next((p for amt, p in nonzero if amt > 0), None)
|
||
return (cand.get("positionSide") or "LONG").upper() if cand else "LONG"
|
||
if side_u == "BUY":
|
||
cand = next((p for amt, p in nonzero if amt < 0), None)
|
||
return (cand.get("positionSide") or "SHORT").upper() if cand else "SHORT"
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 推断 positionSide 失败: {e}")
|
||
|
||
return None
|
||
|
||
async def disconnect(self):
|
||
"""断开连接"""
|
||
|
||
# 停止 WebSocket 交易客户端
|
||
if self._ws_trade_client:
|
||
try:
|
||
await self._ws_trade_client.stop()
|
||
except Exception as e:
|
||
logger.debug(f"停止 WS 交易客户端失败: {e}")
|
||
self._ws_trade_client = None
|
||
|
||
# 关闭 Redis 连接
|
||
await self.redis_cache.close()
|
||
|
||
if self.client:
|
||
await self.client.close_connection()
|
||
logger.info("币安客户端已断开连接")
|
||
|
||
def _resolve_api_symbol(self, symbol: str) -> str:
|
||
"""
|
||
将显示名(含中文等非 ASCII)解析为交易所 API 使用的英文 symbol。
|
||
若未建立映射或已是 ASCII,则原样返回。
|
||
"""
|
||
if not symbol:
|
||
return symbol
|
||
return self._display_to_api_symbol.get(symbol, symbol)
|
||
|
||
async def get_all_usdt_pairs(self, max_retries: int = 3, timeout: int = 30) -> List[str]:
|
||
"""
|
||
获取所有USDT交易对
|
||
优先从 DB market_cache 读取 exchange_info(24 小时内有效),减少 API 调用。
|
||
添加超时处理和重试机制,避免推荐系统因网络超时中断。
|
||
|
||
Args:
|
||
max_retries: 最大重试次数(默认3次)
|
||
timeout: 单次请求超时时间(秒,默认30秒)
|
||
|
||
Returns:
|
||
USDT交易对列表(失败时返回空列表)
|
||
"""
|
||
loop = asyncio.get_event_loop()
|
||
# 优先从 DB 读取(同步调用放至线程池)
|
||
try:
|
||
exchange_info = await loop.run_in_executor(None, lambda: _load_exchange_info_from_db(86400))
|
||
if exchange_info and isinstance(exchange_info.get("symbols"), list):
|
||
usdt_pairs, display_to_api = _parse_usdt_pairs_from_exchange_info(exchange_info)
|
||
if usdt_pairs:
|
||
self._display_to_api_symbol.clear()
|
||
self._display_to_api_symbol.update(display_to_api)
|
||
if display_to_api:
|
||
logger.info(f"已映射 {len(display_to_api)} 个中文/非ASCII交易对到英文 symbol")
|
||
logger.info(f"从 DB 缓存获取到 {len(usdt_pairs)} 个USDT永续合约交易对")
|
||
return usdt_pairs
|
||
except Exception as e:
|
||
logger.debug("从 DB 读取 exchange_info 失败,将请求 API: %s", e)
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
exchange_info = await asyncio.wait_for(
|
||
self._rate_limited_request(
|
||
'futures_exchange_info',
|
||
self.client.futures_exchange_info()
|
||
),
|
||
timeout=timeout
|
||
)
|
||
usdt_pairs, display_to_api = _parse_usdt_pairs_from_exchange_info(exchange_info)
|
||
self._display_to_api_symbol.clear()
|
||
self._display_to_api_symbol.update(display_to_api)
|
||
if display_to_api:
|
||
logger.info(f"已映射 {len(display_to_api)} 个中文/非ASCII交易对到英文 symbol,均可正常下单")
|
||
logger.info(f"获取到 {len(usdt_pairs)} 个USDT永续合约交易对")
|
||
# 回写 DB 供下次使用
|
||
try:
|
||
await loop.run_in_executor(None, lambda: _save_exchange_info_to_db(exchange_info))
|
||
except Exception as e:
|
||
logger.debug("exchange_info 写入 DB 失败: %s", e)
|
||
return usdt_pairs
|
||
|
||
except asyncio.TimeoutError:
|
||
if attempt < max_retries:
|
||
wait_time = attempt * 2 # 递增等待时间:2秒、4秒、6秒
|
||
logger.warning(f"获取交易对超时({timeout}秒),{wait_time}秒后重试 ({attempt}/{max_retries})")
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
logger.error(f"获取交易对失败:{max_retries}次重试后仍然超时")
|
||
return []
|
||
|
||
except BinanceAPIException as e:
|
||
logger.error(f"获取交易对失败(API错误): {e}")
|
||
return []
|
||
|
||
except Exception as e:
|
||
if attempt < max_retries:
|
||
wait_time = attempt * 2
|
||
logger.warning(f"获取交易对出错: {e},{wait_time}秒后重试 ({attempt}/{max_retries})")
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
logger.error(f"获取交易对失败(未知错误): {e}")
|
||
return []
|
||
|
||
return [] # 所有重试都失败,返回空列表
|
||
|
||
async def _rate_limited_request(self, endpoint: str, coro):
|
||
"""
|
||
带速率限制的API请求
|
||
|
||
Args:
|
||
endpoint: API端点标识(用于记录请求时间)
|
||
coro: 异步协程
|
||
"""
|
||
async with self._semaphore:
|
||
# 检查是否需要等待(避免请求过快)
|
||
if endpoint in self._last_request_time:
|
||
elapsed = asyncio.get_event_loop().time() - self._last_request_time[endpoint]
|
||
if elapsed < self._request_delay:
|
||
await asyncio.sleep(self._request_delay - elapsed)
|
||
|
||
self._last_request_time[endpoint] = asyncio.get_event_loop().time()
|
||
return await coro
|
||
|
||
async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]:
|
||
"""
|
||
获取K线数据(合约市场)
|
||
优先级:共享 Redis 缓存(多账号共用)> WS 本地缓存 > REST API
|
||
|
||
⚠️ 优化:多账号共享缓存机制
|
||
- Leader 进程通过 WebSocket 实时更新共享缓存(market:kline:{symbol}:{interval})
|
||
- 所有账号进程优先从共享缓存读取,减少 REST API 调用
|
||
- REST API 获取的数据也会写入共享缓存,供其他账号使用
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
interval: K线周期
|
||
limit: 获取数量
|
||
|
||
Returns:
|
||
K线数据列表
|
||
"""
|
||
# 0. ⚠️ 优先:多进程/多账号共用 Redis 缓存(最高优先级,减少所有账号的 REST 调用)
|
||
try:
|
||
from .market_ws_leader import use_shared_market_ws, KEY_KLINE_PREFIX
|
||
from .kline_stream import get_klines_from_redis
|
||
if use_shared_market_ws(self.redis_cache):
|
||
shared = await get_klines_from_redis(self.redis_cache, symbol, interval, limit)
|
||
if shared and len(shared) >= limit:
|
||
logger.debug(f"✓ 从共享 Redis 缓存获取 {symbol} K线: {interval} x{limit}(多账号共用)")
|
||
return shared
|
||
except Exception as e:
|
||
logger.debug(f"读取共享 K线 Redis 失败: {e}")
|
||
|
||
# 1. 本地 WS 缓存(当前进程的 WebSocket 数据,实时更新)
|
||
try:
|
||
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh
|
||
stream = get_kline_stream_instance()
|
||
if stream:
|
||
# 确保订阅该流(首次请求时自动订阅)
|
||
await stream.subscribe(symbol, interval, limit=max(limit, 50))
|
||
if is_kline_cache_fresh(symbol, interval, max_age_sec=300.0):
|
||
ws_cached = get_klines_from_cache(symbol, interval, limit)
|
||
if ws_cached and len(ws_cached) >= limit:
|
||
logger.debug(f"从本地 WS 缓存获取 {symbol} K线数据: {interval} x{limit}")
|
||
return ws_cached
|
||
except Exception as e:
|
||
logger.debug(f"读取 K线 WS 缓存失败: {e}")
|
||
|
||
# 2. 旧格式 Redis 缓存(兼容性,逐步迁移到共享缓存)
|
||
cache_key = f"klines:{symbol}:{interval}:{limit}"
|
||
cached = await self.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.debug(f"从 Redis 缓存获取 {symbol} K线数据: {interval} x{limit}")
|
||
return cached
|
||
|
||
# 3. REST API(兜底,获取后写入共享缓存供其他账号使用)
|
||
try:
|
||
klines = await self._rate_limited_request(
|
||
f'klines_{symbol}_{interval}',
|
||
self.client.futures_klines(symbol=symbol, interval=interval, limit=limit)
|
||
)
|
||
|
||
# ⚠️ 优化:写入共享缓存(market:kline:{symbol}:{interval}),供所有账号使用
|
||
if klines:
|
||
try:
|
||
from .market_ws_leader import KEY_KLINE_PREFIX
|
||
shared_key = f"{KEY_KLINE_PREFIX}{symbol.upper()}:{interval.lower()}"
|
||
# 使用较长的 TTL,因为这是共享缓存,多个账号都会使用
|
||
ttl_map = {
|
||
'1m': 60, '3m': 120, '5m': 180, '15m': 300, '30m': 600,
|
||
'1h': 900, '2h': 1800, '4h': 3600, '6h': 5400, '8h': 7200, '12h': 10800, '1d': 21600
|
||
}
|
||
ttl = ttl_map.get(interval, 1800) # 默认 30 分钟
|
||
await self.redis_cache.set(shared_key, klines, ttl=ttl)
|
||
logger.debug(f"✓ 已写入共享 Redis 缓存 {symbol} K线: {interval} x{limit} (TTL: {ttl}秒,供多账号共用)")
|
||
except Exception as e:
|
||
logger.debug(f"写入共享缓存失败: {e}")
|
||
|
||
# 同时写入旧格式缓存(兼容性)
|
||
ttl_map_old = {
|
||
'1m': 10, '3m': 20, '5m': 30, '15m': 60, '30m': 120,
|
||
'1h': 300, '2h': 600, '4h': 900, '6h': 1200, '8h': 1800, '12h': 2400, '1d': 3600
|
||
}
|
||
ttl_old = ttl_map_old.get(interval, 300)
|
||
await self.redis_cache.set(cache_key, klines, ttl=ttl_old)
|
||
logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl_old}秒)")
|
||
|
||
return klines
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
if error_code == -1003:
|
||
logger.warning(f"获取 {symbol} K线数据失败: API请求频率过高,建议使用WebSocket或增加扫描间隔")
|
||
else:
|
||
logger.error(f"获取 {symbol} K线数据失败: {e}")
|
||
return []
|
||
|
||
async def get_ticker_24h(self, symbol: str) -> Optional[Dict]:
|
||
"""
|
||
获取24小时行情数据(合约市场)
|
||
优先从WebSocket缓存读取,其次从Redis缓存读取,最后使用REST API
|
||
|
||
Args:
|
||
symbol: 交易对(支持中文名,会解析为 API symbol)
|
||
|
||
Returns:
|
||
24小时行情数据
|
||
"""
|
||
import time
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
|
||
# 全用 Redis:优先从 Redis 读;REST 后只写 Redis,不写进程内存(Redis 不可用时才写内存)
|
||
cache_key = f"ticker_24h:{symbol}"
|
||
cached = await self.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.debug(f"从Redis缓存获取 {symbol} 24小时行情数据")
|
||
return cached
|
||
# Redis 未命中时降级到进程内存(仅当 Redis 不可用时会有数据)
|
||
if symbol in self._price_cache:
|
||
cached = self._price_cache[symbol]
|
||
cache_age = time.time() - cached.get('timestamp', 0)
|
||
if cache_age < self._price_cache_ttl:
|
||
logger.debug(f"从进程内存缓存获取 {symbol} 价格 (降级)")
|
||
return {
|
||
'symbol': symbol,
|
||
'price': cached['price'],
|
||
'volume': cached.get('volume', 0),
|
||
'changePercent': cached.get('changePercent', 0)
|
||
}
|
||
self._price_cache.pop(symbol, None)
|
||
logger.debug(f"{symbol} 未在缓存中,使用REST API获取")
|
||
try:
|
||
ticker = await self._rate_limited_request(
|
||
f'ticker_{symbol}',
|
||
self.client.futures_symbol_ticker(symbol=symbol)
|
||
)
|
||
stats = await self._rate_limited_request(
|
||
f'stats_{symbol}',
|
||
self.client.futures_ticker(symbol=symbol)
|
||
)
|
||
result = {
|
||
'symbol': symbol,
|
||
'price': float(ticker['price']),
|
||
'volume': float(stats.get('quoteVolume', 0)),
|
||
'changePercent': float(stats.get('priceChangePercent', 0))
|
||
}
|
||
# 只写 Redis;仅当 Redis 写入失败时才写进程内存(降级)
|
||
wrote_redis = await self.redis_cache.set(cache_key, result, ttl=30)
|
||
if not wrote_redis:
|
||
if len(self._price_cache) >= self._price_cache_max_size:
|
||
oldest_key = min(self._price_cache.keys(), key=lambda k: self._price_cache[k].get('timestamp', 0))
|
||
self._price_cache.pop(oldest_key, None)
|
||
self._price_cache[symbol] = {**result, 'timestamp': time.time()}
|
||
return result
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
if error_code == -1003:
|
||
logger.warning(f"获取 {symbol} 24小时行情失败: API请求频率过高,建议使用WebSocket或增加扫描间隔")
|
||
else:
|
||
logger.error(f"获取 {symbol} 24小时行情失败: {e}")
|
||
return None
|
||
|
||
async def get_all_tickers_24h(self) -> Dict[str, Dict]:
|
||
"""
|
||
批量获取所有交易对的24小时行情数据(更高效)
|
||
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
|
||
|
||
Returns:
|
||
交易对行情数据字典 {symbol: {price, volume, changePercent}}
|
||
"""
|
||
# 先查 Redis 缓存
|
||
cache_key = "ticker_24h:all"
|
||
cached = await self.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.debug(f"从Redis缓存获取所有交易对的24小时行情数据: {len(cached)} 个交易对")
|
||
return cached
|
||
|
||
try:
|
||
# 使用批量API,一次获取所有交易对的数据
|
||
tickers = await self._rate_limited_request(
|
||
'all_tickers',
|
||
self.client.futures_ticker()
|
||
)
|
||
|
||
result = {}
|
||
now_ms = int(__import__("time").time() * 1000)
|
||
for ticker in tickers:
|
||
sym = ticker['symbol']
|
||
if not sym.endswith('USDT'):
|
||
continue
|
||
# 支持中文名:用映射后的 API symbol 作为 key,便于扫描与下单一致
|
||
api_sym = self._display_to_api_symbol.get(sym, sym)
|
||
# 移除 ASCII 检查,允许非 ASCII 交易对
|
||
result[api_sym] = {
|
||
'symbol': api_sym,
|
||
'price': float(ticker.get('lastPrice', 0)),
|
||
'volume': float(ticker.get('quoteVolume', 0)),
|
||
'changePercent': float(ticker.get('priceChangePercent', 0)),
|
||
# 用于前端展示“当前价格更新时间”(以及后端合并时判断新鲜度)
|
||
'ts': now_ms
|
||
}
|
||
|
||
# 写入 Redis 缓存(TTL: 60秒,多个账户可以共用)
|
||
await self.redis_cache.set(cache_key, result, ttl=60)
|
||
logger.debug(f"批量获取到 {len(result)} 个交易对的24小时行情数据,已缓存 (TTL: 60秒)")
|
||
return result
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
if error_code == -1003:
|
||
logger.warning(f"批量获取24小时行情失败: API请求频率过高,建议使用WebSocket或增加扫描间隔")
|
||
else:
|
||
logger.error(f"批量获取24小时行情失败: {e}")
|
||
return {}
|
||
|
||
# ------------------------- 公开行情接口(文档:行情接口REST.txt)-------------------------
|
||
|
||
async def _public_futures_get(self, path: str, params: Optional[Dict[str, Any]] = None, timeout_sec: int = 10) -> Any:
|
||
"""公开 GET 请求(无需签名),path 如 'fapi/v1/depth' 或 'futures/data/openInterestHist'。"""
|
||
import aiohttp
|
||
base = self._futures_base_url()
|
||
url = f"{base}/{path}" if not path.startswith("/") else f"{base}{path}"
|
||
params = {k: v for k, v in (params or {}).items() if v is not None}
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=timeout_sec)) as resp:
|
||
text = await resp.text()
|
||
if resp.status != 200:
|
||
logger.debug(f"公开行情 GET 失败: {path} status={resp.status} body={text[:200]}")
|
||
return None
|
||
if not text or not text.strip():
|
||
return None
|
||
return json.loads(text)
|
||
except asyncio.TimeoutError:
|
||
logger.debug(f"公开行情 GET 超时: {path}")
|
||
return None
|
||
except Exception as e:
|
||
logger.debug(f"公开行情 GET 异常 {path}: {e}")
|
||
return None
|
||
|
||
async def get_depth(self, symbol: str, limit: int = 50) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
深度信息 GET /fapi/v1/depth
|
||
limit 可选 5,10,20,50,100,500,1000;权重 2~20。
|
||
Returns: { lastUpdateId, E, T, bids: [[price, qty], ...], asks: [...] }
|
||
"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
if limit not in (5, 10, 20, 50, 100, 500, 1000):
|
||
limit = 50
|
||
return await self._rate_limited_request(
|
||
f"depth_{symbol}",
|
||
self._public_futures_get("fapi/v1/depth", {"symbol": symbol, "limit": limit}),
|
||
)
|
||
|
||
async def get_premium_index(self, symbol: Optional[str] = None) -> Any:
|
||
"""
|
||
最新标记价格和资金费率 GET /fapi/v1/premiumIndex
|
||
带 symbol 权重 1,不带 10。返回单对象或数组。
|
||
"""
|
||
params = {} if symbol is None else {"symbol": self._resolve_api_symbol(symbol)}
|
||
return await self._rate_limited_request(
|
||
"premiumIndex",
|
||
self._public_futures_get("fapi/v1/premiumIndex", params),
|
||
)
|
||
|
||
async def get_funding_rate(self, symbol: Optional[str] = None, limit: int = 100,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""
|
||
资金费率历史 GET /fapi/v1/fundingRate
|
||
limit 默认 100 最大 1000;与 fundingInfo 共享 500/5min/IP。
|
||
"""
|
||
params = {"limit": limit}
|
||
if symbol:
|
||
params["symbol"] = self._resolve_api_symbol(symbol)
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._public_futures_get("fapi/v1/fundingRate", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_open_interest(self, symbol: str) -> Optional[Dict[str, Any]]:
|
||
"""获取未平仓合约数 GET /fapi/v1/openInterest 权重 1。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
return await self._rate_limited_request(
|
||
f"openInterest_{symbol}",
|
||
self._public_futures_get("fapi/v1/openInterest", {"symbol": symbol}),
|
||
)
|
||
|
||
async def _futures_data_get(self, endpoint: str, params: Dict[str, Any]) -> Any:
|
||
"""GET /futures/data/<endpoint>,权重多为 0,IP 限频 1000/5min。"""
|
||
return await self._public_futures_get(f"futures/data/{endpoint}", params)
|
||
|
||
async def get_open_interest_hist(self, symbol: str, period: str = "1h", limit: int = 30,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""合约持仓量历史 GET /futures/data/openInterestHist。period: 5m,15m,30m,1h,2h,4h,6h,12h,1d。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
params = {"symbol": symbol, "period": period, "limit": limit}
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._futures_data_get("openInterestHist", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_top_long_short_position_ratio(self, symbol: str, period: str = "1h", limit: int = 30,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""大户持仓量多空比 GET /futures/data/topLongShortPositionRatio。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
params = {"symbol": symbol, "period": period, "limit": limit}
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._futures_data_get("topLongShortPositionRatio", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_top_long_short_account_ratio(self, symbol: str, period: str = "1h", limit: int = 30,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""大户账户数多空比 GET /futures/data/topLongShortAccountRatio。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
params = {"symbol": symbol, "period": period, "limit": limit}
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._futures_data_get("topLongShortAccountRatio", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_global_long_short_account_ratio(self, symbol: str, period: str = "1h", limit: int = 30,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""多空持仓人数比 GET /futures/data/globalLongShortAccountRatio。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
params = {"symbol": symbol, "period": period, "limit": limit}
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._futures_data_get("globalLongShortAccountRatio", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_taker_long_short_ratio(self, symbol: str, period: str = "1h", limit: int = 30,
|
||
start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[Dict]:
|
||
"""合约主动买卖量 GET /futures/data/takerlongshortRatio。"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
params = {"symbol": symbol, "period": period, "limit": limit}
|
||
if start_time is not None:
|
||
params["startTime"] = start_time
|
||
if end_time is not None:
|
||
params["endTime"] = end_time
|
||
out = await self._futures_data_get("takerlongshortRatio", params)
|
||
return out if isinstance(out, list) else []
|
||
|
||
async def get_account_balance(self) -> Dict[str, float]:
|
||
"""
|
||
获取U本位合约账户余额
|
||
|
||
Returns:
|
||
账户余额字典 {'available': 可用余额, 'total': 总余额, 'margin': 保证金余额}
|
||
"""
|
||
try:
|
||
account = await self.client.futures_account()
|
||
assets = account.get('assets', [])
|
||
usdt_asset = next((a for a in assets if a['asset'] == 'USDT'), None)
|
||
|
||
if usdt_asset:
|
||
# 币安合约账户字段说明(根据官方文档):
|
||
# - walletBalance: 钱包余额(不包括未实现盈亏,只反映已实现的盈亏、转账、手续费等)
|
||
# - marginBalance: 保证金余额(钱包余额 + 未实现盈亏),这是账户的总权益,用户看到的"总余额"
|
||
# - availableBalance: 可用余额(可用于开仓的余额 = 钱包余额 - 初始保证金 + 未实现盈亏)
|
||
wallet_balance = float(usdt_asset.get('walletBalance', 0))
|
||
available_balance = float(usdt_asset.get('availableBalance', 0))
|
||
margin_balance = float(usdt_asset.get('marginBalance', 0))
|
||
unrealized_profit = float(usdt_asset.get('unrealizedProfit', 0))
|
||
|
||
# 记录所有字段以便调试
|
||
logger.info(f"币安合约账户余额详情 (USDT):")
|
||
logger.info(f" - walletBalance (钱包余额,不包括未实现盈亏): {wallet_balance}")
|
||
logger.info(f" - marginBalance (保证金余额,总权益,包括未实现盈亏): {margin_balance}")
|
||
logger.info(f" - availableBalance (可用余额): {available_balance}")
|
||
logger.info(f" - unrealizedProfit (未实现盈亏): {unrealized_profit}")
|
||
logger.info(f" - 验证: marginBalance ({margin_balance}) = walletBalance ({wallet_balance}) + unrealizedProfit ({unrealized_profit}) = {wallet_balance + unrealized_profit}")
|
||
|
||
return {
|
||
'ok': True,
|
||
'available': available_balance,
|
||
'total': margin_balance, # 使用保证金余额作为总余额(包括未实现盈亏),这是用户看到的"总余额"
|
||
'margin': margin_balance,
|
||
# 添加原始字段以便调试
|
||
'walletBalance': wallet_balance,
|
||
'availableBalance': available_balance,
|
||
'marginBalance': margin_balance,
|
||
'unrealizedProfit': unrealized_profit
|
||
}
|
||
logger.warning("币安账户中没有找到 USDT 资产")
|
||
return {'ok': True, 'available': 0.0, 'total': 0.0, 'margin': 0.0}
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
error_msg = str(e)
|
||
|
||
# 合并成“单条多行日志”,避免日志/Redis 里刷屏
|
||
lines = [
|
||
"=" * 60,
|
||
f"获取账户余额失败: {error_msg}",
|
||
]
|
||
if error_code == -2015:
|
||
lines += [
|
||
"=" * 60,
|
||
"API密钥权限错误 (错误代码: -2015)",
|
||
"可能的原因:",
|
||
"1. API密钥无效或已过期",
|
||
"2. API密钥没有合约交易权限",
|
||
"3. IP地址未添加到API密钥白名单",
|
||
"4. 测试网/生产网环境不匹配",
|
||
"=" * 60,
|
||
"解决方案:",
|
||
"1. 登录币安账户,检查API密钥状态",
|
||
"2. 确保API密钥已启用'合约交易'权限",
|
||
"3. 如果设置了IP白名单,请添加当前服务器IP",
|
||
"4. 检查 USE_TESTNET 配置是否正确",
|
||
f" 当前配置: USE_TESTNET = {self.testnet}",
|
||
"=" * 60,
|
||
]
|
||
elif error_code == -1022:
|
||
lines += [
|
||
f"错误代码: {error_code}",
|
||
"签名错误,请检查 API_KEY / API_SECRET 是否匹配、是否有多余空格/换行",
|
||
]
|
||
elif error_code == -2010:
|
||
lines += [
|
||
f"错误代码: {error_code}",
|
||
"账户余额不足",
|
||
]
|
||
else:
|
||
lines += [f"错误代码: {error_code}"]
|
||
|
||
logger.error("\n".join(lines))
|
||
|
||
return {
|
||
'ok': False,
|
||
'available': 0.0,
|
||
'total': 0.0,
|
||
'margin': 0.0,
|
||
'error_code': error_code,
|
||
'error_msg': error_msg,
|
||
}
|
||
|
||
async def get_open_positions(self) -> List[Dict]:
|
||
"""
|
||
获取当前持仓
|
||
|
||
Returns:
|
||
持仓列表
|
||
"""
|
||
# 保持 7 次重试,不增加对币安的总请求次数;仅拉长重试间隔(退避)以给网络恢复时间
|
||
retries = 7
|
||
last_error = None
|
||
|
||
read_timeout = getattr(config, 'READ_ONLY_REQUEST_TIMEOUT', 60)
|
||
for attempt in range(retries):
|
||
try:
|
||
# 增加 recvWindow 以避免 -1021 错误;仅此只读接口用较长超时,不影响下单类接口
|
||
positions = await asyncio.wait_for(
|
||
self.client.futures_position_information(recvWindow=20000),
|
||
timeout=read_timeout
|
||
)
|
||
# 只保留非零持仓,且名义价值 >= 配置阈值,避免灰尘持仓被当成“有仓”;与仪表板不一致时可调低 POSITION_MIN_NOTIONAL_USDT 或设为 0
|
||
min_notional = getattr(config, 'POSITION_MIN_NOTIONAL_USDT', 1.0)
|
||
open_positions = []
|
||
skipped_low = []
|
||
for pos in positions:
|
||
amt = float(pos['positionAmt'])
|
||
if amt == 0:
|
||
continue
|
||
entry_price = float(pos['entryPrice'])
|
||
notional = abs(amt) * entry_price
|
||
if min_notional > 0 and notional < min_notional:
|
||
skipped_low.append((pos['symbol'], round(notional, 4)))
|
||
continue
|
||
open_positions.append({
|
||
'symbol': pos['symbol'],
|
||
'positionAmt': amt,
|
||
'entryPrice': entry_price,
|
||
'markPrice': float(pos.get('markPrice', 0)),
|
||
'unRealizedProfit': float(pos['unRealizedProfit']),
|
||
'leverage': int(pos['leverage'])
|
||
})
|
||
if skipped_low and logger.isEnabledFor(logging.DEBUG):
|
||
logger.debug(
|
||
f"获取持仓: 过滤掉 {len(skipped_low)} 个名义价值 < {min_notional} USDT 的仓位 {skipped_low},"
|
||
"与仪表板不一致时可设 POSITION_MIN_NOTIONAL_USDT=0 或更小"
|
||
)
|
||
return open_positions
|
||
except (asyncio.TimeoutError, BinanceAPIException) as e:
|
||
last_error = e
|
||
# 如果是API异常,检查是否是网络相关或服务器错误
|
||
is_network_error = False
|
||
if isinstance(e, BinanceAPIException):
|
||
# -1021: Timestamp for this request is outside of the recvWindow
|
||
# 5xx: Server Error
|
||
if e.code == -1021 or str(e.code).startswith('5'):
|
||
is_network_error = True
|
||
else:
|
||
# TimeoutError
|
||
is_network_error = True
|
||
|
||
if is_network_error:
|
||
if attempt < retries - 1:
|
||
# 退避拉长间隔,不增加重试次数:避免对币安请求频率上升,同时给网络恢复时间
|
||
wait = 3 if attempt >= 4 else (2 if attempt >= 2 else 1)
|
||
logger.warning(f"获取持仓信息失败 (第 {attempt + 1}/{retries} 次): {_format_exception(e)},{wait}秒后重试...")
|
||
await asyncio.sleep(wait)
|
||
continue
|
||
# 最后一次仍失败,不在此处打 ERROR,交给循环外统一打「已重试 N 次」
|
||
else:
|
||
logger.error(f"获取持仓信息失败: {_format_exception(e)}")
|
||
return []
|
||
# 网络类错误且最后一次:跳出循环,由下方统一打日志并返回 []
|
||
except Exception as e:
|
||
logger.error(f"获取持仓信息失败: {_format_exception(e)}")
|
||
return []
|
||
|
||
if last_error:
|
||
err_msg = _format_exception(last_error) if isinstance(last_error, Exception) else str(last_error)
|
||
logger.warning(f"获取持仓信息最终失败 (已重试 {retries} 次): {err_msg}")
|
||
return []
|
||
|
||
async def get_recent_trades(self, symbol: str, limit: int = 50) -> List[Dict]:
|
||
"""
|
||
获取最近的成交记录(超时/网络错误时自动重试)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
limit: 获取数量
|
||
|
||
Returns:
|
||
成交记录列表
|
||
"""
|
||
# 保持 5 次重试,不增加对币安的总请求次数;仅拉长重试间隔(退避)
|
||
retries = 5
|
||
last_error = None
|
||
attempts_made = 0
|
||
read_timeout = getattr(config, 'READ_ONLY_REQUEST_TIMEOUT', 60)
|
||
for attempt in range(retries):
|
||
attempts_made = attempt + 1
|
||
try:
|
||
return await asyncio.wait_for(
|
||
self.client.futures_account_trades(symbol=symbol, limit=limit, recvWindow=20000),
|
||
timeout=read_timeout
|
||
)
|
||
except (asyncio.TimeoutError, BinanceAPIException) as e:
|
||
last_error = e
|
||
is_retryable = isinstance(e, asyncio.TimeoutError) or (
|
||
isinstance(e, BinanceAPIException) and (e.code == -1021 or str(e.code).startswith('5'))
|
||
)
|
||
if is_retryable and attempt < retries - 1:
|
||
wait = 3 if attempt >= 3 else (2 if attempt >= 1 else 1)
|
||
logger.warning(f"获取成交记录失败 {symbol} (第 {attempt + 1}/{retries} 次): {_format_exception(e)},{wait}秒后重试...")
|
||
await asyncio.sleep(wait)
|
||
continue
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"获取成交记录失败 {symbol}: {_format_exception(e)}")
|
||
return []
|
||
if last_error:
|
||
logger.warning(f"获取成交记录失败 {symbol} (已重试 {attempts_made} 次): {_format_exception(last_error)}")
|
||
return []
|
||
|
||
async def get_symbol_info(self, symbol: str) -> Optional[Dict]:
|
||
"""
|
||
获取交易对的精度和限制信息
|
||
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
|
||
|
||
Args:
|
||
symbol: 交易对(支持中文名,会解析为 API symbol)
|
||
|
||
Returns:
|
||
交易对信息字典,包含 quantityPrecision, minQty, stepSize 等
|
||
"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
loop = asyncio.get_event_loop()
|
||
cache_key = f"symbol_info:{symbol}"
|
||
# 1. 优先从 Redis 读取(全用 Redis,不占进程内存)
|
||
cached = await self.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.debug(f"从Redis缓存获取 {symbol} 交易对信息")
|
||
if isinstance(cached, dict) and ("tickSize" not in cached or "pricePrecision" not in cached):
|
||
logger.info(f"{symbol} symbol_info 缓存缺少 tickSize/pricePrecision,自动刷新一次")
|
||
else:
|
||
return cached
|
||
# 2. 降级到进程内存(仅当 Redis 不可用时会有数据)
|
||
if symbol in self._symbol_info_cache:
|
||
cached_mem = self._symbol_info_cache[symbol]
|
||
if isinstance(cached_mem, dict) and ("tickSize" in cached_mem and "pricePrecision" in cached_mem):
|
||
return cached_mem
|
||
self._symbol_info_cache.pop(symbol, None)
|
||
|
||
# 3. 尝试从 DB market_cache 读取 exchange_info 并解析本 symbol
|
||
try:
|
||
exchange_info_db = await loop.run_in_executor(None, lambda: _load_exchange_info_from_db(86400))
|
||
info = _parse_symbol_info_from_exchange_info(exchange_info_db, symbol) if exchange_info_db else None
|
||
if info:
|
||
if self.redis_cache:
|
||
await self.redis_cache.set(cache_key, info, ttl=3600)
|
||
# Redis 写入成功则不占进程内存
|
||
else:
|
||
if len(self._symbol_info_cache) >= self._symbol_info_cache_max_size and symbol not in self._symbol_info_cache:
|
||
_oldest = next(iter(self._symbol_info_cache), None)
|
||
if _oldest is not None:
|
||
self._symbol_info_cache.pop(_oldest, None)
|
||
self._symbol_info_cache[symbol] = info
|
||
logger.debug(f"从 DB 缓存解析 {symbol} 交易对信息")
|
||
return info
|
||
except Exception as e:
|
||
logger.debug("从 DB 读取 exchange_info 解析 %s 失败: %s", symbol, e)
|
||
|
||
# 4. 缓存未命中,调用 API(加超时与重试)
|
||
read_timeout = getattr(config, 'READ_ONLY_REQUEST_TIMEOUT', 60)
|
||
last_err = None
|
||
for attempt in range(3):
|
||
try:
|
||
exchange_info = await asyncio.wait_for(
|
||
self.client.futures_exchange_info(),
|
||
timeout=read_timeout
|
||
)
|
||
# 回写 DB 供后续请求使用
|
||
try:
|
||
await loop.run_in_executor(None, lambda: _save_exchange_info_to_db(exchange_info))
|
||
except Exception as e:
|
||
logger.debug("exchange_info 写入 DB 失败: %s", e)
|
||
break
|
||
except (asyncio.TimeoutError, BinanceAPIException, OSError) as e:
|
||
last_err = e
|
||
if attempt < 2:
|
||
logger.warning(f"获取 exchange_info 失败 (第 {attempt + 1}/3 次): {_format_exception(e)},1秒后重试...")
|
||
await asyncio.sleep(1)
|
||
continue
|
||
logger.error(f"获取 {symbol} 交易对信息失败 (已重试 3 次): {_format_exception(e)}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取 {symbol} 交易对信息失败: {_format_exception(e)}")
|
||
return None
|
||
|
||
try:
|
||
for s in exchange_info['symbols']:
|
||
if s['symbol'] == symbol:
|
||
# 提取数量/价格精度信息
|
||
quantity_precision = s.get('quantityPrecision', 8)
|
||
price_precision = s.get('pricePrecision', 8)
|
||
|
||
# 从filters中提取 minQty/stepSize/minNotional/tickSize
|
||
min_qty = None
|
||
step_size = None
|
||
min_notional = None
|
||
tick_size = None
|
||
for f in s.get('filters', []):
|
||
if f['filterType'] == 'LOT_SIZE':
|
||
min_qty = float(f.get('minQty', 0))
|
||
step_size = float(f.get('stepSize', 0))
|
||
elif f.get('filterType') == 'PRICE_FILTER':
|
||
tick_size = float(f.get('tickSize', 0) or 0)
|
||
elif f['filterType'] == 'MIN_NOTIONAL':
|
||
min_notional = float(f.get('notional', 0))
|
||
|
||
# 如果没有从filters获取到minNotional,使用默认值5 USDT
|
||
if min_notional is None or min_notional == 0:
|
||
min_notional = 5.0
|
||
|
||
# 获取交易对支持的最大杠杆倍数
|
||
# 币安API的exchange_info中可能没有直接的leverageBracket信息
|
||
# 我们尝试从leverageBracket获取,如果没有则使用默认值
|
||
max_leverage_supported = 125 # 币安合约默认最大杠杆
|
||
|
||
# 尝试从leverageBracket获取(如果存在)
|
||
if s.get('leverageBracket') and len(s.get('leverageBracket', [])) > 0:
|
||
max_leverage_supported = s['leverageBracket'][0].get('maxLeverage', 125)
|
||
else:
|
||
# 如果leverageBracket不存在,尝试通过futures_leverage_bracket API获取
|
||
# 但为了不增加API调用,这里先使用默认值125
|
||
# 实际使用时会在设置杠杆时检查,如果失败会自动降低
|
||
max_leverage_supported = 125
|
||
|
||
info = {
|
||
'quantityPrecision': quantity_precision,
|
||
'pricePrecision': price_precision,
|
||
'minQty': min_qty or 0,
|
||
'stepSize': step_size or 0,
|
||
'tickSize': tick_size or 0,
|
||
'minNotional': min_notional,
|
||
'maxLeverage': int(max_leverage_supported) # 交易对支持的最大杠杆
|
||
}
|
||
|
||
wrote = await self.redis_cache.set(cache_key, info, ttl=3600)
|
||
if not wrote:
|
||
if len(self._symbol_info_cache) >= self._symbol_info_cache_max_size and symbol not in self._symbol_info_cache:
|
||
_oldest = next(iter(self._symbol_info_cache), None)
|
||
if _oldest is not None:
|
||
self._symbol_info_cache.pop(_oldest, None)
|
||
self._symbol_info_cache[symbol] = info
|
||
logger.debug(f"获取 {symbol} 精度信息: {info}")
|
||
return info
|
||
|
||
logger.warning(f"未找到交易对 {symbol} 的信息")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取 {symbol} 交易对信息失败: {_format_exception(e)}")
|
||
return None
|
||
|
||
def _adjust_quantity_precision(self, quantity: float, symbol_info: Dict) -> float:
|
||
"""
|
||
调整数量精度,使其符合币安要求
|
||
|
||
Args:
|
||
quantity: 原始数量
|
||
symbol_info: 交易对信息
|
||
|
||
Returns:
|
||
调整后的数量
|
||
"""
|
||
if not symbol_info:
|
||
# 如果没有交易对信息,使用默认精度(3位小数)
|
||
return round(quantity, 3)
|
||
|
||
quantity_precision = symbol_info.get('quantityPrecision', 8)
|
||
step_size = symbol_info.get('stepSize', 0)
|
||
min_qty = symbol_info.get('minQty', 0)
|
||
|
||
# 如果有stepSize,按照stepSize调整
|
||
if step_size > 0:
|
||
# 向下取整到stepSize的倍数(使用浮点数除法)
|
||
adjusted = float(int(quantity / step_size)) * step_size
|
||
else:
|
||
# 否则按照精度调整
|
||
adjusted = round(quantity, quantity_precision)
|
||
|
||
# 确保不小于最小数量
|
||
if min_qty > 0 and adjusted < min_qty:
|
||
# 如果小于最小数量,尝试向上取整到最小数量
|
||
if step_size > 0:
|
||
adjusted = min_qty
|
||
else:
|
||
adjusted = round(min_qty, quantity_precision)
|
||
logger.warning(f"数量 {quantity} 小于最小数量 {min_qty},调整为 {adjusted}")
|
||
|
||
# 最终精度调整
|
||
adjusted = round(adjusted, quantity_precision)
|
||
|
||
if adjusted != quantity:
|
||
logger.info(f"数量精度调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
|
||
|
||
return adjusted
|
||
|
||
@staticmethod
|
||
def _format_decimal_str(d) -> str:
|
||
try:
|
||
s = format(d, 'f')
|
||
except Exception:
|
||
s = str(d)
|
||
if '.' in s:
|
||
s = s.rstrip('0').rstrip('.')
|
||
return s
|
||
|
||
@staticmethod
|
||
def _format_quantity_str(quantity: float, symbol_info: Optional[Dict]) -> str:
|
||
"""
|
||
把数量格式化为币安可接受的字符串,避免 quantityPrecision=0 时发送 "197.0" 导致 -1111。
|
||
"""
|
||
try:
|
||
from decimal import Decimal, ROUND_DOWN
|
||
except Exception:
|
||
# fallback:尽量去掉浮点尾巴
|
||
q = float(quantity or 0)
|
||
if not symbol_info:
|
||
return str(q)
|
||
qp = int(symbol_info.get("quantityPrecision", 8) or 8)
|
||
if qp <= 0:
|
||
return str(int(q))
|
||
return str(round(q, qp))
|
||
|
||
qp = 8
|
||
try:
|
||
qp = int(symbol_info.get("quantityPrecision", 8) or 8) if symbol_info else 8
|
||
except Exception:
|
||
qp = 8
|
||
|
||
qd = Decimal(str(quantity))
|
||
if qp <= 0:
|
||
q2 = qd.to_integral_value(rounding=ROUND_DOWN)
|
||
return BinanceClient._format_decimal_str(q2)
|
||
q2 = qd.quantize(Decimal(f"1e-{qp}"), rounding=ROUND_DOWN)
|
||
return BinanceClient._format_decimal_str(q2)
|
||
|
||
@staticmethod
|
||
def _format_limit_price_str(price: float, symbol_info: Optional[Dict], side: str) -> str:
|
||
"""
|
||
把 LIMIT 价格格式化为币安可接受的字符串(tickSize/pricePrecision 对齐),避免:
|
||
-4014 Price not increased by tick size
|
||
-1111 Precision is over the maximum defined for this asset
|
||
"""
|
||
try:
|
||
from decimal import Decimal, ROUND_DOWN, ROUND_UP
|
||
except Exception:
|
||
return str(round(float(price), 8))
|
||
|
||
tick = 0.0
|
||
pp = 8
|
||
try:
|
||
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
|
||
except Exception:
|
||
tick = 0.0
|
||
try:
|
||
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
|
||
except Exception:
|
||
pp = 8
|
||
|
||
p = Decimal(str(price))
|
||
rounding = ROUND_DOWN if (side or "").upper() == "BUY" else ROUND_UP
|
||
|
||
# 1) tickSize 优先(最严格)
|
||
try:
|
||
t = Decimal(str(tick))
|
||
if t > 0:
|
||
q = p / t
|
||
q2 = q.to_integral_value(rounding=rounding)
|
||
p2 = q2 * t
|
||
return BinanceClient._format_decimal_str(p2)
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) 没有 tickSize 时,用 pricePrecision 兜底
|
||
try:
|
||
if pp <= 0:
|
||
return BinanceClient._format_decimal_str(p.to_integral_value(rounding=rounding))
|
||
p2 = p.quantize(Decimal(f"1e-{pp}"), rounding=rounding)
|
||
return BinanceClient._format_decimal_str(p2)
|
||
except Exception:
|
||
return BinanceClient._format_decimal_str(p)
|
||
|
||
@staticmethod
|
||
def _format_price_str_with_rounding(price: float, symbol_info: Optional[Dict], rounding_mode: str) -> str:
|
||
"""
|
||
通用价格格式化(tickSize/pricePrecision 对齐),并允许显式指定 ROUND_UP / ROUND_DOWN。
|
||
rounding_mode: "UP" 或 "DOWN"
|
||
"""
|
||
try:
|
||
from decimal import Decimal, ROUND_DOWN, ROUND_UP
|
||
except Exception:
|
||
return str(round(float(price), 8))
|
||
|
||
tick = 0.0
|
||
pp = 8
|
||
try:
|
||
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
|
||
except Exception:
|
||
tick = 0.0
|
||
try:
|
||
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
|
||
except Exception:
|
||
pp = 8
|
||
|
||
p = Decimal(str(price))
|
||
rounding = ROUND_UP if str(rounding_mode).upper() == "UP" else ROUND_DOWN
|
||
|
||
# tickSize 优先
|
||
try:
|
||
t = Decimal(str(tick))
|
||
if t > 0:
|
||
q = p / t
|
||
q2 = q.to_integral_value(rounding=rounding)
|
||
p2 = q2 * t
|
||
return BinanceClient._format_decimal_str(p2)
|
||
except Exception:
|
||
pass
|
||
|
||
# pricePrecision 兜底
|
||
try:
|
||
if pp <= 0:
|
||
return BinanceClient._format_decimal_str(p.to_integral_value(rounding=rounding))
|
||
p2 = p.quantize(Decimal(f"1e-{pp}"), rounding=rounding)
|
||
return BinanceClient._format_decimal_str(p2)
|
||
except Exception:
|
||
return BinanceClient._format_decimal_str(p)
|
||
|
||
@staticmethod
|
||
def _adjust_price_to_tick(price: float, tick_size: float, side: str) -> float:
|
||
"""
|
||
把 LIMIT 价格调整为 tickSize 的整数倍,避免:
|
||
-4014 Price not increased by tick size
|
||
-1111 Precision is over the maximum defined for this asset(部分情况下也会由 price 引发)
|
||
|
||
规则(入场限价):
|
||
- BUY:向下取整(更便宜,且不会“买贵了”)
|
||
- SELL:向上取整(更贵/更高,且不会“卖便宜了”)
|
||
"""
|
||
try:
|
||
from decimal import Decimal, ROUND_DOWN, ROUND_UP
|
||
except Exception:
|
||
# fallback:不用 Decimal 时,至少 round 到 8 位,尽量减少精度问题
|
||
return float(round(float(price), 8))
|
||
|
||
try:
|
||
p = Decimal(str(price))
|
||
t = Decimal(str(tick_size))
|
||
if t <= 0:
|
||
return float(p)
|
||
q = p / t
|
||
rounding = ROUND_DOWN if (side or "").upper() == "BUY" else ROUND_UP
|
||
q2 = q.to_integral_value(rounding=rounding)
|
||
p2 = q2 * t
|
||
return float(p2)
|
||
except Exception:
|
||
return float(round(float(price), 8))
|
||
|
||
def _adjust_quantity_precision_up(self, quantity: float, symbol_info: Dict) -> float:
|
||
"""
|
||
向上取整调整数量精度,使其符合币安要求
|
||
|
||
Args:
|
||
quantity: 原始数量
|
||
symbol_info: 交易对信息
|
||
|
||
Returns:
|
||
调整后的数量(向上取整)
|
||
"""
|
||
import math
|
||
|
||
if not symbol_info:
|
||
# 如果没有交易对信息,向上取整到3位小数
|
||
return round(math.ceil(quantity * 1000) / 1000, 3)
|
||
|
||
quantity_precision = symbol_info.get('quantityPrecision', 8)
|
||
step_size = symbol_info.get('stepSize', 0)
|
||
min_qty = symbol_info.get('minQty', 0)
|
||
|
||
# 如果有stepSize,按照stepSize向上取整
|
||
if step_size > 0:
|
||
# 向上取整到stepSize的倍数
|
||
adjusted = math.ceil(quantity / step_size) * step_size
|
||
else:
|
||
# 否则按照精度向上取整
|
||
multiplier = 10 ** quantity_precision
|
||
adjusted = math.ceil(quantity * multiplier) / multiplier
|
||
|
||
# 确保不小于最小数量
|
||
if min_qty > 0 and adjusted < min_qty:
|
||
adjusted = min_qty
|
||
|
||
# 最终精度调整
|
||
adjusted = round(adjusted, quantity_precision)
|
||
|
||
if adjusted != quantity:
|
||
logger.info(f"数量向上取整调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
|
||
|
||
return adjusted
|
||
|
||
async def place_order(
|
||
self,
|
||
symbol: str,
|
||
side: str,
|
||
quantity: float,
|
||
order_type: str = 'MARKET',
|
||
price: Optional[float] = None,
|
||
reduce_only: bool = False,
|
||
position_side: Optional[str] = None,
|
||
new_client_order_id: Optional[str] = None,
|
||
) -> Optional[Dict]:
|
||
"""
|
||
下单
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
side: 方向 'BUY' 或 'SELL'
|
||
quantity: 数量
|
||
order_type: 订单类型 'MARKET' 或 'LIMIT'
|
||
price: 限价单价格
|
||
|
||
Returns:
|
||
订单信息
|
||
"""
|
||
# 支持中文名:将显示名解析为 API 使用的英文 symbol
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
if not symbol:
|
||
logger.error(f"❌ 下单请求 symbol 为空")
|
||
import traceback
|
||
logger.error(f" 调用堆栈:\n{traceback.format_exc()}")
|
||
return None
|
||
|
||
try:
|
||
# 获取交易对精度信息
|
||
symbol_info = await self.get_symbol_info(symbol)
|
||
|
||
# 获取当前价格以计算名义价值(优先用 bookTicker 估算执行价;有 Redis 时从 Redis 读,不占进程内存)
|
||
if price is None:
|
||
try:
|
||
from .book_ticker_stream import get_book_ticker, get_book_ticker_from_redis
|
||
if self.redis_cache:
|
||
book = await get_book_ticker_from_redis(self.redis_cache, symbol)
|
||
else:
|
||
book = get_book_ticker(symbol)
|
||
if book:
|
||
if side == "BUY":
|
||
estimated_price = float(book.get("askPrice", 0))
|
||
else:
|
||
estimated_price = float(book.get("bidPrice", 0))
|
||
if estimated_price > 0:
|
||
current_price = estimated_price
|
||
logger.debug(f"{symbol} 使用 bookTicker 估算价格: {current_price:.4f} ({side})")
|
||
else:
|
||
ticker = await self.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
logger.error(f"无法获取 {symbol} 的价格信息")
|
||
return None
|
||
current_price = ticker['price']
|
||
else:
|
||
ticker = await self.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
logger.error(f"无法获取 {symbol} 的价格信息")
|
||
return None
|
||
current_price = ticker['price']
|
||
except Exception as e:
|
||
logger.debug(f"使用 bookTicker 估算价格失败,回退到 ticker: {e}")
|
||
ticker = await self.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
logger.error(f"无法获取 {symbol} 的价格信息")
|
||
return None
|
||
current_price = ticker['price']
|
||
else:
|
||
current_price = price
|
||
|
||
# 先按原始数量计算名义价值,用于保证金检查
|
||
initial_notional_value = quantity * current_price
|
||
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
|
||
|
||
# 调整数量精度(在保证金检查之前)
|
||
adjusted_quantity = self._adjust_quantity_precision(quantity, symbol_info)
|
||
|
||
if adjusted_quantity <= 0:
|
||
logger.error(f"调整后的数量无效: {adjusted_quantity} (原始: {quantity})")
|
||
return None
|
||
|
||
# 使用调整后的数量重新计算名义价值
|
||
notional_value = adjusted_quantity * current_price
|
||
|
||
logger.info(f"下单检查: {symbol} {side} {adjusted_quantity} (原始: {quantity}) @ {order_type}")
|
||
logger.info(f" 当前价格: {current_price:.4f} USDT")
|
||
logger.info(f" 订单名义价值: {notional_value:.2f} USDT")
|
||
logger.info(f" 最小名义价值: {min_notional:.2f} USDT")
|
||
logger.info(f" 平仓模式: {reduce_only}")
|
||
|
||
# 检查名义价值是否满足最小要求
|
||
# 注意:对于平仓操作(reduce_only=True),完全跳过名义价值检查
|
||
# 因为平仓是关闭现有持仓,币安允许任意大小的平仓订单(只要大于0)
|
||
if reduce_only:
|
||
logger.info(f" 平仓操作:跳过名义价值检查(名义价值: {notional_value:.2f} USDT)")
|
||
elif notional_value < min_notional:
|
||
logger.warning(
|
||
f"❌ {symbol} 订单名义价值不足: {notional_value:.2f} USDT < "
|
||
f"最小要求: {min_notional:.2f} USDT"
|
||
)
|
||
logger.warning(f" 需要增加数量或提高仓位大小")
|
||
return None
|
||
|
||
# 检查最小保证金要求(避免手续费侵蚀收益)
|
||
# 获取当前杠杆(如果无法获取,使用默认值)
|
||
current_leverage = config.TRADING_CONFIG.get('LEVERAGE', 10)
|
||
try:
|
||
# 尝试从持仓信息获取实际使用的杠杆
|
||
# 注意:即使没有持仓(positionAmt=0),leverage 字段也是准确的(反映了当前该 symbol 设置的杠杆)
|
||
positions = await self.client.futures_position_information(symbol=symbol)
|
||
if positions and len(positions) > 0:
|
||
position = positions[0]
|
||
# 优先使用 API 返回的 leverage,不再限制必须有持仓
|
||
leverage_bracket = position.get('leverage')
|
||
if leverage_bracket:
|
||
current_leverage = int(leverage_bracket)
|
||
except Exception as e:
|
||
logger.debug(f"无法获取 {symbol} 的杠杆信息,使用默认值: {current_leverage}x ({e})")
|
||
|
||
min_margin_usdt = config.TRADING_CONFIG.get('MIN_MARGIN_USDT', 0.5) # 默认0.5 USDT
|
||
required_margin = notional_value / current_leverage
|
||
|
||
# 对于平仓操作(reduce_only=True),跳过最小保证金检查
|
||
# 因为这是关闭现有持仓,不应该因为保证金不足而拒绝平仓
|
||
if reduce_only:
|
||
logger.info(f" 平仓操作:跳过最小保证金检查(名义价值: {notional_value:.2f} USDT)")
|
||
elif required_margin < min_margin_usdt:
|
||
# 如果保证金不足,自动调整到最小保证金要求
|
||
required_notional_value = min_margin_usdt * current_leverage
|
||
logger.warning(
|
||
f"⚠ {symbol} 订单保证金不足: {required_margin:.4f} USDT < "
|
||
f"最小保证金要求: {min_margin_usdt:.2f} USDT"
|
||
)
|
||
logger.info(
|
||
f" 自动调整订单名义价值: {notional_value:.2f} USDT -> {required_notional_value:.2f} USDT "
|
||
f"(杠杆: {current_leverage}x, 保证金: {min_margin_usdt:.2f} USDT)"
|
||
)
|
||
|
||
# 调整数量以满足最小保证金要求
|
||
if current_price > 0:
|
||
new_quantity = required_notional_value / current_price
|
||
# 先尝试向下取整调整
|
||
adjusted_quantity = self._adjust_quantity_precision(new_quantity, symbol_info)
|
||
# 重新计算名义价值和保证金
|
||
notional_value = adjusted_quantity * current_price
|
||
required_margin = notional_value / current_leverage
|
||
|
||
# 如果调整后保证金仍然不足,使用向上取整
|
||
if required_margin < min_margin_usdt:
|
||
logger.warning(
|
||
f" ⚠ 向下取整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
|
||
)
|
||
adjusted_quantity = self._adjust_quantity_precision_up(new_quantity, symbol_info)
|
||
# 重新计算名义价值和保证金
|
||
notional_value = adjusted_quantity * current_price
|
||
required_margin = notional_value / current_leverage
|
||
|
||
# 再次检查保证金
|
||
if required_margin < min_margin_usdt:
|
||
logger.error(
|
||
f" ❌ 调整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
|
||
)
|
||
logger.error(
|
||
f" 💡 建议: 增加账户余额或降低杠杆倍数,才能满足最小保证金要求"
|
||
)
|
||
return None
|
||
|
||
logger.info(
|
||
f" ✓ 调整数量: {quantity:.4f} -> {adjusted_quantity:.4f}, "
|
||
f"名义价值: {notional_value:.2f} USDT, "
|
||
f"保证金: {required_margin:.4f} USDT"
|
||
)
|
||
else:
|
||
logger.error(f" ❌ 无法获取 {symbol} 的当前价格,无法调整订单大小")
|
||
return None
|
||
|
||
# 最终检查:确保调整后的保证金满足要求
|
||
# 对于平仓操作(reduce_only=True),跳过最终保证金检查
|
||
if reduce_only:
|
||
logger.info(f" 平仓操作:跳过最终保证金检查(保证金: {required_margin:.4f} USDT)")
|
||
elif required_margin < min_margin_usdt:
|
||
logger.error(
|
||
f"❌ {symbol} 订单保证金不足: {required_margin:.4f} USDT < "
|
||
f"最小保证金要求: {min_margin_usdt:.2f} USDT,拒绝下单"
|
||
)
|
||
return None
|
||
else:
|
||
logger.info(
|
||
f" 保证金检查通过: {required_margin:.4f} USDT >= "
|
||
f"最小要求: {min_margin_usdt:.2f} USDT (杠杆: {current_leverage}x)"
|
||
)
|
||
|
||
# 最终检查:确保名义价值不小于0.2 USDT(避免无意义的小单子)
|
||
# 对于平仓操作(reduce_only=True),跳过此检查
|
||
# MIN_NOTIONAL_VALUE = 0.2 # 最小名义价值0.2 USDT
|
||
# if not reduce_only and notional_value < MIN_NOTIONAL_VALUE:
|
||
# logger.error(
|
||
# f"❌ {symbol} 订单名义价值 {notional_value:.4f} USDT < 最小要求 {MIN_NOTIONAL_VALUE:.2f} USDT,拒绝下单"
|
||
# )
|
||
# logger.error(f" 💡 此类小单子意义不大,拒绝开仓")
|
||
# return None
|
||
|
||
# 构建订单参数
|
||
order_params = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'type': order_type,
|
||
# 关键:quantityPrecision=0 时必须是 "197" 而不是 "197.0",否则会触发 -1111
|
||
'quantity': self._format_quantity_str(adjusted_quantity, symbol_info)
|
||
}
|
||
|
||
# 处理持仓模式(解决 -4061:position side 与账户设置不匹配)
|
||
# - 对冲模式:必须传 positionSide=LONG/SHORT
|
||
# - 单向模式:不要传 positionSide=LONG/SHORT(否则会 -4061)
|
||
dual = None
|
||
try:
|
||
dual = await self._get_dual_side_position()
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 获取持仓模式失败(稍后用重试兜底): {e}")
|
||
|
||
if dual is True:
|
||
ps = await self._resolve_position_side_for_order(symbol, side, reduce_only, position_side)
|
||
if not ps:
|
||
logger.error(f"{symbol} 对冲模式下无法确定 positionSide,拒绝下单以避免 -4061")
|
||
return None
|
||
order_params["positionSide"] = ps
|
||
elif dual is False:
|
||
if position_side:
|
||
logger.info(f"{symbol} 单向模式下忽略 positionSide={position_side}(避免 -4061)")
|
||
|
||
# 开仓单写入自定义订单号:优先使用调用方传入的(便于先落库再下单、WS 按 c 匹配)
|
||
if not reduce_only:
|
||
if new_client_order_id:
|
||
order_params['newClientOrderId'] = str(new_client_order_id)[:36]
|
||
else:
|
||
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
|
||
if prefix:
|
||
order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||
|
||
# 如果是平仓订单,添加 reduceOnly 参数
|
||
# 根据币安API文档,reduceOnly 应该是字符串 "true" 或 "false"
|
||
if reduce_only:
|
||
# 实测:某些账户/模式下(尤其是对冲模式 + positionSide)会报:
|
||
# APIError(code=-1106): Parameter 'reduceonly' sent when not required.
|
||
# 因此:当我们已经明确指定 positionSide=LONG/SHORT 时,不再传 reduceOnly;
|
||
# 其余情况仍保留 reduceOnly 以避免反向开仓。
|
||
if order_params.get("positionSide") in {"LONG", "SHORT"}:
|
||
logger.info(f"{symbol} 对冲模式平仓:已指定 positionSide,跳过 reduceOnly(避免 -1106)")
|
||
else:
|
||
# python-binance 可以接受 bool;同时后面也做 -1106 自动兜底重试
|
||
order_params['reduceOnly'] = True
|
||
logger.info(f"{symbol} 使用 reduceOnly=true 平仓订单")
|
||
|
||
async def _submit(params: Dict[str, Any]) -> Dict[str, Any]:
|
||
# 增加 recvWindow 以避免 -1021 错误
|
||
params['recvWindow'] = 20000
|
||
|
||
if order_type == 'MARKET':
|
||
return await self.client.futures_create_order(**params)
|
||
if price is None:
|
||
raise ValueError("限价单必须指定价格")
|
||
params = dict(params)
|
||
params['timeInForce'] = 'GTC'
|
||
# LIMIT 价格按 tickSize/pricePrecision 修正(避免 -4014 / -1111)
|
||
params['price'] = self._format_limit_price_str(float(price), symbol_info, side)
|
||
return await self.client.futures_create_order(**params)
|
||
|
||
# 提交订单;若遇到:
|
||
# -4061: positionSide 与账户模式不匹配 → 在“带/不带 positionSide”之间兜底切换重试
|
||
# -1106: reduceOnly not required → 去掉 reduceOnly 重试(避免自动平仓失败)
|
||
try:
|
||
order = await _submit(order_params)
|
||
except BinanceAPIException as e:
|
||
code = getattr(e, "code", None)
|
||
if code in (-4061, -1106, -1022):
|
||
retry_params = dict(order_params)
|
||
# 关键修复:重试时必须清除之前的 timestamp 和 signature,
|
||
# 让 python-binance 重新生成,否则会报 -1022 Signature invalid
|
||
retry_params.pop('timestamp', None)
|
||
retry_params.pop('signature', None)
|
||
# 重试时保留调用方传入的 newClientOrderId,否则才重新生成(便于 WS 按 c 匹配)
|
||
if new_client_order_id and 'newClientOrderId' in retry_params:
|
||
retry_params['newClientOrderId'] = str(new_client_order_id)[:36]
|
||
elif 'newClientOrderId' in retry_params and not new_client_order_id:
|
||
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
|
||
if prefix:
|
||
retry_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||
|
||
if code == -4061:
|
||
logger.error(f"{symbol} 触发 -4061(持仓模式不匹配),尝试自动兜底重试一次")
|
||
if "positionSide" in retry_params:
|
||
retry_params.pop("positionSide", None)
|
||
else:
|
||
ps = await self._resolve_position_side_for_order(symbol, side, reduce_only, position_side)
|
||
if ps:
|
||
retry_params["positionSide"] = ps
|
||
elif code == -1106:
|
||
# 常见:Parameter 'reduceonly' sent when not required.
|
||
msg = str(e).lower()
|
||
if "reduceonly" in msg or "reduce only" in msg:
|
||
logger.error(f"{symbol} 触发 -1106(reduceOnly 不被接受),去掉 reduceOnly 后重试一次")
|
||
retry_params.pop("reduceOnly", None)
|
||
else:
|
||
raise
|
||
order = await _submit(retry_params)
|
||
else:
|
||
raise
|
||
|
||
logger.info(f"下单成功: {symbol} {side} {adjusted_quantity} @ {order_type} (名义价值: {notional_value:.2f} USDT)")
|
||
return order
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
error_msg = str(e)
|
||
|
||
if error_code == -1111:
|
||
logger.error(f"下单失败 {symbol} {side}: 精度错误 - {e}")
|
||
logger.error(f" 原始数量: {quantity}")
|
||
if order_type == 'LIMIT':
|
||
logger.error(f" 原始价格: {price}")
|
||
if symbol_info:
|
||
logger.error(f" 交易对精度: {symbol_info}")
|
||
elif error_code == -4014:
|
||
# Price not increased by tick size.
|
||
logger.error(f"下单失败 {symbol} {side}: 价格步长错误(-4014) - {e}")
|
||
logger.error(f" 原始数量: {quantity}")
|
||
logger.error(f" 原始价格: {price}")
|
||
if symbol_info:
|
||
logger.error(f" tickSize: {symbol_info.get('tickSize')}, pricePrecision: {symbol_info.get('pricePrecision')}")
|
||
elif error_code == -4164:
|
||
logger.error(f"下单失败 {symbol} {side}: 订单名义价值不足 - {e}")
|
||
logger.error(f" 订单名义价值必须至少为 5 USDT (除非选择 reduce only)")
|
||
if symbol_info:
|
||
logger.error(f" 最小名义价值: {symbol_info.get('minNotional', 5.0)} USDT")
|
||
elif error_code == -2022:
|
||
# ReduceOnly Order is rejected - 可能是没有持仓或持仓方向不对
|
||
# 这类错误在并发/竞态场景很常见:我们以为还有仓位,但实际上已经被其他任务/手动操作平掉了
|
||
# 对于 reduce_only=True:调用方应当把它当作“幂等平仓”的可接受结果(再查一次实时持仓即可)。
|
||
if reduce_only:
|
||
logger.warning(f"下单被拒绝 {symbol} {side}: ReduceOnly(-2022)(可能仓位已为0/方向腿不匹配),将由上层做幂等处理")
|
||
else:
|
||
logger.error(f"下单失败 {symbol} {side}: ReduceOnly 订单被拒绝 - {e}")
|
||
elif "reduceOnly" in error_msg.lower() or "reduce only" in error_msg.lower():
|
||
logger.error(f"下单失败 {symbol} {side}: ReduceOnly 相关错误 - {e}")
|
||
logger.error(f" 错误码: {error_code}")
|
||
else:
|
||
logger.error(f"下单失败 {symbol} {side}: {e}")
|
||
logger.error(f" 错误码: {error_code}")
|
||
logger.error(
|
||
f" 下单参数: symbol={symbol}, side={side}, quantity={adjusted_quantity}, type={order_type}, "
|
||
f"reduceOnly={reduce_only}, positionSide={order_params.get('positionSide') if 'order_params' in locals() else None}"
|
||
)
|
||
|
||
return None
|
||
except Exception as e:
|
||
# 捕获其他异常
|
||
logger.error(f"下单失败 {symbol} {side}: 未知错误 - {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
|
||
return None
|
||
|
||
async def cancel_order(self, symbol: str, order_id: int) -> bool:
|
||
"""
|
||
取消订单
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
order_id: 订单ID
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
if not symbol:
|
||
logger.error(f"❌ 取消订单请求 symbol 为空")
|
||
import traceback
|
||
logger.error(f" 调用堆栈:\n{traceback.format_exc()}")
|
||
return False
|
||
|
||
try:
|
||
# 使用更大的 recvWindow,降低在网络抖动时触发 -1021 的概率
|
||
await self.client.futures_cancel_order(symbol=symbol, orderId=order_id, recvWindow=20000)
|
||
logger.info(f"取消订单成功: {symbol} {order_id}")
|
||
return True
|
||
except BinanceAPIException as e:
|
||
code = getattr(e, "code", None)
|
||
msg = str(e)
|
||
# -2011 Unknown order sent:订单可能已成交/已撤销/已过期,这是典型幂等场景
|
||
if code == -2011 or "code=-2011" in msg or "Unknown order sent" in msg:
|
||
logger.info(f"取消订单幂等成功(订单可能已不存在): {symbol} {order_id} | {e}")
|
||
return True
|
||
# -1021 时间戳不在 recvWindow 内:尝试同步服务器时间并重试一次
|
||
if code == -1021 or "code=-1021" in msg or "Timestamp for this request is outside of the recvWindow" in msg:
|
||
logger.warning(f"取消订单触发 -1021 时间戳错误,尝试同步服务器时间并重试一次: {symbol} {order_id} | {e}")
|
||
try:
|
||
# 触发一次与服务器对时,让 python-binance 更新内部时间偏差
|
||
await self.client.get_server_time()
|
||
except Exception as sync_e:
|
||
logger.debug(f"同步币安服务器时间失败(可忽略,继续重试取消): {sync_e}")
|
||
try:
|
||
await self.client.futures_cancel_order(symbol=symbol, orderId=order_id, recvWindow=20000)
|
||
logger.info(f"取消订单重试成功: {symbol} {order_id}")
|
||
return True
|
||
except BinanceAPIException as e2:
|
||
code2 = getattr(e2, "code", None)
|
||
msg2 = str(e2)
|
||
if code2 == -2011 or "code=-2011" in msg2 or "Unknown order sent" in msg2:
|
||
logger.info(f"取消订单幂等成功(重试后订单可能已不存在): {symbol} {order_id} | {e2}")
|
||
return True
|
||
logger.error(f"取消订单失败(重试后仍错误): {symbol} {order_id} | {e2}")
|
||
return False
|
||
logger.error(f"取消订单失败: {e}")
|
||
return False
|
||
|
||
# =========================
|
||
# Algo Orders(条件单/止盈止损/计划委托)
|
||
# 说明:币安在 2025-12 后将 USDT-M 合约的 STOP/TP/Trailing 等条件单迁移到 Algo 接口:
|
||
# - POST /fapi/v1/algoOrder
|
||
# - GET /fapi/v1/openAlgoOrders
|
||
# - DELETE /fapi/v1/algoOrder
|
||
# 如果仍用 /fapi/v1/order 下 STOP_MARKET/TAKE_PROFIT_MARKET + closePosition 会报 -4120。
|
||
# =========================
|
||
|
||
async def futures_create_algo_order(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
symbol = params.get('symbol', 'UNKNOWN')
|
||
# 优先从 TRADING_CONFIG 读(DB 可配),否则用 config 或默认 45
|
||
algo_timeout = 45
|
||
if getattr(config, 'TRADING_CONFIG', None):
|
||
algo_timeout = int(config.TRADING_CONFIG.get('ALGO_ORDER_TIMEOUT_SEC') or algo_timeout)
|
||
else:
|
||
algo_timeout = getattr(config, 'ALGO_ORDER_TIMEOUT_SEC', algo_timeout)
|
||
algo_timeout = max(15, min(120, int(algo_timeout)))
|
||
|
||
# 优先尝试 WebSocket 下单(减少 REST 超时)
|
||
if self._ws_trade_client and self._ws_trade_client.is_connected():
|
||
try:
|
||
# 准备 WS 参数(与币安 WS 文档一致:apiKey, timestamp, recvWindow, 其余同 REST)
|
||
ws_params = dict(params)
|
||
ws_params["apiKey"] = self.api_key
|
||
ts_ms = int(time.time() * 1000)
|
||
if "timestamp" not in ws_params:
|
||
ws_params["timestamp"] = ts_ms
|
||
else:
|
||
ws_params["timestamp"] = int(ws_params["timestamp"])
|
||
# 扩大接收窗口,避免服务器与本地时钟偏差导致签名被拒
|
||
ws_params["recvWindow"] = 60000
|
||
# 币安 WS 文档中 closePosition 为 STRING,发送与签名均用 "true"/"false"
|
||
if ws_params.get("closePosition") is True:
|
||
ws_params["closePosition"] = "true"
|
||
elif ws_params.get("closePosition") is False:
|
||
ws_params["closePosition"] = "false"
|
||
# 计算签名:参与签名的字符串必须与 REST 一致(键按字母序、值统一为字符串,布尔为 true/false)
|
||
if "signature" not in ws_params:
|
||
import hmac
|
||
import hashlib
|
||
from urllib.parse import urlencode
|
||
def _val_for_sig(v):
|
||
if v is True:
|
||
return "true"
|
||
if v is False:
|
||
return "false"
|
||
if v is None:
|
||
return ""
|
||
if isinstance(v, (int, float)):
|
||
return str(int(v)) if isinstance(v, float) and v == int(v) else str(v)
|
||
return str(v).strip()
|
||
to_sign = [(k, _val_for_sig(v)) for k, v in ws_params.items() if k != "signature"]
|
||
query_string = urlencode(sorted(to_sign))
|
||
signature = hmac.new(
|
||
self.api_secret.encode("utf-8"),
|
||
query_string.encode("utf-8"),
|
||
hashlib.sha256
|
||
).hexdigest()
|
||
ws_params["signature"] = signature
|
||
# WS 请求(超时时间略短于 REST,因为 WS 通常更快)
|
||
ws_timeout = min(algo_timeout - 5, 25) if algo_timeout > 25 else algo_timeout
|
||
result = await self._ws_trade_client.algo_order_place(ws_params, timeout=ws_timeout)
|
||
if result:
|
||
logger.debug(f"{symbol} ✓ WS 条件单创建成功: algoId={result.get('algoId')}")
|
||
# 转换响应格式以兼容 REST 返回格式(algoId 等字段)
|
||
return result
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.debug(f"{symbol} WS 条件单失败({e}),回退到 REST")
|
||
except Exception as e:
|
||
code = getattr(e, "code", None)
|
||
err_msg = str(e).strip()
|
||
if code in (-4509, -4061):
|
||
raise BinanceAPIException(None, 400, json.dumps({"code": code, "msg": err_msg}))
|
||
# "Time in Force (TIF) GTE can only be used with open positions":持仓尚未可用或已平
|
||
if "GTE" in err_msg and "open positions" in err_msg:
|
||
logger.warning(
|
||
f"{symbol} 条件单被拒(持仓未就绪或已平): {err_msg[:80]}…,将依赖 WebSocket 监控"
|
||
)
|
||
raise AlgoOrderPositionUnavailableError(symbol, err_msg[:200])
|
||
logger.debug(f"{symbol} WS 条件单异常: {e},回退到 REST")
|
||
|
||
# 回退到 REST(原有逻辑)
|
||
for attempt in range(2): # 超时重试 1 次,避免偶发网络抖动直接失败
|
||
try:
|
||
res = await asyncio.wait_for(
|
||
self.client._request_futures_api("post", "algoOrder", True, data=params),
|
||
timeout=algo_timeout,
|
||
)
|
||
return res if isinstance(res, dict) else None
|
||
except asyncio.TimeoutError:
|
||
if attempt == 0:
|
||
logger.warning(f"{symbol} 创建 Algo 条件单超时({algo_timeout}秒),2 秒后重试一次...")
|
||
await asyncio.sleep(2)
|
||
continue
|
||
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: 请求超时({algo_timeout}秒,已重试)")
|
||
logger.error(f" 参数: {params}")
|
||
logger.error(
|
||
f" 💡 可能原因: (1) 网络/币安 API 慢,可调大 ALGO_ORDER_TIMEOUT_SEC(当前 {algo_timeout}s)"
|
||
f" (2) 条件单已触发平仓,持仓消失后挂单会卡住或 -4509,见 docs/bian 条件订单交易更新推送 EXPIRED"
|
||
)
|
||
return None
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
error_msg = str(e).strip()
|
||
if error_code in (-4509, -4061):
|
||
raise # 让 place_trigger_close_position_order 统一打一条 warning,不在此处刷日志
|
||
if "GTE" in error_msg and "open positions" in error_msg:
|
||
logger.warning(
|
||
f"{symbol} 条件单被拒(持仓未就绪或已平): {error_msg[:80]}…,将依赖 WebSocket 监控"
|
||
)
|
||
raise AlgoOrderPositionUnavailableError(symbol, error_msg[:200])
|
||
trigger_type = params.get('type', 'UNKNOWN')
|
||
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}")
|
||
logger.error(f" 错误代码: {error_code}")
|
||
logger.error(f" 参数: {params}")
|
||
if error_code == -4014:
|
||
logger.error(f" 原因: 触发价步长错误(-4014),triggerPrice 须为该交易对 PRICE_FILTER.tickSize 的整数倍,请检查 exchangeInfo 或重试")
|
||
elif error_code == -4164:
|
||
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT)")
|
||
elif error_code == -2022:
|
||
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)")
|
||
elif error_code == -4120:
|
||
logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)")
|
||
elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower():
|
||
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)")
|
||
raise e
|
||
elif "position" in error_msg.lower():
|
||
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)")
|
||
return None
|
||
except Exception as e:
|
||
err_msg = getattr(e, "message", str(e)) or repr(e)
|
||
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {err_msg}")
|
||
logger.error(f" 参数: {params}")
|
||
import traceback
|
||
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}")
|
||
return None
|
||
return None
|
||
|
||
async def futures_get_open_algo_orders(self, symbol: Optional[str] = None, algo_type: str = "CONDITIONAL") -> List[Dict[str, Any]]:
|
||
try:
|
||
data: Dict[str, Any] = {}
|
||
if symbol:
|
||
data["symbol"] = symbol
|
||
if algo_type:
|
||
data["algoType"] = algo_type
|
||
res = await self.client._request_futures_api("get", "openAlgoOrders", True, data=data)
|
||
return res if isinstance(res, list) else []
|
||
except Exception as e:
|
||
logger.debug(f"{symbol or ''} 获取 openAlgoOrders 失败: {e}")
|
||
return []
|
||
|
||
async def futures_cancel_algo_order(self, algo_id: int) -> bool:
|
||
try:
|
||
_ = await self.client._request_futures_api("delete", "algoOrder", True, data={"algoId": int(algo_id)})
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"取消 Algo 条件单失败 algoId={algo_id}: {e}")
|
||
return False
|
||
|
||
async def get_order_by_client_order_id(self, symbol: str, client_order_id: str) -> Optional[Dict]:
|
||
"""
|
||
按 origClientOrderId 查询订单(用于 pending 对账)。
|
||
返回订单信息或 None(不存在/异常)。
|
||
"""
|
||
if not symbol or not (client_order_id or "").strip():
|
||
return None
|
||
try:
|
||
info = await self.client.futures_get_order(
|
||
symbol=symbol,
|
||
origClientOrderId=(client_order_id or "").strip(),
|
||
recvWindow=20000,
|
||
)
|
||
return info if isinstance(info, dict) else None
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 按 origClientOrderId 查询失败: {e}")
|
||
return None
|
||
|
||
async def get_open_orders(self, symbol: str) -> List[Dict]:
|
||
"""
|
||
获取某交易对的未成交委托(用于防止重复挂保护单)。
|
||
"""
|
||
try:
|
||
orders = await self.client.futures_get_open_orders(symbol=symbol)
|
||
return orders if isinstance(orders, list) else []
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 获取未成交委托失败: {e}")
|
||
return []
|
||
|
||
async def cancel_open_orders_by_types(self, symbol: str, types: set[str]) -> int:
|
||
"""
|
||
取消指定类型的未成交委托(只取消保护单相关类型,避免重复下单)。
|
||
返回取消数量。
|
||
"""
|
||
try:
|
||
want = {str(t).upper() for t in (types or set())}
|
||
if not want:
|
||
return 0
|
||
orders = await self.get_open_orders(symbol)
|
||
cancelled = 0
|
||
for o in orders:
|
||
try:
|
||
if not isinstance(o, dict):
|
||
continue
|
||
otype = str(o.get("type") or "").upper()
|
||
oid = o.get("orderId")
|
||
if otype in want and oid:
|
||
ok = await self.cancel_order(symbol, int(oid))
|
||
if ok:
|
||
cancelled += 1
|
||
except Exception:
|
||
continue
|
||
return cancelled
|
||
except Exception:
|
||
return 0
|
||
|
||
async def cancel_open_algo_orders_by_order_types(self, symbol: str, order_types: set[str]) -> int:
|
||
"""
|
||
取消指定类型的“Algo 条件单”(openAlgoOrders)。
|
||
返回取消数量。
|
||
"""
|
||
try:
|
||
want = {str(t).upper() for t in (order_types or set())}
|
||
if not want:
|
||
return 0
|
||
orders = await self.futures_get_open_algo_orders(symbol=symbol, algo_type="CONDITIONAL")
|
||
cancelled = 0
|
||
for o in orders or []:
|
||
try:
|
||
if not isinstance(o, dict):
|
||
continue
|
||
otype = str(o.get("orderType") or o.get("type") or "").upper()
|
||
algo_id = o.get("algoId")
|
||
if algo_id and otype in want:
|
||
ok = await self.futures_cancel_algo_order(int(algo_id))
|
||
if ok:
|
||
cancelled += 1
|
||
except Exception:
|
||
continue
|
||
return cancelled
|
||
except Exception:
|
||
return 0
|
||
|
||
async def place_trigger_close_position_order(
|
||
self,
|
||
symbol: str,
|
||
position_direction: str,
|
||
trigger_type: str,
|
||
stop_price: float,
|
||
current_price: Optional[float] = None,
|
||
working_type: str = "MARK_PRICE",
|
||
client_algo_id: Optional[str] = None,
|
||
) -> Optional[Dict]:
|
||
"""
|
||
在币安侧挂“保护单”,用于止损/止盈:
|
||
- STOP_MARKET / TAKE_PROFIT_MARKET
|
||
- closePosition=True(自动平掉该 symbol 的当前仓位)
|
||
|
||
注意:这类单子不会在本地生成 exit_reason;触发后我们靠“持仓同步/订单同步”去回写数据库。
|
||
"""
|
||
try:
|
||
symbol_info = await self.get_symbol_info(symbol)
|
||
one_way_only = bool(config.TRADING_CONFIG.get("ONE_WAY_POSITION_ONLY", True))
|
||
dual = None
|
||
if not one_way_only:
|
||
try:
|
||
dual = await self._get_dual_side_position()
|
||
except Exception:
|
||
dual = None
|
||
|
||
pd = (position_direction or "").upper()
|
||
if pd not in {"BUY", "SELL"}:
|
||
return None
|
||
|
||
ttype = str(trigger_type or "").upper()
|
||
if ttype not in {"STOP_MARKET", "TAKE_PROFIT_MARKET"}:
|
||
return None
|
||
|
||
close_side = "SELL" if pd == "BUY" else "BUY"
|
||
|
||
# stopPrice 的“避免立即触发”修正(按 MARK_PRICE)
|
||
cp = None
|
||
try:
|
||
cp = float(current_price) if current_price is not None else None
|
||
except Exception:
|
||
cp = None
|
||
# 止盈单校验需要当前价;若未传入则拉取一次,避免错误 triggerPrice(如 0.001)导致 -4509
|
||
if (cp is None or cp <= 0) and ttype == "TAKE_PROFIT_MARKET":
|
||
try:
|
||
ticker = await self.get_ticker_24h(symbol)
|
||
if ticker and ticker.get("lastPrice"):
|
||
cp = float(ticker["lastPrice"])
|
||
except Exception:
|
||
pass
|
||
|
||
tick = 0.0
|
||
pp = 8
|
||
try:
|
||
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
|
||
except Exception:
|
||
tick = 0.0
|
||
try:
|
||
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
|
||
except Exception:
|
||
pp = 8
|
||
|
||
min_step = tick if tick and tick > 0 else (10 ** (-pp) if pp and pp > 0 else 1e-8)
|
||
|
||
sp = float(stop_price or 0)
|
||
if sp <= 0:
|
||
return None
|
||
|
||
# 触发价合理性:止盈价不能偏离当前价过远(避免错误数据导致挂单被拒或 -4509)
|
||
if ttype == "TAKE_PROFIT_MARKET":
|
||
if cp and cp > 0:
|
||
if pd == "SELL":
|
||
if sp >= cp:
|
||
pass
|
||
elif sp < cp * 0.01:
|
||
logger.warning(
|
||
f"{symbol} [止盈校验] SELL 止盈价 {sp:.8f} 远低于当前价 {cp:.8f},疑似数据错误,跳过挂单"
|
||
)
|
||
return None
|
||
else:
|
||
if sp <= cp:
|
||
pass
|
||
elif sp > cp * 100:
|
||
logger.warning(
|
||
f"{symbol} [止盈校验] BUY 止盈价 {sp:.8f} 远高于当前价 {cp:.8f},疑似数据错误,跳过挂单"
|
||
)
|
||
return None
|
||
elif sp < 0.01:
|
||
# 无当前价时:多数 USDT 合约价格 > 0.01,过小触发价视为错误(如 0.001573)
|
||
logger.warning(
|
||
f"{symbol} [止盈校验] 止盈价 {sp:.8f} 过小且无法获取当前价,疑似数据错误,跳过挂单"
|
||
)
|
||
return None
|
||
|
||
# 触发方向约束(避免立即触发):
|
||
# - long 止损:价格 <= stopPrice(stopPrice 应 < current,至少差一个 min_step)
|
||
# - short 止损:价格 >= stopPrice(stopPrice 应 > current,至少差一个 min_step)
|
||
# - long 止盈:价格 >= stopPrice(stopPrice 应 > current,至少差一个 min_step)
|
||
# - short 止盈:价格 <= stopPrice(stopPrice 应 < current,至少差一个 min_step)
|
||
if cp and cp > 0:
|
||
if ttype == "STOP_MARKET":
|
||
if pd == "BUY":
|
||
# 做多止损:止损价必须 < 当前价,至少差一个 min_step
|
||
if sp >= cp:
|
||
# 如果止损价 >= 当前价,调整为当前价 - min_step(但这样止损太紧,可能不合理)
|
||
# 更好的做法是:如果止损价太接近当前价,增加一个安全距离(例如 0.5%)
|
||
safety_margin = max(min_step, cp * 0.005) # 至少 0.5% 的安全距离
|
||
sp = max(0.0, cp - safety_margin)
|
||
logger.warning(f"{symbol} [止损修正] BUY止损价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
|
||
elif pd == "SELL":
|
||
# 做空止损:止损价必须 > 当前价,至少差一个 min_step
|
||
if sp <= cp:
|
||
# 如果止损价 <= 当前价,调整为当前价 + min_step(但这样止损太紧,可能不合理)
|
||
# 更好的做法是:如果止损价太接近当前价,增加一个安全距离(例如 0.5%)
|
||
safety_margin = max(min_step, cp * 0.005) # 至少 0.5% 的安全距离
|
||
sp = cp + safety_margin
|
||
logger.warning(f"{symbol} [止损修正] SELL止损价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
|
||
if ttype == "TAKE_PROFIT_MARKET":
|
||
if pd == "BUY":
|
||
# 做多止盈:止盈价必须 > 当前价,至少差一个 min_step
|
||
if sp <= cp:
|
||
safety_margin = max(min_step, cp * 0.005)
|
||
sp = cp + safety_margin
|
||
logger.warning(f"{symbol} [止盈修正] BUY止盈价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
|
||
elif pd == "SELL":
|
||
# 做空止盈:止盈价必须 < 当前价,至少差一个 min_step
|
||
if sp >= cp:
|
||
safety_margin = max(min_step, cp * 0.005)
|
||
sp = max(0.0, cp - safety_margin)
|
||
logger.warning(f"{symbol} [止盈修正] SELL止盈价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
|
||
|
||
# rounding 规则(提高命中概率,避免“显示等于入场价”的误差带来立即触发/永不触发):
|
||
# 止损:long 用 UP(更靠近当前价),short 用 DOWN
|
||
# 止盈:long 用 DOWN(更靠近当前价),short 用 UP
|
||
rounding_mode = "DOWN"
|
||
if ttype == "STOP_MARKET":
|
||
rounding_mode = "UP" if pd == "BUY" else "DOWN"
|
||
else: # TAKE_PROFIT_MARKET
|
||
rounding_mode = "DOWN" if pd == "BUY" else "UP"
|
||
|
||
stop_price_str = self._format_price_str_with_rounding(sp, symbol_info, rounding_mode)
|
||
|
||
# Algo 条件单接口使用 triggerPrice(不是 stopPrice)
|
||
# 显式传 timeInForce=GTC,避免交易所对 closePosition 单默认用 GTE 导致 "GTE can only be used with open positions"
|
||
# clientAlgoId:用于 ALGO_UPDATE 时按 entry_order_id 精确匹配 DB 平仓记录
|
||
params: Dict[str, Any] = {
|
||
"algoType": "CONDITIONAL",
|
||
"symbol": symbol,
|
||
"side": close_side,
|
||
"type": ttype,
|
||
"triggerPrice": stop_price_str,
|
||
"workingType": working_type,
|
||
"closePosition": True,
|
||
"timeInForce": "GTC",
|
||
}
|
||
if client_algo_id and len(str(client_algo_id)) <= 36:
|
||
params["clientAlgoId"] = str(client_algo_id)
|
||
|
||
# 单向持仓模式(ONE_WAY_POSITION_ONLY):不传 positionSide;否则按检测结果处理
|
||
if not one_way_only and dual is True:
|
||
params["positionSide"] = "LONG" if pd == "BUY" else "SHORT"
|
||
|
||
# 走 Algo Order 接口(避免 -4120)
|
||
order = await self.futures_create_algo_order(params)
|
||
if order:
|
||
return order
|
||
|
||
# 仅在对冲模式且未配置 ONE_WAY_POSITION_ONLY 时,尝试移除 positionSide 重试一次
|
||
if not one_way_only and dual is True:
|
||
logger.warning(f"{symbol} 首次挂保护单失败,尝试移除 positionSide 重试...")
|
||
retry = dict(params)
|
||
retry.pop("positionSide", None)
|
||
retry.pop("timestamp", None)
|
||
retry.pop("signature", None)
|
||
retry_order = await self.futures_create_algo_order(retry)
|
||
if retry_order:
|
||
logger.info(f"{symbol} ✓ 重试成功(移除 positionSide)")
|
||
return retry_order
|
||
|
||
logger.error(f"{symbol} ❌ 保护单挂单失败(详见上方「创建 Algo 条件单失败」日志)")
|
||
logger.error(f" 类型: {trigger_type} 触发价: {stop_price_str}")
|
||
if not one_way_only:
|
||
logger.error(f" 对冲模式: {dual}(None 表示无法读取持仓模式,请检查网络或 API 权限)")
|
||
return None
|
||
|
||
except BinanceAPIException as e:
|
||
error_code = e.code if hasattr(e, 'code') else None
|
||
error_msg = str(e)
|
||
# -4509/-4061:持仓已平或方向不匹配,仅打一条 warning,不刷详细日志
|
||
if error_code in (-4509, -4061):
|
||
logger.warning(
|
||
f"{symbol} 保护单挂单被拒({error_code}): 持仓可能已平或方向不匹配,已跳过"
|
||
)
|
||
return None
|
||
# 其他错误:详细错误日志
|
||
logger.error(f"{symbol} ❌ 挂保护单失败({trigger_type}): {error_msg}")
|
||
logger.error(f" 错误代码: {error_code}")
|
||
logger.error(f" 触发价格: {stop_price:.8f} (格式化后: {stop_price_str})")
|
||
logger.error(f" 当前价格: {cp if cp else '无(已尝试 WS/REST)'}")
|
||
logger.error(f" 持仓方向: {pd}")
|
||
logger.error(f" 平仓方向: {close_side}")
|
||
logger.error(f" 工作类型: {working_type}")
|
||
if symbol_info:
|
||
logger.error(f" 价格精度: {pp}, 价格步长: {tick}")
|
||
|
||
# 常见错误码说明
|
||
if error_code == -4014:
|
||
logger.error(f" 原因: 价格步长错误,需要调整到 tickSize 的倍数")
|
||
elif error_code == -4164:
|
||
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT)")
|
||
elif error_code == -2022:
|
||
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)")
|
||
elif error_code == -2021 or "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower():
|
||
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)")
|
||
# 关键修复:将此异常抛出,让上层调用者(如 PositionManager)决定是否立即执行市价平仓
|
||
raise e
|
||
elif "position" in error_msg.lower():
|
||
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)")
|
||
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"{symbol} ❌ 挂保护单失败({trigger_type}): {type(e).__name__}: {e}")
|
||
logger.error(f" 触发价格: {stop_price:.8f}")
|
||
logger.error(f" 持仓方向: {pd}")
|
||
import traceback
|
||
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}")
|
||
return None
|
||
|
||
async def set_leverage(self, symbol: str, leverage: int = 10) -> int:
|
||
"""
|
||
设置杠杆倍数
|
||
如果设置失败(比如超过交易对支持的最大杠杆),会自动降低杠杆重试
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
leverage: 杠杆倍数(可为 int 或 float,内部会转为 int)
|
||
|
||
Returns:
|
||
成功设置的杠杆倍数,如果失败返回 0
|
||
"""
|
||
target_leverage = int(leverage) # 币安 API 要求整数
|
||
symbol = self._resolve_api_symbol(symbol)
|
||
if not symbol:
|
||
logger.error(f"❌ 设置杠杆请求 symbol 为空")
|
||
return 0
|
||
|
||
# 定义降级尝试序列(如果目标杠杆失败,依次尝试这些标准倍数)
|
||
# 必须确保序列中的值小于 target_leverage
|
||
fallback_levels = [50, 20, 10, 5, 3, 2, 1]
|
||
|
||
# 第一次尝试:目标杠杆(网络超时重试 2 次)
|
||
for attempt in range(3):
|
||
try:
|
||
await self.client.futures_change_leverage(symbol=symbol, leverage=target_leverage)
|
||
logger.info(f"设置杠杆成功: {symbol} {target_leverage}x")
|
||
return target_leverage
|
||
except (TimeoutError, asyncio.TimeoutError):
|
||
if attempt < 2:
|
||
wait_sec = (attempt + 1) * 3
|
||
logger.warning(f"设置杠杆请求超时 ({symbol} {target_leverage}x),{wait_sec} 秒后重试 ({attempt + 1}/2)")
|
||
await asyncio.sleep(wait_sec)
|
||
else:
|
||
logger.error(f"设置杠杆请求超时 ({symbol} {target_leverage}x),已重试 2 次仍失败")
|
||
return 0
|
||
except BinanceAPIException as e:
|
||
error_msg = str(e).lower()
|
||
logger.warning(f"设置杠杆 {target_leverage}x 失败: {e},尝试降低杠杆...")
|
||
# 如果是 leverage 相关错误,尝试降级
|
||
if 'leverage' in error_msg or 'invalid' in error_msg or 'max' in error_msg:
|
||
for fallback in fallback_levels:
|
||
if fallback >= target_leverage:
|
||
continue
|
||
try:
|
||
await self.client.futures_change_leverage(symbol=symbol, leverage=fallback)
|
||
logger.warning(
|
||
f"{symbol} 杠杆降级成功: {target_leverage}x -> {fallback}x"
|
||
)
|
||
return fallback
|
||
except (TimeoutError, asyncio.TimeoutError, BinanceAPIException):
|
||
continue
|
||
err_code = getattr(e, "code", None)
|
||
logger.error(f"设置杠杆最终失败: {symbol} (目标: {target_leverage}x) 错误码={err_code} 详情: {e}")
|
||
return 0
|
||
|
||
def get_realtime_price(self, symbol: str) -> Optional[float]:
|
||
"""
|
||
获取实时价格(从缓存)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
实时价格,如果缓存中有则返回,否则返回None
|
||
"""
|
||
import time
|
||
if symbol in self._price_cache:
|
||
cached = self._price_cache[symbol]
|
||
cache_age = time.time() - cached.get('timestamp', 0)
|
||
if cache_age < self._price_cache_ttl:
|
||
return cached.get('price')
|
||
return None
|