在后端API中新增现货推荐扫描功能,定时将数据写入Redis缓存,并提供相应的API接口以获取现货推荐。前端组件更新以支持现货推荐的展示与切换,提升用户体验与决策支持。此改动为用户提供了实时的现货推荐信息,增强了系统的功能性与灵活性。
281 lines
10 KiB
Python
281 lines
10 KiB
Python
"""
|
||
现货推荐扫描:拉取币安现货行情,仅做多信号,写入 Redis 供 /api/recommendations/spot 使用。
|
||
使用公开 API,无需 API Key。定时任务调用 run_spot_scan_and_cache()。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import aiohttp
|
||
|
||
# 可选的 Redis 写入(与 recommendations 路由共用连接方式)
|
||
try:
|
||
import redis.asyncio as redis_async
|
||
except Exception:
|
||
redis_async = None
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
BINANCE_SPOT_BASE = "https://api.binance.com"
|
||
SPOT_KLINES_LIMIT = 60
|
||
SPOT_TOP_N = 80
|
||
SPOT_MIN_STRENGTH = 4
|
||
SPOT_MAX_RECS = 30
|
||
|
||
|
||
def _beijing_now_iso() -> str:
|
||
from datetime import timedelta
|
||
return datetime.now(tz=timezone(timedelta(hours=8))).isoformat()
|
||
|
||
|
||
async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None) -> Optional[Any]:
|
||
try:
|
||
async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
|
||
if resp.status != 200:
|
||
return None
|
||
return await resp.json()
|
||
except Exception as e:
|
||
logger.warning("spot_scanner _http_get %s: %s", url[:60], e)
|
||
return None
|
||
|
||
|
||
def _technical_indicators():
|
||
"""延迟导入 trading_system.indicators,避免 backend 强依赖 trading_system 路径。"""
|
||
project_root = __import__("pathlib").Path(__file__).resolve().parent.parent
|
||
trading_system = project_root / "trading_system"
|
||
if str(trading_system) not in sys.path:
|
||
sys.path.insert(0, str(trading_system))
|
||
try:
|
||
from indicators import TechnicalIndicators
|
||
return TechnicalIndicators
|
||
except ImportError:
|
||
from trading_system.indicators import TechnicalIndicators
|
||
return TechnicalIndicators
|
||
|
||
|
||
async def _fetch_spot_symbols(session: aiohttp.ClientSession) -> List[str]:
|
||
"""获取所有 USDT 现货交易对(status=TRADING)。"""
|
||
data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/exchangeInfo")
|
||
if not data or "symbols" not in data:
|
||
return []
|
||
symbols = []
|
||
for s in data["symbols"]:
|
||
if s.get("status") != "TRADING":
|
||
continue
|
||
if s.get("quoteAsset") != "USDT":
|
||
continue
|
||
sym = s.get("symbol")
|
||
if sym:
|
||
symbols.append(sym)
|
||
return symbols
|
||
|
||
|
||
async def _fetch_spot_ticker_24h(session: aiohttp.ClientSession) -> List[Dict]:
|
||
"""获取 24h ticker,返回 list of dict (symbol, lastPrice, priceChangePercent, volume, ...)。"""
|
||
data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr")
|
||
if not isinstance(data, list):
|
||
return []
|
||
return data
|
||
|
||
|
||
async def _fetch_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 60) -> Optional[List[List]]:
|
||
"""现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。"""
|
||
data = await _http_get(
|
||
session,
|
||
f"{BINANCE_SPOT_BASE}/api/v3/klines",
|
||
{"symbol": symbol, "interval": interval, "limit": limit},
|
||
)
|
||
return data if isinstance(data, list) else None
|
||
|
||
|
||
def _compute_spot_signal(klines: List[List], ticker: Dict, TechnicalIndicators) -> Optional[Dict]:
|
||
"""
|
||
基于 K 线计算只做多信号。返回 None 或 { direction: 'BUY', strength: int, ... }。
|
||
"""
|
||
if not klines or len(klines) < 50:
|
||
return None
|
||
closes = [float(k[4]) for k in klines]
|
||
highs = [float(k[2]) for k in klines]
|
||
lows = [float(k[3]) for k in klines]
|
||
current_price = closes[-1]
|
||
|
||
rsi = TechnicalIndicators.calculate_rsi(closes, period=14)
|
||
macd = TechnicalIndicators.calculate_macd(closes)
|
||
bollinger = TechnicalIndicators.calculate_bollinger_bands(closes, period=20)
|
||
ema20 = TechnicalIndicators.calculate_ema(closes, period=20)
|
||
ema50 = TechnicalIndicators.calculate_ema(closes, period=50)
|
||
|
||
strength = 0
|
||
# 只做多:RSI 超卖、价格在下轨附近、MACD 金叉、价格在均线上方等
|
||
if rsi is not None and rsi < 35:
|
||
strength += 3
|
||
elif rsi is not None and rsi < 50:
|
||
strength += 1
|
||
if bollinger and current_price <= bollinger["lower"] * 1.002:
|
||
strength += 3
|
||
elif bollinger and current_price < bollinger["middle"]:
|
||
strength += 1
|
||
if macd and macd["histogram"] > 0 and macd["macd"] > macd["signal"]:
|
||
strength += 2
|
||
if ema20 and ema50 and current_price > ema20 > ema50:
|
||
strength += 2
|
||
elif ema20 and current_price > ema20:
|
||
strength += 1
|
||
|
||
strength = max(0, min(strength, 10))
|
||
if strength < SPOT_MIN_STRENGTH:
|
||
return None
|
||
return {
|
||
"direction": "BUY",
|
||
"strength": strength,
|
||
"rsi": rsi,
|
||
"current_price": current_price,
|
||
}
|
||
|
||
|
||
def _build_spot_recommendation(
|
||
symbol: str,
|
||
ticker: Dict,
|
||
signal: Dict,
|
||
) -> Dict[str, Any]:
|
||
"""构造单条现货推荐(与合约推荐结构兼容,便于前端复用)。"""
|
||
current_price = float(ticker.get("lastPrice") or signal.get("current_price") or 0)
|
||
change_percent = float(ticker.get("priceChangePercent") or 0)
|
||
ts = time.time()
|
||
entry = current_price * 0.995
|
||
stop_pct = 0.05
|
||
tp1_pct = 0.08
|
||
tp2_pct = 0.15
|
||
if current_price <= 0:
|
||
return None
|
||
stop_loss = entry * (1 - stop_pct)
|
||
tp1 = entry * (1 + tp1_pct)
|
||
tp2 = entry * (1 + tp2_pct)
|
||
|
||
return {
|
||
"symbol": symbol,
|
||
"direction": "BUY",
|
||
"market": "spot",
|
||
"current_price": current_price,
|
||
"signal_strength": signal.get("strength", 0),
|
||
"change_percent": change_percent,
|
||
"suggested_limit_price": entry,
|
||
"planned_entry_price": entry,
|
||
"suggested_stop_loss": stop_loss,
|
||
"suggested_take_profit_1": tp1,
|
||
"suggested_take_profit_2": tp2,
|
||
"suggested_position_percent": 0.05,
|
||
"recommendation_time": _beijing_now_iso(),
|
||
"timestamp": ts,
|
||
"recommendation_reason": "现货做多信号(RSI/布林带/MACD/均线)",
|
||
"user_guide": f"现货建议在 {entry:.4f} USDT 附近买入,止损 {stop_loss:.4f},目标1 {tp1:.4f},目标2 {tp2:.4f}。仅供参考,请自行判断。",
|
||
}
|
||
|
||
|
||
async def run_spot_scan() -> List[Dict[str, Any]]:
|
||
"""执行一次现货扫描,返回推荐列表(不写 Redis)。"""
|
||
TechnicalIndicators = _technical_indicators()
|
||
recommendations = []
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
symbols = await _fetch_spot_symbols(session)
|
||
if not symbols:
|
||
logger.warning("spot_scanner: 未获取到现货交易对")
|
||
return []
|
||
|
||
tickers = await _fetch_spot_ticker_24h(session)
|
||
ticker_map = {t["symbol"]: t for t in tickers if isinstance(t.get("symbol"), str)}
|
||
|
||
# 按 24h 成交量排序,取前 SPOT_TOP_N 再按涨跌幅取部分
|
||
def volume_key(t):
|
||
try:
|
||
return float(t.get("volume") or 0) * float(t.get("lastPrice") or 0)
|
||
except Exception:
|
||
return 0
|
||
|
||
sorted_tickers = sorted(
|
||
[t for t in tickers if t.get("symbol") in symbols],
|
||
key=volume_key,
|
||
reverse=True,
|
||
)[: SPOT_TOP_N * 2]
|
||
|
||
# 按涨跌幅取前 N 个(偏强势或超跌反弹)
|
||
with_change = [(t, float(t.get("priceChangePercent") or 0)) for t in sorted_tickers]
|
||
with_change.sort(key=lambda x: -abs(x[1]))
|
||
to_scan = [t[0]["symbol"] for t in with_change[: SPOT_TOP_N]]
|
||
|
||
for symbol in to_scan:
|
||
try:
|
||
klines = await _fetch_spot_klines(session, symbol, "15m", SPOT_KLINES_LIMIT)
|
||
ticker = ticker_map.get(symbol, {})
|
||
if not klines or not ticker:
|
||
continue
|
||
signal = _compute_spot_signal(klines, ticker, TechnicalIndicators)
|
||
if not signal:
|
||
continue
|
||
rec = _build_spot_recommendation(symbol, ticker, signal)
|
||
if rec:
|
||
recommendations.append(rec)
|
||
if len(recommendations) >= SPOT_MAX_RECS:
|
||
break
|
||
except Exception as e:
|
||
logger.debug("spot_scanner %s: %s", symbol, e)
|
||
await asyncio.sleep(0.05)
|
||
|
||
recommendations.sort(key=lambda x: x.get("signal_strength", 0), reverse=True)
|
||
return recommendations[: SPOT_MAX_RECS]
|
||
|
||
|
||
def _redis_connection_kwargs():
|
||
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
|
||
kwargs = {"decode_responses": True}
|
||
if os.getenv("REDIS_USERNAME"):
|
||
kwargs["username"] = os.getenv("REDIS_USERNAME")
|
||
if os.getenv("REDIS_PASSWORD"):
|
||
kwargs["password"] = os.getenv("REDIS_PASSWORD")
|
||
if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true":
|
||
if redis_url.startswith("redis://"):
|
||
redis_url = redis_url.replace("redis://", "rediss://", 1)
|
||
kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required"))
|
||
if os.getenv("REDIS_SSL_CA_CERTS"):
|
||
kwargs["ssl_ca_certs"] = os.getenv("REDIS_SSL_CA_CERTS")
|
||
return redis_url, kwargs
|
||
|
||
|
||
async def run_spot_scan_and_cache(ttl_sec: int = 900) -> int:
|
||
"""
|
||
执行现货扫描并写入 Redis。返回写入的推荐数量。
|
||
Redis key: recommendations:spot:snapshot
|
||
"""
|
||
items = await run_spot_scan()
|
||
now_ms = int(time.time() * 1000)
|
||
payload = {
|
||
"items": items,
|
||
"generated_at": _beijing_now_iso(),
|
||
"generated_at_ms": now_ms,
|
||
"ttl_sec": ttl_sec,
|
||
"count": len(items),
|
||
}
|
||
|
||
if redis_async is None:
|
||
logger.warning("spot_scanner: redis 不可用,跳过写入")
|
||
return len(items)
|
||
|
||
redis_url, kwargs = _redis_connection_kwargs()
|
||
try:
|
||
client = redis_async.from_url(redis_url, **kwargs)
|
||
await client.ping()
|
||
key = "recommendations:spot:snapshot"
|
||
await client.setex(key, ttl_sec, json.dumps(payload, ensure_ascii=False))
|
||
logger.info("spot_scanner: 已写入 %d 条现货推荐到 %s", len(items), key)
|
||
await client.aclose()
|
||
return len(items)
|
||
except Exception as e:
|
||
logger.warning("spot_scanner: Redis 写入失败 %s", e)
|
||
return len(items)
|