feat(trading_system): 优化交易记录管理与用户数据流集成

在 `position_manager` 和 `risk_manager` 中引入用户数据流缓存,优先使用 WebSocket 更新持仓和余额信息,减少对 REST API 的依赖。同时,增强了交易记录的创建和更新逻辑,支持在订单成交后完善记录,确保与币安数据一致性。新增 `update_open_fields` 和 `update_pending_to_filled` 方法,提升了交易记录的管理能力。
This commit is contained in:
薇薇安 2026-02-16 15:16:49 +08:00
parent aa073099f2
commit 5154b4933e
13 changed files with 1239 additions and 117 deletions

View File

@ -1811,31 +1811,87 @@ async def sync_positions(
notional = quantity * entry_price
if notional < 1.0:
continue
# 补建时尽量拿到 entry_order_id优先 get_all_orders+clientOrderId 前缀,兜底 get_recent_trades(100)+重试
entry_order_id = None
try:
trades = await client.get_recent_trades(symbol, limit=30)
if trades:
same_side = [t for t in trades if str(t.get('side', '')).upper() == side]
if same_side:
same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True)
entry_order_id = same_side[0].get('orderId')
except Exception as e:
logger.debug(f"获取 {symbol} 成交记录失败: {e}")
client_order_id = None
is_clearly_manual = False
if system_order_prefix:
if entry_order_id:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
client_order_id = cid or None
if cid and not cid.startswith(system_order_prefix):
is_clearly_manual = True
logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,视为手动单,跳过补建")
except Exception as e:
logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建")
if is_clearly_manual:
continue
try:
end_ms = int(time.time() * 1000)
start_ms = end_ms - (24 * 3600 * 1000)
orders = await client.client.futures_get_all_orders(
symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000
)
if isinstance(orders, list):
open_orders = [
o for o in orders
if isinstance(o, dict) and o.get("reduceOnly") is False
and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED"
]
our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)]
if our_orders:
our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True)
best = None
for o in our_orders:
ap = float(o.get("avgPrice") or 0)
eq = float(o.get("executedQty") or o.get("origQty") or 0)
if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6:
best = o
break
if best is None:
best = our_orders[0]
entry_order_id = best.get("orderId")
client_order_id = (best.get("clientOrderId") or "").strip() or None
except Exception as e:
logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}")
if entry_order_id is None:
try:
trades = await client.get_recent_trades(symbol, limit=100)
if not trades:
await asyncio.sleep(2)
trades = await client.get_recent_trades(symbol, limit=100)
if trades:
same_side = [t for t in trades if str(t.get('side', '')).upper() == side]
same_side.sort(key=lambda x: int(x.get('time', 0)), reverse=True)
if system_order_prefix and same_side:
for t in same_side[:5]:
oid = t.get("orderId")
if not oid:
continue
try:
info = await client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000)
cid = (info or {}).get("clientOrderId") or ""
if cid.startswith(system_order_prefix):
entry_order_id = oid
client_order_id = cid.strip() or None
break
except Exception:
continue
if entry_order_id is None and same_side:
entry_order_id = same_side[0].get("orderId")
except Exception as e:
logger.debug(f"获取 {symbol} 成交记录失败: {e}")
if entry_order_id and client_order_id is None:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
client_order_id = (order_info or {}).get("clientOrderId") or None
if client_order_id:
client_order_id = client_order_id.strip() or None
except Exception:
pass
is_clearly_manual = False
if system_order_prefix and entry_order_id and client_order_id and not client_order_id.startswith(system_order_prefix):
is_clearly_manual = True
logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id!r} 非系统前缀,视为手动单,跳过补建")
elif system_order_prefix and entry_order_id and not client_order_id:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
if cid and not cid.startswith(system_order_prefix):
is_clearly_manual = True
except Exception:
pass
if is_clearly_manual:
continue
elif only_recover_when_has_sltp:
has_sltp = False
try:

View File

