""" 现货推荐扫描:使用币安现货公开 API,只做多信号,供定时任务写入 Redis。 不依赖合约 API 和 API Key,仅使用 api.binance.com 公开接口。 """ import asyncio import logging import time from datetime import datetime, timezone from typing import List, Dict, Any, Optional import aiohttp try: from .indicators import TechnicalIndicators except ImportError: from indicators import TechnicalIndicators logger = logging.getLogger(__name__) BINANCE_SPOT_BASE = "https://api.binance.com" BEIJING_TZ = timezone(offset=__import__("datetime").timedelta(hours=8)) # 默认只扫描成交量靠前的现货 USDT 对数量 DEFAULT_TOP_N = 80 # 每个 symbol 的 K 线请求间隔,避免触发限频 KLINES_DELAY = 0.15 async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None, timeout: int = 15) -> Any: try: async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=timeout)) as resp: if resp.status != 200: text = await resp.text() logger.warning(f"现货 API 请求失败 {url} status={resp.status} body={text[:200]}") return None return await resp.json() except Exception as e: logger.warning(f"现货 API 请求异常 {url}: {e}") return None async def get_spot_tickers_24h(session: aiohttp.ClientSession) -> List[Dict]: """获取现货 24h 行情,用于按成交量排序、取涨跌幅。""" url = f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr" data = await _http_get(session, url) if not isinstance(data, list): return [] # 只保留 USDT 交易对 out = [t for t in data if isinstance(t, dict) and str(t.get("symbol", "")).endswith("USDT")] return out async def get_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 100) -> Optional[List[List]]: """获取现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。""" url = f"{BINANCE_SPOT_BASE}/api/v3/klines" params = {"symbol": symbol, "interval": interval, "limit": limit} data = await _http_get(session, url, params) if not isinstance(data, list) or len(data) < 20: return None return data def _score_spot_long(ticker: Dict, klines: Optional[List], rsi: Optional[float], macd: Optional[Dict]) -> int: """ 现货做多评分 0-10。 考虑:24h 涨跌幅不过度下跌、RSI 非超买、MACD 偏多或金叉。 """ score = 5 # 中性起点 change = 0.0 try: change = float(ticker.get("priceChangePercent", 0) or 0) except (TypeError, ValueError): pass # 24h 涨跌幅:正加分,负减分 if change >= 2: score += 1 elif change >= 0: score += 0 elif change >= -2: score -= 0 else: score -= 1 if rsi is not None: if rsi < 30: score += 1 # 超卖反弹机会 elif rsi < 45: score += 0 elif rsi > 70: score -= 2 # 超买不推荐 elif rsi > 55: score -= 0 if macd and isinstance(macd, dict): hist = macd.get("histogram") or 0 if hist > 0: score += 1 elif hist < 0: score -= 0 return max(0, min(10, int(score))) async def run_spot_scan( top_n: int = DEFAULT_TOP_N, min_signal_strength: int = 5, kline_interval: str = "15m", kline_limit: int = 100, ) -> List[Dict[str, Any]]: """ 执行现货扫描,返回推荐列表(仅做多)。 按 24h 成交量取前 top_n 个 USDT 交易对,拉 K 线算 RSI/MACD,评分后过滤。 """ recommendations = [] async with aiohttp.ClientSession() as session: tickers = await get_spot_tickers_24h(session) if not tickers: logger.warning("现货 24h ticker 未获取到数据") return [] # 按 quoteVolume 排序,取前 top_n def vol_key(t): try: return float(t.get("quoteVolume") or 0) except (TypeError, ValueError): return 0.0 tickers.sort(key=vol_key, reverse=True) symbols_to_scan = [t["symbol"] for t in tickers[: top_n * 2]] # 多取一些,后面按分数再筛 for i, ticker in enumerate(tickers[:top_n]): symbol = ticker.get("symbol") if not symbol or not str(symbol).endswith("USDT"): continue try: klines = await get_spot_klines(session, symbol, kline_interval, kline_limit) await asyncio.sleep(KLINES_DELAY) except Exception as e: logger.debug(f"获取 {symbol} K线失败: {e}") klines = None if not klines or len(klines) < 30: continue closes = [float(k[4]) for k in klines] highs = [float(k[2]) for k in klines] lows = [float(k[3]) for k in klines] rsi = TechnicalIndicators.calculate_rsi(closes, period=14) macd = TechnicalIndicators.calculate_macd(closes, 12, 26, 9) score = _score_spot_long(ticker, klines, rsi, macd) if score < min_signal_strength: continue try: price = float(ticker.get("lastPrice") or ticker.get("weightedAvgPrice") or closes[-1]) except (TypeError, ValueError): price = closes[-1] try: change_percent = float(ticker.get("priceChangePercent") or 0) except (TypeError, ValueError): change_percent = 0.0 recommendation_time = datetime.now(BEIJING_TZ).isoformat() rec = { "symbol": symbol, "direction": "BUY", "side": "BUY", "signal_strength": score, "current_price": price, "change_percent": change_percent, "recommendation_time": recommendation_time, "market": "spot", "recommendation_reason": f"现货做多信号 RSI={rsi:.1f if rsi else '-'} 24h涨跌{change_percent:+.2f}%", "user_guide": f"现货建议在 {price:.4f} 附近分批买入,并自行设置止损止盈。", "timestamp": time.time(), } recommendations.append(rec) # 按信号强度排序 recommendations.sort(key=lambda x: (x.get("signal_strength", 0), -(x.get("change_percent") or 0)), reverse=True) return recommendations[:50] # 最多返回 50 条 async def run_spot_scan_and_cache_redis(ttl_sec: int = 3600) -> int: """ 执行现货扫描并将结果写入 Redis(key: recommendations:spot:snapshot)。 需要环境变量 REDIS_URL 等与后端一致。 返回写入的推荐条数。 """ import os import json try: import redis.asyncio as redis_async except Exception: logger.warning("redis.asyncio 不可用,无法写入现货推荐缓存") return 0 redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" kwargs = {"decode_responses": True} if os.getenv("REDIS_USERNAME"): kwargs["username"] = os.getenv("REDIS_USERNAME") if os.getenv("REDIS_PASSWORD"): kwargs["password"] = os.getenv("REDIS_PASSWORD") if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true": if not redis_url.startswith("rediss://"): redis_url = redis_url.replace("redis://", "rediss://", 1) kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required")) try: rds = redis_async.from_url(redis_url, **kwargs) await rds.ping() except Exception as e: logger.warning(f"Redis 连接失败: {e}") return 0 try: items = await run_spot_scan(top_n=DEFAULT_TOP_N, min_signal_strength=5) now_ms = int(time.time() * 1000) payload = { "items": items, "generated_at_ms": now_ms, "generated_at": datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S"), "ttl_sec": ttl_sec, "market": "spot", } await rds.setex( "recommendations:spot:snapshot", ttl_sec, json.dumps(payload, ensure_ascii=False), ) logger.info(f"现货推荐已写入 Redis,共 {len(items)} 条,TTL={ttl_sec}s") await rds.aclose() return len(items) except Exception as e: logger.exception(f"现货扫描或写入 Redis 失败: {e}") try: await rds.aclose() except Exception: pass return 0