auto_trade_sys/backend/spot_scanner.py
薇薇安 3389e0aafc feat(recommendations): 添加现货推荐扫描与API支持
在后端API中新增现货推荐扫描功能,定时将数据写入Redis缓存,并提供相应的API接口以获取现货推荐。前端组件更新以支持现货推荐的展示与切换,提升用户体验与决策支持。此改动为用户提供了实时的现货推荐信息,增强了系统的功能性与灵活性。
2026-02-25 08:40:52 +08:00

281 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
现货推荐扫描:拉取币安现货行情,仅做多信号,写入 Redis 供 /api/recommendations/spot 使用。
使用公开 API无需 API Key。定时任务调用 run_spot_scan_and_cache()。
"""
import asyncio
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import aiohttp
# 可选的 Redis 写入(与 recommendations 路由共用连接方式)
try:
import redis.asyncio as redis_async
except Exception:
redis_async = None
logger = logging.getLogger(__name__)
BINANCE_SPOT_BASE = "https://api.binance.com"
SPOT_KLINES_LIMIT = 60
SPOT_TOP_N = 80
SPOT_MIN_STRENGTH = 4
SPOT_MAX_RECS = 30
def _beijing_now_iso() -> str:
from datetime import timedelta
return datetime.now(tz=timezone(timedelta(hours=8))).isoformat()
async def _http_get(session: aiohttp.ClientSession, url: str, params: Optional[Dict] = None) -> Optional[Any]:
try:
async with session.get(url, params=params or {}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status != 200:
return None
return await resp.json()
except Exception as e:
logger.warning("spot_scanner _http_get %s: %s", url[:60], e)
return None
def _technical_indicators():
"""延迟导入 trading_system.indicators避免 backend 强依赖 trading_system 路径。"""
project_root = __import__("pathlib").Path(__file__).resolve().parent.parent
trading_system = project_root / "trading_system"
if str(trading_system) not in sys.path:
sys.path.insert(0, str(trading_system))
try:
from indicators import TechnicalIndicators
return TechnicalIndicators
except ImportError:
from trading_system.indicators import TechnicalIndicators
return TechnicalIndicators
async def _fetch_spot_symbols(session: aiohttp.ClientSession) -> List[str]:
"""获取所有 USDT 现货交易对status=TRADING"""
data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/exchangeInfo")
if not data or "symbols" not in data:
return []
symbols = []
for s in data["symbols"]:
if s.get("status") != "TRADING":
continue
if s.get("quoteAsset") != "USDT":
continue
sym = s.get("symbol")
if sym:
symbols.append(sym)
return symbols
async def _fetch_spot_ticker_24h(session: aiohttp.ClientSession) -> List[Dict]:
"""获取 24h ticker返回 list of dict (symbol, lastPrice, priceChangePercent, volume, ...)。"""
data = await _http_get(session, f"{BINANCE_SPOT_BASE}/api/v3/ticker/24hr")
if not isinstance(data, list):
return []
return data
async def _fetch_spot_klines(session: aiohttp.ClientSession, symbol: str, interval: str = "15m", limit: int = 60) -> Optional[List[List]]:
"""现货 K 线,格式与合约一致 [open_time, o, h, l, c, volume, ...]。"""
data = await _http_get(
session,
f"{BINANCE_SPOT_BASE}/api/v3/klines",
{"symbol": symbol, "interval": interval, "limit": limit},
)
return data if isinstance(data, list) else None
def _compute_spot_signal(klines: List[List], ticker: Dict, TechnicalIndicators) -> Optional[Dict]:
"""
基于 K 线计算只做多信号。返回 None 或 { direction: 'BUY', strength: int, ... }。
"""
if not klines or len(klines) < 50:
return None
closes = [float(k[4]) for k in klines]
highs = [float(k[2]) for k in klines]
lows = [float(k[3]) for k in klines]
current_price = closes[-1]
rsi = TechnicalIndicators.calculate_rsi(closes, period=14)
macd = TechnicalIndicators.calculate_macd(closes)
bollinger = TechnicalIndicators.calculate_bollinger_bands(closes, period=20)
ema20 = TechnicalIndicators.calculate_ema(closes, period=20)
ema50 = TechnicalIndicators.calculate_ema(closes, period=50)
strength = 0
# 只做多RSI 超卖、价格在下轨附近、MACD 金叉、价格在均线上方等
if rsi is not None and rsi < 35:
strength += 3
elif rsi is not None and rsi < 50:
strength += 1
if bollinger and current_price <= bollinger["lower"] * 1.002:
strength += 3
elif bollinger and current_price < bollinger["middle"]:
strength += 1
if macd and macd["histogram"] > 0 and macd["macd"] > macd["signal"]:
strength += 2
if ema20 and ema50 and current_price > ema20 > ema50:
strength += 2
elif ema20 and current_price > ema20:
strength += 1
strength = max(0, min(strength, 10))
if strength < SPOT_MIN_STRENGTH:
return None
return {
"direction": "BUY",
"strength": strength,
"rsi": rsi,
"current_price": current_price,
}
def _build_spot_recommendation(
symbol: str,
ticker: Dict,
signal: Dict,
) -> Dict[str, Any]:
"""构造单条现货推荐(与合约推荐结构兼容,便于前端复用)。"""
current_price = float(ticker.get("lastPrice") or signal.get("current_price") or 0)
change_percent = float(ticker.get("priceChangePercent") or 0)
ts = time.time()
entry = current_price * 0.995
stop_pct = 0.05
tp1_pct = 0.08
tp2_pct = 0.15
if current_price <= 0:
return None
stop_loss = entry * (1 - stop_pct)
tp1 = entry * (1 + tp1_pct)
tp2 = entry * (1 + tp2_pct)
return {
"symbol": symbol,
"direction": "BUY",
"market": "spot",
"current_price": current_price,
"signal_strength": signal.get("strength", 0),
"change_percent": change_percent,
"suggested_limit_price": entry,
"planned_entry_price": entry,
"suggested_stop_loss": stop_loss,
"suggested_take_profit_1": tp1,
"suggested_take_profit_2": tp2,
"suggested_position_percent": 0.05,
"recommendation_time": _beijing_now_iso(),
"timestamp": ts,
"recommendation_reason": "现货做多信号RSI/布林带/MACD/均线)",
"user_guide": f"现货建议在 {entry:.4f} USDT 附近买入,止损 {stop_loss:.4f}目标1 {tp1:.4f}目标2 {tp2:.4f}。仅供参考,请自行判断。",
}
async def run_spot_scan() -> List[Dict[str, Any]]:
"""执行一次现货扫描,返回推荐列表(不写 Redis"""
TechnicalIndicators = _technical_indicators()
recommendations = []
async with aiohttp.ClientSession() as session:
symbols = await _fetch_spot_symbols(session)
if not symbols:
logger.warning("spot_scanner: 未获取到现货交易对")
return []
tickers = await _fetch_spot_ticker_24h(session)
ticker_map = {t["symbol"]: t for t in tickers if isinstance(t.get("symbol"), str)}
# 按 24h 成交量排序,取前 SPOT_TOP_N 再按涨跌幅取部分
def volume_key(t):
try:
return float(t.get("volume") or 0) * float(t.get("lastPrice") or 0)
except Exception:
return 0
sorted_tickers = sorted(
[t for t in tickers if t.get("symbol") in symbols],
key=volume_key,
reverse=True,
)[: SPOT_TOP_N * 2]
# 按涨跌幅取前 N 个(偏强势或超跌反弹)
with_change = [(t, float(t.get("priceChangePercent") or 0)) for t in sorted_tickers]
with_change.sort(key=lambda x: -abs(x[1]))
to_scan = [t[0]["symbol"] for t in with_change[: SPOT_TOP_N]]
for symbol in to_scan:
try:
klines = await _fetch_spot_klines(session, symbol, "15m", SPOT_KLINES_LIMIT)
ticker = ticker_map.get(symbol, {})
if not klines or not ticker:
continue
signal = _compute_spot_signal(klines, ticker, TechnicalIndicators)
if not signal:
continue
rec = _build_spot_recommendation(symbol, ticker, signal)
if rec:
recommendations.append(rec)
if len(recommendations) >= SPOT_MAX_RECS:
break
except Exception as e:
logger.debug("spot_scanner %s: %s", symbol, e)
await asyncio.sleep(0.05)
recommendations.sort(key=lambda x: x.get("signal_strength", 0), reverse=True)
return recommendations[: SPOT_MAX_RECS]
def _redis_connection_kwargs():
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
kwargs = {"decode_responses": True}
if os.getenv("REDIS_USERNAME"):
kwargs["username"] = os.getenv("REDIS_USERNAME")
if os.getenv("REDIS_PASSWORD"):
kwargs["password"] = os.getenv("REDIS_PASSWORD")
if redis_url.startswith("rediss://") or os.getenv("REDIS_USE_TLS", "").lower() == "true":
if redis_url.startswith("redis://"):
redis_url = redis_url.replace("redis://", "rediss://", 1)
kwargs.setdefault("ssl_cert_reqs", os.getenv("REDIS_SSL_CERT_REQS", "required"))
if os.getenv("REDIS_SSL_CA_CERTS"):
kwargs["ssl_ca_certs"] = os.getenv("REDIS_SSL_CA_CERTS")
return redis_url, kwargs
async def run_spot_scan_and_cache(ttl_sec: int = 900) -> int:
"""
执行现货扫描并写入 Redis。返回写入的推荐数量。
Redis key: recommendations:spot:snapshot
"""
items = await run_spot_scan()
now_ms = int(time.time() * 1000)
payload = {
"items": items,
"generated_at": _beijing_now_iso(),
"generated_at_ms": now_ms,
"ttl_sec": ttl_sec,
"count": len(items),
}
if redis_async is None:
logger.warning("spot_scanner: redis 不可用,跳过写入")
return len(items)
redis_url, kwargs = _redis_connection_kwargs()
try:
client = redis_async.from_url(redis_url, **kwargs)
await client.ping()
key = "recommendations:spot:snapshot"
await client.setex(key, ttl_sec, json.dumps(payload, ensure_ascii=False))
logger.info("spot_scanner: 已写入 %d 条现货推荐到 %s", len(items), key)
await client.aclose()
return len(items)
except Exception as e:
logger.warning("spot_scanner: Redis 写入失败 %s", e)
return len(items)