@ -431,14 +431,6 @@ class GlobalStrategyConfig:
class Trade:
"""交易记录模型"""
@staticmethod
def get_by_client_order_id(client_order_id):
"""根据自定义订单号(clientOrderId)获取交易记录"""
return db.execute_one(
"SELECT * FROM trades WHERE client_order_id = %s",
(client_order_id,)
)
@staticmethod
def create(
symbol,
@ -458,11 +450,13 @@ class Trade:
margin_usdt=None,
account_id: int = None,
entry_context=None,
status: str = "open",
):
"""创建交易记录(使用北京时间)
Args:
symbol: 交易对
status: 状态默认 "open"先落库等 WS 成交时可传 "pending"
side: 方向
quantity: 数量
entry_price: 入场价
@ -503,7 +497,7 @@ class Trade:
# 动态构建 INSERT兼容不同schema
columns = ["symbol", "side", "quantity", "entry_price", "leverage", "entry_reason", "status", "entry_time"]
values = [symbol, side, quantity, entry_price, leverage, entry_reason, "open", entry_time]
values = [symbol, side, quantity, entry_price, leverage, entry_reason, (status or "open"), entry_time]
# 在真正执行 INSERT 之前,利用 entry_order_id / client_order_id 做一次幂等去重
try:
@ -730,7 +724,39 @@ class Trade:
else:
# 其他错误,重新抛出
raise
@staticmethod
def update_open_fields(trade_id: int, **kwargs):
"""开仓后完善字段:止损/止盈/名义/保证金/entry_context 等(用于先 pending 落库、成交后补全)。"""
if not kwargs:
return
allowed = {
"stop_loss_price", "take_profit_price", "take_profit_1", "take_profit_2",
"notional_usdt", "margin_usdt", "entry_context", "atr"
}
updates = []
values = []
for k, v in kwargs.items():
if k not in allowed or v is None:
continue
if k == "entry_context":
try:
v = json.dumps(v, ensure_ascii=False) if isinstance(v, dict) else str(v)
except Exception:
continue
updates.append(f"{k} = %s")
values.append(v)
if not updates:
return
values.append(trade_id)
try:
db.execute_update(
f"UPDATE trades SET {', '.join(updates)} WHERE id = %s",
tuple(values)
)
except Exception as e:
logger.warning(f"update_open_fields trade_id={trade_id} 失败: {e}")
@staticmethod
def get_by_entry_order_id(entry_order_id):
"""根据开仓订单号获取交易记录"""
@ -746,7 +772,47 @@ class Trade:
"SELECT * FROM trades WHERE exit_order_id = %s",
(exit_order_id,)
)
@staticmethod
def get_by_client_order_id(client_order_id, account_id: int = None):
"""根据 clientOrderId 获取交易记录(可选按 account_id 隔离)"""
if not client_order_id:
return None
try:
if account_id is not None and _table_has_column("trades", "account_id"):
return db.execute_one(
"SELECT * FROM trades WHERE client_order_id = %s AND account_id = %s",
(client_order_id, int(account_id))
)
return db.execute_one(
"SELECT * FROM trades WHERE client_order_id = %s",
(client_order_id,)
)
except Exception:
return None
@staticmethod
def update_pending_to_filled(client_order_id, account_id: int, entry_order_id, entry_price: float, quantity: float):
"""WS 或 REST 收到成交后:按 client_order_id 将 pending 记录完善为已成交(幂等)"""
if not client_order_id or account_id is None:
return False
try:
row = Trade.get_by_client_order_id(client_order_id, account_id)
if not row:
return False
# 仅当仍为 pending 或 entry_order_id 为空时更新,避免覆盖已完善数据
if row.get("status") not in ("pending", "open") and row.get("entry_order_id"):
return True # 已完善,视为成功
db.execute_update(
"""UPDATE trades SET entry_order_id = %s, entry_price = %s, quantity = %s, status = 'open'
WHERE client_order_id = %s AND account_id = %s""",
(entry_order_id, float(entry_price), float(quantity), client_order_id, int(account_id))
)
return True
except Exception as e:
logger.warning(f"update_pending_to_filled 失败 client_order_id={client_order_id!r}: {e}")
return False
@staticmethod
def get_all(start_timestamp=None, end_timestamp=None, symbol=None, status=None, trade_type=None, exit_reason=None, account_id: int = None):
"""获取交易记录(仅包含本系统开仓的记录已通过清理脚本维护,不再在查询里筛选)"""
@ -801,6 +867,27 @@ class Trade:
(symbol, status),
)
@staticmethod
def set_exit_order_id_for_open_trade(symbol: str, account_id: int, exit_order_id) -> bool:
"""ALGO_UPDATE/条件单触发后:为指定 symbol 下未填 exit_order_id 的 open 记录补全平仓订单号(仅更新一条)。"""
if not symbol or account_id is None or exit_order_id is None:
return False
try:
if not _table_has_column("trades", "account_id"):
return False
# 更新一条open 且 (exit_order_id 为空)
n = db.execute_update(
"""UPDATE trades SET exit_order_id = %s
WHERE account_id = %s AND symbol = %s AND status = 'open'
AND (exit_order_id IS NULL OR exit_order_id = '')
LIMIT 1""",
(str(exit_order_id), int(account_id), symbol.strip())
)
return n is not None and n > 0
except Exception as e:
logger.warning(f"set_exit_order_id_for_open_trade 失败 symbol={symbol!r}: {e}")
return False
class AccountSnapshot:
"""账户快照模型"""

View File

@ -165,7 +165,7 @@ Authorization: Bearer <token>
| **数据库不可用** | `position_manager` 启动时尝试导入 `backend.database.models`,若 `backend` 目录不存在、或导入失败缺依赖、Python 路径错误、DB 连不上等),会设 `DB_AVAILABLE = False`,成交后直接跳过保存并打日志「数据库不可用,跳过保存」。 |
| **Trade.create 抛异常** | 落库时发生异常(如连接超时、唯一约束冲突、磁盘满、字段错误等),代码会打错并 `return None`,不会重试,币安已有仓但 DB 无记录。 |
| **进程在落库前退出** | 订单已在币安成交,但在执行到 `Trade.create` 之前进程被 kill、崩溃或重启也会出现币安有仓、DB 无记录。 |
| **补建时拿不到订单号** | 同步补建时用 `get_recent_trades(symbol, limit=30)` 取最近同向成交并取第一条的 `orderId` 作为 `entry_order_id`。若接口失败、返回空、或该笔开仓已不在最近 30 条成交里,则 `entry_order_id` 为空,补建记录就会「没记订单号」。 |
| **补建时拿不到订单号** | 已优化:配置了 **SYSTEM_ORDER_ID_PREFIX** 时,补建**优先**用 `futures_get_all_orders(symbol, 最近24h)` 按开仓订单reduceOnly=false、FILLED、同向**clientOrderId 以系统前缀开头** 取本系统单,再按成交价/数量匹配取 `orderId``clientOrderId`,不依赖「最近 30 条成交」。若无前缀或该接口无结果,再用 `get_recent_trades(symbol, limit=100)` 兜底,空时重试一次;若配置了前缀,会从同向成交里逐条查订单的 clientOrderId只采用前缀匹配的订单号避免把手动单绑到补建记录。 |
**建议**:保证交易进程与 backend 同机或能访问同一 DB、且 `backend` 路径正确,避免 `DB_AVAILABLE = False`;若曾出现「开了不落库」,可查交易进程日志中的「保存交易记录到数据库失败」或「数据库不可用」;补建后仍无订单号的记录可用「仅可对账」过滤,不影响基于可对账记录的统计。

18
listenkey过期推送.txt Normal file
View File

@ -0,0 +1,18 @@
listenKey过期推送
事件描述
当前连接使用的有效listenKey过期时user data stream 将会推送此事件。
注意:
此事件与 websocket 连接中断没有必然联系
只有正在连接中的有效listenKey过期时才会收到此消息
收到此消息后 user data stream 将不再更新直到用户使用新的有效的listenKey
事件类型
listenKeyExpired
响应示例
{
"e": "listenKeyExpired", // 事件类型
"E": "1736996475556", // 事件时间
"listenKey":"WsCMN0a4KHUPTQuX6IUnqEZfB1inxmv1qR4kbf1LuEjur5VdbzqvyxqG9TSjVVxv"
}

View File

@ -2,6 +2,7 @@
币安客户端封装 - 提供异步交易接口
"""
import asyncio
import json
import logging
import random
import time
@ -279,6 +280,58 @@ class BinanceClient:
else:
logger.warning(f"⚠ API密钥验证时出现错误: {e}")
def _futures_base_url(self) -> str:
"""合约 REST 根地址(用于 listenKey 等)"""
return "https://testnet.binancefuture.com" if self.testnet else "https://fapi.binance.com"
async def create_futures_listen_key(self) -> Optional[str]:
"""创建 U 本位合约 User Data Stream listenKey用于 WS 订阅订单/持仓推送。60 分钟无 keepalive 会失效。"""
if not self.api_key:
return None
try:
import aiohttp
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
headers = {"X-MBX-APIKEY": self.api_key}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
text = await resp.text()
logger.warning(f"create_futures_listen_key 失败 status={resp.status} body={text}")
return None
data = await resp.json()
key = data.get("listenKey") if isinstance(data, dict) else None
if key:
logger.info("✓ 合约 User Data Stream listenKey 已创建")
return key
except Exception as e:
logger.warning(f"create_futures_listen_key 失败: {e}")
return None
async def keepalive_futures_listen_key(self, listen_key: str):
"""延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。返回 (ok: bool, code_1125: bool)-1125 表示 key 不存在需换新。"""
if not self.api_key or not listen_key:
return False, False
try:
import aiohttp
url = f"{self._futures_base_url()}/fapi/v1/listenKey"
headers = {"X-MBX-APIKEY": self.api_key}
async with aiohttp.ClientSession() as session:
async with session.put(f"{url}?listenKey={listen_key}", headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
text = await resp.text()
ok = resp.status == 200
code_1125 = False
if not ok and text:
try:
data = json.loads(text) if text.strip().startswith("{") else {}
code_1125 = int(data.get("code", 0)) == -1125
except Exception:
pass
logger.debug(f"keepalive_futures_listen_key 失败 status={resp.status} body={text}")
return ok, code_1125
except Exception as e:
logger.debug(f"keepalive_futures_listen_key 失败: {e}")
return False, False
@staticmethod
def _to_bool(value: Any) -> Optional[bool]:
if value is None:
@ -1258,6 +1311,7 @@ class BinanceClient:
price: Optional[float] = None,
reduce_only: bool = False,
position_side: Optional[str] = None,
new_client_order_id: Optional[str] = None,
) -> Optional[Dict]:
"""
下单
@ -1454,12 +1508,14 @@ class BinanceClient:
if position_side:
logger.info(f"{symbol} 单向模式下忽略 positionSide={position_side}(避免 -4061")
# 开仓单写入自定义订单号前缀,便于同步时根据 clientOrderId 区分系统单(仅本系统开的仓才补建记录
# 开仓单写入自定义订单号优先使用调用方传入的便于先落库再下单、WS 按 c 匹配
if not reduce_only:
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
if prefix:
# 币安 newClientOrderId 最长 36 字符,格式: PREFIX_timestamp_hex
order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
if new_client_order_id:
order_params['newClientOrderId'] = str(new_client_order_id)[:36]
else:
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
if prefix:
order_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
# 如果是平仓订单,添加 reduceOnly 参数
# 根据币安API文档reduceOnly 应该是字符串 "true" 或 "false"
@ -1502,8 +1558,10 @@ class BinanceClient:
# 让 python-binance 重新生成,否则会报 -1022 Signature invalid
retry_params.pop('timestamp', None)
retry_params.pop('signature', None)
# 重试时生成新的 newClientOrderId避免与首次提交冲突
if 'newClientOrderId' in retry_params:
# 重试时保留调用方传入的 newClientOrderId否则才重新生成便于 WS 按 c 匹配)
if new_client_order_id and 'newClientOrderId' in retry_params:
retry_params['newClientOrderId'] = str(new_client_order_id)[:36]
elif 'newClientOrderId' in retry_params and not new_client_order_id:
prefix = (config.TRADING_CONFIG.get('SYSTEM_ORDER_ID_PREFIX') or '').strip()
if prefix:
retry_params['newClientOrderId'] = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]

View File

@ -16,6 +16,7 @@ try:
from .risk_manager import RiskManager # type: ignore
from .position_manager import PositionManager # type: ignore
from .strategy import TradingStrategy # type: ignore
from .user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
from . import config # type: ignore
except Exception:
_here = Path(__file__).resolve().parent
@ -30,6 +31,7 @@ except Exception:
from risk_manager import RiskManager # type: ignore
from position_manager import PositionManager # type: ignore
from strategy import TradingStrategy # type: ignore
from user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
import config # type: ignore
# 配置日志(支持相对路径)
@ -233,6 +235,7 @@ async def main():
# 初始化组件
client = None
user_data_stream = None
try:
# 1. 初始化币安客户端
logger.info("初始化币安客户端...")
@ -302,8 +305,25 @@ async def main():
except Exception as e:
logger.error(f"余额重试异常: {e}", exc_info=True)
continue
# 3. 启动 User Data Stream订单/持仓/余额推送listenKey 保活,减少 REST 请求)
import os
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
user_data_stream = UserDataStream(client, account_id)
if await user_data_stream.start():
logger.info("✓ User Data Stream 已启动(订单/持仓/余额 WS 推送30 分钟 keepalive")
# 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存
try:
seed_balance_cache(balance)
positions_seed = await client.get_open_positions()
seed_position_cache(positions_seed)
logger.info(f"✓ 已播种持仓/余额缓存(持仓 {len(positions_seed)} 个)")
except Exception as e:
logger.warning(f"播种 WS 缓存失败(将仅用 REST: {e}")
else:
logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓")
user_data_stream = None
# 4. 初始化各个模块
logger.info("初始化交易模块...")
scanner = MarketScanner(client)
@ -384,7 +404,13 @@ async def main():
logger.error(f"程序运行出错: {e}", exc_info=True)
raise
finally:
# 清理资源
# 清理资源(先停 User Data Stream再断 client
try:
if user_data_stream is not None:
await user_data_stream.stop()
logger.info("User Data Stream 已停止")
except Exception as e:
logger.debug(f"停止 User Data Stream 时异常: {e}")
if client:
await client.disconnect()
logger.info("程序已退出")

View File

@ -2,10 +2,11 @@
仓位管理模块 - 管理持仓和订单
"""
import asyncio
import logging
import json
import aiohttp
import logging
import random
import time
import aiohttp
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
@ -96,6 +97,18 @@ def _log_trade_db_failure(symbol, entry_order_id, side, quantity, entry_price, a
logger.warning(f"写入落库失败日志失败: {e}")
# User Data Stream 缓存(持仓/余额优先读 WS减少 REST
try:
from .user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache
except Exception:
try:
from user_data_stream import get_stream_instance, get_positions_from_cache, get_balance_from_cache
except Exception:
get_stream_instance = lambda: None
get_positions_from_cache = lambda min_notional=1.0: []
get_balance_from_cache = lambda: None
class PositionManager:
"""仓位管理类"""
@ -131,6 +144,23 @@ class PositionManager:
self._last_auto_close_attempt_ms: Dict[str, int] = {}
self._last_auto_close_fail_log_ms: Dict[str, int] = {}
async def _get_open_positions(self) -> List[Dict]:
"""优先使用 User Data Stream 持仓缓存,无缓存或未启动时走 REST。"""
if get_stream_instance() is not None:
min_notional = float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0)
cached = get_positions_from_cache(min_notional)
if cached is not None:
return cached
return await self.client.get_open_positions()
async def _get_account_balance(self) -> Dict:
"""优先使用 User Data Stream 余额缓存,无缓存时走 REST。"""
if get_stream_instance() is not None:
bal = get_balance_from_cache()
if bal is not None:
return bal
return await self.client.get_account_balance()
@staticmethod
def _pct_like_to_ratio(v: float) -> float:
"""
@ -458,6 +488,39 @@ class PositionManager:
filled_quantity = 0.0
entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase")
# 生成 client_order_id先落库 pendingWS 按 o.c 匹配完善,减少对 REST 同步依赖
prefix = (config.TRADING_CONFIG.get("SYSTEM_ORDER_ID_PREFIX") or "").strip()
client_order_id = None
if prefix:
client_order_id = f"{prefix}_{int(time.time() * 1000)}_{random.randint(0, 0xFFFF):04x}"[:36]
pending_trade_id = None
if DB_AVAILABLE and Trade and client_order_id:
try:
pending_trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=None,
client_order_id=client_order_id,
stop_loss_price=None,
take_profit_price=None,
take_profit_1=None,
take_profit_2=None,
atr=atr,
notional_usdt=None,
margin_usdt=None,
account_id=self.account_id,
entry_context=entry_context,
status="pending",
)
logger.debug(f"{symbol} 已落库 pending 记录 (client_order_id={client_order_id!r}, id={pending_trade_id})")
except Exception as e:
logger.warning(f"{symbol} 创建 pending 记录失败: {e}")
pending_trade_id = None
if not smart_entry_enabled:
# 根治方案:关闭智能入场后,回归“纯限价单模式”
# - 不追价
@ -469,7 +532,8 @@ class PositionManager:
f"确认超时={confirm_timeout}s未成交将撤单跳过"
)
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit,
new_client_order_id=client_order_id
)
if not order:
return None
@ -485,7 +549,10 @@ class PositionManager:
f"追价上限={max_drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit)
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit,
new_client_order_id=client_order_id
)
if not order:
return None
entry_order_id = order.get("orderId")
@ -522,7 +589,10 @@ class PositionManager:
pass
self._pending_entry_orders.pop(symbol, None)
logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底")
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET")
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="MARKET",
new_client_order_id=client_order_id
)
# 关键:转市价后必须更新 entry_order_id否则后续会继续查询“已取消的旧限价单”导致误判 CANCELED
try:
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
@ -563,7 +633,10 @@ class PositionManager:
f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | "
f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired)
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired,
new_client_order_id=client_order_id
)
if not order:
self._pending_entry_orders.pop(symbol, None)
return None
@ -728,32 +801,57 @@ class PositionManager:
take_profit_1 = closer
take_profit_2 = further
# 记录到数据库(只有在订单真正成交后才保存)
# 记录到数据库:有 pending 则完善WS 也会按 client_order_id 完善),否则新建
trade_id = None
if DB_AVAILABLE and Trade:
try:
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
client_order_id = (order.get("clientOrderId") if order else None) or None
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity, # 使用实际成交数量
entry_price=entry_price, # 使用实际成交价格
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id, # 保存币安订单号
client_order_id=client_order_id, # 自定义订单号,便于在订单记录中核对系统单
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
entry_context=entry_context, # 入场思路与过程(便于事后分析策略执行效果)
account_id=self.account_id
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
if client_order_id:
row = Trade.get_by_client_order_id(client_order_id, self.account_id)
if row and str(row.get("status")) == "pending":
ok = Trade.update_pending_to_filled(
client_order_id, self.account_id,
entry_order_id, entry_price, quantity
)
if ok:
row = Trade.get_by_client_order_id(client_order_id, self.account_id)
trade_id = row.get("id") if row else None
if trade_id:
Trade.update_open_fields(
trade_id,
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
entry_context=entry_context,
atr=atr,
)
logger.info(f"{symbol} 已完善 pending 记录 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
if trade_id is None:
# 无 pending 或未匹配到:走新建(兜底)
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
fallback_client_order_id = (order.get("clientOrderId") if order else None) or client_order_id
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id,
client_order_id=fallback_client_order_id,
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
entry_context=entry_context,
account_id=self.account_id,
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
@ -817,7 +915,7 @@ class PositionManager:
# 验证持仓是否真的在币安存在
try:
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
binance_position = next(
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
None
@ -910,7 +1008,7 @@ class PositionManager:
pass
# 获取当前持仓
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
@ -1474,7 +1572,7 @@ class PositionManager:
if current_price is None:
try:
# 优先获取标记价格MARK_PRICE因为止损单使用 MARK_PRICE 作为触发基准
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
position = next((p for p in positions if p['symbol'] == symbol), None)
if position:
mark_price = position.get('markPrice')
@ -1735,7 +1833,7 @@ class PositionManager:
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
@ -2260,8 +2358,8 @@ class PositionManager:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
positions = await self._get_open_positions()
balance = await self._get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
@ -2329,7 +2427,7 @@ class PositionManager:
logger.info("开始同步币安持仓状态与数据库...")
# 1. 获取币安实际持仓
binance_positions = await self.client.get_open_positions()
binance_positions = await self._get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions}
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
@ -3045,34 +3143,88 @@ class PositionManager:
notional = quantity * entry_price
if notional < 1.0:
continue
# 补建时尽量拿到 entry_order_id优先按时间范围查开仓订单并用 clientOrderId 前缀锁定本系统单,避免拿错/拿不到
entry_order_id = None
try:
recent = await self.client.get_recent_trades(symbol, limit=30)
if recent:
same_side = [t for t in recent if str(t.get("side", "")).upper() == side]
if same_side:
same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True)
entry_order_id = same_side[0].get("orderId")
except Exception:
pass
client_order_id_sync = None
if system_order_prefix:
try:
end_ms = int(time.time() * 1000)
start_ms = end_ms - (24 * 3600 * 1000)
orders = await self.client.client.futures_get_all_orders(
symbol=symbol, startTime=start_ms, endTime=end_ms, recvWindow=20000
)
if isinstance(orders, list):
open_orders = [
o for o in orders
if isinstance(o, dict) and o.get("reduceOnly") is False
and str(o.get("side", "")).upper() == side and o.get("status") == "FILLED"
]
our_orders = [o for o in open_orders if (o.get("clientOrderId") or "").startswith(system_order_prefix)]
if our_orders:
our_orders.sort(key=lambda x: int(x.get("updateTime", 0)), reverse=True)
best = None
for o in our_orders:
ap = float(o.get("avgPrice") or 0)
eq = float(o.get("executedQty") or o.get("origQty") or 0)
if ap > 0 and abs(ap - entry_price) / max(entry_price, 1e-9) < 0.01 and abs(eq - quantity) < 1e-6:
best = o
break
if best is None:
best = our_orders[0]
entry_order_id = best.get("orderId")
client_order_id_sync = (best.get("clientOrderId") or "").strip() or None
logger.debug(f" {symbol} 补建从 get_all_orders 取得 orderId={entry_order_id}, clientOrderId={client_order_id_sync!r}")
except Exception as e:
logger.debug(f" {symbol} 补建 get_all_orders 取开仓订单号失败: {e}")
if entry_order_id is None:
try:
recent = await self.client.get_recent_trades(symbol, limit=100)
if not recent:
await asyncio.sleep(2)
recent = await self.client.get_recent_trades(symbol, limit=100)
if recent:
same_side = [t for t in recent if str(t.get("side", "")).upper() == side]
same_side.sort(key=lambda x: int(x.get("time", 0)), reverse=True)
if system_order_prefix and same_side:
for t in same_side[:5]:
oid = t.get("orderId")
if not oid:
continue
try:
info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(oid), recvWindow=20000)
cid = (info or {}).get("clientOrderId") or ""
if cid.startswith(system_order_prefix):
entry_order_id = oid
client_order_id_sync = cid.strip() or None
break
except Exception:
continue
if entry_order_id is None and same_side:
entry_order_id = same_side[0].get("orderId")
except Exception:
pass
if entry_order_id and client_order_id_sync is None:
try:
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
client_order_id_sync = cid.strip() or None
except Exception:
pass
# 标记是否「来历不明」:用于 DB entry_reason 与后续统计分析sync_recovered_unknown_origin
sync_unknown_origin = False
# 仅当「明确查到开仓订单且 clientOrderId 非空且不以系统前缀开头」时标记为来历不明(仍会补建+挂 SL/TP+监控)
is_clearly_manual = False
if system_order_prefix and entry_order_id and client_order_id_sync and not client_order_id_sync.startswith(system_order_prefix):
is_clearly_manual = True
logger.debug(f" {symbol} 开仓订单 clientOrderId={client_order_id_sync!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP")
elif system_order_prefix and entry_order_id and not client_order_id_sync:
try:
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
if cid and not cid.startswith(system_order_prefix):
is_clearly_manual = True
except Exception:
pass
if system_order_prefix:
if entry_order_id:
try:
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=int(entry_order_id), recvWindow=20000)
cid = (order_info or {}).get("clientOrderId") or ""
client_order_id_sync = cid or None
if cid and not cid.startswith(system_order_prefix):
is_clearly_manual = True
logger.debug(f" {symbol} 开仓订单 clientOrderId={cid!r} 非系统前缀,将按来历不明仓补建并挂 SL/TP")
except Exception as e:
logger.debug(f" {symbol} 查询开仓订单失败: {e},按系统单补建")
# 无法获取订单或 cid 为空(历史单/未带前缀)时不视为手动单,继续补建
# 例外:若开启「仅币安有仓且存在 SL/TP 也监控」,且该 symbol 有止损/止盈单,可视为系统单(仅影响日志)
if is_clearly_manual and config.TRADING_CONFIG.get("SYNC_MONITOR_BINANCE_POSITIONS_WITH_SLTP", True):
if await self._symbol_has_sltp_orders(symbol):
is_clearly_manual = False
@ -3081,7 +3233,6 @@ class PositionManager:
sync_unknown_origin = True
logger.info(f"{symbol} 来历不明(开仓订单非系统前缀),将补建记录、自动挂止损止盈并纳入监控")
else:
# 未配置前缀时,不再因「无止损/止盈单」跳过:一律补建并自动挂 SL/TP、纳入监控
if sync_recover_only_has_sltp and not (await self._symbol_has_sltp_orders(symbol)):
sync_unknown_origin = True
logger.info(f"{symbol} 无止损/止盈单,将补建记录、自动挂止损止盈并纳入监控")
@ -3169,10 +3320,13 @@ class PositionManager:
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
# 尽量从币安成交取 entry_order_id,便于订单记录/统计对账
# 尽量从币安成交取 entry_order_idlimit=100、空时重试一次,便于订单记录/统计对账
entry_order_id = None
try:
recent = await self.client.get_recent_trades(symbol, limit=30)
recent = await self.client.get_recent_trades(symbol, limit=100)
if not recent:
await asyncio.sleep(2)
recent = await self.client.get_recent_trades(symbol, limit=100)
if recent:
same_side = [t for t in recent if str(t.get('side', '')).upper() == side]
if same_side:
@ -3315,7 +3469,7 @@ class PositionManager:
return
# 获取当前所有持仓(与 sync 一致:仅本系统关心的持仓会进 active_positions
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
binance_symbols = {p['symbol'] for p in positions}
active_symbols = set(self.active_positions.keys())
sync_create_manual = config.TRADING_CONFIG.get("SYNC_CREATE_MANUAL_ENTRY_RECORD", False)
@ -4133,7 +4287,7 @@ class PositionManager:
logger.warning(f" 可能原因: 监控未启动或已停止")
# 3. 获取币安实际持仓
positions = await self.client.get_open_positions()
positions = await self._get_open_positions()
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
if not binance_position:

View File

@ -17,6 +17,28 @@ except ImportError:
logger = logging.getLogger(__name__)
# 可选:优先使用 User Data Stream 缓存(与 position_manager 一致)
def _get_stream_instance():
try:
from .user_data_stream import get_stream_instance
return get_stream_instance()
except Exception:
return None
def _get_balance_from_cache():
try:
from .user_data_stream import get_balance_from_cache
return get_balance_from_cache()
except Exception:
return None
def _get_positions_from_cache():
try:
from .user_data_stream import get_positions_from_cache
return get_positions_from_cache(float(getattr(config, "POSITION_MIN_NOTIONAL_USDT", 1.0) or 1.0))
except Exception:
return None
class RiskManager:
"""风险管理类"""
@ -49,8 +71,10 @@ class RiskManager:
try:
logger.info(f"检查 {symbol} 单笔仓位大小...")
# 获取账户余额
balance = await self.client.get_account_balance()
# 获取账户余额(优先 WS 缓存)
balance = _get_balance_from_cache() if _get_stream_instance() else None
if balance is None:
balance = await self.client.get_account_balance()
available_balance = balance.get('available', 0)
if available_balance <= 0:
@ -137,8 +161,10 @@ class RiskManager:
是否通过检查
"""
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
# 获取当前持仓(优先 WS 缓存)
positions = _get_positions_from_cache() if _get_stream_instance() else None
if positions is None:
positions = await self.client.get_open_positions()
# 计算当前总保证金占用
current_position_values = []
@ -167,8 +193,10 @@ class RiskManager:
# 加上新仓位
total_with_new = total_margin_value + new_position_margin
# 获取账户余额
balance = await self.client.get_account_balance()
# 获取账户余额(优先 WS 缓存)
balance = _get_balance_from_cache() if _get_stream_instance() else None
if balance is None:
balance = await self.client.get_account_balance()
total_balance = balance.get('total', 0)
available_balance = balance.get('available', 0)
@ -401,8 +429,10 @@ class RiskManager:
try:
logger.info(f"开始计算 {symbol} 的仓位大小...")
# 获取账户余额
balance = await self.client.get_account_balance()
# 获取账户余额(优先 WS 缓存)
balance = _get_balance_from_cache() if _get_stream_instance() else None
if balance is None:
balance = await self.client.get_account_balance()
available_balance = balance.get('available', 0)
total_balance = balance.get('total', 0)
@ -785,8 +815,10 @@ class RiskManager:
logger.debug(f"{symbol} 涨跌幅 {change_percent:.2f}% 小于阈值")
return False
# 检查是否已有持仓 / 总持仓数量限制
positions = await self.client.get_open_positions()
# 检查是否已有持仓 / 总持仓数量限制(优先 WS 缓存)
positions = _get_positions_from_cache() if _get_stream_instance() else None
if positions is None:
positions = await self.client.get_open_positions()
try:
max_open = int(config.TRADING_CONFIG.get("MAX_OPEN_POSITIONS", 0) or 0)
except Exception:

