feat(trading_system): 优化交易记录管理与用户数据流集成
在 `position_manager` 和 `risk_manager` 中引入用户数据流缓存,优先使用 WebSocket 更新持仓和余额信息,减少对 REST API 的依赖。同时,增强了交易记录的创建和更新逻辑,支持在订单成交后完善记录,确保与币安数据一致性。新增 `update_open_fields` 和 `update_pending_to_filled` 方法,提升了交易记录的管理能力。
This commit is contained in:
parent
aa073099f2
commit
5154b4933e
|
|
@ -1811,31 +1811,87 @@ async def sync_positions(
|
|||
notional = quantity * entry_price
|
||||
if notional < 1.0:
|
||||
continue
|
||||
# 补建时尽量拿到 entry_order_id:优先 get_all_orders+clientOrderId 前缀,兜底 get_recent_trades(100)+重试
|
||||
entry_order_id = None
|
||||
try:
|
||||
trades = await client.get_recent_trades(symbol, limit=30)
|
||||
if trades:
|
||||
same_side = [t for t in trades if str(t.get('side', '')).upper() == side]
|
||||
if same_side:
|
||||
same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True)
|
||||
entry_order_id = same_side[0].get('orderId')
|
||||
except Exception as e:
|
||||
logger.debug(f"获取 {symbol} 成交记录失败: {e}")
|
||||
client_order_id = None
|
||||
is_clearly_manual = False
|
||||
if system_order_prefix:
|
||||
if entry_order_id:
|
||||
try:
|
||||
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
cid = (order_info or {}).get("clientOrderId") or ""
|
||||
client_order_id = cid or None
|
||||
if cid and not cid.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,视为手动单,跳过补建")
|
||||
except Exception as e:
|
||||
logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建")
|
||||
if is_clearly_manual:
|
||||
continue
|
||||
try:
|
||||
end_ms = int(time.time() * 1000)
|
||||
start_ms = end_ms - (24 * 3600 * 1000)
|
||||
orders = await client.client.futures_get_all_orders(
|
||||
symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000
|
||||
)
|
||||
if isinstance(orders, list):
|
||||
open_orders = [
|
||||
o for o in orders
|
||||
if isinstance(o, dict) and o.get("reduceOnly") is False
|
||||
and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED"
|
||||
]
|
||||
our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)]
|
||||
if our_orders:
|
||||
our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True)
|
||||
best = None
|
||||
for o in our_orders:
|
||||
ap = float(o.get("avgPrice") or 0)
|
||||
eq = float(o.get("executedQty") or o.get("origQty") or 0)
|
||||
if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6:
|
||||
best = o
|
||||
break
|
||||
if best is None:
|
||||
best = our_orders[0]
|
||||
entry_order_id = best.get("orderId")
|
||||
client_order_id = (best.get("clientOrderId") or "").strip() or None
|
||||
except Exception as e:
|
||||
logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}")
|
||||
if entry_order_id is None:
|
||||
try:
|
||||
trades = await client.get_recent_trades(symbol, limit=100)
|
||||
if not trades:
|
||||
await asyncio.sleep(2)
|
||||
trades = await client.get_recent_trades(symbol, limit=100)
|
||||
if trades:
|
||||
same_side = [t for t in trades if str(t.get('side', '')).upper() == side]
|
||||
same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True)
|
||||
if system_order_prefix and same_side:
|
||||
for t in same_side[:5]:
|
||||
oid = t.get("orderId")
|
||||
if not oid:
|
||||
continue
|
||||
try:
|
||||
info = await client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000)
|
||||
cid = (info or {}).get("clientOrderId") or ""
|
||||
if cid.startswith(system_order_prefix):
|
||||
entry_order_id = oid
|
||||
client_order_id = cid.strip() or None
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if entry_order_id is None and same_side:
|
||||
entry_order_id = same_side[0].get("orderId")
|
||||
except Exception as e:
|
||||
logger.debug(f"获取 {symbol} 成交记录失败: {e}")
|
||||
if entry_order_id and client_order_id is None:
|
||||
try:
|
||||
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
client_order_id = (order_info or {}).get("clientOrderId") or None
|
||||
if client_order_id:
|
||||
client_order_id = client_order_id.strip() or None
|
||||
except Exception:
|
||||
pass
|
||||
is_clearly_manual = False
|
||||
if system_order_prefix and entry_order_id and client_order_id and not client_order_id.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id!r} 非系统前缀,视为手动单,跳过补建")
|
||||
elif system_order_prefix and entry_order_id and not client_order_id:
|
||||
try:
|
||||
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
cid = (order_info or {}).get("clientOrderId") or ""
|
||||
if cid and not cid.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
except Exception:
|
||||
pass
|
||||
if is_clearly_manual:
|
||||
continue
|
||||
elif only_recover_when_has_sltp:
|
||||
has_sltp = False
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -431,14 +431,6 @@ class GlobalStrategyConfig:
|
|||
class Trade:
|
||||
"""交易记录模型"""
|
||||
|
||||
@staticmethod
|
||||
def get_by_client_order_id(client_order_id):
|
||||
"""根据自定义订单号(clientOrderId)获取交易记录"""
|
||||
return db.execute_one(
|
||||
"SELECT * FROM trades WHERE client_order_id = %s",
|
||||
(client_order_id,)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def create(
|
||||
symbol,
|
||||
|
|
@ -458,11 +450,13 @@ class Trade:
|
|||
margin_usdt=None,
|
||||
account_id: int = None,
|
||||
entry_context=None,
|
||||
status: str = "open",
|
||||
):
|
||||
"""创建交易记录(使用北京时间)
|
||||
|
||||
Args:
|
||||
symbol: 交易对
|
||||
status: 状态,默认 "open";先落库等 WS 成交时可传 "pending"
|
||||
side: 方向
|
||||
quantity: 数量
|
||||
entry_price: 入场价
|
||||
|
|
@ -503,7 +497,7 @@ class Trade:
|
|||
|
||||
# 动态构建 INSERT(兼容不同schema)
|
||||
columns = ["symbol", "side", "quantity", "entry_price", "leverage", "entry_reason", "status", "entry_time"]
|
||||
values = [symbol, side, quantity, entry_price, leverage, entry_reason, "open", entry_time]
|
||||
values = [symbol, side, quantity, entry_price, leverage, entry_reason, (status or "open"), entry_time]
|
||||
|
||||
# 在真正执行 INSERT 之前,利用 entry_order_id / client_order_id 做一次幂等去重
|
||||
try:
|
||||
|
|
@ -730,7 +724,39 @@ class Trade:
|
|||
else:
|
||||
# 其他错误,重新抛出
|
||||
raise
|
||||
|
||||
|
||||
@staticmethod
|
||||
def update_open_fields(trade_id: int, **kwargs):
|
||||
"""开仓后完善字段:止损/止盈/名义/保证金/entry_context 等(用于先 pending 落库、成交后补全)。"""
|
||||
if not kwargs:
|
||||
return
|
||||
allowed = {
|
||||
"stop_loss_price", "take_profit_price", "take_profit_1", "take_profit_2",
|
||||
"notional_usdt", "margin_usdt", "entry_context", "atr"
|
||||
}
|
||||
updates = []
|
||||
values = []
|
||||
for k, v in kwargs.items():
|
||||
if k not in allowed or v is None:
|
||||
continue
|
||||
if k == "entry_context":
|
||||
try:
|
||||
v = json.dumps(v, ensure_ascii=False) if isinstance(v, dict) else str(v)
|
||||
except Exception:
|
||||
continue
|
||||
updates.append(f"{k} = %s")
|
||||
values.append(v)
|
||||
if not updates:
|
||||
return
|
||||
values.append(trade_id)
|
||||
try:
|
||||
db.execute_update(
|
||||
f"UPDATE trades SET {', '.join(updates)} WHERE id = %s",
|
||||
tuple(values)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"update_open_fields trade_id={trade_id} 失败: {e}")
|
||||
|
||||
@staticmethod
|
||||
def get_by_entry_order_id(entry_order_id):
|
||||
"""根据开仓订单号获取交易记录"""
|
||||
|
|
@ -746,7 +772,47 @@ class Trade:
|
|||
"SELECT * FROM trades WHERE exit_order_id = %s",
|
||||
(exit_order_id,)
|
||||
)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_by_client_order_id(client_order_id, account_id: int = None):
|
||||
"""根据 clientOrderId 获取交易记录(可选按 account_id 隔离)"""
|
||||
if not client_order_id:
|
||||
return None
|
||||
try:
|
||||
if account_id is not None and _table_has_column("trades", "account_id"):
|
||||
return db.execute_one(
|
||||
"SELECT * FROM trades WHERE client_order_id = %s AND account_id = %s",
|
||||
(client_order_id, int(account_id))
|
||||
)
|
||||
return db.execute_one(
|
||||
"SELECT * FROM trades WHERE client_order_id = %s",
|
||||
(client_order_id,)
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def update_pending_to_filled(client_order_id, account_id: int, entry_order_id, entry_price: float, quantity: float):
|
||||
"""WS 或 REST 收到成交后:按 client_order_id 将 pending 记录完善为已成交(幂等)"""
|
||||
if not client_order_id or account_id is None:
|
||||
return False
|
||||
try:
|
||||
row = Trade.get_by_client_order_id(client_order_id, account_id)
|
||||
if not row:
|
||||
return False
|
||||
# 仅当仍为 pending 或 entry_order_id 为空时更新,避免覆盖已完善数据
|
||||
if row.get("status") not in ("pending", "open") and row.get("entry_order_id"):
|
||||
return True # 已完善,视为成功
|
||||
db.execute_update(
|
||||
"""UPDATE trades SET entry_order_id = %s, entry_price = %s, quantity = %s, status = 'open'
|
||||
WHERE client_order_id = %s AND account_id = %s""",
|
||||
(entry_order_id, float(entry_price), float(quantity), client_order_id, int(account_id))
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_all(start_timestamp=None, end_timestamp=None, symbol=None, status=None, trade_type=None, exit_reason=None, account_id: int = None):
|
||||
"""获取交易记录(仅包含本系统开仓的记录已通过清理脚本维护,不再在查询里筛选)"""
|
||||
|
|
@ -801,6 +867,27 @@ class Trade:
|
|||
(symbol, status),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id) -> bool:
|
||||
"""ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号(仅更新一条)。"""
|
||||
if not symbol or account_id is None or exit_order_id is None:
|
||||
return False
|
||||
try:
|
||||
if not _table_has_column("trades", "account_id"):
|
||||
return False
|
||||
# 更新一条:open 且 (exit_order_id 为空)
|
||||
n = db.execute_update(
|
||||
"""UPDATE trades SET exit_order_id = %s
|
||||
WHERE account_id = %s AND symbol = %s AND status = 'open'
|
||||
AND (exit_order_id IS NULL OR exit_order_id = '')
|
||||
LIMIT 1""",
|
||||
(str(exit_order_id), int(account_id), symbol.strip())
|
||||
)
|
||||
return n is not None and n > 0
|
||||
except Exception as e:
|
||||
logger.warning(f"set_exit_order_id_for_open_trade 失败 symbol={symbol!r}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class AccountSnapshot:
|
||||
"""账户快照模型"""
|
||||
|
|
|
|||
|
|
@ -165,7 +165,7 @@ Authorization: Bearer <token>
|
|||
| **数据库不可用** | `position_manager` 启动时尝试导入 `backend.database.models`,若 `backend` 目录不存在、或导入失败(缺依赖、Python 路径错误、DB 连不上等),会设 `DB_AVAILABLE = False`,成交后直接跳过保存并打日志「数据库不可用,跳过保存」。 |
|
||||
| **Trade.create 抛异常** | 落库时发生异常(如连接超时、唯一约束冲突、磁盘满、字段错误等),代码会打错并 `return None`,不会重试,币安已有仓但 DB 无记录。 |
|
||||
| **进程在落库前退出** | 订单已在币安成交,但在执行到 `Trade.create` 之前进程被 kill、崩溃或重启,也会出现币安有仓、DB 无记录。 |
|
||||
| **补建时拿不到订单号** | 同步补建时用 `get_recent_trades(symbol, limit=30)` 取最近同向成交并取第一条的 `orderId` 作为 `entry_order_id`。若接口失败、返回空、或该笔开仓已不在最近 30 条成交里,则 `entry_order_id` 为空,补建记录就会「没记订单号」。 |
|
||||
| **补建时拿不到订单号** | 已优化:配置了 **SYSTEM_ORDER_ID_PREFIX** 时,补建**优先**用 `futures_get_all_orders(symbol, 最近24h)` 按开仓订单(reduceOnly=false、FILLED、同向)且 **clientOrderId 以系统前缀开头** 取本系统单,再按成交价/数量匹配取 `orderId` 与 `clientOrderId`,不依赖「最近 30 条成交」。若无前缀或该接口无结果,再用 `get_recent_trades(symbol, limit=100)` 兜底,空时重试一次;若配置了前缀,会从同向成交里逐条查订单的 clientOrderId,只采用前缀匹配的订单号,避免把手动单绑到补建记录。 |
|
||||
|
||||
**建议**:保证交易进程与 backend 同机或能访问同一 DB、且 `backend` 路径正确,避免 `DB_AVAILABLE = False`;若曾出现「开了不落库」,可查交易进程日志中的「保存交易记录到数据库失败」或「数据库不可用」;补建后仍无订单号的记录可用「仅可对账」过滤,不影响基于可对账记录的统计。
|
||||
|
||||
|
|
|
|||
18
listenkey过期推送.txt
Normal file
18
listenkey过期推送.txt
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
listenKey过期推送
|
||||
事件描述
|
||||
当前连接使用的有效listenKey过期时,user data stream 将会推送此事件。
|
||||
|
||||
注意:
|
||||
|
||||
此事件与 websocket 连接中断没有必然联系
|
||||
只有正在连接中的有效listenKey过期时才会收到此消息
|
||||
收到此消息后 user data stream 将不再更新,直到用户使用新的有效的listenKey
|
||||
事件类型
|
||||
listenKeyExpired
|
||||
|
||||
响应示例
|
||||
{
|
||||
"e": "listenKeyExpired", // 事件类型
|
||||
"E": "1736996475556", // 事件时间
|
||||
"listenKey":"WsCMN0a4KHUPTQuX6IUnqEZfB1inxmv1qR4kbf1LuEjur5VdbzqvyxqG9TSjVVxv"
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@
|
|||
币安客户端封装 - 提供异步交易接口
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
|
@ -279,6 +280,58 @@ class BinanceClient:
|
|||
else:
|
||||
logger.warning(f"⚠ API密钥验证时出现错误: {e}")
|
||||
|
||||
def _futures_base_url(self) -> str:
|
||||
"""合约 REST 根地址(用于 listenKey 等)"""
|
||||
return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com"
|
||||
|
||||
async def create_futures_listen_key(self) -> Optional[str]:
|
||||
"""创建 U 本位合约 User Data Stream listenKey(用于 WS 订阅订单/持仓推送)。60 分钟无 keepalive 会失效。"""
|
||||
if not self.api_key:
|
||||
return None
|
||||
try:
|
||||
import aiohttp
|
||||
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
|
||||
headers = {"X-MBX-APIKEY": self.api_key}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
logger.warning(f"create_futures_listen_key 失败 status={resp.status} body={text}")
|
||||
return None
|
||||
data = await resp.json()
|
||||
key = data.get("listenKey") if isinstance(data, dict) else None
|
||||
if key:
|
||||
logger.info("✓ 合约 User Data Stream listenKey 已创建")
|
||||
return key
|
||||
except Exception as e:
|
||||
logger.warning(f"create_futures_listen_key 失败: {e}")
|
||||
return None
|
||||
|
||||
async def keepalive_futures_listen_key(self, listen_key: str):
|
||||
"""延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。返回 (ok: bool, code_1125: bool),-1125 表示 key 不存在需换新。"""
|
||||
if not self.api_key or not listen_key:
|
||||
return False, False
|
||||
try:
|
||||
import aiohttp
|
||||
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
|
||||
headers = {"X-MBX-APIKEY": self.api_key}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
||||
text = await resp.text()
|
||||
ok = resp.status == 200
|
||||
code_1125 = False
|
||||
if not ok and text:
|
||||
try:
|
||||
data = json.loads(text) if text.strip().startswith("{") else {}
|
||||
code_1125 = int(data.get("code", 0)) == -1125
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug(f"keepalive_futures_listen_key 失败 status={resp.status} body={text}")
|
||||
return ok, code_1125
|
||||
except Exception as e:
|
||||
logger.debug(f"keepalive_futures_listen_key 失败: {e}")
|
||||
return False, False
|
||||
|
||||
@staticmethod
|
||||
def _to_bool(value: Any) -> Optional[bool]:
|
||||
if value is None:
|
||||
|
|
@ -1258,6 +1311,7 @@ class BinanceClient:
|
|||
price: Optional[float] = None,
|
||||
reduce_only: bool = False,
|
||||
position_side: Optional[str] = None,
|
||||
new_client_order_id: Optional[str] = None,
|
||||
) -> Optional[Dict]:
|
||||
"""
|
||||
下单
|
||||
|
|
@ -1454,12 +1508,14 @@ class BinanceClient:
|
|||
if position_side:
|
||||
logger.info(f"{symbol} 单向模式下忽略 positionSide={position_side}(避免 -4061)")
|
||||
|
||||
# 开仓单写入自定义订单号前缀,便于同步时根据 clientOrderId 区分系统单(仅本系统开的仓才补建记录)
|
||||
# 开仓单写入自定义订单号:优先使用调用方传入的(便于先落库再下单、WS 按 c 匹配)
|
||||
if not reduce_only:
|
||||
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
|
||||
if prefix:
|
||||
# 币安 newClientOrderId 最长 36 字符,格式: PREFIX_timestamp_hex
|
||||
order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||||
if new_client_order_id:
|
||||
order_params['newClientOrderId'] = str(new_client_order_id)[:36]
|
||||
else:
|
||||
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
|
||||
if prefix:
|
||||
order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||||
|
||||
# 如果是平仓订单,添加 reduceOnly 参数
|
||||
# 根据币安API文档,reduceOnly 应该是字符串 "true" 或 "false"
|
||||
|
|
@ -1502,8 +1558,10 @@ class BinanceClient:
|
|||
# 让 python-binance 重新生成,否则会报 -1022 Signature invalid
|
||||
retry_params.pop('timestamp', None)
|
||||
retry_params.pop('signature', None)
|
||||
# 重试时生成新的 newClientOrderId,避免与首次提交冲突
|
||||
if 'newClientOrderId' in retry_params:
|
||||
# 重试时保留调用方传入的 newClientOrderId,否则才重新生成(便于 WS 按 c 匹配)
|
||||
if new_client_order_id and 'newClientOrderId' in retry_params:
|
||||
retry_params['newClientOrderId'] = str(new_client_order_id)[:36]
|
||||
elif 'newClientOrderId' in retry_params and not new_client_order_id:
|
||||
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
|
||||
if prefix:
|
||||
retry_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ try:
|
|||
from .risk_manager import RiskManager # type: ignore
|
||||
from .position_manager import PositionManager # type: ignore
|
||||
from .strategy import TradingStrategy # type: ignore
|
||||
from .user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
|
||||
from . import config # type: ignore
|
||||
except Exception:
|
||||
_here = Path(__file__).resolve().parent
|
||||
|
|
@ -30,6 +31,7 @@ except Exception:
|
|||
from risk_manager import RiskManager # type: ignore
|
||||
from position_manager import PositionManager # type: ignore
|
||||
from strategy import TradingStrategy # type: ignore
|
||||
from user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
|
||||
import config # type: ignore
|
||||
|
||||
# 配置日志(支持相对路径)
|
||||
|
|
@ -233,6 +235,7 @@ async def main():
|
|||
|
||||
# 初始化组件
|
||||
client = None
|
||||
user_data_stream = None
|
||||
try:
|
||||
# 1. 初始化币安客户端
|
||||
logger.info("初始化币安客户端...")
|
||||
|
|
@ -302,8 +305,25 @@ async def main():
|
|||
except Exception as e:
|
||||
logger.error(f"余额重试异常: {e}", exc_info=True)
|
||||
continue
|
||||
|
||||
|
||||
|
||||
# 3. 启动 User Data Stream(订单/持仓/余额推送,listenKey 保活,减少 REST 请求)
|
||||
import os
|
||||
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
|
||||
user_data_stream = UserDataStream(client, account_id)
|
||||
if await user_data_stream.start():
|
||||
logger.info("✓ User Data Stream 已启动(订单/持仓/余额 WS 推送,30 分钟 keepalive)")
|
||||
# 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存
|
||||
try:
|
||||
seed_balance_cache(balance)
|
||||
positions_seed = await client.get_open_positions()
|
||||
seed_position_cache(positions_seed)
|
||||
logger.info(f"✓ 已播种持仓/余额缓存(持仓 {len(positions_seed)} 个)")
|
||||
except Exception as e:
|
||||
logger.warning(f"播种 WS 缓存失败(将仅用 REST): {e}")
|
||||
else:
|
||||
logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓")
|
||||
user_data_stream = None
|
||||
|
||||
# 4. 初始化各个模块
|
||||
logger.info("初始化交易模块...")
|
||||
scanner = MarketScanner(client)
|
||||
|
|
@ -384,7 +404,13 @@ async def main():
|
|||
logger.error(f"程序运行出错: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
# 清理资源
|
||||
# 清理资源(先停 User Data Stream,再断 client)
|
||||
try:
|
||||
if user_data_stream is not None:
|
||||
await user_data_stream.stop()
|
||||
logger.info("User Data Stream 已停止")
|
||||
except Exception as e:
|
||||
logger.debug(f"停止 User Data Stream 时异常: {e}")
|
||||
if client:
|
||||
await client.disconnect()
|
||||
logger.info("程序已退出")
|
||||
|
|
|
|||
|
|
@ -2,10 +2,11 @@
|
|||
仓位管理模块 - 管理持仓和订单
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
import aiohttp
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import aiohttp
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
|
|
@ -96,6 +97,18 @@ def _log_trade_db_failure(symbol, entry_order_id, side, quantity, entry_price, a
|
|||
logger.warning(f"写入落库失败日志失败: {e}")
|
||||
|
||||
|
||||
# User Data Stream 缓存(持仓/余额优先读 WS,减少 REST)
|
||||
try:
|
||||
from .user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache
|
||||
except Exception:
|
||||
try:
|
||||
from user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache
|
||||
except Exception:
|
||||
get_stream_instance = lambda: None
|
||||
get_positions_from_cache = lambda min_notional=1.0: []
|
||||
get_balance_from_cache = lambda: None
|
||||
|
||||
|
||||
class PositionManager:
|
||||
"""仓位管理类"""
|
||||
|
||||
|
|
@ -131,6 +144,23 @@ class PositionManager:
|
|||
self._last_auto_close_attempt_ms: Dict[str, int] = {}
|
||||
self._last_auto_close_fail_log_ms: Dict[str, int] = {}
|
||||
|
||||
async def _get_open_positions(self) -> List[Dict]:
|
||||
"""优先使用 User Data Stream 持仓缓存,无缓存或未启动时走 REST。"""
|
||||
if get_stream_instance() is not None:
|
||||
min_notional = float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0)
|
||||
cached = get_positions_from_cache(min_notional)
|
||||
if cached is not None:
|
||||
return cached
|
||||
return await self.client.get_open_positions()
|
||||
|
||||
async def _get_account_balance(self) -> Dict:
|
||||
"""优先使用 User Data Stream 余额缓存,无缓存时走 REST。"""
|
||||
if get_stream_instance() is not None:
|
||||
bal = get_balance_from_cache()
|
||||
if bal is not None:
|
||||
return bal
|
||||
return await self.client.get_account_balance()
|
||||
|
||||
@staticmethod
|
||||
def _pct_like_to_ratio(v: float) -> float:
|
||||
"""
|
||||
|
|
@ -458,6 +488,39 @@ class PositionManager:
|
|||
filled_quantity = 0.0
|
||||
entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase")
|
||||
|
||||
# 生成 client_order_id:先落库 pending,WS 按 o.c 匹配完善,减少对 REST 同步依赖
|
||||
prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip()
|
||||
client_order_id = None
|
||||
if prefix:
|
||||
client_order_id = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
|
||||
pending_trade_id = None
|
||||
if DB_AVAILABLE and Trade and client_order_id:
|
||||
try:
|
||||
pending_trade_id = Trade.create(
|
||||
symbol=symbol,
|
||||
side=side,
|
||||
quantity=quantity,
|
||||
entry_price=entry_price,
|
||||
leverage=leverage,
|
||||
entry_reason=entry_reason,
|
||||
entry_order_id=None,
|
||||
client_order_id=client_order_id,
|
||||
stop_loss_price=None,
|
||||
take_profit_price=None,
|
||||
take_profit_1=None,
|
||||
take_profit_2=None,
|
||||
atr=atr,
|
||||
notional_usdt=None,
|
||||
margin_usdt=None,
|
||||
account_id=self.account_id,
|
||||
entry_context=entry_context,
|
||||
status="pending",
|
||||
)
|
||||
logger.debug(f"{symbol} 已落库 pending 记录 (client_order_id={client_order_id!r}, id={pending_trade_id})")
|
||||
except Exception as e:
|
||||
logger.warning(f"{symbol} 创建 pending 记录失败: {e}")
|
||||
pending_trade_id = None
|
||||
|
||||
if not smart_entry_enabled:
|
||||
# 根治方案:关闭智能入场后,回归“纯限价单模式”
|
||||
# - 不追价
|
||||
|
|
@ -469,7 +532,8 @@ class PositionManager:
|
|||
f"确认超时={confirm_timeout}s(未成交将撤单跳过)"
|
||||
)
|
||||
order = await self.client.place_order(
|
||||
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit
|
||||
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit,
|
||||
new_client_order_id=client_order_id
|
||||
)
|
||||
if not order:
|
||||
return None
|
||||
|
|
@ -485,7 +549,10 @@ class PositionManager:
|
|||
f"追价上限={max_drift_ratio*100:.2f}%"
|
||||
)
|
||||
|
||||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit)
|
||||
order = await self.client.place_order(
|
||||
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit,
|
||||
new_client_order_id=client_order_id
|
||||
)
|
||||
if not order:
|
||||
return None
|
||||
entry_order_id = order.get("orderId")
|
||||
|
|
@ -522,7 +589,10 @@ class PositionManager:
|
|||
pass
|
||||
self._pending_entry_orders.pop(symbol, None)
|
||||
logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底")
|
||||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET")
|
||||
order = await self.client.place_order(
|
||||
symbol=symbol, side=side, quantity=quantity, order_type="MARKET",
|
||||
new_client_order_id=client_order_id
|
||||
)
|
||||
# 关键:转市价后必须更新 entry_order_id,否则后续会继续查询“已取消的旧限价单”,导致误判 CANCELED
|
||||
try:
|
||||
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
|
||||
|
|
@ -563,7 +633,10 @@ class PositionManager:
|
|||
f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | "
|
||||
f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%"
|
||||
)
|
||||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired)
|
||||
order = await self.client.place_order(
|
||||
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired,
|
||||
new_client_order_id=client_order_id
|
||||
)
|
||||
if not order:
|
||||
self._pending_entry_orders.pop(symbol, None)
|
||||
return None
|
||||
|
|
@ -728,32 +801,57 @@ class PositionManager:
|
|||
take_profit_1 = closer
|
||||
take_profit_2 = further
|
||||
|
||||
# 记录到数据库(只有在订单真正成交后才保存)
|
||||
# 记录到数据库:有 pending 则完善(WS 也会按 client_order_id 完善),否则新建
|
||||
trade_id = None
|
||||
if DB_AVAILABLE and Trade:
|
||||
try:
|
||||
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
|
||||
client_order_id = (order.get("clientOrderId") if order else None) or None
|
||||
trade_id = Trade.create(
|
||||
symbol=symbol,
|
||||
side=side,
|
||||
quantity=quantity, # 使用实际成交数量
|
||||
entry_price=entry_price, # 使用实际成交价格
|
||||
leverage=leverage,
|
||||
entry_reason=entry_reason,
|
||||
entry_order_id=entry_order_id, # 保存币安订单号
|
||||
client_order_id=client_order_id, # 自定义订单号,便于在订单记录中核对系统单
|
||||
stop_loss_price=stop_loss_price,
|
||||
take_profit_price=take_profit_price,
|
||||
take_profit_1=take_profit_1,
|
||||
take_profit_2=take_profit_2,
|
||||
atr=atr,
|
||||
notional_usdt=notional_usdt,
|
||||
margin_usdt=margin_usdt,
|
||||
entry_context=entry_context, # 入场思路与过程(便于事后分析策略执行效果)
|
||||
account_id=self.account_id
|
||||
)
|
||||
logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
|
||||
if client_order_id:
|
||||
row = Trade.get_by_client_order_id(client_order_id, self.account_id)
|
||||
if row and str(row.get("status")) == "pending":
|
||||
ok = Trade.update_pending_to_filled(
|
||||
client_order_id, self.account_id,
|
||||
entry_order_id, entry_price, quantity
|
||||
)
|
||||
if ok:
|
||||
row = Trade.get_by_client_order_id(client_order_id, self.account_id)
|
||||
trade_id = row.get("id") if row else None
|
||||
if trade_id:
|
||||
Trade.update_open_fields(
|
||||
trade_id,
|
||||
stop_loss_price=stop_loss_price,
|
||||
take_profit_price=take_profit_price,
|
||||
take_profit_1=take_profit_1,
|
||||
take_profit_2=take_profit_2,
|
||||
notional_usdt=notional_usdt,
|
||||
margin_usdt=margin_usdt,
|
||||
entry_context=entry_context,
|
||||
atr=atr,
|
||||
)
|
||||
logger.info(f"✓ {symbol} 已完善 pending 记录 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
|
||||
if trade_id is None:
|
||||
# 无 pending 或未匹配到:走新建(兜底)
|
||||
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
|
||||
fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id
|
||||
trade_id = Trade.create(
|
||||
symbol=symbol,
|
||||
side=side,
|
||||
quantity=quantity,
|
||||
entry_price=entry_price,
|
||||
leverage=leverage,
|
||||
entry_reason=entry_reason,
|
||||
entry_order_id=entry_order_id,
|
||||
client_order_id=fallback_client_order_id,
|
||||
stop_loss_price=stop_loss_price,
|
||||
take_profit_price=take_profit_price,
|
||||
take_profit_1=take_profit_1,
|
||||
take_profit_2=take_profit_2,
|
||||
atr=atr,
|
||||
notional_usdt=notional_usdt,
|
||||
margin_usdt=margin_usdt,
|
||||
entry_context=entry_context,
|
||||
account_id=self.account_id,
|
||||
)
|
||||
logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
|
||||
logger.error(f" 错误类型: {type(e).__name__}")
|
||||
|
|
@ -817,7 +915,7 @@ class PositionManager:
|
|||
# 验证持仓是否真的在币安存在
|
||||
try:
|
||||
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
binance_position = next(
|
||||
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
|
||||
None
|
||||
|
|
@ -910,7 +1008,7 @@ class PositionManager:
|
|||
pass
|
||||
|
||||
# 获取当前持仓
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
position = next(
|
||||
(p for p in positions if p['symbol'] == symbol),
|
||||
None
|
||||
|
|
@ -1474,7 +1572,7 @@ class PositionManager:
|
|||
if current_price is None:
|
||||
try:
|
||||
# 优先获取标记价格(MARK_PRICE),因为止损单使用 MARK_PRICE 作为触发基准
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
position = next((p for p in positions if p['symbol'] == symbol), None)
|
||||
if position:
|
||||
mark_price = position.get('markPrice')
|
||||
|
|
@ -1735,7 +1833,7 @@ class PositionManager:
|
|||
|
||||
try:
|
||||
# 获取当前持仓
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
position_dict = {p['symbol']: p for p in positions}
|
||||
|
||||
for symbol, position_info in list(self.active_positions.items()):
|
||||
|
|
@ -2260,8 +2358,8 @@ class PositionManager:
|
|||
持仓摘要信息
|
||||
"""
|
||||
try:
|
||||
positions = await self.client.get_open_positions()
|
||||
balance = await self.client.get_account_balance()
|
||||
positions = await self._get_open_positions()
|
||||
balance = await self._get_account_balance()
|
||||
|
||||
total_pnl = sum(p['unRealizedProfit'] for p in positions)
|
||||
|
||||
|
|
@ -2329,7 +2427,7 @@ class PositionManager:
|
|||
logger.info("开始同步币安持仓状态与数据库...")
|
||||
|
||||
# 1. 获取币安实际持仓
|
||||
binance_positions = await self.client.get_open_positions()
|
||||
binance_positions = await self._get_open_positions()
|
||||
binance_symbols = {p['symbol'] for p in binance_positions}
|
||||
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})")
|
||||
|
||||
|
|
@ -3045,34 +3143,88 @@ class PositionManager:
|
|||
notional = quantity * entry_price
|
||||
if notional < 1.0:
|
||||
continue
|
||||
# 补建时尽量拿到 entry_order_id:优先按时间范围查开仓订单并用 clientOrderId 前缀锁定本系统单,避免拿错/拿不到
|
||||
entry_order_id = None
|
||||
try:
|
||||
recent = await self.client.get_recent_trades(symbol, limit=30)
|
||||
if recent:
|
||||
same_side = [t for t in recent if str(t.get("side", "")).upper() == side]
|
||||
if same_side:
|
||||
same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True)
|
||||
entry_order_id = same_side[0].get("orderId")
|
||||
except Exception:
|
||||
pass
|
||||
client_order_id_sync = None
|
||||
if system_order_prefix:
|
||||
try:
|
||||
end_ms = int(time.time() * 1000)
|
||||
start_ms = end_ms - (24 * 3600 * 1000)
|
||||
orders = await self.client.client.futures_get_all_orders(
|
||||
symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000
|
||||
)
|
||||
if isinstance(orders, list):
|
||||
open_orders = [
|
||||
o for o in orders
|
||||
if isinstance(o, dict) and o.get("reduceOnly") is False
|
||||
and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED"
|
||||
]
|
||||
our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)]
|
||||
if our_orders:
|
||||
our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True)
|
||||
best = None
|
||||
for o in our_orders:
|
||||
ap = float(o.get("avgPrice") or 0)
|
||||
eq = float(o.get("executedQty") or o.get("origQty") or 0)
|
||||
if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6:
|
||||
best = o
|
||||
break
|
||||
if best is None:
|
||||
best = our_orders[0]
|
||||
entry_order_id = best.get("orderId")
|
||||
client_order_id_sync = (best.get("clientOrderId") or "").strip() or None
|
||||
logger.debug(f" {symbol} 补建从 get_all_orders 取得 orderId={entry_order_id}, clientOrderId={client_order_id_sync!r}")
|
||||
except Exception as e:
|
||||
logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}")
|
||||
if entry_order_id is None:
|
||||
try:
|
||||
recent = await self.client.get_recent_trades(symbol, limit=100)
|
||||
if not recent:
|
||||
await asyncio.sleep(2)
|
||||
recent = await self.client.get_recent_trades(symbol, limit=100)
|
||||
if recent:
|
||||
same_side = [t for t in recent if str(t.get("side", "")).upper() == side]
|
||||
same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True)
|
||||
if system_order_prefix and same_side:
|
||||
for t in same_side[:5]:
|
||||
oid = t.get("orderId")
|
||||
if not oid:
|
||||
continue
|
||||
try:
|
||||
info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000)
|
||||
cid = (info or {}).get("clientOrderId") or ""
|
||||
if cid.startswith(system_order_prefix):
|
||||
entry_order_id = oid
|
||||
client_order_id_sync = cid.strip() or None
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if entry_order_id is None and same_side:
|
||||
entry_order_id = same_side[0].get("orderId")
|
||||
except Exception:
|
||||
pass
|
||||
if entry_order_id and client_order_id_sync is None:
|
||||
try:
|
||||
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
cid = (order_info or {}).get("clientOrderId") or ""
|
||||
client_order_id_sync = cid.strip() or None
|
||||
except Exception:
|
||||
pass
|
||||
# 标记是否「来历不明」:用于 DB entry_reason 与后续统计分析(sync_recovered_unknown_origin)
|
||||
sync_unknown_origin = False
|
||||
# 仅当「明确查到开仓订单且 clientOrderId 非空且不以系统前缀开头」时标记为来历不明(仍会补建+挂 SL/TP+监控)
|
||||
is_clearly_manual = False
|
||||
if system_order_prefix and entry_order_id and client_order_id_sync and not client_order_id_sync.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id_sync!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP")
|
||||
elif system_order_prefix and entry_order_id and not client_order_id_sync:
|
||||
try:
|
||||
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
cid = (order_info or {}).get("clientOrderId") or ""
|
||||
if cid and not cid.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
except Exception:
|
||||
pass
|
||||
if system_order_prefix:
|
||||
if entry_order_id:
|
||||
try:
|
||||
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
|
||||
cid = (order_info or {}).get("clientOrderId") or ""
|
||||
client_order_id_sync = cid or None
|
||||
if cid and not cid.startswith(system_order_prefix):
|
||||
is_clearly_manual = True
|
||||
logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP")
|
||||
except Exception as e:
|
||||
logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建")
|
||||
# 无法获取订单或 cid 为空(历史单/未带前缀)时不视为手动单,继续补建
|
||||
# 例外:若开启「仅币安有仓且存在 SL/TP 也监控」,且该 symbol 有止损/止盈单,可视为系统单(仅影响日志)
|
||||
if is_clearly_manual and config.TRADING_CONFIG.get("SYNC_MONITOR_BINANCE_POSITIONS_WITH_SLTP", True):
|
||||
if await self._symbol_has_sltp_orders(symbol):
|
||||
is_clearly_manual = False
|
||||
|
|
@ -3081,7 +3233,6 @@ class PositionManager:
|
|||
sync_unknown_origin = True
|
||||
logger.info(f" → {symbol} 来历不明(开仓订单非系统前缀),将补建记录、自动挂止损止盈并纳入监控")
|
||||
else:
|
||||
# 未配置前缀时,不再因「无止损/止盈单」跳过:一律补建并自动挂 SL/TP、纳入监控
|
||||
if sync_recover_only_has_sltp and not (await self._symbol_has_sltp_orders(symbol)):
|
||||
sync_unknown_origin = True
|
||||
logger.info(f" → {symbol} 无止损/止盈单,将补建记录、自动挂止损止盈并纳入监控")
|
||||
|
|
@ -3169,10 +3320,13 @@ class PositionManager:
|
|||
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
|
||||
f"({side} {quantity:.4f} @ {entry_price:.4f})"
|
||||
)
|
||||
# 尽量从币安成交取 entry_order_id,便于订单记录/统计对账
|
||||
# 尽量从币安成交取 entry_order_id(limit=100、空时重试一次),便于订单记录/统计对账
|
||||
entry_order_id = None
|
||||
try:
|
||||
recent = await self.client.get_recent_trades(symbol, limit=30)
|
||||
recent = await self.client.get_recent_trades(symbol, limit=100)
|
||||
if not recent:
|
||||
await asyncio.sleep(2)
|
||||
recent = await self.client.get_recent_trades(symbol, limit=100)
|
||||
if recent:
|
||||
same_side = [t for t in recent if str(t.get('side', '')).upper() == side]
|
||||
if same_side:
|
||||
|
|
@ -3315,7 +3469,7 @@ class PositionManager:
|
|||
return
|
||||
|
||||
# 获取当前所有持仓(与 sync 一致:仅本系统关心的持仓会进 active_positions)
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
binance_symbols = {p['symbol'] for p in positions}
|
||||
active_symbols = set(self.active_positions.keys())
|
||||
sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False)
|
||||
|
|
@ -4133,7 +4287,7 @@ class PositionManager:
|
|||
logger.warning(f" 可能原因: 监控未启动或已停止")
|
||||
|
||||
# 3. 获取币安实际持仓
|
||||
positions = await self.client.get_open_positions()
|
||||
positions = await self._get_open_positions()
|
||||
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
|
||||
|
||||
if not binance_position:
|
||||
|
|
|
|||
|
|
@ -17,6 +17,28 @@ except ImportError:
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 可选:优先使用 User Data Stream 缓存(与 position_manager 一致)
|
||||
def _get_stream_instance():
|
||||
try:
|
||||
from .user_data_stream import get_stream_instance
|
||||
return get_stream_instance()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _get_balance_from_cache():
|
||||
try:
|
||||
from .user_data_stream import get_balance_from_cache
|
||||
return get_balance_from_cache()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _get_positions_from_cache():
|
||||
try:
|
||||
from .user_data_stream import get_positions_from_cache
|
||||
return get_positions_from_cache(float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class RiskManager:
|
||||
"""风险管理类"""
|
||||
|
|
@ -49,8 +71,10 @@ class RiskManager:
|
|||
try:
|
||||
logger.info(f"检查 {symbol} 单笔仓位大小...")
|
||||
|
||||
# 获取账户余额
|
||||
balance = await self.client.get_account_balance()
|
||||
# 获取账户余额(优先 WS 缓存)
|
||||
balance = _get_balance_from_cache() if _get_stream_instance() else None
|
||||
if balance is None:
|
||||
balance = await self.client.get_account_balance()
|
||||
available_balance = balance.get('available', 0)
|
||||
|
||||
if available_balance <= 0:
|
||||
|
|
@ -137,8 +161,10 @@ class RiskManager:
|
|||
是否通过检查
|
||||
"""
|
||||
try:
|
||||
# 获取当前持仓
|
||||
positions = await self.client.get_open_positions()
|
||||
# 获取当前持仓(优先 WS 缓存)
|
||||
positions = _get_positions_from_cache() if _get_stream_instance() else None
|
||||
if positions is None:
|
||||
positions = await self.client.get_open_positions()
|
||||
|
||||
# 计算当前总保证金占用
|
||||
current_position_values = []
|
||||
|
|
@ -167,8 +193,10 @@ class RiskManager:
|
|||
# 加上新仓位
|
||||
total_with_new = total_margin_value + new_position_margin
|
||||
|
||||
# 获取账户余额
|
||||
balance = await self.client.get_account_balance()
|
||||
# 获取账户余额(优先 WS 缓存)
|
||||
balance = _get_balance_from_cache() if _get_stream_instance() else None
|
||||
if balance is None:
|
||||
balance = await self.client.get_account_balance()
|
||||
total_balance = balance.get('total', 0)
|
||||
available_balance = balance.get('available', 0)
|
||||
|
||||
|
|
@ -401,8 +429,10 @@ class RiskManager:
|
|||
try:
|
||||
logger.info(f"开始计算 {symbol} 的仓位大小...")
|
||||
|
||||
# 获取账户余额
|
||||
balance = await self.client.get_account_balance()
|
||||
# 获取账户余额(优先 WS 缓存)
|
||||
balance = _get_balance_from_cache() if _get_stream_instance() else None
|
||||
if balance is None:
|
||||
balance = await self.client.get_account_balance()
|
||||
available_balance = balance.get('available', 0)
|
||||
total_balance = balance.get('total', 0)
|
||||
|
||||
|
|
@ -785,8 +815,10 @@ class RiskManager:
|
|||
logger.debug(f"{symbol} 涨跌幅 {change_percent:.2f}% 小于阈值")
|
||||
return False
|
||||
|
||||
# 检查是否已有持仓 / 总持仓数量限制
|
||||
positions = await self.client.get_open_positions()
|
||||
# 检查是否已有持仓 / 总持仓数量限制(优先 WS 缓存)
|
||||
positions = _get_positions_from_cache() if _get_stream_instance() else None
|
||||
if positions is None:
|
||||
positions = await self.client.get_open_positions()
|
||||
try:
|
||||
max_open = int(config.TRADING_CONFIG.get("MAX_OPEN_POSITIONS", 0) or 0)
|
||||
except Exception:
|
||||
|
|
|
|||
364
trading_system/user_data_stream.py
Normal file
364
trading_system/user_data_stream.py
Normal file
|
|
@ -0,0 +1,364 @@
|
|||
"""
|
||||
合约 User Data Stream:订阅 listenKey 推送,用 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓。
|
||||
- 先 REST 下单并落库 pending,WS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量。
|
||||
- ACCOUNT_UPDATE 推送持仓/余额变化,可作持仓与余额数据源以降低 REST 频率。
|
||||
- listenKey 每 30 分钟 keepalive;收到 listenKeyExpired 或 keepalive 返回 -1125 时主动换新 key 重连。
|
||||
- 单连接有效期不超过 24 小时,届时主动重连(文档建议)。
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 持仓推送缓存:ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
|
||||
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
|
||||
# 余额推送缓存:ACCOUNT_UPDATE 的 B 数组,a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
|
||||
_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc }
|
||||
# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用)
|
||||
_stream_instance: Optional["UserDataStream"] = None
|
||||
# 是否已用 REST 结果播种过(未播种时业务应走 REST,不返回空缓存)
|
||||
_position_cache_seeded = False
|
||||
_balance_cache_seeded = False
|
||||
|
||||
|
||||
def get_position_updates_cache() -> Dict[str, List[Dict]]:
|
||||
"""返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。"""
|
||||
return dict(_position_updates_cache)
|
||||
|
||||
|
||||
def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]:
|
||||
"""返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。"""
|
||||
return dict(_balance_updates_cache)
|
||||
|
||||
|
||||
def get_stream_instance() -> Optional["UserDataStream"]:
|
||||
"""返回当前运行的 UserDataStream 实例(未启动时为 None)。"""
|
||||
return _stream_instance
|
||||
|
||||
|
||||
def seed_position_cache(positions: List[Dict]) -> None:
|
||||
"""用 REST 全量持仓结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
|
||||
global _position_updates_cache, _position_cache_seeded
|
||||
_position_updates_cache.clear()
|
||||
_position_cache_seeded = True
|
||||
for pos in positions or []:
|
||||
symbol = (pos.get("symbol") or "").strip()
|
||||
if not symbol:
|
||||
continue
|
||||
amt = float(pos.get("positionAmt") or 0)
|
||||
_position_updates_cache[symbol] = [{
|
||||
"s": symbol,
|
||||
"pa": amt,
|
||||
"ep": str(pos.get("entryPrice") or "0"),
|
||||
"up": str(pos.get("unRealizedProfit") or "0"),
|
||||
"ps": pos.get("positionSide") or "BOTH",
|
||||
}]
|
||||
logger.debug(f"UserDataStream: 已填充持仓缓存 {len(_position_updates_cache)} 个 symbol")
|
||||
|
||||
|
||||
def seed_balance_cache(balance: Dict[str, Any]) -> None:
|
||||
"""用 REST 余额结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
|
||||
global _balance_updates_cache, _balance_cache_seeded
|
||||
_balance_cache_seeded = True
|
||||
if balance and isinstance(balance, dict):
|
||||
wb = balance.get("walletBalance") or balance.get("total") or 0
|
||||
av = balance.get("availableBalance") or balance.get("available") or wb
|
||||
_balance_updates_cache["USDT"] = {"wb": str(wb), "cw": str(av), "bc": "0"}
|
||||
logger.debug("UserDataStream: 已填充余额缓存 (USDT)")
|
||||
|
||||
|
||||
def get_positions_from_cache(min_notional: float = 1.0) -> Optional[List[Dict]]:
|
||||
"""将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。"""
|
||||
if not _position_cache_seeded:
|
||||
return None
|
||||
out = []
|
||||
for symbol, plist in _position_updates_cache.items():
|
||||
for p in plist if isinstance(plist, list) else []:
|
||||
try:
|
||||
pa = float(p.get("pa") or 0)
|
||||
except (TypeError, ValueError):
|
||||
pa = 0
|
||||
if pa == 0:
|
||||
continue
|
||||
ep = float(p.get("ep") or 0)
|
||||
if min_notional > 0 and abs(pa) * ep < min_notional:
|
||||
continue
|
||||
out.append({
|
||||
"symbol": symbol,
|
||||
"positionAmt": pa,
|
||||
"entryPrice": ep,
|
||||
"markPrice": float(p.get("markPrice") or 0),
|
||||
"unRealizedProfit": float(p.get("up") or 0),
|
||||
"leverage": int(p.get("leverage") or 0),
|
||||
})
|
||||
return out
|
||||
|
||||
|
||||
def get_balance_from_cache() -> Optional[Dict[str, Any]]:
|
||||
"""从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。"""
|
||||
if not _balance_cache_seeded:
|
||||
return None
|
||||
u = _balance_updates_cache.get("USDT")
|
||||
if not u:
|
||||
return None
|
||||
try:
|
||||
wb = float(u.get("wb") or 0)
|
||||
cw = float(u.get("cw") or wb)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return {"ok": True, "available": cw, "total": wb, "margin": wb}
|
||||
|
||||
|
||||
class UserDataStream:
|
||||
"""合约用户数据流:创建/保活 listenKey,连接 WS,处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。"""
|
||||
|
||||
def __init__(self, client: Any, account_id: int):
|
||||
self.client = client
|
||||
self.account_id = int(account_id)
|
||||
self._listen_key: Optional[str] = None
|
||||
self._ws = None
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._keepalive_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连)
|
||||
|
||||
def _ws_base_url(self) -> str:
|
||||
if getattr(self.client, "testnet", False):
|
||||
return "wss://stream.binancefuture.com/ws"
|
||||
return "wss://fstream.binance.com/ws"
|
||||
|
||||
async def start(self) -> bool:
|
||||
"""创建 listenKey 并启动 WS 接收循环与 keepalive 任务。"""
|
||||
global _stream_instance
|
||||
if self._running:
|
||||
return True
|
||||
self._listen_key = await self.client.create_futures_listen_key()
|
||||
if not self._listen_key:
|
||||
logger.warning("UserDataStream: 无法创建 listenKey,跳过启动")
|
||||
return False
|
||||
self._running = True
|
||||
_stream_instance = self
|
||||
self._task = asyncio.create_task(self._run_ws())
|
||||
self._keepalive_task = asyncio.create_task(self._run_keepalive())
|
||||
logger.info("UserDataStream: 已启动(ORDER_TRADE_UPDATE / ACCOUNT_UPDATE,含持仓与余额推送)")
|
||||
return True
|
||||
|
||||
async def stop(self):
|
||||
"""停止 WS 与 keepalive。"""
|
||||
global _stream_instance
|
||||
self._running = False
|
||||
_stream_instance = None
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
try:
|
||||
await self._keepalive_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._keepalive_task = None
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
if self._ws:
|
||||
try:
|
||||
await self._ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._ws = None
|
||||
self._listen_key = None
|
||||
logger.info("UserDataStream: 已停止")
|
||||
|
||||
async def _run_keepalive(self):
|
||||
"""每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。"""
|
||||
while self._running:
|
||||
await asyncio.sleep(30 * 60)
|
||||
if not self._running or not self._listen_key:
|
||||
break
|
||||
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key)
|
||||
if not ok and code_1125 and self._ws:
|
||||
logger.warning("UserDataStream: keepalive 返回 -1125(listenKey 不存在),主动断线以换新 key 重连")
|
||||
try:
|
||||
await self._ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
if not ok:
|
||||
logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey")
|
||||
|
||||
async def _run_ws(self):
|
||||
"""连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。"""
|
||||
import aiohttp
|
||||
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
|
||||
while self._running:
|
||||
url = f"{self._ws_base_url()}/{self._listen_key}"
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
|
||||
self._ws = ws
|
||||
self._conn_start_time = time.monotonic()
|
||||
logger.info("UserDataStream: WS 已连接")
|
||||
async for msg in ws:
|
||||
if not self._running:
|
||||
break
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
should_break = await self._handle_message(msg.data)
|
||||
if should_break:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
logger.warning("UserDataStream: WS 错误")
|
||||
break
|
||||
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
|
||||
break
|
||||
# 单连接 24h 主动重连(文档:每个链接有效期不超过24小时)
|
||||
if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec:
|
||||
logger.info("UserDataStream: 连接已近 24h,主动重连")
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"UserDataStream: WS 异常 {e},60s 后重连")
|
||||
await asyncio.sleep(60)
|
||||
self._ws = None
|
||||
self._conn_start_time = None
|
||||
if not self._running:
|
||||
break
|
||||
# 重连前重新创建 listenKey(旧 key 可能已失效或 listenKeyExpired)
|
||||
self._listen_key = await self.client.create_futures_listen_key()
|
||||
if not self._listen_key:
|
||||
await asyncio.sleep(60)
|
||||
continue
|
||||
|
||||
async def _handle_message(self, raw: str) -> bool:
|
||||
"""处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired)以触发重连。"""
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
return False
|
||||
e = data.get("e")
|
||||
if e == "listenKeyExpired":
|
||||
logger.warning("UserDataStream: 收到 listenKeyExpired,将换新 key 重连")
|
||||
return True
|
||||
if e == "ORDER_TRADE_UPDATE":
|
||||
self._on_order_trade_update(data.get("o") or {})
|
||||
elif e == "ACCOUNT_UPDATE":
|
||||
self._on_account_update(data.get("a") or {})
|
||||
elif e == "ALGO_UPDATE":
|
||||
self._on_algo_update(data.get("o") or {})
|
||||
return False
|
||||
|
||||
def _on_order_trade_update(self, o: Dict):
|
||||
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
|
||||
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对
|
||||
event_type = (o.get("x") or "").strip().upper()
|
||||
status = (o.get("X") or "").strip().upper()
|
||||
if status != "FILLED":
|
||||
return
|
||||
client_order_id = (o.get("c") or "").strip() or None
|
||||
order_id = o.get("i")
|
||||
ap = o.get("ap")
|
||||
z = o.get("z")
|
||||
reduce_only = o.get("R") is True
|
||||
symbol = (o.get("s") or "").strip()
|
||||
rp = o.get("rp") # 实现盈亏
|
||||
if order_id is None or ap is None or z is None:
|
||||
return
|
||||
try:
|
||||
ap_f = float(ap)
|
||||
z_f = float(z)
|
||||
except (TypeError, ValueError):
|
||||
return
|
||||
# 特殊 clientOrderId:强平/ADL/结算,仅打日志不参与系统 pending 完善
|
||||
if client_order_id:
|
||||
if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"):
|
||||
logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}")
|
||||
return
|
||||
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交(文档:x=TRADE 表示有成交)
|
||||
if event_type and event_type != "TRADE":
|
||||
return
|
||||
# 开仓成交:完善 pending 记录
|
||||
if not reduce_only:
|
||||
if not client_order_id:
|
||||
return
|
||||
try:
|
||||
import sys
|
||||
from pathlib import Path
|
||||
project_root = Path(__file__).parent.parent
|
||||
backend_path = project_root / "backend"
|
||||
if backend_path.exists():
|
||||
sys.path.insert(0, str(backend_path))
|
||||
from database.models import Trade
|
||||
updated = Trade.update_pending_to_filled(
|
||||
client_order_id, self.account_id, order_id, ap_f, z_f
|
||||
)
|
||||
if updated:
|
||||
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}")
|
||||
except Exception as ex:
|
||||
logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}")
|
||||
else:
|
||||
# 平仓成交:按 symbol 回写 open 记录的 exit_order_id;若有 rp 可记入日志
|
||||
if rp is not None:
|
||||
logger.debug(f"UserDataStream: 平仓订单 FILLED orderId={order_id} symbol={symbol!r} 实现盈亏 rp={rp}")
|
||||
if symbol:
|
||||
try:
|
||||
import sys
|
||||
from pathlib import Path
|
||||
project_root = Path(__file__).parent.parent
|
||||
backend_path = project_root / "backend"
|
||||
if backend_path.exists():
|
||||
sys.path.insert(0, str(backend_path))
|
||||
from database.models import Trade
|
||||
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id):
|
||||
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
|
||||
except Exception as ex:
|
||||
logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}")
|
||||
|
||||
def _on_account_update(self, a: Dict):
|
||||
# 文档: a.B = 余额数组,每项 a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
|
||||
# 文档: a.P = 持仓信息数组,每项 s=symbol, pa=仓位, ep=入仓价, ps=LONG/SHORT/BOTH 等
|
||||
global _position_updates_cache, _balance_updates_cache
|
||||
B = a.get("B")
|
||||
if isinstance(B, list) and B:
|
||||
for b in B:
|
||||
asset = (b.get("a") or "").strip()
|
||||
if asset:
|
||||
_balance_updates_cache[asset] = {
|
||||
"wb": b.get("wb"),
|
||||
"cw": b.get("cw"),
|
||||
"bc": b.get("bc"),
|
||||
}
|
||||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
|
||||
P = a.get("P")
|
||||
if isinstance(P, list) and P:
|
||||
for p in P:
|
||||
s = (p.get("s") or "").strip()
|
||||
if s:
|
||||
if s not in _position_updates_cache:
|
||||
_position_updates_cache[s] = []
|
||||
_position_updates_cache[s] = [p]
|
||||
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
|
||||
|
||||
def _on_algo_update(self, o: Dict):
|
||||
# 条件单交易更新推送:X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
|
||||
x = (o.get("X") or "").strip().upper()
|
||||
ai = o.get("ai")
|
||||
symbol = (o.get("s") or "").strip()
|
||||
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
|
||||
try:
|
||||
import sys
|
||||
from pathlib import Path
|
||||
project_root = Path(__file__).parent.parent
|
||||
backend_path = project_root / "backend"
|
||||
if backend_path.exists():
|
||||
sys.path.insert(0, str(backend_path))
|
||||
from database.models import Trade
|
||||
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai):
|
||||
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
|
||||
except Exception as ex:
|
||||
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
|
||||
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")
|
||||
48
条件订单交易更新推送.txt
Normal file
48
条件订单交易更新推送.txt
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
条件订单交易更新推送
|
||||
事件描述
|
||||
当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ALGO_UPDATE
|
||||
|
||||
本次事件的具体执行类型
|
||||
|
||||
NEW:该状态表示条件订单已提交,但尚未触发。
|
||||
CANCELED:该状态表示条件订单已被取消。
|
||||
TRIGGERING:该状态表示条件订单已满足触发条件,且已被转发至撮合引擎。
|
||||
TRIGGERED:该状态表示条件订单已成功触发并成功进入撮合引擎。
|
||||
FINISHED:该状态表示触发的条件订单已在撮合引擎中被成交或取消。
|
||||
REJECTED:该状态表示条件订单被撮合引擎拒绝,例如保证金检查失败等情况。
|
||||
EXPIRED:该状态表示条件订单被系统取消。例如,用户下了一个GTE_GTC时效条件订单,但随后关闭了该标的的所有持仓,系统因此取消了该条件订单。
|
||||
事件类型
|
||||
ALGO_UPDATE
|
||||
|
||||
响应示例
|
||||
{
|
||||
"e":"ALGO_UPDATE", // 事件类型
|
||||
"T":1750515742297, // 撮合时间
|
||||
"E":1750515742303, // 事件时间
|
||||
"o":{
|
||||
"caid":"Q5xaq5EGKgXXa0fD7fs0Ip", // 客户端自定条件订单ID
|
||||
"aid":2148719, // 条件单 Id
|
||||
"at":"CONDITIONAL", // 条件单类型
|
||||
"o":"TAKE_PROFIT", //订单类型
|
||||
"s":"BNBUSDT", //交易对
|
||||
"S":"SELL", //订单方向
|
||||
"ps":"BOTH", //持仓方向
|
||||
"f":"GTC", //有效方式
|
||||
"q":"0.01", //订单数量
|
||||
"X":"CANCELED", //条件单状态
|
||||
"ai":"", // 触发后普通订单 id
|
||||
"ap": "0.00000", // 触发后在撮合引擎中实际订单的平均成交价格,仅在订单被触发并进入撮合引擎时显示
|
||||
"aq": "0.00000", // 触发后在撮合引擎中实际订单已成交数量,仅当订单被触发并进入撮合引擎时显示
|
||||
"act": "0", // 触发后在撮合引擎中实际的订单类型,仅当订单被触发并进入撮合引擎时显示
|
||||
"tp":"750", //条件单触发价格
|
||||
"p":"750", //订单价格
|
||||
"V":"EXPIRE_MAKER", //自成交防止模式
|
||||
"wt":"CONTRACT_PRICE", //触发价类型
|
||||
"pm":"NONE", // 价格匹配模式
|
||||
"cp":false, //是否为触发平仓单; 仅在条件订单情况下会推送此字段
|
||||
"pP":false, //是否开启条件单触发保护
|
||||
"R":false, // 是否是只减仓单
|
||||
"tt":0, //触发时间
|
||||
"gtd":0, // TIF为GTD的订单自动取消时间
|
||||
"rm": "Reduce Only reject" // 条件单失败原因
|
||||
}
|
||||
51
用户余额.txt
Normal file
51
用户余额.txt
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
账户余额V2 (USER_DATA)
|
||||
查询账户余额
|
||||
|
||||
方式
|
||||
v2/account.balance
|
||||
|
||||
请求
|
||||
{
|
||||
"id": "605a6d20-6588-4cb9-afa0-b0ab087507ba",
|
||||
"method": "v2/account.balance",
|
||||
"params": {
|
||||
"apiKey": "xTaDyrmvA9XT2oBHHjy39zyPzKCvMdtH3b9q4xadkAg2dNSJXQGCxzui26L823W2",
|
||||
"timestamp": 1702561978458,
|
||||
"signature": "208bb94a26f99aa122b1319490ca9cb2798fccc81d9b6449521a26268d53217a"
|
||||
}
|
||||
}
|
||||
|
||||
请求权重
|
||||
5
|
||||
|
||||
请求参数
|
||||
名称 类型 是否必需 描述
|
||||
recvWindow LONG NO
|
||||
timestamp LONG YES
|
||||
响应示例
|
||||
{
|
||||
"id": "605a6d20-6588-4cb9-afa0-b0ab087507ba",
|
||||
"status": 200,
|
||||
"result": [
|
||||
{
|
||||
"accountAlias": "SgsR", // 账户唯一识别码
|
||||
"asset": "USDT", // 资产
|
||||
"balance": "122607.35137903", // 总余额
|
||||
"crossWalletBalance": "23.72469206", // 全仓余额
|
||||
"crossUnPnl": "0.00000000" // 全仓持仓未实现盈亏
|
||||
"availableBalance": "23.72469206", // 下单可用余额
|
||||
"maxWithdrawAmount": "23.72469206", // 最大可转出余额
|
||||
"marginAvailable": true, // 是否可用作联合保证金
|
||||
"updateTime": 1617939110373
|
||||
}
|
||||
],
|
||||
"rateLimits": [
|
||||
{
|
||||
"rateLimitType": "REQUEST_WEIGHT",
|
||||
"interval": "MINUTE",
|
||||
"intervalNum": 1,
|
||||
"limit": 2400,
|
||||
"count": 20
|
||||
}
|
||||
]
|
||||
}
|
||||
110
订单交易更新推送.txt
Normal file
110
订单交易更新推送.txt
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
订单交易更新推送
|
||||
事件描述
|
||||
当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ORDER_TRADE_UPDATE
|
||||
|
||||
订单方向
|
||||
|
||||
BUY 买入
|
||||
SELL 卖出
|
||||
订单类型
|
||||
|
||||
LIMIT 限价单
|
||||
MARKET 市价单
|
||||
STOP 止损限价单
|
||||
STOP_MARKET 止损市价单
|
||||
TAKE_PROFIT 止盈限价单
|
||||
TAKE_PROFIT_MARKET 止盈市价单
|
||||
TRAILING_STOP_MARKET 跟踪止损单
|
||||
LIQUIDATION 爆仓
|
||||
本次事件的具体执行类型
|
||||
|
||||
NEW
|
||||
CANCELED 已撤
|
||||
CALCULATED 订单 ADL 或爆仓
|
||||
EXPIRED 订单失效
|
||||
TRADE 交易
|
||||
AMENDMENT 订单修改
|
||||
订单状态
|
||||
|
||||
NEW
|
||||
PARTIALLY_FILLED
|
||||
FILLED
|
||||
CANCELED
|
||||
EXPIRED
|
||||
EXPIRED_IN_MATCH
|
||||
有效方式:
|
||||
|
||||
GTC
|
||||
IOC
|
||||
FOK
|
||||
GTX
|
||||
强平和ADL:
|
||||
|
||||
若用户因保证金不足发生强平:
|
||||
c为"autoclose-XXX",X为"NEW"
|
||||
若用户保证金充足但被 ADL:
|
||||
c为“adl_autoclose”,X为“NEW”
|
||||
过期原因
|
||||
|
||||
0: 无,默认值
|
||||
1: 自成交保护,订单被取消
|
||||
2: IOC订单无法完全成交,订单被取消
|
||||
3: IOC订单因自成交保护无法完全成交,订单被取消
|
||||
4: 只减仓竞争过程中,低优先级的只减仓订单被取消
|
||||
5: 账户强平过程中,订单被取消
|
||||
6: 不满足GTE条件,订单被取消
|
||||
7: Symbol下架,订单被取消
|
||||
8: 止盈止损单触发后,初始订单被取消
|
||||
9: 市价订单无法完全成交,订单被取消
|
||||
事件类型
|
||||
ORDER_TRADE_UPDATE
|
||||
|
||||
响应示例
|
||||
{
|
||||
"e":"ORDER_TRADE_UPDATE", // 事件类型
|
||||
"E":1568879465651, // 事件时间
|
||||
"T":1568879465650, // 撮合时间
|
||||
"o":{
|
||||
"s":"BTCUSDT", // 交易对
|
||||
"c":"TEST", // 客户端自定订单ID
|
||||
// 特殊的自定义订单ID:
|
||||
// "autoclose-"开头的字符串: 系统强平订单
|
||||
// "adl_autoclose": ADL自动减仓订单
|
||||
// "settlement_autoclose-": 下架或交割的结算订单
|
||||
"S":"SELL", // 订单方向
|
||||
"o":"TRAILING_STOP_MARKET", // 订单类型
|
||||
"f":"GTC", // 有效方式
|
||||
"q":"0.001", // 订单原始数量
|
||||
"p":"0", // 订单原始价格
|
||||
"ap":"0", // 订单平均价格
|
||||
"sp":"7103.04", // 条件订单触发价格,对追踪止损单无效
|
||||
"x":"NEW", // 本次事件的具体执行类型
|
||||
"X":"NEW", // 订单的当前状态
|
||||
"i":8886774, // 订单ID
|
||||
"l":"0", // 订单末次成交量
|
||||
"z":"0", // 订单累计已成交量
|
||||
"L":"0", // 订单末次成交价格
|
||||
"N": "USDT", // 手续费资产类型
|
||||
"n": "0", // 手续费数量
|
||||
"T":1568879465650, // 成交时间
|
||||
"t":0, // 成交ID
|
||||
"b":"0", // 买单净值
|
||||
"a":"9.91", // 卖单净值
|
||||
"m": false, // 该成交是作为挂单成交吗?
|
||||
"R":false , // 是否是只减仓单
|
||||
"wt": "CONTRACT_PRICE", // 触发价类型
|
||||
"ot": "TRAILING_STOP_MARKET", // 原始订单类型
|
||||
"ps":"LONG" // 持仓方向
|
||||
"cp":false, // 是否为触发平仓单; 仅在条件订单情况下会推送此字段
|
||||
"AP":"7476.89", // 追踪止损激活价格, 仅在追踪止损单时会推送此字段
|
||||
"cr":"5.0", // 追踪止损回调比例, 仅在追踪止损单时会推送此字段
|
||||
"pP": false, // 是否开启条件单触发保护
|
||||
"si": 0, // 忽略
|
||||
"ss": 0, // 忽略
|
||||
"rp":"0", // 该交易实现盈亏
|
||||
"V":"EXPIRE_TAKER", // 自成交防止模式
|
||||
"pm":"OPPONENT", // 价格匹配模式
|
||||
"gtd":0, // TIF为GTD的订单自动取消时间
|
||||
"er":"0" // 过期原因
|
||||
}
|
||||
}
|
||||
118
账户信息流连接.txt
Normal file
118
账户信息流连接.txt
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
账户信息流连接
|
||||
本篇所列出REST接口的baseurl https://fapi.binance.com
|
||||
|
||||
用于订阅账户数据的 listenKey 从创建时刻起有效期为60分钟
|
||||
|
||||
可以通过PUT一个listenKey延长60分钟有效期,如收到-1125报错提示此listenKey不存在,建议重新使用POST /fapi/v1/listenKey生成listenKey
|
||||
|
||||
可以通过DELETE一个 listenKey 立即关闭当前数据流,并使该listenKey 无效
|
||||
|
||||
在具有有效listenKey的帐户上执行POST将返回当前有效的listenKey并将其有效期延长60分钟
|
||||
|
||||
本篇所列出的websocket接口,连接方式如下:
|
||||
|
||||
Base Url: wss://fstream.binance.com
|
||||
订阅账户数据流的stream名称为 /ws/`<listenKey>`
|
||||
连接样例:
|
||||
wss://fstream.binance.com/ws/XaEAKTsQSRLZAGH9tuIu37plSRsdjmlAVBoNYPUITlTAko1WI22PgmBMpI1rS8Yh
|
||||
每个链接有效期不超过24小时,请妥善处理断线重连。
|
||||
|
||||
单一账户,单一连接的推送数据流消息可以保证时间序; 强烈建议您使用 E 字段进行排序
|
||||
|
||||
考虑到剧烈行情下, RESTful接口可能存在查询延迟,我们强烈建议您优先从Websocket user data stream推送的消息来获取订单,仓位等信息。
|
||||
|
||||
|
||||
|
||||
生成listenKey (USER_STREAM)
|
||||
接口描述
|
||||
创建一个新的user data stream,返回值为一个listenKey,即websocket订阅的stream名称。如果该帐户具有有效的listenKey,则将返回该listenKey并将其有效期延长60分钟。
|
||||
|
||||
HTTP请求
|
||||
POST /fapi/v1/listenKey
|
||||
|
||||
请求权重
|
||||
1
|
||||
|
||||
请求参数
|
||||
None
|
||||
|
||||
响应示例
|
||||
{
|
||||
"listenKey": "pqia91ma19a5s61cv6a81va65sdf19v8a65a1a5s61cv6a81va65sdf19v8a65a1"
|
||||
}
|
||||
|
||||
|
||||
延长listenKey有效期(USER_STREAM)
|
||||
接口描述
|
||||
有效期延长至本次调用后60分钟
|
||||
|
||||
HTTP请求
|
||||
PUT /fapi/v1/listenKey
|
||||
|
||||
请求权重
|
||||
1
|
||||
|
||||
请求参数
|
||||
None
|
||||
|
||||
{
|
||||
"listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg" // 被延长的listenkey
|
||||
}
|
||||
|
||||
|
||||
Websocket API生成listenKey (USER_STREAM)
|
||||
接口描述
|
||||
创建一个新的user data stream,返回值为一个listenKey,即websocket订阅的stream名称。如果该帐户具有有效的listenKey,则将返回该listenKey并将其有效期延长60分钟。
|
||||
|
||||
方式
|
||||
userDataStream.start
|
||||
|
||||
请求
|
||||
{
|
||||
"id": "d3df8a61-98ea-4fe0-8f4e-0fcea5d418b0",
|
||||
"method": "userDataStream.start",
|
||||
"params": {
|
||||
"apiKey": "vmPUZE6mv9SD5VNHk4HlWFsOr6aKE2zvsw0MuIgwCIPy6utIco14y7Ju91duEh8A"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Websocket API延长listenKey有效期(USER_STREAM)
|
||||
接口描述
|
||||
有效期延长至本次调用后60分钟
|
||||
|
||||
方式
|
||||
userDataStream.ping
|
||||
|
||||
请求
|
||||
{
|
||||
"id": "815d5fce-0880-4287-a567-80badf004c74",
|
||||
"method": "userDataStream.ping",
|
||||
"params": {
|
||||
"apiKey": "vmPUZE6mv9SD5VNHk9HlWFsOr9aLE2zvsw0MuIgwCIPy8atIco14y7Ju91duEh8A"
|
||||
}
|
||||
}
|
||||
|
||||
请求权重
|
||||
1
|
||||
|
||||
请求参数
|
||||
None
|
||||
|
||||
响应示例
|
||||
{
|
||||
"id": "815d5fce-0880-4287-a567-80badf004c74",
|
||||
"status": 200,
|
||||
"result": {
|
||||
"listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg"
|
||||
},
|
||||
"rateLimits": [
|
||||
{
|
||||
"rateLimitType": "REQUEST_WEIGHT",
|
||||
"interval": "MINUTE",
|
||||
"intervalNum": 1,
|
||||
"limit": 2400,
|
||||
"count": 2
|
||||
}
|
||||
]
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user