在多个模块中实现 Redis 缓存机制,优先从 Redis 读取数据,减少进程内存占用。更新 `binance_client.py`、`market_scanner.py`、`position_manager.py`、`ticker_24h_stream.py` 和 `book_ticker_stream.py`,确保在有 Redis 时优先使用其进行数据存储,降级到内存缓存。调整缓存管理逻辑,限制进程内缓存的最大条数为 500,避免内存无限增长。此改动提升了数据访问效率,优化了内存使用,增强了系统的整体性能与稳定性。
892 lines
47 KiB
Python
892 lines
47 KiB
Python
"""
|
||
市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import List, Dict, Optional
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .indicators import TechnicalIndicators
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from indicators import TechnicalIndicators
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MarketScanner:
|
||
"""市场扫描器类"""
|
||
|
||
def __init__(self, client: BinanceClient):
|
||
"""
|
||
初始化市场扫描器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
"""
|
||
self.client = client
|
||
self.top_symbols: List[Dict] = []
|
||
|
||
async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]:
|
||
"""
|
||
扫描市场,找出涨跌幅最大的前N个货币对
|
||
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
|
||
|
||
Returns:
|
||
前N个货币对列表,包含涨跌幅信息
|
||
"""
|
||
import time
|
||
self._scan_start_time = time.time()
|
||
|
||
# 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰
|
||
cfg = dict(config.TRADING_CONFIG or {})
|
||
if config_override and isinstance(config_override, dict):
|
||
cfg.update(config_override)
|
||
ns = (cache_namespace or "trade").strip() or "trade"
|
||
|
||
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
|
||
# 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存
|
||
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
|
||
|
||
logger.info("开始扫描市场...")
|
||
|
||
# 获取所有USDT交易对
|
||
raw_symbols = await self.client.get_all_usdt_pairs()
|
||
all_symbols = [s for s in (raw_symbols or []) if s]
|
||
if raw_symbols and len(all_symbols) < len(raw_symbols):
|
||
logger.info(f"已过滤 {len(raw_symbols) - len(all_symbols)} 个非ASCII交易对,保留 {len(all_symbols)} 个")
|
||
if not all_symbols:
|
||
logger.warning("未获取到交易对")
|
||
return []
|
||
|
||
# 根据配置限制扫描的交易对数量(确保为整数,避免 DB/JSON 返回 float 导致 slice 报错)
|
||
max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500)
|
||
try:
|
||
max_scan_symbols = int(max_scan_symbols) if max_scan_symbols is not None else 500
|
||
except (TypeError, ValueError):
|
||
max_scan_symbols = 500
|
||
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
|
||
symbols = all_symbols[:max_scan_symbols]
|
||
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})")
|
||
else:
|
||
symbols = all_symbols
|
||
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
|
||
|
||
# 过滤主流币(山寨币策略应该排除主流币)
|
||
exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True)
|
||
if exclude_major_coins:
|
||
# 主流币列表(市值排名前15的主流币)
|
||
major_coins = {
|
||
'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT',
|
||
'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT',
|
||
'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT',
|
||
'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT'
|
||
}
|
||
symbols_before = len(symbols)
|
||
symbols = [s for s in symbols if s not in major_coins]
|
||
excluded_count = symbols_before - len(symbols)
|
||
if excluded_count > 0:
|
||
logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)")
|
||
|
||
# 优先从 Redis 读 24h ticker(进程内不保留全量,减轻内存);无/过期再 REST
|
||
all_tickers = None
|
||
try:
|
||
try:
|
||
from .ticker_24h_stream import (
|
||
get_tickers_24h_cache,
|
||
get_tickers_24h_from_redis,
|
||
is_ticker_24h_cache_fresh,
|
||
)
|
||
except ImportError:
|
||
from ticker_24h_stream import (
|
||
get_tickers_24h_cache,
|
||
get_tickers_24h_from_redis,
|
||
is_ticker_24h_cache_fresh,
|
||
)
|
||
if is_ticker_24h_cache_fresh(max_age_sec=120.0):
|
||
redis_cache = getattr(self.client, "redis_cache", None)
|
||
if redis_cache:
|
||
all_tickers = await get_tickers_24h_from_redis(redis_cache)
|
||
else:
|
||
all_tickers = get_tickers_24h_cache()
|
||
if all_tickers:
|
||
logger.info(f"使用 24h ticker 缓存({len(all_tickers)} 个交易对,来自 Redis/WS),跳过 REST 批量请求")
|
||
except Exception as e:
|
||
logger.debug(f"读取 24h ticker 缓存失败: {e}")
|
||
if not all_tickers:
|
||
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
|
||
all_tickers = await self.client.get_all_tickers_24h()
|
||
|
||
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
|
||
pre_filtered_symbols = []
|
||
for symbol in symbols:
|
||
ticker = all_tickers.get(symbol) if all_tickers else None
|
||
if ticker:
|
||
change_percent = abs(ticker.get('changePercent', 0))
|
||
volume = ticker.get('volume', 0)
|
||
if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and
|
||
volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])):
|
||
pre_filtered_symbols.append(symbol)
|
||
|
||
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个")
|
||
|
||
# 扫描阶段资金费率过滤:排除 |lastFundingRate| 过大的标的;低波动期使用更严阈值(有效配置)
|
||
scan_funding_enabled = cfg.get('SCAN_FUNDING_RATE_FILTER_ENABLED', False)
|
||
scan_funding_max_abs = float(config.get_effective_config('SCAN_FUNDING_RATE_MAX_ABS', 0.001))
|
||
if scan_funding_enabled and pre_filtered_symbols:
|
||
try:
|
||
premium_all = await self.client.get_premium_index(None) # 全量,一次请求
|
||
rate_map = {}
|
||
if isinstance(premium_all, list):
|
||
for item in premium_all:
|
||
if isinstance(item, dict) and 'symbol' in item:
|
||
try:
|
||
rate_map[item['symbol']] = float(item.get('lastFundingRate', 0))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
elif isinstance(premium_all, dict) and premium_all.get('symbol'):
|
||
rate_map[premium_all['symbol']] = float(premium_all.get('lastFundingRate', 0))
|
||
if rate_map:
|
||
before = len(pre_filtered_symbols)
|
||
pre_filtered_symbols = [
|
||
s for s in pre_filtered_symbols
|
||
if abs(rate_map.get(s, 0)) <= scan_funding_max_abs
|
||
]
|
||
excluded = before - len(pre_filtered_symbols)
|
||
if excluded > 0:
|
||
logger.info(f"扫描阶段资金费率过滤: 排除 {excluded} 个标的 (|lastFundingRate| > {scan_funding_max_abs}),剩余 {len(pre_filtered_symbols)} 个")
|
||
except Exception as e:
|
||
logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e)
|
||
|
||
# 只对符合条件的交易对进行详细分析(获取K线和技术指标)
|
||
# 并发数由 SCAN_CONCURRENT_SYMBOLS 控制:2 CPU 4G 多账号建议 2,单账号可 3~5
|
||
concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2)
|
||
try:
|
||
concurrent = max(1, min(10, int(concurrent)))
|
||
except (TypeError, ValueError):
|
||
concurrent = 2
|
||
semaphore = asyncio.Semaphore(concurrent)
|
||
|
||
analysis_timeout = float(cfg.get('SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC', 18) or 18)
|
||
if analysis_timeout < 5:
|
||
analysis_timeout = 5.0
|
||
elif analysis_timeout > 60:
|
||
analysis_timeout = 60.0
|
||
|
||
async def get_symbol_change_with_limit(symbol):
|
||
async with semaphore:
|
||
try:
|
||
# ⚠️ 优化:优先使用共享缓存,减少超时风险
|
||
result = await asyncio.wait_for(
|
||
self._get_symbol_change(symbol, all_tickers.get(symbol)),
|
||
timeout=analysis_timeout
|
||
)
|
||
return result
|
||
except asyncio.TimeoutError:
|
||
# ⚠️ 优化:超时时尝试返回降级结果(仅涨跌幅/成交量),而不是完全跳过
|
||
logger.warning(f"{symbol} 分析超时({analysis_timeout:.0f}秒),尝试返回降级结果...")
|
||
try:
|
||
ticker = all_tickers.get(symbol) if all_tickers else None
|
||
if ticker:
|
||
change_pct = float(ticker.get('changePercent', 0) or 0)
|
||
vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0)
|
||
price = float(ticker.get('price', 0) or ticker.get('lastPrice', 0) or 0)
|
||
if price > 0:
|
||
return {
|
||
'symbol': symbol,
|
||
'price': price,
|
||
'changePercent': change_pct,
|
||
'volume24h': vol,
|
||
'direction': 'UP' if change_pct > 0 else 'DOWN',
|
||
'signalScore': 0,
|
||
'signal_strength': 0,
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 降级结果构建失败: {e}")
|
||
logger.warning(f"{symbol} 分析超时且无法返回降级结果,跳过")
|
||
return None
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 分析出错: {e}")
|
||
return None
|
||
|
||
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 过滤有效结果
|
||
valid_results = [
|
||
r for r in results
|
||
if isinstance(r, dict)
|
||
and r.get('changePercent') is not None
|
||
]
|
||
|
||
# ⚠️ 优化2:成交量验证 - 24H Volume低于1000万美金,直接剔除
|
||
min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金
|
||
min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])
|
||
# 使用更严格的成交量要求
|
||
min_volume = max(min_volume_strict, min_volume_normal)
|
||
|
||
# 过滤最小涨跌幅和成交量
|
||
filtered_results = [
|
||
r for r in valid_results
|
||
if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT'])
|
||
and r.get('volume24h', 0) >= min_volume
|
||
]
|
||
|
||
if min_volume_strict > min_volume_normal:
|
||
logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)")
|
||
|
||
# ⚠️ 2026-01-27优化:按真实的signal_strength排序,而不是简单的signalScore
|
||
# 优先考虑信号强度高的交易对(8-10分),提升胜率
|
||
sorted_results = sorted(
|
||
filtered_results,
|
||
key=lambda x: (
|
||
x.get('signal_strength', 0) * 100, # 信号强度权重最高(乘以100确保优先级)
|
||
x.get('signalScore', 0) * 10, # 其次考虑信号得分(兼容性)
|
||
abs(x['changePercent']) # 最后考虑涨跌幅
|
||
),
|
||
reverse=True
|
||
)
|
||
|
||
# 智能补单:返回 TOP_N + 额外候选数,当前 TOP_N 中部分因冷却等被跳过时,策略仍会尝试后续交易对,避免无单可下
|
||
top_n_val = cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])
|
||
extra = cfg.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', config.TRADING_CONFIG.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', 8))
|
||
try:
|
||
top_n_val = int(top_n_val) if top_n_val is not None else 20
|
||
extra = int(extra) if extra is not None else 8
|
||
except (TypeError, ValueError):
|
||
top_n_val, extra = 20, 8
|
||
take_count = min(len(sorted_results), top_n_val + extra)
|
||
top_n = sorted_results[:take_count]
|
||
|
||
self.top_symbols = top_n
|
||
|
||
# 市场节奏自动识别:用本次扫描的 24h 涨跌幅中位数判断低波动/正常,写入 Redis 并更新进程内缓存
|
||
try:
|
||
from . import market_regime
|
||
except ImportError:
|
||
market_regime = None
|
||
if market_regime and getattr(self.client, "redis_cache", None):
|
||
# 用过滤后的全部结果计算中位数(样本更多),无则用 top_n
|
||
source = filtered_results if filtered_results else top_n
|
||
change_percents = [r["changePercent"] for r in source if r.get("changePercent") is not None]
|
||
threshold = float(cfg.get("LOW_VOLATILITY_THRESHOLD", config.TRADING_CONFIG.get("LOW_VOLATILITY_THRESHOLD", 2.5)))
|
||
regime = await market_regime.update_regime_from_scan(
|
||
change_percents, self.client.redis_cache, low_volatility_threshold=threshold
|
||
)
|
||
market_regime.set_cached_regime(regime)
|
||
|
||
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
|
||
# 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存
|
||
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
|
||
|
||
# 记录扫描性能(用于监控多用户时的系统压力)
|
||
scan_duration = time.time() - self._scan_start_time
|
||
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对,耗时 {scan_duration:.2f}秒")
|
||
if scan_duration > 60:
|
||
logger.warning(f"⚠️ 扫描耗时较长({scan_duration:.2f}秒),可能影响系统性能,建议检查缓存命中率")
|
||
|
||
# 记录扫描结果到数据库
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import MarketScan
|
||
MarketScan.create(
|
||
symbols_scanned=len(symbols),
|
||
symbols_found=len(top_n),
|
||
top_symbols=[s['symbol'] for s in top_n],
|
||
scan_duration=scan_duration
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"记录扫描结果失败: {e}")
|
||
|
||
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
|
||
|
||
# 打印结果(包含技术指标)
|
||
# 分组打印:优先展示有信号的交易对,然后是高波动/成交量的观察对象
|
||
signals_found = [s for s in top_n if s.get('signal_strength', 0) > 0 or s.get('signalScore', 0) > 0]
|
||
others = [s for s in top_n if s not in signals_found]
|
||
|
||
if signals_found:
|
||
logger.info(f"===== 发现潜在交易信号 ({len(signals_found)}) =====")
|
||
for i, symbol_info in enumerate(signals_found, 1):
|
||
self._log_single_symbol(i, symbol_info)
|
||
|
||
if others:
|
||
logger.info(f"===== 其它活跃交易对 (高波动/无明确信号) ({len(others)}) =====")
|
||
for i, symbol_info in enumerate(others, 1):
|
||
self._log_single_symbol(i, symbol_info)
|
||
|
||
return top_n
|
||
|
||
def _log_single_symbol(self, index: int, symbol_info: Dict):
|
||
"""打印单个交易对信息"""
|
||
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
|
||
regime_str = symbol_info.get('marketRegime', 'unknown')
|
||
|
||
strength = symbol_info.get('signal_strength', symbol_info.get('signalScore', 0))
|
||
strength_str = f"信号:{strength}" if strength > 0 else ""
|
||
|
||
log_parts = [
|
||
f"{index}. {symbol_info['symbol']}:",
|
||
f"{symbol_info['changePercent']:.2f}%",
|
||
rsi_str,
|
||
regime_str
|
||
]
|
||
|
||
if strength_str:
|
||
log_parts.append(strength_str)
|
||
|
||
log_parts.append(f"价格: {symbol_info['price']:.4f}")
|
||
|
||
logger.info(" | ".join(log_parts))
|
||
|
||
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
|
||
"""
|
||
获取单个交易对的涨跌幅和技术指标
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
ticker_data: 可选的24小时行情数据(如果已批量获取)
|
||
|
||
Returns:
|
||
包含涨跌幅和技术指标信息的字典
|
||
"""
|
||
try:
|
||
# 如果已有批量获取的数据,直接使用;否则单独获取
|
||
if ticker_data:
|
||
ticker = ticker_data
|
||
else:
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
|
||
if not ticker:
|
||
return None
|
||
|
||
# 24h ticker 的“最新价”(更接近用户理解的“当前价格”)
|
||
# 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳
|
||
ticker_price = ticker.get('price')
|
||
try:
|
||
ticker_price = float(ticker_price) if ticker_price is not None else None
|
||
except Exception:
|
||
ticker_price = None
|
||
ticker_ts = ticker.get('ts')
|
||
try:
|
||
ticker_ts = int(ticker_ts) if ticker_ts is not None else None
|
||
except Exception:
|
||
ticker_ts = None
|
||
|
||
# 获取更多K线数据用于技术指标计算(主周期 + 确认周期并行请求;单独超时避免整分析拖到 18s)
|
||
# ⚠️ 优化:优先使用 WebSocket K 线缓存,避免 REST API 超时
|
||
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
|
||
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
|
||
cfg = dict(config.TRADING_CONFIG or {})
|
||
klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8)
|
||
if klines_timeout < 3:
|
||
klines_timeout = 3.0
|
||
elif klines_timeout > 20:
|
||
klines_timeout = 20.0
|
||
|
||
# ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存
|
||
# 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用
|
||
klines, klines_4h = [], []
|
||
use_ws_prefer = cfg.get('SCAN_PREFER_WEBSOCKET', True) # 默认优先 WebSocket
|
||
|
||
# 0. 优先从共享 Redis 缓存读取(多账号共用,最高优先级)
|
||
try:
|
||
from .market_ws_leader import use_shared_market_ws
|
||
from .kline_stream import get_klines_from_redis
|
||
if use_shared_market_ws(self.client.redis_cache):
|
||
# 尝试从共享缓存获取
|
||
shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50)
|
||
shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50)
|
||
if shared_klines and len(shared_klines) >= 2:
|
||
klines = shared_klines
|
||
logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}")
|
||
if shared_klines_4h and len(shared_klines_4h) >= 2:
|
||
klines_4h = shared_klines_4h
|
||
logger.debug(f"{symbol} 从共享缓存获取确认周期 K 线: {confirm_interval}")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}")
|
||
|
||
# 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存
|
||
# ⚠️ 优化:减少不必要的订阅,优先使用已有缓存,避免订阅过多导致负载上升
|
||
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
|
||
try:
|
||
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh
|
||
stream = get_kline_stream_instance()
|
||
if stream:
|
||
# ⚠️ 优化:先检查缓存是否已有数据,避免不必要的订阅
|
||
# 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力
|
||
cache_has_data = False
|
||
if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
|
||
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50)
|
||
if cached_klines and len(cached_klines) >= 2:
|
||
klines = cached_klines
|
||
cache_has_data = True
|
||
if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
|
||
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50)
|
||
if cached_klines_4h and len(cached_klines_4h) >= 2:
|
||
klines_4h = cached_klines_4h
|
||
cache_has_data = True
|
||
|
||
# ⚠️ 优化:只有在缓存完全没有数据时才订阅,避免订阅过多导致负载上升
|
||
# 如果缓存已有部分数据,优先使用部分数据,减少订阅需求
|
||
need_subscribe_primary = (not klines or len(klines) < 2)
|
||
need_subscribe_confirm = (not klines_4h or len(klines_4h) < 2)
|
||
|
||
# 只在真正需要时才订阅(避免扫描时订阅过多流)
|
||
if need_subscribe_primary or need_subscribe_confirm:
|
||
subscribe_tasks = []
|
||
if need_subscribe_primary:
|
||
subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=50))
|
||
if need_subscribe_confirm:
|
||
subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=50))
|
||
|
||
if subscribe_tasks:
|
||
await asyncio.gather(*subscribe_tasks, return_exceptions=True)
|
||
|
||
# 如果优先使用 WebSocket,等待推送数据(但缩短等待时间,避免阻塞)
|
||
if use_ws_prefer:
|
||
# ⚠️ 优化:缩短等待时间(1秒),如果缓存已有部分数据,直接使用,不等待
|
||
max_wait_sec = 1.0 if cache_has_data else 1.5 # 如果缓存有数据,只等1秒;否则等1.5秒
|
||
wait_start = time.time()
|
||
check_interval = 0.2 # 每 200ms 检查一次
|
||
|
||
while (time.time() - wait_start) < max_wait_sec:
|
||
# 检查缓存是否可用
|
||
if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
|
||
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50)
|
||
if cached_klines and len(cached_klines) >= 2:
|
||
klines = cached_klines
|
||
if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
|
||
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50)
|
||
if cached_klines_4h and len(cached_klines_4h) >= 2:
|
||
klines_4h = cached_klines_4h
|
||
|
||
# 如果两个周期都有足够数据,提前退出
|
||
if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2:
|
||
break
|
||
|
||
await asyncio.sleep(check_interval)
|
||
else:
|
||
# 缓存已有完整数据,无需订阅
|
||
logger.debug(f"{symbol} 缓存已有完整 K 线数据,跳过订阅")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}")
|
||
|
||
# 如果 WebSocket 缓存仍不完整,使用 REST API(带超时保护)
|
||
# 注意:如果 WebSocket 缓存有部分数据,优先使用部分数据,REST 只补充缺失的
|
||
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
|
||
# 只请求缺失的数据,减少等待时间
|
||
rest_tasks = []
|
||
if not klines or len(klines) < 2:
|
||
rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50)))
|
||
else:
|
||
rest_tasks.append(('primary', None))
|
||
if not klines_4h or len(klines_4h) < 2:
|
||
rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50)))
|
||
else:
|
||
rest_tasks.append(('confirm', None))
|
||
|
||
try:
|
||
# 只等待实际需要的数据
|
||
results = await asyncio.wait_for(
|
||
asyncio.gather(
|
||
*[task[1] if task[1] else asyncio.sleep(0) for task in rest_tasks],
|
||
return_exceptions=True
|
||
),
|
||
timeout=klines_timeout,
|
||
)
|
||
# 合并结果:优先使用 WebSocket 缓存,REST 作为补充
|
||
for i, (task_type, _) in enumerate(rest_tasks):
|
||
if task_type == 'primary' and (not klines or len(klines) < 2):
|
||
if isinstance(results[i], list) and len(results[i]) >= 2:
|
||
klines = results[i]
|
||
elif task_type == 'confirm' and (not klines_4h or len(klines_4h) < 2):
|
||
if isinstance(results[i], list) and len(results[i]) >= 2:
|
||
klines_4h = results[i]
|
||
except asyncio.TimeoutError:
|
||
logger.debug(f"{symbol} K线拉取超时({klines_timeout:.0f}秒),使用已有缓存或降级结果")
|
||
# 如果 WebSocket 缓存有部分数据,继续使用;否则使用降级结果
|
||
if not klines or len(klines) < 2:
|
||
klines = []
|
||
if not klines_4h or len(klines_4h) < 2:
|
||
klines_4h = []
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} K线拉取异常: {e}")
|
||
if not klines or len(klines) < 2:
|
||
klines = []
|
||
if not klines_4h or len(klines_4h) < 2:
|
||
klines_4h = []
|
||
|
||
if not klines or len(klines) < 2:
|
||
# 降级:仍有 ticker 时返回仅涨跌幅/成交量的结果,不直接跳过,避免整轮扫描被拖慢
|
||
try:
|
||
change_pct = float(ticker.get('changePercent', 0) or 0)
|
||
vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0)
|
||
price = ticker_price if ticker_price is not None else float(ticker.get('lastPrice', 0) or ticker.get('price', 0) or 0)
|
||
return {
|
||
'symbol': symbol,
|
||
'price': price,
|
||
'kline_close_price': price,
|
||
'ticker_price': ticker_price,
|
||
'ticker_ts': ticker_ts,
|
||
'prevPrice': None,
|
||
'kline_change_percent': change_pct,
|
||
'changePercent': change_pct,
|
||
'volume24h': vol,
|
||
'direction': 'UP' if change_pct > 0 else 'DOWN',
|
||
'rsi': None,
|
||
'macd': None,
|
||
'bollinger': None,
|
||
'atr': None,
|
||
'ema20': None,
|
||
'ema50': None,
|
||
'ema20_4h': None,
|
||
'price_4h': None,
|
||
'marketRegime': 'unknown',
|
||
'signalScore': 0,
|
||
'signal_strength': 0,
|
||
'trend_4h': None,
|
||
'klines': [],
|
||
'klines_4h': [],
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 降级结果构建失败: {e}")
|
||
return None
|
||
|
||
# 提取价格数据(主周期)
|
||
close_prices = [float(k[4]) for k in klines] # 收盘价
|
||
high_prices = [float(k[2]) for k in klines] # 最高价
|
||
low_prices = [float(k[3]) for k in klines] # 最低价
|
||
|
||
# 提取4H周期价格数据
|
||
close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices
|
||
|
||
# 计算涨跌幅(基于主周期)
|
||
current_price = close_prices[-1]
|
||
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
|
||
|
||
if prev_price == 0:
|
||
return None
|
||
|
||
change_percent = ((current_price - prev_price) / prev_price) * 100
|
||
|
||
# ⚠️ 优化:检查技术指标计算结果缓存(基于K线数据的最后更新时间)
|
||
# 如果K线数据没有更新,可以直接使用缓存的技术指标
|
||
cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}"
|
||
last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳
|
||
|
||
# 尝试从缓存获取技术指标计算结果
|
||
cached_indicators = None
|
||
try:
|
||
cached_indicators = await self.client.redis_cache.get(cache_key_indicators)
|
||
except Exception:
|
||
pass
|
||
|
||
use_cached_indicators = False
|
||
if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time:
|
||
# 缓存命中,使用缓存的技术指标
|
||
use_cached_indicators = True
|
||
logger.debug(f"{symbol} 使用缓存的技术指标计算结果")
|
||
rsi = cached_indicators.get('rsi')
|
||
macd = cached_indicators.get('macd')
|
||
bollinger = cached_indicators.get('bollinger')
|
||
atr = cached_indicators.get('atr')
|
||
ema20 = cached_indicators.get('ema20')
|
||
ema50 = cached_indicators.get('ema50')
|
||
ema20_4h = cached_indicators.get('ema20_4h')
|
||
market_regime = cached_indicators.get('marketRegime')
|
||
else:
|
||
# 缓存未命中,重新计算技术指标
|
||
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
|
||
macd = TechnicalIndicators.calculate_macd(close_prices)
|
||
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
|
||
atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14)
|
||
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period)
|
||
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
|
||
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
|
||
|
||
# 计算4H周期的EMA20用于多周期共振检查
|
||
ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None
|
||
|
||
# 判断市场状态
|
||
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
|
||
|
||
# 保存技术指标计算结果到缓存(TTL: 30秒,与K线缓存一致)
|
||
try:
|
||
indicators_cache = {
|
||
'last_kline_time': last_kline_time,
|
||
'rsi': rsi,
|
||
'macd': macd,
|
||
'bollinger': bollinger,
|
||
'atr': atr,
|
||
'ema20': ema20,
|
||
'ema50': ema50,
|
||
'ema20_4h': ema20_4h,
|
||
'marketRegime': market_regime,
|
||
}
|
||
await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30)
|
||
logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}")
|
||
|
||
# 计算交易信号得分(用于排序)- 保留用于兼容性
|
||
signal_score = 0
|
||
|
||
# RSI信号(均值回归)
|
||
if rsi is not None:
|
||
if rsi < 30: # 超卖,做多信号
|
||
signal_score += 3
|
||
elif rsi > 70: # 超买,做空信号
|
||
signal_score += 3
|
||
elif 30 <= rsi <= 70: # 中性区域
|
||
signal_score += 1
|
||
|
||
# MACD信号
|
||
if macd and macd['histogram'] is not None:
|
||
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
|
||
signal_score += 2
|
||
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
|
||
signal_score += 2
|
||
|
||
# 布林带信号(均值回归)
|
||
if bollinger:
|
||
if current_price <= bollinger['lower']: # 触及下轨,做多
|
||
signal_score += 3
|
||
elif current_price >= bollinger['upper']: # 触及上轨,做空
|
||
signal_score += 3
|
||
elif bollinger['lower'] < current_price < bollinger['upper']:
|
||
signal_score += 1
|
||
|
||
# 均线信号
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50: # 上升趋势
|
||
signal_score += 1
|
||
elif current_price < ema20 < ema50: # 下降趋势
|
||
signal_score += 1
|
||
|
||
# ⚠️ 2026-01-27优化:计算真实的signal_strength(用于排序和筛选)
|
||
# 使用与strategy.py相同的逻辑,确保排序依据与交易判断一致
|
||
signal_strength = 0
|
||
direction = None
|
||
|
||
# 获取4H周期当前价格(用于判断4H趋势)
|
||
price_4h = close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price
|
||
|
||
# 判断4H周期趋势方向
|
||
trend_4h = None
|
||
if ema20_4h is not None:
|
||
if price_4h > ema20_4h:
|
||
trend_4h = 'up'
|
||
elif price_4h < ema20_4h:
|
||
trend_4h = 'down'
|
||
else:
|
||
trend_4h = 'neutral'
|
||
|
||
# 策略权重配置(与strategy.py保持一致)
|
||
TREND_SIGNAL_WEIGHTS = {
|
||
'macd_cross': 5, # MACD金叉/死叉
|
||
'ema_cross': 4, # EMA20上穿/下穿EMA50
|
||
'price_above_ema20': 3, # 价格在EMA20之上/下
|
||
'4h_trend_confirmation': 2, # 4H趋势确认
|
||
}
|
||
|
||
# MACD金叉/死叉(权重最高)
|
||
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
|
||
# MACD金叉,做多信号(需4H趋势向上或中性)
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
|
||
# MACD死叉,做空信号(需4H趋势向下或中性)
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
# EMA均线系统
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50: # 上升趋势
|
||
if trend_4h in ('up', 'neutral', None):
|
||
# 冲突检查
|
||
if direction == 'SELL':
|
||
signal_strength = 0
|
||
direction = None
|
||
else:
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif current_price < ema20 < ema50: # 下降趋势
|
||
if trend_4h in ('down', 'neutral', None):
|
||
# 冲突检查
|
||
if direction == 'BUY':
|
||
signal_strength = 0
|
||
direction = None
|
||
else:
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
# 价格与EMA20关系
|
||
if ema20:
|
||
if current_price > ema20:
|
||
if trend_4h in ('up', 'neutral', None) and direction == 'BUY':
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
|
||
elif current_price < ema20:
|
||
if trend_4h in ('down', 'neutral', None) and direction == 'SELL':
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
|
||
|
||
# 4H趋势确认加分
|
||
if direction and trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation']
|
||
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
# 逆势信号,直接拒绝
|
||
signal_strength = 0
|
||
direction = None
|
||
|
||
# 强度上限归一到 0-10
|
||
signal_strength = max(0, min(int(signal_strength), 10))
|
||
|
||
# ===== 趋势状态缓存(用于后续“入场时机过滤”)=====
|
||
try:
|
||
# 只有在方向明确且信号强度达到最小门槛时,才记录趋势状态
|
||
min_strength = int(config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) or 7)
|
||
use_trend_filter = bool(config.TRADING_CONFIG.get('USE_TREND_ENTRY_FILTER', False))
|
||
if use_trend_filter and direction and signal_strength >= min_strength:
|
||
trend_state_key = f"trend_state:{symbol}"
|
||
trend_state_value = {
|
||
'symbol': symbol,
|
||
'direction': direction, # BUY / SELL
|
||
'signal_strength': int(signal_strength),
|
||
'marketRegime': market_regime,
|
||
'trend_4h': trend_4h,
|
||
# 使用技术分析用的收盘价作为“信号价格”,同时附带 24h ticker 价格
|
||
'signal_price': float(current_price),
|
||
'ticker_price': float(ticker_price) if ticker_price is not None else None,
|
||
'ema20': float(ema20) if ema20 is not None else None,
|
||
'ema50': float(ema50) if ema50 is not None else None,
|
||
'ema20_4h': float(ema20_4h) if ema20_4h is not None else None,
|
||
'price_4h': float(price_4h) if price_4h is not None else None,
|
||
'last_kline_time': last_kline_time,
|
||
'created_at_ts': int(ticker_ts or 0),
|
||
}
|
||
ttl_sec = int(config.TRADING_CONFIG.get('TREND_STATE_TTL_SEC', 3600) or 3600)
|
||
if self.client and getattr(self.client, "redis_cache", None):
|
||
try:
|
||
await self.client.redis_cache.set(trend_state_key, trend_state_value, ttl=ttl_sec)
|
||
logger.debug(f"{symbol} 趋势状态已缓存: dir={direction}, strength={signal_strength}, price={current_price:.6f}")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 缓存趋势状态失败: {e}")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 处理趋势状态缓存时出错: {e}")
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
# 技术分析使用的价格(K线收盘价)
|
||
'price': current_price,
|
||
'kline_close_price': current_price,
|
||
# 展示用“当前价”(24h ticker 最新价,通常更贴近用户的直觉)
|
||
'ticker_price': ticker_price,
|
||
'ticker_ts': ticker_ts,
|
||
'prevPrice': prev_price,
|
||
# 1) 主周期涨跌幅(用于内部信号)
|
||
'kline_change_percent': change_percent,
|
||
# 2) 24h涨跌幅(用于前端展示更符合“24h涨跌”的文案)
|
||
'changePercent': float(ticker.get('changePercent', 0) or 0),
|
||
'volume24h': ticker.get('volume', 0),
|
||
'direction': 'UP' if change_percent > 0 else 'DOWN',
|
||
'rsi': rsi,
|
||
'macd': macd,
|
||
'bollinger': bollinger,
|
||
'atr': atr,
|
||
'ema20': ema20,
|
||
'ema50': ema50,
|
||
'ema20_4h': ema20_4h, # 4H周期EMA20,用于多周期共振
|
||
'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格
|
||
'marketRegime': market_regime,
|
||
'signalScore': signal_score, # 保留用于兼容性
|
||
'signal_strength': signal_strength, # ⚠️ 2026-01-27优化:添加真实的信号强度
|
||
'trend_4h': trend_4h, # 4H趋势方向
|
||
'klines': klines[-10:], # 保留最近10根K线
|
||
'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"获取 {symbol} 数据失败: {e}")
|
||
return None
|
||
|
||
def get_top_symbols(self) -> List[Dict]:
|
||
"""
|
||
获取当前扫描到的前N个货币对
|
||
|
||
Returns:
|
||
前N个货币对列表
|
||
"""
|
||
return self.top_symbols
|
||
|
||
async def monitor_price(self, symbol: str, callback) -> None:
|
||
"""
|
||
监控单个交易对的价格变化(WebSocket)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
callback: 价格变化回调函数
|
||
"""
|
||
# 使用标准WebSocket
|
||
try:
|
||
import aiohttp
|
||
import json
|
||
|
||
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
|
||
# 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
|
||
# 端点:wss://fstream.binance.com/ws/<symbol>@ticker
|
||
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(ws_url) as ws:
|
||
async for msg in ws:
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
try:
|
||
# 解析 JSON 消息
|
||
data = json.loads(msg.data)
|
||
|
||
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
|
||
if isinstance(data, dict):
|
||
if 'c' in data: # 'c' 是当前价格
|
||
price = float(data['c'])
|
||
await callback(symbol, price)
|
||
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
|
||
price = float(data['data']['c'])
|
||
await callback(symbol, price)
|
||
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
|
||
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
|
||
continue
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||
logger.info(f"监控 {symbol} WebSocket连接关闭")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"监控 {symbol} 价格失败: {e}")
|
||
|
||
def get_realtime_price(self, symbol: str) -> Optional[float]:
|
||
"""
|
||
获取实时价格(从缓存)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
实时价格
|
||
"""
|
||
if self.client:
|
||
return self.client.get_realtime_price(symbol)
|
||
return None |