""" 现货推荐扫描:拉取币安现货行情,仅做多信号,写入 Redis 供 /api/recommendations/spot 使用。 使用公开 API,无需 API Key。定时任务调用 run_spot_scan_and_cache()。 """ import asyncio import json import logging import os import sys import time from datetime import datetime, timezone from typing import Any, Dict, List, Optional import aiohttp # 可选的 Redis 写入(与 recommendations 路由共用连接方式) try: import redis.asyncio as redis_async except Exception: redis_async = None logger = logging.getLogger(__name__) BINANCE_SPOT_BASE = "https://api.binance.com" SPOT_KLINES_LIMIT = 60 SPOT_TOP_N = 80 SPOT_MIN_STRENGTH = 4 SPOT_MAX_RECS = 30 def _beijing_now_iso() -> str: from datetime import timedelta return datetime.now(tz=timezone(timedelta(hours=8))).isoformat() async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None) -> Optional[Any]: try: async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=15)) as resp: if resp.status != 200: return None return await resp.json() except Exception as e: logger.warning("spot_scanner _http_get %s: %s", url[:60], e) return None def _technical_indicators(): """延迟导入 trading_system.indicators,避免 backend 强依赖 trading_system 路径。""" project_root = __import__("pathlib").Path(__file__).resolve().parent.parent trading_system = project_root / "trading_system" if str(trading_system) not in sys.path: sys.path.insert(0, str(trading_system)) try: from indicators import TechnicalIndicators return TechnicalIndicators except ImportError: from trading_system.indicators import TechnicalIndicators return TechnicalIndicators async def _fetch_spot_symbols(session: aiohttp.ClientSession) -> List[str]: """获取所有 USDT 现货交易对(status=TRADING)。""" data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/exchangeInfo") if not data or "symbols" not in data: return [] symbols = [] for s in data["symbols"]: if s.get("status") != "TRADING": continue if s.get("quoteAsset") != "USDT": continue sym = s.get("symbol") if sym: symbols.append(sym) return symbols async def _fetch_spot_ticker_24h(session: aiohttp.ClientSession) -> List[Dict]: """获取 24h ticker,返回 list of dict (symbol, lastPrice, priceChangePercent, volume, ...)。""" data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr") if not isinstance(data, list): return [] return data async def _fetch_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 60) -> Optional[List[List]]: """现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。""" data = await _http_get( session, f"{BINANCE_SPOT_BASE}/api/v3/klines", {"symbol": symbol, "interval": interval, "limit": limit}, ) return data if isinstance(data, list) else None def _compute_spot_signal(klines: List[List], ticker: Dict, TechnicalIndicators) -> Optional[Dict]: """ 基于 K 线计算只做多信号。返回 None 或 { direction: 'BUY', strength: int, ... }。 """ if not klines or len(klines) < 50: return None closes = [float(k[4]) for k in klines] highs = [float(k[2]) for k in klines] lows = [float(k[3]) for k in klines] current_price = closes[-1] rsi = TechnicalIndicators.calculate_rsi(closes, period=14) macd = TechnicalIndicators.calculate_macd(closes) bollinger = TechnicalIndicators.calculate_bollinger_bands(closes, period=20) ema20 = TechnicalIndicators.calculate_ema(closes, period=20) ema50 = TechnicalIndicators.calculate_ema(closes, period=50) strength = 0 # 只做多:RSI 超卖、价格在下轨附近、MACD 金叉、价格在均线上方等 if rsi is not None and rsi < 35: strength += 3 elif rsi is not None and rsi < 50: strength += 1 if bollinger and current_price <= bollinger["lower"] * 1.002: strength += 3 elif bollinger and current_price < bollinger["middle"]: strength += 1 if macd and macd["histogram"] > 0 and macd["macd"] > macd["signal"]: strength += 2 if ema20 and ema50 and current_price > ema20 > ema50: strength += 2 elif ema20 and current_price > ema20: strength += 1 strength = max(0, min(strength, 10)) if strength < SPOT_MIN_STRENGTH: return None return { "direction": "BUY", "strength": strength, "rsi": rsi, "current_price": current_price, } def _build_spot_recommendation( symbol: str, ticker: Dict, signal: Dict, ) -> Dict[str, Any]: """构造单条现货推荐(与合约推荐结构兼容,便于前端复用)。""" current_price = float(ticker.get("lastPrice") or signal.get("current_price") or 0) change_percent = float(ticker.get("priceChangePercent") or 0) ts = time.time() entry = current_price * 0.995 stop_pct = 0.05 tp1_pct = 0.08 tp2_pct = 0.15 if current_price <= 0: return None stop_loss = entry * (1 - stop_pct) tp1 = entry * (1 + tp1_pct) tp2 = entry * (1 + tp2_pct) return { "symbol": symbol, "direction": "BUY", "market": "spot", "current_price": current_price, "signal_strength": signal.get("strength", 0), "change_percent": change_percent, "suggested_limit_price": entry, "planned_entry_price": entry, "suggested_stop_loss": stop_loss, "suggested_take_profit_1": tp1, "suggested_take_profit_2": tp2, "suggested_position_percent": 0.05, "recommendation_time": _beijing_now_iso(), "timestamp": ts, "recommendation_reason": "现货做多信号(RSI/布林带/MACD/均线)", "user_guide": f"现货建议在 {entry:.4f} USDT 附近买入,止损 {stop_loss:.4f},目标1 {tp1:.4f},目标2 {tp2:.4f}。仅供参考,请自行判断。", } async def run_spot_scan() -> List[Dict[str, Any]]: """执行一次现货扫描,返回推荐列表(不写 Redis)。""" TechnicalIndicators = _technical_indicators() recommendations = [] async with aiohttp.ClientSession() as session: symbols = await _fetch_spot_symbols(session) if not symbols: logger.warning("spot_scanner: 未获取到现货交易对") return [] tickers = await _fetch_spot_ticker_24h(session) ticker_map = {t["symbol"]: t for t in tickers if isinstance(t.get("symbol"), str)} # 按 24h 成交量排序,取前 SPOT_TOP_N 再按涨跌幅取部分 def volume_key(t): try: return float(t.get("volume") or 0) * float(t.get("lastPrice") or 0) except Exception: return 0 sorted_tickers = sorted( [t for t in tickers if t.get("symbol") in symbols], key=volume_key, reverse=True, )[: SPOT_TOP_N * 2] # 按涨跌幅取前 N 个(偏强势或超跌反弹) with_change = [(t, float(t.get("priceChangePercent") or 0)) for t in sorted_tickers] with_change.sort(key=lambda x: -abs(x[1])) to_scan = [t[0]["symbol"] for t in with_change[: SPOT_TOP_N]] for symbol in to_scan: try: klines = await _fetch_spot_klines(session, symbol, "15m", SPOT_KLINES_LIMIT) ticker = ticker_map.get(symbol, {}) if not klines or not ticker: continue signal = _compute_spot_signal(klines, ticker, TechnicalIndicators) if not signal: continue rec = _build_spot_recommendation(symbol, ticker, signal) if rec: recommendations.append(rec) if len(recommendations) >= SPOT_MAX_RECS: break except Exception as e: logger.debug("spot_scanner %s: %s", symbol, e) await asyncio.sleep(0.05) recommendations.sort(key=lambda x: x.get("signal_strength", 0), reverse=True) return recommendations[: SPOT_MAX_RECS] def _redis_connection_kwargs(): redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" kwargs = {"decode_responses": True} if os.getenv("REDIS_USERNAME"): kwargs["username"] = os.getenv("REDIS_USERNAME") if os.getenv("REDIS_PASSWORD"): kwargs["password"] = os.getenv("REDIS_PASSWORD") if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true": if redis_url.startswith("redis://"): redis_url = redis_url.replace("redis://", "rediss://", 1) kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required")) if os.getenv("REDIS_SSL_CA_CERTS"): kwargs["ssl_ca_certs"] = os.getenv("REDIS_SSL_CA_CERTS") return redis_url, kwargs async def run_spot_scan_and_cache(ttl_sec: int = 900) -> int: """ 执行现货扫描并写入 Redis。返回写入的推荐数量。 Redis key: recommendations:spot:snapshot """ items = await run_spot_scan() now_ms = int(time.time() * 1000) payload = { "items": items, "generated_at": _beijing_now_iso(), "generated_at_ms": now_ms, "ttl_sec": ttl_sec, "count": len(items), } if redis_async is None: logger.warning("spot_scanner: redis 不可用,跳过写入") return len(items) redis_url, kwargs = _redis_connection_kwargs() try: client = redis_async.from_url(redis_url, **kwargs) await client.ping() key = "recommendations:spot:snapshot" await client.setex(key, ttl_sec, json.dumps(payload, ensure_ascii=False)) logger.info("spot_scanner: 已写入 %d 条现货推荐到 %s", len(items), key) await client.aclose() return len(items) except Exception as e: logger.warning("spot_scanner: Redis 写入失败 %s", e) return len(items)