auto_trade_sys/trading_system/kline_stream.py
薇薇安 249aec917a feat(binance_client, market_scanner, position_manager): 增强行情数据获取与处理逻辑
在 `binance_client` 中新增多个公开行情接口,包括深度信息、资金费率和未平仓合约数的获取,优化了 REST API 的调用逻辑。更新 `market_scanner` 以并行请求主周期和确认周期的 K线数据,提升了数据获取效率并引入超时处理。`position_manager` 中增加了从深度信息获取当前价格的逻辑,确保在多种情况下都能准确获取价格,增强了系统的稳定性和可追溯性。
2026-02-16 17:30:05 +08:00

264 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K线 WebSocket 流:订阅 <symbol>@kline_<interval>维护K线缓存。
供 get_klines 优先使用,替代 REST 拉取,减少超时、实时更新技术指标。
文档:推送间隔 250ms仅推送最新一根K线的更新x=false 表示K线未完结x=true 表示已完结。
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
# K线缓存{ (symbol, interval): [kline1, kline2, ...] },最多保留 limit 根
_kline_cache: Dict[Tuple[str, str], List[List]] = {}
_kline_cache_updated_at: Dict[Tuple[str, str], float] = {}
_kline_cache_limit: Dict[Tuple[str, str], int] = {} # 每个 (symbol, interval) 的 limit
def get_klines_from_cache(symbol: str, interval: str, limit: int = 50) -> Optional[List[List]]:
"""从缓存返回K线数据与 REST get_klines 格式兼容)。未订阅或数据不足时返回 None。"""
key = (symbol.upper(), interval.lower())
cached = _kline_cache.get(key)
if not cached or len(cached) < limit:
return None
# 返回最后 limit 根
return cached[-limit:]
def is_kline_cache_fresh(symbol: str, interval: str, max_age_sec: float = 300.0) -> bool:
"""缓存是否在 max_age_sec 秒内更新过。"""
key = (symbol.upper(), interval.lower())
updated = _kline_cache_updated_at.get(key, 0)
if updated == 0:
return False
return (time.monotonic() - updated) <= max_age_sec
class KlineStream:
"""订阅合约 K线流持续更新 _kline_cache。支持动态订阅/取消订阅。"""
def __init__(self, testnet: bool = False):
self.testnet = testnet
self._ws = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._subscribed: Dict[Tuple[str, str], bool] = {} # (symbol, interval) -> 是否已订阅
self._subscription_lock = asyncio.Lock()
def _ws_base_url(self) -> str:
if self.testnet:
return "wss://stream.binancefuture.com"
return "wss://fstream.binance.com"
async def start(self) -> bool:
global _kline_stream_instance
if self._running:
return True
self._running = True
_kline_stream_instance = self
self._task = asyncio.create_task(self._run_ws())
logger.info("KlineStream: 已启动(支持动态订阅)")
return True
async def stop(self):
global _kline_stream_instance
self._running = False
_kline_stream_instance = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("KlineStream: 已停止")
async def subscribe(self, symbol: str, interval: str, limit: int = 50) -> bool:
"""订阅指定 symbol 和 interval 的K线流若 WS 未连接则等待连接后订阅)。"""
symbol = symbol.upper()
interval = interval.lower()
key = (symbol, interval)
async with self._subscription_lock:
if self._subscribed.get(key):
return True
if not self._running:
return False
# 等待 WS 连接(最多等待 5 秒)
for _ in range(50):
if self._ws:
break
await asyncio.sleep(0.1)
if not self._ws:
return False
stream_name = f"{symbol.lower()}@kline_{interval}"
try:
await self._ws.send_json({
"method": "SUBSCRIBE",
"params": [stream_name],
"id": int(time.time() * 1000) % 1000000,
})
self._subscribed[key] = True
_kline_cache_limit[key] = limit
logger.debug(f"KlineStream: 已订阅 {symbol} {interval}")
return True
except (ConnectionResetError, OSError) as e:
msg = str(e).lower()
if "closing transport" in msg or "cannot write" in msg:
logger.debug("KlineStream: 连接关闭中,订阅 %s %s 跳过(重连后将自动重试)", symbol, interval)
else:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
except Exception as e:
logger.warning(f"KlineStream: 订阅 {symbol} {interval} 失败: {e}")
return False
async def _run_ws(self):
import aiohttp
# 使用组合流,支持动态订阅
url = f"{self._ws_base_url()}/stream"
while self._running:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15)
) as ws:
self._ws = ws
logger.info("KlineStream: WS 已连接(组合流,支持动态订阅)")
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
# 处理订阅响应({"result": null, "id": ...}或K线数据
try:
data = json.loads(raw)
if isinstance(data, dict) and "result" in data:
# 订阅响应,忽略
continue
except Exception:
pass
self._handle_message(raw)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE):
break
except asyncio.CancelledError:
break
except Exception as e:
err_msg = getattr(e, "message", str(e)) or repr(e)
err_type = type(e).__name__
logger.warning(
"KlineStream: WS 异常 %s: %s10s 后重连",
err_type, err_msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await asyncio.sleep(10)
self._ws = None
# 重连时清空订阅状态,需要重新订阅
async with self._subscription_lock:
self._subscribed.clear()
if not self._running:
break
def _handle_message(self, raw: str):
global _kline_cache, _kline_cache_updated_at
try:
data = json.loads(raw)
except Exception:
return
# 组合流格式:{ "stream": "btcusdt@kline_1h", "data": {...} }
if isinstance(data, dict) and "stream" in data:
stream = data.get("stream", "")
kline_data = data.get("data", {})
else:
kline_data = data
stream = ""
if not isinstance(kline_data, dict) or kline_data.get("e") != "kline":
return
k = kline_data.get("k")
if not isinstance(k, dict):
return
s = (k.get("s") or "").strip().upper()
i = (k.get("i") or "").strip().lower()
if not s or not i:
return
key = (s, i)
if key not in self._subscribed:
return
# 转换为 REST 格式:[open_time, open, high, low, close, volume, close_time, quote_volume, trades, ...]
try:
t = int(k.get("t", 0))
o = float(k.get("o", 0))
h = float(k.get("h", 0))
l = float(k.get("l", 0))
c = float(k.get("c", 0))
v = float(k.get("v", 0))
T = int(k.get("T", 0))
q = float(k.get("q", 0))
n = int(k.get("n", 0))
x = k.get("x", False) # 是否完结
except (TypeError, ValueError):
return
kline_rest_format = [
t, # open_time
str(o), # open
str(h), # high
str(l), # low
str(c), # close
str(v), # volume
T, # close_time
str(q), # quote_volume
n, # trades
"0", # taker_buy_base_volume
"0", # taker_buy_quote_volume
"0", # ignore
]
# 更新缓存:若 x=true完结追加新K线若 x=false未完结更新最后一根
if key not in _kline_cache:
_kline_cache[key] = []
cache_list = _kline_cache[key]
if x:
# K线完结追加新K线
cache_list.append(kline_rest_format)
limit = _kline_cache_limit.get(key, 50)
if len(cache_list) > limit * 2:
cache_list[:] = cache_list[-limit:]
else:
# K线未完结更新最后一根或追加第一根
if cache_list:
cache_list[-1] = kline_rest_format
else:
cache_list.append(kline_rest_format)
_kline_cache_updated_at[key] = time.monotonic()
logger.debug(f"KlineStream: 已更新 {s} {i} (完结={x}),缓存 {len(cache_list)}")
# 全局 KlineStream 实例
_kline_stream_instance: Optional["KlineStream"] = None
def get_kline_stream_instance() -> Optional["KlineStream"]:
"""返回当前运行的 KlineStream 实例(未启动时为 None"""
return _kline_stream_instance
def set_kline_stream_instance(instance: Optional["KlineStream"]):
"""设置全局 KlineStream 实例(由 main 调用)。"""
global _kline_stream_instance
_kline_stream_instance = instance