View File

@ -0,0 +1,364 @@
"""
合约 User Data Stream订阅 listenKey 推送 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE 更新订单与持仓
- REST 下单并落库 pendingWS 收到 FILLED 后按 clientOrderId(c) 完善 entry_order_id/价格/数量
- ACCOUNT_UPDATE 推送持仓/余额变化可作持仓与余额数据源以降低 REST 频率
- listenKey 30 分钟 keepalive收到 listenKeyExpired keepalive 返回 -1125 时主动换新 key 重连
- 单连接有效期不超过 24 小时届时主动重连文档建议
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
# 持仓推送缓存ACCOUNT_UPDATE 的 P 数组,供外部读取以替代频繁 REST get_open_positions
_position_updates_cache: Dict[str, List[Dict]] = {} # symbol -> list of position dicts (pa, ep, ps, ...)
# 余额推送缓存ACCOUNT_UPDATE 的 B 数组a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
_balance_updates_cache: Dict[str, Dict[str, Any]] = {} # asset -> { wb, cw, bc }
# 当前运行的 UserDataStream 实例(供 main 停止、或业务层判断是否可用)
_stream_instance: Optional["UserDataStream"] = None
# 是否已用 REST 结果播种过(未播种时业务应走 REST不返回空缓存
_position_cache_seeded = False
_balance_cache_seeded = False
def get_position_updates_cache() -> Dict[str, List[Dict]]:
"""返回最近一次 ACCOUNT_UPDATE 的持仓数据(仅当有推送时才有值)。"""
return dict(_position_updates_cache)
def get_balance_updates_cache() -> Dict[str, Dict[str, Any]]:
"""返回最近一次 ACCOUNT_UPDATE 的余额数据(仅当有推送时才有值)。键为资产如 USDT。"""
return dict(_balance_updates_cache)
def get_stream_instance() -> Optional["UserDataStream"]:
"""返回当前运行的 UserDataStream 实例(未启动时为 None"""
return _stream_instance
def seed_position_cache(positions: List[Dict]) -> None:
"""用 REST 全量持仓结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
global _position_updates_cache, _position_cache_seeded
_position_updates_cache.clear()
_position_cache_seeded = True
for pos in positions or []:
symbol = (pos.get("symbol") or "").strip()
if not symbol:
continue
amt = float(pos.get("positionAmt") or 0)
_position_updates_cache[symbol] = [{
"s": symbol,
"pa": amt,
"ep": str(pos.get("entryPrice") or "0"),
"up": str(pos.get("unRealizedProfit") or "0"),
"ps": pos.get("positionSide") or "BOTH",
}]
logger.debug(f"UserDataStream: 已填充持仓缓存 {len(_position_updates_cache)} 个 symbol")
def seed_balance_cache(balance: Dict[str, Any]) -> None:
"""用 REST 余额结果填充缓存(启动时调用一次,之后由 ACCOUNT_UPDATE 增量更新)。"""
global _balance_updates_cache, _balance_cache_seeded
_balance_cache_seeded = True
if balance and isinstance(balance, dict):
wb = balance.get("walletBalance") or balance.get("total") or 0
av = balance.get("availableBalance") or balance.get("available") or wb
_balance_updates_cache["USDT"] = {"wb": str(wb), "cw": str(av), "bc": "0"}
logger.debug("UserDataStream: 已填充余额缓存 (USDT)")
def get_positions_from_cache(min_notional: float = 1.0) -> Optional[List[Dict]]:
"""将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None业务应走 REST"""
if not _position_cache_seeded:
return None
out = []
for symbol, plist in _position_updates_cache.items():
for p in plist if isinstance(plist, list) else []:
try:
pa = float(p.get("pa") or 0)
except (TypeError, ValueError):
pa = 0
if pa == 0:
continue
ep = float(p.get("ep") or 0)
if min_notional > 0 and abs(pa) * ep < min_notional:
continue
out.append({
"symbol": symbol,
"positionAmt": pa,
"entryPrice": ep,
"markPrice": float(p.get("markPrice") or 0),
"unRealizedProfit": float(p.get("up") or 0),
"leverage": int(p.get("leverage") or 0),
})
return out
def get_balance_from_cache() -> Optional[Dict[str, Any]]:
"""从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。"""
if not _balance_cache_seeded:
return None
u = _balance_updates_cache.get("USDT")
if not u:
return None
try:
wb = float(u.get("wb") or 0)
cw = float(u.get("cw") or wb)
except (TypeError, ValueError):
return None
return {"ok": True, "available": cw, "total": wb, "margin": wb}
class UserDataStream:
"""合约用户数据流:创建/保活 listenKey连接 WS处理 ORDER_TRADE_UPDATE / ACCOUNT_UPDATE / ALGO_UPDATE。"""
def __init__(self, client: Any, account_id: int):
self.client = client
self.account_id = int(account_id)
self._listen_key: Optional[str] = None
self._ws = None
self._task: Optional[asyncio.Task] = None
self._keepalive_task: Optional[asyncio.Task] = None
self._running = False
self._conn_start_time: Optional[float] = None # 当前连接建立时间(用于 24h 主动重连)
def _ws_base_url(self) -> str:
if getattr(self.client, "testnet", False):
return "wss://stream.binancefuture.com/ws"
return "wss://fstream.binance.com/ws"
async def start(self) -> bool:
"""创建 listenKey 并启动 WS 接收循环与 keepalive 任务。"""
global _stream_instance
if self._running:
return True
self._listen_key = await self.client.create_futures_listen_key()
if not self._listen_key:
logger.warning("UserDataStream: 无法创建 listenKey跳过启动")
return False
self._running = True
_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
self._keepalive_task = asyncio.create_task(self._run_keepalive())
logger.info("UserDataStream: 已启动ORDER_TRADE_UPDATE / ACCOUNT_UPDATE含持仓与余额推送")
return True
async def stop(self):
"""停止 WS 与 keepalive。"""
global _stream_instance
self._running = False
_stream_instance = None
if self._keepalive_task:
self._keepalive_task.cancel()
try:
await self._keepalive_task
except asyncio.CancelledError:
pass
self._keepalive_task = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
self._listen_key = None
logger.info("UserDataStream: 已停止")
async def _run_keepalive(self):
"""每 30 分钟延长 listenKey 有效期(文档:延长至本次调用后 60 分钟)。遇 -1125 主动断线促重连。"""
while self._running:
await asyncio.sleep(30 * 60)
if not self._running or not self._listen_key:
break
ok, code_1125 = await self.client.keepalive_futures_listen_key(self._listen_key)
if not ok and code_1125 and self._ws:
logger.warning("UserDataStream: keepalive 返回 -1125listenKey 不存在),主动断线以换新 key 重连")
try:
await self._ws.close()
except Exception:
pass
break
if not ok:
logger.warning("UserDataStream: keepalive 失败,将在下次 WS 断线时重新创建 listenKey")
async def _run_ws(self):
"""连接 WS 并处理消息;断线/listenKeyExpired/24h 后重连并必要时重新创建 listenKey。"""
import aiohttp
_24h_sec = 23 * 3600 # 文档:单连接有效期不超过 24 小时,提前 1h 重连
while self._running:
url = f"{self._ws_base_url()}/{self._listen_key}"
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=10)) as ws:
self._ws = ws
self._conn_start_time = time.monotonic()
logger.info("UserDataStream: WS 已连接")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
should_break = await self._handle_message(msg.data)
if should_break:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning("UserDataStream: WS 错误")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
# 单连接 24h 主动重连文档每个链接有效期不超过24小时
if self._conn_start_time and (time.monotonic() - self._conn_start_time) >= _24h_sec:
logger.info("UserDataStream: 连接已近 24h主动重连")
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"UserDataStream: WS 异常 {e}60s 后重连")
await asyncio.sleep(60)
self._ws = None
self._conn_start_time = None
if not self._running:
break
# 重连前重新创建 listenKey旧 key 可能已失效或 listenKeyExpired
self._listen_key = await self.client.create_futures_listen_key()
if not self._listen_key:
await asyncio.sleep(60)
continue
async def _handle_message(self, raw: str) -> bool:
"""处理一条推送。返回 True 表示应断开当前连接(如 listenKeyExpired以触发重连。"""
try:
data = json.loads(raw)
except Exception:
return False
e = data.get("e")
if e == "listenKeyExpired":
logger.warning("UserDataStream: 收到 listenKeyExpired将换新 key 重连")
return True
if e == "ORDER_TRADE_UPDATE":
self._on_order_trade_update(data.get("o") or {})
elif e == "ACCOUNT_UPDATE":
self._on_account_update(data.get("a") or {})
elif e == "ALGO_UPDATE":
self._on_algo_update(data.get("o") or {})
return False
def _on_order_trade_update(self, o: Dict):
# 文档: x=本次事件执行类型(NEW/TRADE/CANCELED等), X=订单当前状态, c=clientOrderId, i=orderId
# ap=均价, z=累计成交量, R=只减仓, rp=该交易实现盈亏, s=交易对
event_type = (o.get("x") or "").strip().upper()
status = (o.get("X") or "").strip().upper()
if status != "FILLED":
return
client_order_id = (o.get("c") or "").strip() or None
order_id = o.get("i")
ap = o.get("ap")
z = o.get("z")
reduce_only = o.get("R") is True
symbol = (o.get("s") or "").strip()
rp = o.get("rp") # 实现盈亏
if order_id is None or ap is None or z is None:
return
try:
ap_f = float(ap)
z_f = float(z)
except (TypeError, ValueError):
return
# 特殊 clientOrderId强平/ADL/结算,仅打日志不参与系统 pending 完善
if client_order_id:
if client_order_id.startswith("autoclose-") or client_order_id == "adl_autoclose" or client_order_id.startswith("settlement_autoclose-"):
logger.info(f"UserDataStream: 强平/ADL/结算订单 FILLED orderId={order_id} c={client_order_id!r} rp={rp}")
return
# 仅当事件类型为 TRADE 且状态 FILLED 时视为成交文档x=TRADE 表示有成交)
if event_type and event_type != "TRADE":
return
# 开仓成交:完善 pending 记录
if not reduce_only:
if not client_order_id:
return
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
updated = Trade.update_pending_to_filled(
client_order_id, self.account_id, order_id, ap_f, z_f
)
if updated:
logger.info(f"UserDataStream: 开仓成交已完善 client_order_id={client_order_id!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: update_pending_to_filled 失败 {ex}")
else:
# 平仓成交:按 symbol 回写 open 记录的 exit_order_id若有 rp 可记入日志
if rp is not None:
logger.debug(f"UserDataStream: 平仓订单 FILLED orderId={order_id} symbol={symbol!r} 实现盈亏 rp={rp}")
if symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, order_id):
logger.info(f"UserDataStream: 平仓订单已回写 exit_order_id symbol={symbol!r} orderId={order_id}")
except Exception as ex:
logger.warning(f"UserDataStream: set_exit_order_id_for_open_trade 失败 {ex}")
def _on_account_update(self, a: Dict):
# 文档: a.B = 余额数组,每项 a=资产, wb=钱包余额, cw=除逐仓外的钱包余额, bc=余额变化量
# 文档: a.P = 持仓信息数组,每项 s=symbol, pa=仓位, ep=入仓价, ps=LONG/SHORT/BOTH 等
global _position_updates_cache, _balance_updates_cache
B = a.get("B")
if isinstance(B, list) and B:
for b in B:
asset = (b.get("a") or "").strip()
if asset:
_balance_updates_cache[asset] = {
"wb": b.get("wb"),
"cw": b.get("cw"),
"bc": b.get("bc"),
}
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 余额资产数 {len(B)}")
P = a.get("P")
if isinstance(P, list) and P:
for p in P:
s = (p.get("s") or "").strip()
if s:
if s not in _position_updates_cache:
_position_updates_cache[s] = []
_position_updates_cache[s] = [p]
logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}")
def _on_algo_update(self, o: Dict):
# 条件单交易更新推送X=TRIGGERED/FINISHED 且 ai=触发后普通订单 id 时,回写 open 记录的 exit_order_id
x = (o.get("X") or "").strip().upper()
ai = o.get("ai")
symbol = (o.get("s") or "").strip()
if x in ("TRIGGERED", "FINISHED") and ai and symbol:
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / "backend"
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
if Trade.set_exit_order_id_for_open_trade(symbol, self.account_id, ai):
logger.info(f"UserDataStream: 条件单触发已回写 exit_order_id symbol={symbol!r} ai={ai}")
except Exception as ex:
logger.warning(f"UserDataStream: ALGO_UPDATE set_exit_order_id 失败 {ex}")
logger.debug(f"UserDataStream: ALGO_UPDATE X={x} s={symbol!r} ai={ai}")

