""" 市场行情概览 - 用于全局配置页展示 拉取 Binance 公开接口(无需 API Key),与策略过滤逻辑对应的数据。 """ import json import logging import ssl import urllib.request from typing import Any, Dict, Optional logger = logging.getLogger(__name__) BINANCE_FUTURES_BASE = "https://fapi.binance.com" BINANCE_FUTURES_DATA = "https://fapi.binance.com/futures/data" REQUEST_TIMEOUT = 10 def _http_get(url: str, params: Optional[dict] = None) -> Optional[Any]: """发起 GET 请求,返回 JSON 或 None。""" if params: qs = "&".join(f"{k}={v}" for k, v in params.items()) url = f"{url}?{qs}" try: req = urllib.request.Request(url, headers={"Accept": "application/json"}) ctx = ssl.create_default_context() with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT, context=ctx) as resp: return json.loads(resp.read().decode("utf-8")) except Exception as e: logger.debug("market_overview HTTP GET 失败 %s: %s", url[:80], e) return None def _fetch_klines(symbol: str, interval: str, limit: int) -> Optional[list]: """获取 K 线数据。""" data = _http_get( f"{BINANCE_FUTURES_BASE}/fapi/v1/klines", {"symbol": symbol, "interval": interval, "limit": limit}, ) return data if isinstance(data, list) else None def _compute_change_from_klines(klines: list, periods: int) -> Optional[float]: """根据 K 线计算最近 N 根的总涨跌幅(比例,如 -0.0167 表示 -1.67%)。""" if not klines or len(klines) < periods + 1: return None first_close = float(klines[0][4]) last_close = float(klines[-1][4]) return (last_close - first_close) / first_close if first_close else None def fetch_symbol_change_period(symbol: str, interval: str, periods: int) -> Optional[float]: """获取指定交易对在指定周期内的涨跌幅(比例)。""" klines = _fetch_klines(symbol, interval, periods + 1) return _compute_change_from_klines(klines, periods) if klines else None def fetch_ticker_24h(symbol: str) -> Optional[Dict]: """获取 24h ticker。""" data = _http_get(f"{BINANCE_FUTURES_BASE}/fapi/v1/ticker/24hr", {"symbol": symbol}) return data if isinstance(data, dict) else None def fetch_premium_index(symbol: str) -> Optional[Dict]: """获取资金费率等。""" data = _http_get(f"{BINANCE_FUTURES_BASE}/fapi/v1/premiumIndex", {"symbol": symbol}) return data if isinstance(data, dict) else None def fetch_long_short_ratio(symbol: str = "BTCUSDT", period: str = "1d", limit: int = 1) -> Optional[float]: """获取大户多空比。""" data = _http_get( f"{BINANCE_FUTURES_DATA}/topLongShortPositionRatio", {"symbol": symbol, "period": period, "limit": limit}, ) if not isinstance(data, list) or len(data) == 0: return None try: return float(data[-1].get("longShortRatio", 1)) except (TypeError, ValueError): return None def get_market_overview() -> Dict[str, Any]: """ 获取市场行情概览,与策略过滤逻辑对应的数据。 供全局配置页展示,帮助用户确认当前策略方案是否匹配市场。 """ result = { "btc_24h_change_pct": None, "eth_24h_change_pct": None, "btc_15m_change_pct": None, "btc_1h_change_pct": None, "eth_15m_change_pct": None, "eth_1h_change_pct": None, "btc_funding_rate": None, "eth_funding_rate": None, "btc_long_short_ratio": None, "btc_trend_4h": None, "market_regime": None, "beta_filter_triggered": None, } # 24h 涨跌幅 btc_ticker = fetch_ticker_24h("BTCUSDT") if btc_ticker is not None: try: result["btc_24h_change_pct"] = round(float(btc_ticker.get("priceChangePercent", 0)), 2) except (TypeError, ValueError): pass eth_ticker = fetch_ticker_24h("ETHUSDT") if eth_ticker is not None: try: result["eth_24h_change_pct"] = round(float(eth_ticker.get("priceChangePercent", 0)), 2) except (TypeError, ValueError): pass # 15m / 1h 涨跌幅(大盘共振过滤用) btc_15m = fetch_symbol_change_period("BTCUSDT", "15m", 5) btc_1h = fetch_symbol_change_period("BTCUSDT", "1h", 3) eth_15m = fetch_symbol_change_period("ETHUSDT", "15m", 5) eth_1h = fetch_symbol_change_period("ETHUSDT", "1h", 3) if btc_15m is not None: result["btc_15m_change_pct"] = round(btc_15m * 100, 2) if btc_1h is not None: result["btc_1h_change_pct"] = round(btc_1h * 100, 2) if eth_15m is not None: result["eth_15m_change_pct"] = round(eth_15m * 100, 2) if eth_1h is not None: result["eth_1h_change_pct"] = round(eth_1h * 100, 2) # 资金费率 btc_prem = fetch_premium_index("BTCUSDT") if btc_prem is not None: try: result["btc_funding_rate"] = round(float(btc_prem.get("lastFundingRate", 0)), 6) except (TypeError, ValueError): pass eth_prem = fetch_premium_index("ETHUSDT") if eth_prem is not None: try: result["eth_funding_rate"] = round(float(eth_prem.get("lastFundingRate", 0)), 6) except (TypeError, ValueError): pass # 大户多空比 lsr = fetch_long_short_ratio("BTCUSDT", "1d", 1) if lsr is not None: result["btc_long_short_ratio"] = round(lsr, 4) # 4H 趋势 klines_4h = _fetch_klines("BTCUSDT", "4h", 60) if klines_4h and len(klines_4h) >= 21: try: from trading_system.market_regime_detector import compute_trend_4h_from_klines result["btc_trend_4h"] = compute_trend_4h_from_klines(klines_4h) except Exception: result["btc_trend_4h"] = _simple_trend_4h(klines_4h) # 市场状态(bull/bear/normal) try: from trading_system.market_regime_detector import detect_market_regime regime, details = detect_market_regime() result["market_regime"] = regime result["market_regime_details"] = details except Exception as e: logger.debug("market_overview 获取市场状态失败: %s", e) return result def _simple_trend_4h(klines: list) -> str: """简化 4H 趋势:价格 vs 最近一根 K 线前 20 根均价。""" if len(klines) < 21: return "neutral" closes = [float(k[4]) for k in klines] price = closes[-1] avg20 = sum(closes[-21:-1]) / 20 if price > avg20 * 1.002: return "up" if price < avg20 * 0.998: return "down" return "neutral"