diff --git a/backend/api/main.py b/backend/api/main.py index 83ced05..6fd1668 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from api.routes import config, trades, stats, dashboard, account, recommendations, system, accounts, auth, admin, public, data_management import os +import sys import logging from pathlib import Path from logging.handlers import RotatingFileHandler @@ -165,6 +166,32 @@ app = FastAPI( redirect_slashes=False # 禁用自动重定向,避免307重定向问题 ) +# 现货推荐定时扫描间隔(秒),默认 15 分钟;设为 0 关闭定时扫描 +SPOT_SCAN_INTERVAL_SEC = int(os.getenv("SPOT_SCAN_INTERVAL_SEC", "900")) + + +async def _spot_scan_loop(): + """后台循环:每隔 SPOT_SCAN_INTERVAL_SEC 执行一次现货扫描并写入 Redis。""" + if SPOT_SCAN_INTERVAL_SEC <= 0: + logger.info("现货推荐定时扫描已关闭(SPOT_SCAN_INTERVAL_SEC=0)") + return + import asyncio + backend_dir = Path(__file__).resolve().parent.parent + sys.path.insert(0, str(backend_dir)) + try: + from spot_scanner import run_spot_scan_and_cache + except Exception as e: + logger.warning("现货扫描模块加载失败,跳过定时任务: %s", e) + return + logger.info("现货推荐定时扫描已启动,间隔 %d 秒", SPOT_SCAN_INTERVAL_SEC) + while True: + try: + await run_spot_scan_and_cache(ttl_sec=900) + except Exception as e: + logger.warning("现货扫描执行失败: %s", e) + await asyncio.sleep(SPOT_SCAN_INTERVAL_SEC) + + # 启动时:确保存在一个初始管理员(通过环境变量配置) @app.on_event("startup") async def _ensure_initial_admin(): @@ -201,6 +228,14 @@ async def _ensure_initial_admin(): except Exception as e: logger.warning(f"初始化管理员失败(可忽略): {e}") + # 启动现货推荐定时扫描(后台任务) + try: + import asyncio + asyncio.create_task(_spot_scan_loop()) + except Exception as e: + logger.warning("启动现货扫描定时任务失败(可忽略): %s", e) + + # CORS配置(允许React前端访问) # 默认包含:本地开发端口、主前端域名、推荐查看器域名 cors_origins_str = os.getenv('CORS_ORIGINS', 'http://localhost:3000,http://localhost:3001,http://localhost:5173,http://as.deepx1.com,http://asapi.deepx1.com,http://r.deepx1.com,https://r.deepx1.com,http://asapi-new.deepx1.com') diff --git a/backend/api/routes/recommendations.py b/backend/api/routes/recommendations.py index 698a0c5..16aadc9 100644 --- a/backend/api/routes/recommendations.py +++ b/backend/api/routes/recommendations.py @@ -495,6 +495,72 @@ async def get_recommendations( raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}") +REDIS_KEY_SPOT_SNAPSHOT = "recommendations:spot:snapshot" + + +@router.get("/spot") +async def get_spot_recommendations( + limit: int = Query(50, ge=1, le=200, description="返回数量限制"), +): + """ + 获取现货推荐(只做多)。数据来自定时任务扫描并写入的 Redis 缓存。 + """ + try: + rds = await _get_redis() + if rds is None: + raise HTTPException(status_code=503, detail="Redis 不可用,无法读取现货推荐缓存") + snapshot = await _get_cached_json(rds, REDIS_KEY_SPOT_SNAPSHOT) + if not isinstance(snapshot, dict): + return { + "success": True, + "count": 0, + "type": "spot", + "from_cache": False, + "meta": {"generated_at": None, "message": "暂无现货推荐数据,请等待定时扫描更新"}, + "data": [], + } + items = snapshot.get("items") or [] + if not isinstance(items, list): + items = [] + items = items[:limit] + return { + "success": True, + "count": len(items), + "type": "spot", + "from_cache": True, + "meta": { + "generated_at": snapshot.get("generated_at"), + "generated_at_ms": snapshot.get("generated_at_ms"), + "ttl_sec": snapshot.get("ttl_sec"), + }, + "data": items, + } + except HTTPException: + raise + except Exception as e: + logger.error(f"获取现货推荐失败: {e}") + raise HTTPException(status_code=500, detail=f"获取现货推荐失败: {str(e)}") + + +@router.post("/spot/scan") +async def trigger_spot_scan(): + """ + 手动触发一次现货扫描并更新 Redis 缓存(供定时任务或管理员调用)。 + """ + try: + import sys + from pathlib import Path + backend_dir = Path(__file__).resolve().parent.parent.parent + if str(backend_dir) not in sys.path: + sys.path.insert(0, str(backend_dir)) + from spot_scanner import run_spot_scan_and_cache + count = await run_spot_scan_and_cache(ttl_sec=900) + return {"success": True, "message": f"已扫描并缓存 {count} 条现货推荐", "count": count} + except Exception as e: + logger.error(f"现货扫描失败: {e}") + raise HTTPException(status_code=500, detail=f"现货扫描失败: {str(e)}") + + @router.get("/active") async def get_active_recommendations(): """ diff --git a/backend/spot_scanner.py b/backend/spot_scanner.py new file mode 100644 index 0000000..3359899 --- /dev/null +++ b/backend/spot_scanner.py @@ -0,0 +1,280 @@ +""" +现货推荐扫描:拉取币安现货行情,仅做多信号,写入 Redis 供 /api/recommendations/spot 使用。 +使用公开 API,无需 API Key。定时任务调用 run_spot_scan_and_cache()。 +""" +import asyncio +import json +import logging +import os +import sys +import time +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import aiohttp + +# 可选的 Redis 写入(与 recommendations 路由共用连接方式) +try: + import redis.asyncio as redis_async +except Exception: + redis_async = None + +logger = logging.getLogger(__name__) + +BINANCE_SPOT_BASE = "https://api.binance.com" +SPOT_KLINES_LIMIT = 60 +SPOT_TOP_N = 80 +SPOT_MIN_STRENGTH = 4 +SPOT_MAX_RECS = 30 + + +def _beijing_now_iso() -> str: + from datetime import timedelta + return datetime.now(tz=timezone(timedelta(hours=8))).isoformat() + + +async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None) -> Optional[Any]: + try: + async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=15)) as resp: + if resp.status != 200: + return None + return await resp.json() + except Exception as e: + logger.warning("spot_scanner _http_get %s: %s", url[:60], e) + return None + + +def _technical_indicators(): + """延迟导入 trading_system.indicators,避免 backend 强依赖 trading_system 路径。""" + project_root = __import__("pathlib").Path(__file__).resolve().parent.parent + trading_system = project_root / "trading_system" + if str(trading_system) not in sys.path: + sys.path.insert(0, str(trading_system)) + try: + from indicators import TechnicalIndicators + return TechnicalIndicators + except ImportError: + from trading_system.indicators import TechnicalIndicators + return TechnicalIndicators + + +async def _fetch_spot_symbols(session: aiohttp.ClientSession) -> List[str]: + """获取所有 USDT 现货交易对(status=TRADING)。""" + data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/exchangeInfo") + if not data or "symbols" not in data: + return [] + symbols = [] + for s in data["symbols"]: + if s.get("status") != "TRADING": + continue + if s.get("quoteAsset") != "USDT": + continue + sym = s.get("symbol") + if sym: + symbols.append(sym) + return symbols + + +async def _fetch_spot_ticker_24h(session: aiohttp.ClientSession) -> List[Dict]: + """获取 24h ticker,返回 list of dict (symbol, lastPrice, priceChangePercent, volume, ...)。""" + data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr") + if not isinstance(data, list): + return [] + return data + + +async def _fetch_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 60) -> Optional[List[List]]: + """现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。""" + data = await _http_get( + session, + f"{BINANCE_SPOT_BASE}/api/v3/klines", + {"symbol": symbol, "interval": interval, "limit": limit}, + ) + return data if isinstance(data, list) else None + + +def _compute_spot_signal(klines: List[List], ticker: Dict, TechnicalIndicators) -> Optional[Dict]: + """ + 基于 K 线计算只做多信号。返回 None 或 { direction: 'BUY', strength: int, ... }。 + """ + if not klines or len(klines) < 50: + return None + closes = [float(k[4]) for k in klines] + highs = [float(k[2]) for k in klines] + lows = [float(k[3]) for k in klines] + current_price = closes[-1] + + rsi = TechnicalIndicators.calculate_rsi(closes, period=14) + macd = TechnicalIndicators.calculate_macd(closes) + bollinger = TechnicalIndicators.calculate_bollinger_bands(closes, period=20) + ema20 = TechnicalIndicators.calculate_ema(closes, period=20) + ema50 = TechnicalIndicators.calculate_ema(closes, period=50) + + strength = 0 + # 只做多:RSI 超卖、价格在下轨附近、MACD 金叉、价格在均线上方等 + if rsi is not None and rsi < 35: + strength += 3 + elif rsi is not None and rsi < 50: + strength += 1 + if bollinger and current_price <= bollinger["lower"] * 1.002: + strength += 3 + elif bollinger and current_price < bollinger["middle"]: + strength += 1 + if macd and macd["histogram"] > 0 and macd["macd"] > macd["signal"]: + strength += 2 + if ema20 and ema50 and current_price > ema20 > ema50: + strength += 2 + elif ema20 and current_price > ema20: + strength += 1 + + strength = max(0, min(strength, 10)) + if strength < SPOT_MIN_STRENGTH: + return None + return { + "direction": "BUY", + "strength": strength, + "rsi": rsi, + "current_price": current_price, + } + + +def _build_spot_recommendation( + symbol: str, + ticker: Dict, + signal: Dict, +) -> Dict[str, Any]: + """构造单条现货推荐(与合约推荐结构兼容,便于前端复用)。""" + current_price = float(ticker.get("lastPrice") or signal.get("current_price") or 0) + change_percent = float(ticker.get("priceChangePercent") or 0) + ts = time.time() + entry = current_price * 0.995 + stop_pct = 0.05 + tp1_pct = 0.08 + tp2_pct = 0.15 + if current_price <= 0: + return None + stop_loss = entry * (1 - stop_pct) + tp1 = entry * (1 + tp1_pct) + tp2 = entry * (1 + tp2_pct) + + return { + "symbol": symbol, + "direction": "BUY", + "market": "spot", + "current_price": current_price, + "signal_strength": signal.get("strength", 0), + "change_percent": change_percent, + "suggested_limit_price": entry, + "planned_entry_price": entry, + "suggested_stop_loss": stop_loss, + "suggested_take_profit_1": tp1, + "suggested_take_profit_2": tp2, + "suggested_position_percent": 0.05, + "recommendation_time": _beijing_now_iso(), + "timestamp": ts, + "recommendation_reason": "现货做多信号(RSI/布林带/MACD/均线)", + "user_guide": f"现货建议在 {entry:.4f} USDT 附近买入,止损 {stop_loss:.4f},目标1 {tp1:.4f},目标2 {tp2:.4f}。仅供参考,请自行判断。", + } + + +async def run_spot_scan() -> List[Dict[str, Any]]: + """执行一次现货扫描,返回推荐列表(不写 Redis)。""" + TechnicalIndicators = _technical_indicators() + recommendations = [] + + async with aiohttp.ClientSession() as session: + symbols = await _fetch_spot_symbols(session) + if not symbols: + logger.warning("spot_scanner: 未获取到现货交易对") + return [] + + tickers = await _fetch_spot_ticker_24h(session) + ticker_map = {t["symbol"]: t for t in tickers if isinstance(t.get("symbol"), str)} + + # 按 24h 成交量排序,取前 SPOT_TOP_N 再按涨跌幅取部分 + def volume_key(t): + try: + return float(t.get("volume") or 0) * float(t.get("lastPrice") or 0) + except Exception: + return 0 + + sorted_tickers = sorted( + [t for t in tickers if t.get("symbol") in symbols], + key=volume_key, + reverse=True, + )[: SPOT_TOP_N * 2] + + # 按涨跌幅取前 N 个(偏强势或超跌反弹) + with_change = [(t, float(t.get("priceChangePercent") or 0)) for t in sorted_tickers] + with_change.sort(key=lambda x: -abs(x[1])) + to_scan = [t[0]["symbol"] for t in with_change[: SPOT_TOP_N]] + + for symbol in to_scan: + try: + klines = await _fetch_spot_klines(session, symbol, "15m", SPOT_KLINES_LIMIT) + ticker = ticker_map.get(symbol, {}) + if not klines or not ticker: + continue + signal = _compute_spot_signal(klines, ticker, TechnicalIndicators) + if not signal: + continue + rec = _build_spot_recommendation(symbol, ticker, signal) + if rec: + recommendations.append(rec) + if len(recommendations) >= SPOT_MAX_RECS: + break + except Exception as e: + logger.debug("spot_scanner %s: %s", symbol, e) + await asyncio.sleep(0.05) + + recommendations.sort(key=lambda x: x.get("signal_strength", 0), reverse=True) + return recommendations[: SPOT_MAX_RECS] + + +def _redis_connection_kwargs(): + redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" + kwargs = {"decode_responses": True} + if os.getenv("REDIS_USERNAME"): + kwargs["username"] = os.getenv("REDIS_USERNAME") + if os.getenv("REDIS_PASSWORD"): + kwargs["password"] = os.getenv("REDIS_PASSWORD") + if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true": + if redis_url.startswith("redis://"): + redis_url = redis_url.replace("redis://", "rediss://", 1) + kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required")) + if os.getenv("REDIS_SSL_CA_CERTS"): + kwargs["ssl_ca_certs"] = os.getenv("REDIS_SSL_CA_CERTS") + return redis_url, kwargs + + +async def run_spot_scan_and_cache(ttl_sec: int = 900) -> int: + """ + 执行现货扫描并写入 Redis。返回写入的推荐数量。 + Redis key: recommendations:spot:snapshot + """ + items = await run_spot_scan() + now_ms = int(time.time() * 1000) + payload = { + "items": items, + "generated_at": _beijing_now_iso(), + "generated_at_ms": now_ms, + "ttl_sec": ttl_sec, + "count": len(items), + } + + if redis_async is None: + logger.warning("spot_scanner: redis 不可用,跳过写入") + return len(items) + + redis_url, kwargs = _redis_connection_kwargs() + try: + client = redis_async.from_url(redis_url, **kwargs) + await client.ping() + key = "recommendations:spot:snapshot" + await client.setex(key, ttl_sec, json.dumps(payload, ensure_ascii=False)) + logger.info("spot_scanner: 已写入 %d 条现货推荐到 %s", len(items), key) + await client.aclose() + return len(items) + except Exception as e: + logger.warning("spot_scanner: Redis 写入失败 %s", e) + return len(items) diff --git a/recommendations-viewer/src/components/RecommendationsViewer.css b/recommendations-viewer/src/components/RecommendationsViewer.css index b9c3b87..6ec1050 100644 --- a/recommendations-viewer/src/components/RecommendationsViewer.css +++ b/recommendations-viewer/src/components/RecommendationsViewer.css @@ -73,6 +73,41 @@ cursor: pointer; } +.market-type-tabs { + display: flex; + gap: 0; + margin-bottom: 12px; + border-bottom: 1px solid #e0e0e0; +} + +.market-type-tabs .tab { + padding: 10px 20px; + border: none; + background: transparent; + font-size: 15px; + color: #666; + cursor: pointer; + border-bottom: 2px solid transparent; + margin-bottom: -1px; + transition: color 0.2s, border-color 0.2s; +} + +.market-type-tabs .tab:hover { + color: #333; +} + +.market-type-tabs .tab.active { + color: #2196F3; + font-weight: 500; + border-bottom-color: #2196F3; +} + +.spot-hint { + margin: 0 0 16px 0; + font-size: 13px; + color: #666; +} + .error-message { padding: 12px; background-color: #ffebee; diff --git a/recommendations-viewer/src/components/RecommendationsViewer.jsx b/recommendations-viewer/src/components/RecommendationsViewer.jsx index 5108750..8dc7a74 100644 --- a/recommendations-viewer/src/components/RecommendationsViewer.jsx +++ b/recommendations-viewer/src/components/RecommendationsViewer.jsx @@ -10,86 +10,23 @@ function RecommendationsViewer() { const [error, setError] = useState(null); const [directionFilter, setDirectionFilter] = useState(''); const [showDetails, setShowDetails] = useState({}); - - useEffect(() => { - loadRecommendations(); - - // 每10秒静默更新价格(不触发loading状态) - const interval = setInterval(async () => { - try { - const result = await api.getRecommendations({ - type: 'realtime', - direction: directionFilter, - limit: 50, - min_signal_strength: 5 - }); - const newData = result.data || []; - - // 使用setState直接更新,不触发loading状态 - setRecommendations(prevRecommendations => { - if (newData.length === 0) { - return prevRecommendations; - } - - // 实时推荐没有id,使用symbol作为key - const newDataMap = new Map(newData.map(rec => [rec.symbol, rec])); - const prevMap = new Map(prevRecommendations.map(rec => [rec.symbol || rec.id, rec])); - - // 合并数据:优先使用新数据(包含实时价格更新) - const updated = prevRecommendations.map(prevRec => { - const key = prevRec.symbol || prevRec.id; - const newRec = newDataMap.get(key); - if (newRec) { - return newRec; - } - return prevRec; - }); - - // 添加新出现的推荐 - const newItems = newData.filter(newRec => !prevMap.has(newRec.symbol)); - - // 合并并去重(按symbol) - const merged = [...updated, ...newItems]; - const uniqueMap = new Map(); - merged.forEach(rec => { - const key = rec.symbol || rec.id; - if (!uniqueMap.has(key)) { - uniqueMap.set(key, rec); - } - }); - - return Array.from(uniqueMap.values()); - }); - } catch (err) { - // 静默失败,不显示错误 - console.debug('静默更新价格失败:', err); - } - }, 10000); // 每10秒刷新 - - return () => { - clearInterval(interval); - }; - }, [directionFilter]); + const [marketType, setMarketType] = useState('futures'); // 'futures' | 'spot' const loadRecommendations = async () => { try { setLoading(true); setError(null); - - const params = { - type: 'realtime', - limit: 50, - min_signal_strength: 5 - }; - - if (directionFilter) { - params.direction = directionFilter; + if (marketType === 'spot') { + const result = await api.getSpotRecommendations({ limit: 50 }); + const data = result.data || []; + setRecommendations(data); + } else { + const params = { type: 'realtime', limit: 50, min_signal_strength: 5 }; + if (directionFilter) params.direction = directionFilter; + const result = await api.getRecommendations(params); + const data = result.data || []; + setRecommendations(data); } - - const result = await api.getRecommendations(params); - const data = result.data || []; - - setRecommendations(data); } catch (err) { setError(err.message); console.error('加载推荐失败:', err); @@ -98,6 +35,58 @@ function RecommendationsViewer() { } }; + useEffect(() => { + loadRecommendations(); + }, [marketType]); + + useEffect(() => { + if (marketType !== 'futures') return; + // 合约:每10秒静默更新价格(不触发loading状态) + const interval = setInterval(async () => { + try { + const result = await api.getRecommendations({ + type: 'realtime', + direction: directionFilter, + limit: 50, + min_signal_strength: 5 + }); + const newData = result.data || []; + if (newData.length === 0) return; + const newDataMap = new Map(newData.map(rec => [rec.symbol, rec])); + setRecommendations(prev => { + const prevKeys = new Set(prev.map(rec => rec.symbol || rec.id)); + const updated = prev.map(prevRec => { + const key = prevRec.symbol || prevRec.id; + return newDataMap.get(key) || prevRec; + }); + const newItems = newData.filter(newRec => !prevKeys.has(newRec.symbol)); + const merged = [...updated, ...newItems]; + const uniqueMap = new Map(); + merged.forEach(rec => { uniqueMap.set(rec.symbol || rec.id, rec); }); + return Array.from(uniqueMap.values()); + }); + } catch (err) { + console.debug('静默更新价格失败:', err); + } + }, 10000); + return () => clearInterval(interval); + }, [marketType, directionFilter]); + + // 现货:每 30 秒静默刷新列表 + useEffect(() => { + if (marketType !== 'spot') return; + const interval = setInterval(async () => { + try { + const result = await api.getSpotRecommendations({ limit: 50 }); + const data = result.data || []; + if (data.length) setRecommendations(data); + } catch (err) { + console.debug('静默更新现货推荐失败:', err); + } + }, 30000); + return () => clearInterval(interval); + }, [marketType]); + const toggleDetails = (key) => { setShowDetails(prev => ({ ...prev, @@ -204,7 +193,9 @@ function RecommendationsViewer() {

交易推荐

- 每10秒自动更新 + + {marketType === 'spot' ? '现货推荐每30秒更新' : '合约每10秒自动更新'} +
-
- + 合约 + +
+ {marketType === 'spot' && ( +

现货推荐(只做多),数据来自定时扫描,仅供参考。

+ )} + + {marketType === 'futures' && ( +
+ +
+ )} {error && (
diff --git a/recommendations-viewer/src/services/api.js b/recommendations-viewer/src/services/api.js index 985e73e..398d466 100644 --- a/recommendations-viewer/src/services/api.js +++ b/recommendations-viewer/src/services/api.js @@ -13,7 +13,7 @@ const buildUrl = (path) => { }; export const api = { - // 获取实时推荐 + // 获取实时推荐(合约) getRecommendations: async (params = {}) => { // 默认使用实时推荐 if (!params.type) { @@ -27,5 +27,17 @@ export const api = { throw new Error(error.detail || '获取推荐失败'); } return response.json(); + }, + + // 获取现货推荐 + getSpotRecommendations: async (params = {}) => { + const query = new URLSearchParams(params).toString(); + const url = query ? `${buildUrl('/api/recommendations/spot')}?${query}` : buildUrl('/api/recommendations/spot'); + const response = await fetch(url); + if (!response.ok) { + const error = await response.json().catch(() => ({ detail: '获取现货推荐失败' })); + throw new Error(error.detail || '获取现货推荐失败'); + } + return response.json(); } }; diff --git a/trading_system/spot_scanner.py b/trading_system/spot_scanner.py new file mode 100644 index 0000000..601cb31 --- /dev/null +++ b/trading_system/spot_scanner.py @@ -0,0 +1,241 @@ +""" +现货推荐扫描:使用币安现货公开 API,只做多信号,供定时任务写入 Redis。 +不依赖合约 API 和 API Key,仅使用 api.binance.com 公开接口。 +""" +import asyncio +import logging +import time +from datetime import datetime, timezone +from typing import List, Dict, Any, Optional + +import aiohttp + +try: + from .indicators import TechnicalIndicators +except ImportError: + from indicators import TechnicalIndicators + +logger = logging.getLogger(__name__) + +BINANCE_SPOT_BASE = "https://api.binance.com" +BEIJING_TZ = timezone(offset=__import__("datetime").timedelta(hours=8)) + +# 默认只扫描成交量靠前的现货 USDT 对数量 +DEFAULT_TOP_N = 80 +# 每个 symbol 的 K 线请求间隔,避免触发限频 +KLINES_DELAY = 0.15 + + +async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None, timeout: int = 15) -> Any: + try: + async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=timeout)) as resp: + if resp.status != 200: + text = await resp.text() + logger.warning(f"现货 API 请求失败 {url} status={resp.status} body={text[:200]}") + return None + return await resp.json() + except Exception as e: + logger.warning(f"现货 API 请求异常 {url}: {e}") + return None + + +async def get_spot_tickers_24h(session: aiohttp.ClientSession) -> List[Dict]: + """获取现货 24h 行情,用于按成交量排序、取涨跌幅。""" + url = f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr" + data = await _http_get(session, url) + if not isinstance(data, list): + return [] + # 只保留 USDT 交易对 + out = [t for t in data if isinstance(t, dict) and str(t.get("symbol", "")).endswith("USDT")] + return out + + +async def get_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 100) -> Optional[List[List]]: + """获取现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。""" + url = f"{BINANCE_SPOT_BASE}/api/v3/klines" + params = {"symbol": symbol, "interval": interval, "limit": limit} + data = await _http_get(session, url, params) + if not isinstance(data, list) or len(data) < 20: + return None + return data + + +def _score_spot_long(ticker: Dict, klines: Optional[List], rsi: Optional[float], macd: Optional[Dict]) -> int: + """ + 现货做多评分 0-10。 + 考虑:24h 涨跌幅不过度下跌、RSI 非超买、MACD 偏多或金叉。 + """ + score = 5 # 中性起点 + change = 0.0 + try: + change = float(ticker.get("priceChangePercent", 0) or 0) + except (TypeError, ValueError): + pass + # 24h 涨跌幅:正加分,负减分 + if change >= 2: + score += 1 + elif change >= 0: + score += 0 + elif change >= -2: + score -= 0 + else: + score -= 1 + + if rsi is not None: + if rsi < 30: + score += 1 # 超卖反弹机会 + elif rsi < 45: + score += 0 + elif rsi > 70: + score -= 2 # 超买不推荐 + elif rsi > 55: + score -= 0 + + if macd and isinstance(macd, dict): + hist = macd.get("histogram") or 0 + if hist > 0: + score += 1 + elif hist < 0: + score -= 0 + + return max(0, min(10, int(score))) + + +async def run_spot_scan( + top_n: int = DEFAULT_TOP_N, + min_signal_strength: int = 5, + kline_interval: str = "15m", + kline_limit: int = 100, +) -> List[Dict[str, Any]]: + """ + 执行现货扫描,返回推荐列表(仅做多)。 + 按 24h 成交量取前 top_n 个 USDT 交易对,拉 K 线算 RSI/MACD,评分后过滤。 + """ + recommendations = [] + async with aiohttp.ClientSession() as session: + tickers = await get_spot_tickers_24h(session) + if not tickers: + logger.warning("现货 24h ticker 未获取到数据") + return [] + + # 按 quoteVolume 排序,取前 top_n + def vol_key(t): + try: + return float(t.get("quoteVolume") or 0) + except (TypeError, ValueError): + return 0.0 + + tickers.sort(key=vol_key, reverse=True) + symbols_to_scan = [t["symbol"] for t in tickers[: top_n * 2]] # 多取一些,后面按分数再筛 + + for i, ticker in enumerate(tickers[:top_n]): + symbol = ticker.get("symbol") + if not symbol or not str(symbol).endswith("USDT"): + continue + try: + klines = await get_spot_klines(session, symbol, kline_interval, kline_limit) + await asyncio.sleep(KLINES_DELAY) + except Exception as e: + logger.debug(f"获取 {symbol} K线失败: {e}") + klines = None + + if not klines or len(klines) < 30: + continue + + closes = [float(k[4]) for k in klines] + highs = [float(k[2]) for k in klines] + lows = [float(k[3]) for k in klines] + rsi = TechnicalIndicators.calculate_rsi(closes, period=14) + macd = TechnicalIndicators.calculate_macd(closes, 12, 26, 9) + score = _score_spot_long(ticker, klines, rsi, macd) + if score < min_signal_strength: + continue + + try: + price = float(ticker.get("lastPrice") or ticker.get("weightedAvgPrice") or closes[-1]) + except (TypeError, ValueError): + price = closes[-1] + try: + change_percent = float(ticker.get("priceChangePercent") or 0) + except (TypeError, ValueError): + change_percent = 0.0 + + recommendation_time = datetime.now(BEIJING_TZ).isoformat() + rec = { + "symbol": symbol, + "direction": "BUY", + "side": "BUY", + "signal_strength": score, + "current_price": price, + "change_percent": change_percent, + "recommendation_time": recommendation_time, + "market": "spot", + "recommendation_reason": f"现货做多信号 RSI={rsi:.1f if rsi else '-'} 24h涨跌{change_percent:+.2f}%", + "user_guide": f"现货建议在 {price:.4f} 附近分批买入,并自行设置止损止盈。", + "timestamp": time.time(), + } + recommendations.append(rec) + + # 按信号强度排序 + recommendations.sort(key=lambda x: (x.get("signal_strength", 0), -(x.get("change_percent") or 0)), reverse=True) + return recommendations[:50] # 最多返回 50 条 + + +async def run_spot_scan_and_cache_redis(ttl_sec: int = 3600) -> int: + """ + 执行现货扫描并将结果写入 Redis(key: recommendations:spot:snapshot)。 + 需要环境变量 REDIS_URL 等与后端一致。 + 返回写入的推荐条数。 + """ + import os + import json + + try: + import redis.asyncio as redis_async + except Exception: + logger.warning("redis.asyncio 不可用,无法写入现货推荐缓存") + return 0 + + redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" + kwargs = {"decode_responses": True} + if os.getenv("REDIS_USERNAME"): + kwargs["username"] = os.getenv("REDIS_USERNAME") + if os.getenv("REDIS_PASSWORD"): + kwargs["password"] = os.getenv("REDIS_PASSWORD") + if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true": + if not redis_url.startswith("rediss://"): + redis_url = redis_url.replace("redis://", "rediss://", 1) + kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required")) + + try: + rds = redis_async.from_url(redis_url, **kwargs) + await rds.ping() + except Exception as e: + logger.warning(f"Redis 连接失败: {e}") + return 0 + + try: + items = await run_spot_scan(top_n=DEFAULT_TOP_N, min_signal_strength=5) + now_ms = int(time.time() * 1000) + payload = { + "items": items, + "generated_at_ms": now_ms, + "generated_at": datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S"), + "ttl_sec": ttl_sec, + "market": "spot", + } + await rds.setex( + "recommendations:spot:snapshot", + ttl_sec, + json.dumps(payload, ensure_ascii=False), + ) + logger.info(f"现货推荐已写入 Redis,共 {len(items)} 条,TTL={ttl_sec}s") + await rds.aclose() + return len(items) + except Exception as e: + logger.exception(f"现货扫描或写入 Redis 失败: {e}") + try: + await rds.aclose() + except Exception: + pass + return 0