在后端API中新增现货推荐扫描功能,定时将数据写入Redis缓存,并提供相应的API接口以获取现货推荐。前端组件更新以支持现货推荐的展示与切换,提升用户体验与决策支持。此改动为用户提供了实时的现货推荐信息,增强了系统的功能性与灵活性。
242 lines
8.6 KiB
Python
242 lines
8.6 KiB
Python
"""
|
||
现货推荐扫描:使用币安现货公开 API,只做多信号,供定时任务写入 Redis。
|
||
不依赖合约 API 和 API Key,仅使用 api.binance.com 公开接口。
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import List, Dict, Any, Optional
|
||
|
||
import aiohttp
|
||
|
||
try:
|
||
from .indicators import TechnicalIndicators
|
||
except ImportError:
|
||
from indicators import TechnicalIndicators
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
BINANCE_SPOT_BASE = "https://api.binance.com"
|
||
BEIJING_TZ = timezone(offset=__import__("datetime").timedelta(hours=8))
|
||
|
||
# 默认只扫描成交量靠前的现货 USDT 对数量
|
||
DEFAULT_TOP_N = 80
|
||
# 每个 symbol 的 K 线请求间隔,避免触发限频
|
||
KLINES_DELAY = 0.15
|
||
|
||
|
||
async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None, timeout: int = 15) -> Any:
|
||
try:
|
||
async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
|
||
if resp.status != 200:
|
||
text = await resp.text()
|
||
logger.warning(f"现货 API 请求失败 {url} status={resp.status} body={text[:200]}")
|
||
return None
|
||
return await resp.json()
|
||
except Exception as e:
|
||
logger.warning(f"现货 API 请求异常 {url}: {e}")
|
||
return None
|
||
|
||
|
||
async def get_spot_tickers_24h(session: aiohttp.ClientSession) -> List[Dict]:
|
||
"""获取现货 24h 行情,用于按成交量排序、取涨跌幅。"""
|
||
url = f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr"
|
||
data = await _http_get(session, url)
|
||
if not isinstance(data, list):
|
||
return []
|
||
# 只保留 USDT 交易对
|
||
out = [t for t in data if isinstance(t, dict) and str(t.get("symbol", "")).endswith("USDT")]
|
||
return out
|
||
|
||
|
||
async def get_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 100) -> Optional[List[List]]:
|
||
"""获取现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。"""
|
||
url = f"{BINANCE_SPOT_BASE}/api/v3/klines"
|
||
params = {"symbol": symbol, "interval": interval, "limit": limit}
|
||
data = await _http_get(session, url, params)
|
||
if not isinstance(data, list) or len(data) < 20:
|
||
return None
|
||
return data
|
||
|
||
|
||
def _score_spot_long(ticker: Dict, klines: Optional[List], rsi: Optional[float], macd: Optional[Dict]) -> int:
|
||
"""
|
||
现货做多评分 0-10。
|
||
考虑:24h 涨跌幅不过度下跌、RSI 非超买、MACD 偏多或金叉。
|
||
"""
|
||
score = 5 # 中性起点
|
||
change = 0.0
|
||
try:
|
||
change = float(ticker.get("priceChangePercent", 0) or 0)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
# 24h 涨跌幅:正加分,负减分
|
||
if change >= 2:
|
||
score += 1
|
||
elif change >= 0:
|
||
score += 0
|
||
elif change >= -2:
|
||
score -= 0
|
||
else:
|
||
score -= 1
|
||
|
||
if rsi is not None:
|
||
if rsi < 30:
|
||
score += 1 # 超卖反弹机会
|
||
elif rsi < 45:
|
||
score += 0
|
||
elif rsi > 70:
|
||
score -= 2 # 超买不推荐
|
||
elif rsi > 55:
|
||
score -= 0
|
||
|
||
if macd and isinstance(macd, dict):
|
||
hist = macd.get("histogram") or 0
|
||
if hist > 0:
|
||
score += 1
|
||
elif hist < 0:
|
||
score -= 0
|
||
|
||
return max(0, min(10, int(score)))
|
||
|
||
|
||
async def run_spot_scan(
|
||
top_n: int = DEFAULT_TOP_N,
|
||
min_signal_strength: int = 5,
|
||
kline_interval: str = "15m",
|
||
kline_limit: int = 100,
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
执行现货扫描,返回推荐列表(仅做多)。
|
||
按 24h 成交量取前 top_n 个 USDT 交易对,拉 K 线算 RSI/MACD,评分后过滤。
|
||
"""
|
||
recommendations = []
|
||
async with aiohttp.ClientSession() as session:
|
||
tickers = await get_spot_tickers_24h(session)
|
||
if not tickers:
|
||
logger.warning("现货 24h ticker 未获取到数据")
|
||
return []
|
||
|
||
# 按 quoteVolume 排序,取前 top_n
|
||
def vol_key(t):
|
||
try:
|
||
return float(t.get("quoteVolume") or 0)
|
||
except (TypeError, ValueError):
|
||
return 0.0
|
||
|
||
tickers.sort(key=vol_key, reverse=True)
|
||
symbols_to_scan = [t["symbol"] for t in tickers[: top_n * 2]] # 多取一些,后面按分数再筛
|
||
|
||
for i, ticker in enumerate(tickers[:top_n]):
|
||
symbol = ticker.get("symbol")
|
||
if not symbol or not str(symbol).endswith("USDT"):
|
||
continue
|
||
try:
|
||
klines = await get_spot_klines(session, symbol, kline_interval, kline_limit)
|
||
await asyncio.sleep(KLINES_DELAY)
|
||
except Exception as e:
|
||
logger.debug(f"获取 {symbol} K线失败: {e}")
|
||
klines = None
|
||
|
||
if not klines or len(klines) < 30:
|
||
continue
|
||
|
||
closes = [float(k[4]) for k in klines]
|
||
highs = [float(k[2]) for k in klines]
|
||
lows = [float(k[3]) for k in klines]
|
||
rsi = TechnicalIndicators.calculate_rsi(closes, period=14)
|
||
macd = TechnicalIndicators.calculate_macd(closes, 12, 26, 9)
|
||
score = _score_spot_long(ticker, klines, rsi, macd)
|
||
if score < min_signal_strength:
|
||
continue
|
||
|
||
try:
|
||
price = float(ticker.get("lastPrice") or ticker.get("weightedAvgPrice") or closes[-1])
|
||
except (TypeError, ValueError):
|
||
price = closes[-1]
|
||
try:
|
||
change_percent = float(ticker.get("priceChangePercent") or 0)
|
||
except (TypeError, ValueError):
|
||
change_percent = 0.0
|
||
|
||
recommendation_time = datetime.now(BEIJING_TZ).isoformat()
|
||
rec = {
|
||
"symbol": symbol,
|
||
"direction": "BUY",
|
||
"side": "BUY",
|
||
"signal_strength": score,
|
||
"current_price": price,
|
||
"change_percent": change_percent,
|
||
"recommendation_time": recommendation_time,
|
||
"market": "spot",
|
||
"recommendation_reason": f"现货做多信号 RSI={rsi:.1f if rsi else '-'} 24h涨跌{change_percent:+.2f}%",
|
||
"user_guide": f"现货建议在 {price:.4f} 附近分批买入,并自行设置止损止盈。",
|
||
"timestamp": time.time(),
|
||
}
|
||
recommendations.append(rec)
|
||
|
||
# 按信号强度排序
|
||
recommendations.sort(key=lambda x: (x.get("signal_strength", 0), -(x.get("change_percent") or 0)), reverse=True)
|
||
return recommendations[:50] # 最多返回 50 条
|
||
|
||
|
||
async def run_spot_scan_and_cache_redis(ttl_sec: int = 3600) -> int:
|
||
"""
|
||
执行现货扫描并将结果写入 Redis(key: recommendations:spot:snapshot)。
|
||
需要环境变量 REDIS_URL 等与后端一致。
|
||
返回写入的推荐条数。
|
||
"""
|
||
import os
|
||
import json
|
||
|
||
try:
|
||
import redis.asyncio as redis_async
|
||
except Exception:
|
||
logger.warning("redis.asyncio 不可用,无法写入现货推荐缓存")
|
||
return 0
|
||
|
||
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
|
||
kwargs = {"decode_responses": True}
|
||
if os.getenv("REDIS_USERNAME"):
|
||
kwargs["username"] = os.getenv("REDIS_USERNAME")
|
||
if os.getenv("REDIS_PASSWORD"):
|
||
kwargs["password"] = os.getenv("REDIS_PASSWORD")
|
||
if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true":
|
||
if not redis_url.startswith("rediss://"):
|
||
redis_url = redis_url.replace("redis://", "rediss://", 1)
|
||
kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required"))
|
||
|
||
try:
|
||
rds = redis_async.from_url(redis_url, **kwargs)
|
||
await rds.ping()
|
||
except Exception as e:
|
||
logger.warning(f"Redis 连接失败: {e}")
|
||
return 0
|
||
|
||
try:
|
||
items = await run_spot_scan(top_n=DEFAULT_TOP_N, min_signal_strength=5)
|
||
now_ms = int(time.time() * 1000)
|
||
payload = {
|
||
"items": items,
|
||
"generated_at_ms": now_ms,
|
||
"generated_at": datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S"),
|
||
"ttl_sec": ttl_sec,
|
||
"market": "spot",
|
||
}
|
||
await rds.setex(
|
||
"recommendations:spot:snapshot",
|
||
ttl_sec,
|
||
json.dumps(payload, ensure_ascii=False),
|
||
)
|
||
logger.info(f"现货推荐已写入 Redis,共 {len(items)} 条,TTL={ttl_sec}s")
|
||
await rds.aclose()
|
||
return len(items)
|
||
except Exception as e:
|
||
logger.exception(f"现货扫描或写入 Redis 失败: {e}")
|
||
try:
|
||
await rds.aclose()
|
||
except Exception:
|
||
pass
|
||
return 0
|