View File

@ -0,0 +1,48 @@
条件订单交易更新推送
事件描述
当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ALGO_UPDATE
本次事件的具体执行类型
NEW该状态表示条件订单已提交但尚未触发。
CANCELED该状态表示条件订单已被取消。
TRIGGERING该状态表示条件订单已满足触发条件且已被转发至撮合引擎。
TRIGGERED该状态表示条件订单已成功触发并成功进入撮合引擎。
FINISHED该状态表示触发的条件订单已在撮合引擎中被成交或取消。
REJECTED该状态表示条件订单被撮合引擎拒绝例如保证金检查失败等情况。
EXPIRED该状态表示条件订单被系统取消。例如用户下了一个GTE_GTC时效条件订单但随后关闭了该标的的所有持仓系统因此取消了该条件订单。
事件类型
ALGO_UPDATE
响应示例
{
"e":"ALGO_UPDATE", // 事件类型
"T":1750515742297, // 撮合时间
"E":1750515742303, // 事件时间
"o":{
"caid":"Q5xaq5EGKgXXa0fD7fs0Ip", // 客户端自定条件订单ID
"aid":2148719, // 条件单 Id
"at":"CONDITIONAL", // 条件单类型
"o":"TAKE_PROFIT", //订单类型
"s":"BNBUSDT", //交易对
"S":"SELL", //订单方向
"ps":"BOTH", //持仓方向
"f":"GTC", //有效方式
"q":"0.01", //订单数量
"X":"CANCELED", //条件单状态
"ai":"", // 触发后普通订单 id
"ap": "0.00000", // 触发后在撮合引擎中实际订单的平均成交价格,仅在订单被触发并进入撮合引擎时显示
"aq": "0.00000", // 触发后在撮合引擎中实际订单已成交数量,仅当订单被触发并进入撮合引擎时显示
"act": "0", // 触发后在撮合引擎中实际的订单类型,仅当订单被触发并进入撮合引擎时显示
"tp":"750", //条件单触发价格
"p":"750", //订单价格
"V":"EXPIRE_MAKER", //自成交防止模式
"wt":"CONTRACT_PRICE", //触发价类型
"pm":"NONE", // 价格匹配模式
"cp":false, //是否为触发平仓单; 仅在条件订单情况下会推送此字段
"pP":false, //是否开启条件单触发保护
"R":false, // 是否是只减仓单
"tt":0, //触发时间
"gtd":0, // TIF为GTD的订单自动取消时间
"rm": "Reduce Only reject" // 条件单失败原因
}

51
用户余额.txt Normal file
View File

@ -0,0 +1,51 @@
账户余额V2 (USER_DATA)
查询账户余额
方式
v2/account.balance
请求
{
"id": "605a6d20-6588-4cb9-afa0-b0ab087507ba",
"method": "v2/account.balance",
"params": {
"apiKey": "xTaDyrmvA9XT2oBHHjy39zyPzKCvMdtH3b9q4xadkAg2dNSJXQGCxzui26L823W2",
"timestamp": 1702561978458,
"signature": "208bb94a26f99aa122b1319490ca9cb2798fccc81d9b6449521a26268d53217a"
}
}
请求权重
5
请求参数
名称 类型 是否必需 描述
recvWindow LONG NO
timestamp LONG YES
响应示例
{
"id": "605a6d20-6588-4cb9-afa0-b0ab087507ba",
"status": 200,
"result": [
{
"accountAlias": "SgsR", // 账户唯一识别码
"asset": "USDT", // 资产
"balance": "122607.35137903", // 总余额
"crossWalletBalance": "23.72469206", // 全仓余额
"crossUnPnl": "0.00000000" // 全仓持仓未实现盈亏
"availableBalance": "23.72469206", // 下单可用余额
"maxWithdrawAmount": "23.72469206", // 最大可转出余额
"marginAvailable": true, // 是否可用作联合保证金
"updateTime": 1617939110373
}
],
"rateLimits": [
{
"rateLimitType": "REQUEST_WEIGHT",
"interval": "MINUTE",
"intervalNum": 1,
"limit": 2400,
"count": 20
}
]
}

View File

@ -0,0 +1,110 @@
订单交易更新推送
事件描述
当有新订单创建、订单有新成交或者新的状态变化时会推送此类事件 事件类型统一为 ORDER_TRADE_UPDATE
订单方向
BUY 买入
SELL 卖出
订单类型
LIMIT 限价单
MARKET 市价单
STOP 止损限价单
STOP_MARKET 止损市价单
TAKE_PROFIT 止盈限价单
TAKE_PROFIT_MARKET 止盈市价单
TRAILING_STOP_MARKET 跟踪止损单
LIQUIDATION 爆仓
本次事件的具体执行类型
NEW
CANCELED 已撤
CALCULATED 订单 ADL 或爆仓
EXPIRED 订单失效
TRADE 交易
AMENDMENT 订单修改
订单状态
NEW
PARTIALLY_FILLED
FILLED
CANCELED
EXPIRED
EXPIRED_IN_MATCH
有效方式:
GTC
IOC
FOK
GTX
强平和ADL:
若用户因保证金不足发生强平:
c为"autoclose-XXX"X为"NEW"
若用户保证金充足但被 ADL:
c为“adl_autoclose”X为“NEW”
过期原因
0: 无,默认值
1: 自成交保护,订单被取消
2: IOC订单无法完全成交订单被取消
3: IOC订单因自成交保护无法完全成交订单被取消
4: 只减仓竞争过程中,低优先级的只减仓订单被取消
5: 账户强平过程中,订单被取消
6: 不满足GTE条件订单被取消
7: Symbol下架订单被取消
8: 止盈止损单触发后,初始订单被取消
9: 市价订单无法完全成交,订单被取消
事件类型
ORDER_TRADE_UPDATE
响应示例
{
"e":"ORDER_TRADE_UPDATE", // 事件类型
"E":1568879465651, // 事件时间
"T":1568879465650, // 撮合时间
"o":{
"s":"BTCUSDT", // 交易对
"c":"TEST", // 客户端自定订单ID
// 特殊的自定义订单ID:
// "autoclose-"开头的字符串: 系统强平订单
// "adl_autoclose": ADL自动减仓订单
// "settlement_autoclose-": 下架或交割的结算订单
"S":"SELL", // 订单方向
"o":"TRAILING_STOP_MARKET", // 订单类型
"f":"GTC", // 有效方式
"q":"0.001", // 订单原始数量
"p":"0", // 订单原始价格
"ap":"0", // 订单平均价格
"sp":"7103.04", // 条件订单触发价格,对追踪止损单无效
"x":"NEW", // 本次事件的具体执行类型
"X":"NEW", // 订单的当前状态
"i":8886774, // 订单ID
"l":"0", // 订单末次成交量
"z":"0", // 订单累计已成交量
"L":"0", // 订单末次成交价格
"N": "USDT", // 手续费资产类型
"n": "0", // 手续费数量
"T":1568879465650, // 成交时间
"t":0, // 成交ID
"b":"0", // 买单净值
"a":"9.91", // 卖单净值
"m": false, // 该成交是作为挂单成交吗?
"R":false , // 是否是只减仓单
"wt": "CONTRACT_PRICE", // 触发价类型
"ot": "TRAILING_STOP_MARKET", // 原始订单类型
"ps":"LONG" // 持仓方向
"cp":false, // 是否为触发平仓单; 仅在条件订单情况下会推送此字段
"AP":"7476.89", // 追踪止损激活价格, 仅在追踪止损单时会推送此字段
"cr":"5.0", // 追踪止损回调比例, 仅在追踪止损单时会推送此字段
"pP": false, // 是否开启条件单触发保护
"si": 0, // 忽略
"ss": 0, // 忽略
"rp":"0", // 该交易实现盈亏
"V":"EXPIRE_TAKER", // 自成交防止模式
"pm":"OPPONENT", // 价格匹配模式
"gtd":0, // TIF为GTD的订单自动取消时间
"er":"0" // 过期原因
}
}

118
账户信息流连接.txt Normal file
View File

@ -0,0 +1,118 @@
账户信息流连接
本篇所列出REST接口的baseurl https://fapi.binance.com
用于订阅账户数据的 listenKey 从创建时刻起有效期为60分钟
可以通过PUT一个listenKey延长60分钟有效期如收到-1125报错提示此listenKey不存在建议重新使用POST /fapi/v1/listenKey生成listenKey
可以通过DELETE一个 listenKey 立即关闭当前数据流并使该listenKey 无效
在具有有效listenKey的帐户上执行POST将返回当前有效的listenKey并将其有效期延长60分钟
本篇所列出的websocket接口连接方式如下
Base Url: wss://fstream.binance.com
订阅账户数据流的stream名称为 /ws/`<listenKey>`
连接样例:
wss://fstream.binance.com/ws/XaEAKTsQSRLZAGH9tuIu37plSRsdjmlAVBoNYPUITlTAko1WI22PgmBMpI1rS8Yh
每个链接有效期不超过24小时请妥善处理断线重连。
单一账户,单一连接的推送数据流消息可以保证时间序; 强烈建议您使用 E 字段进行排序
考虑到剧烈行情下, RESTful接口可能存在查询延迟我们强烈建议您优先从Websocket user data stream推送的消息来获取订单仓位等信息。
生成listenKey (USER_STREAM)
接口描述
创建一个新的user data stream返回值为一个listenKey即websocket订阅的stream名称。如果该帐户具有有效的listenKey则将返回该listenKey并将其有效期延长60分钟。
HTTP请求
POST /fapi/v1/listenKey
请求权重
1
请求参数
None
响应示例
{
"listenKey": "pqia91ma19a5s61cv6a81va65sdf19v8a65a1a5s61cv6a81va65sdf19v8a65a1"
}
延长listenKey有效期(USER_STREAM)
接口描述
有效期延长至本次调用后60分钟
HTTP请求
PUT /fapi/v1/listenKey
请求权重
1
请求参数
None
{
"listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg" // 被延长的listenkey
}
Websocket API生成listenKey (USER_STREAM)
接口描述
创建一个新的user data stream返回值为一个listenKey即websocket订阅的stream名称。如果该帐户具有有效的listenKey则将返回该listenKey并将其有效期延长60分钟。
方式
userDataStream.start
请求
{
"id": "d3df8a61-98ea-4fe0-8f4e-0fcea5d418b0",
"method": "userDataStream.start",
"params": {
"apiKey": "vmPUZE6mv9SD5VNHk4HlWFsOr6aKE2zvsw0MuIgwCIPy6utIco14y7Ju91duEh8A"
}
}
Websocket API延长listenKey有效期(USER_STREAM)
接口描述
有效期延长至本次调用后60分钟
方式
userDataStream.ping
请求
{
"id": "815d5fce-0880-4287-a567-80badf004c74",
"method": "userDataStream.ping",
"params": {
"apiKey": "vmPUZE6mv9SD5VNHk9HlWFsOr9aLE2zvsw0MuIgwCIPy8atIco14y7Ju91duEh8A"
}
}
请求权重
1
请求参数
None
响应示例
{
"id": "815d5fce-0880-4287-a567-80badf004c74",
"status": 200,
"result": {
"listenKey": "3HBntNTepshgEdjIwSUIBgB9keLyOCg5qv3n6bYAtktG8ejcaW5HXz9Vx1JgIieg"
},
"rateLimits": [
{
"rateLimitType": "REQUEST_WEIGHT",
"interval": "MINUTE",
"intervalNum": 1,
"limit": 2400,
"count": 2
}
]
}