""" 市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .indicators import TechnicalIndicators from . import config except ImportError: from binance_client import BinanceClient from indicators import TechnicalIndicators import config logger = logging.getLogger(__name__) class MarketScanner: """市场扫描器类""" def __init__(self, client: BinanceClient): """ 初始化市场扫描器 Args: client: 币安客户端 """ self.client = client self.top_symbols: List[Dict] = [] async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]: """ 扫描市场,找出涨跌幅最大的前N个货币对 优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描 Returns: 前N个货币对列表,包含涨跌幅信息 """ import time self._scan_start_time = time.time() # 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰 cfg = dict(config.TRADING_CONFIG or {}) if config_override and isinstance(config_override, dict): cfg.update(config_override) ns = (cache_namespace or "trade").strip() or "trade" # ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据 # 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存 # 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描 logger.info("开始扫描市场...") # 获取所有USDT交易对 raw_symbols = await self.client.get_all_usdt_pairs() all_symbols = [s for s in (raw_symbols or []) if s] if raw_symbols and len(all_symbols) < len(raw_symbols): logger.info(f"已过滤 {len(raw_symbols) - len(all_symbols)} 个非ASCII交易对,保留 {len(all_symbols)} 个") if not all_symbols: logger.warning("未获取到交易对") return [] # 根据配置限制扫描的交易对数量(确保为整数,避免 DB/JSON 返回 float 导致 slice 报错) max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500) try: max_scan_symbols = int(max_scan_symbols) if max_scan_symbols is not None else 500 except (TypeError, ValueError): max_scan_symbols = 500 if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols): symbols = all_symbols[:max_scan_symbols] logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})") else: symbols = all_symbols logger.info(f"扫描所有 {len(symbols)} 个USDT交易对") # 过滤主流币(山寨币策略应该排除主流币) exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True) if exclude_major_coins: # 主流币列表(市值排名前15的主流币) major_coins = { 'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT', 'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT', 'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT', 'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT' } symbols_before = len(symbols) symbols = [s for s in symbols if s not in major_coins] excluded_count = symbols_before - len(symbols) if excluded_count > 0: logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)") # 优先从 Redis 读 24h ticker(进程内不保留全量,减轻内存);无/过期再 REST all_tickers = None try: try: from .ticker_24h_stream import ( get_tickers_24h_cache, get_tickers_24h_from_redis, is_ticker_24h_cache_fresh, ) except ImportError: from ticker_24h_stream import ( get_tickers_24h_cache, get_tickers_24h_from_redis, is_ticker_24h_cache_fresh, ) if is_ticker_24h_cache_fresh(max_age_sec=120.0): redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: all_tickers = await get_tickers_24h_from_redis(redis_cache) else: all_tickers = get_tickers_24h_cache() if all_tickers: logger.info(f"使用 24h ticker 缓存({len(all_tickers)} 个交易对,来自 Redis/WS),跳过 REST 批量请求") except Exception as e: logger.debug(f"读取 24h ticker 缓存失败: {e}") if not all_tickers: logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") all_tickers = await self.client.get_all_tickers_24h() # 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量 pre_filtered_symbols = [] for symbol in symbols: ticker = all_tickers.get(symbol) if all_tickers else None if ticker: change_percent = abs(ticker.get('changePercent', 0)) volume = ticker.get('volume', 0) if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])): pre_filtered_symbols.append(symbol) logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个") # 扫描阶段资金费率过滤:排除 |lastFundingRate| 过大的标的;低波动期使用更严阈值(有效配置) scan_funding_enabled = cfg.get('SCAN_FUNDING_RATE_FILTER_ENABLED', False) scan_funding_max_abs = float(config.get_effective_config('SCAN_FUNDING_RATE_MAX_ABS', 0.001)) if scan_funding_enabled and pre_filtered_symbols: try: premium_all = await self.client.get_premium_index(None) # 全量,一次请求 rate_map = {} if isinstance(premium_all, list): for item in premium_all: if isinstance(item, dict) and 'symbol' in item: try: rate_map[item['symbol']] = float(item.get('lastFundingRate', 0)) except (TypeError, ValueError): pass elif isinstance(premium_all, dict) and premium_all.get('symbol'): rate_map[premium_all['symbol']] = float(premium_all.get('lastFundingRate', 0)) if rate_map: before = len(pre_filtered_symbols) pre_filtered_symbols = [ s for s in pre_filtered_symbols if abs(rate_map.get(s, 0)) <= scan_funding_max_abs ] excluded = before - len(pre_filtered_symbols) if excluded > 0: logger.info(f"扫描阶段资金费率过滤: 排除 {excluded} 个标的 (|lastFundingRate| > {scan_funding_max_abs}),剩余 {len(pre_filtered_symbols)} 个") except Exception as e: logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e) # 只对符合条件的交易对进行详细分析(获取K线和技术指标) # 并发数由 SCAN_CONCURRENT_SYMBOLS 控制:2 CPU 4G 多账号建议 2,单账号可 3~5 concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2) try: concurrent = max(1, min(10, int(concurrent))) except (TypeError, ValueError): concurrent = 2 semaphore = asyncio.Semaphore(concurrent) analysis_timeout = float(cfg.get('SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC', 18) or 18) if analysis_timeout < 5: analysis_timeout = 5.0 elif analysis_timeout > 60: analysis_timeout = 60.0 async def get_symbol_change_with_limit(symbol): async with semaphore: try: # ⚠️ 优化:优先使用共享缓存,减少超时风险 result = await asyncio.wait_for( self._get_symbol_change(symbol, all_tickers.get(symbol)), timeout=analysis_timeout ) return result except asyncio.TimeoutError: # ⚠️ 优化:超时时尝试返回降级结果(仅涨跌幅/成交量),而不是完全跳过 logger.warning(f"{symbol} 分析超时({analysis_timeout:.0f}秒),尝试返回降级结果...") try: ticker = all_tickers.get(symbol) if all_tickers else None if ticker: change_pct = float(ticker.get('changePercent', 0) or 0) vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0) price = float(ticker.get('price', 0) or ticker.get('lastPrice', 0) or 0) if price > 0: return { 'symbol': symbol, 'price': price, 'changePercent': change_pct, 'volume24h': vol, 'direction': 'UP' if change_pct > 0 else 'DOWN', 'signalScore': 0, 'signal_strength': 0, } except Exception as e: logger.debug(f"{symbol} 降级结果构建失败: {e}") logger.warning(f"{symbol} 分析超时且无法返回降级结果,跳过") return None except Exception as e: logger.debug(f"{symbol} 分析出错: {e}") return None tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤有效结果 valid_results = [ r for r in results if isinstance(r, dict) and r.get('changePercent') is not None ] # ⚠️ 优化2:成交量验证 - 24H Volume低于1000万美金,直接剔除 min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金 min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H']) # 使用更严格的成交量要求 min_volume = max(min_volume_strict, min_volume_normal) # 过滤最小涨跌幅和成交量 filtered_results = [ r for r in valid_results if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and r.get('volume24h', 0) >= min_volume ] if min_volume_strict > min_volume_normal: logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)") # ⚠️ 2026-01-27优化:按真实的signal_strength排序,而不是简单的signalScore # 优先考虑信号强度高的交易对(8-10分),提升胜率 sorted_results = sorted( filtered_results, key=lambda x: ( x.get('signal_strength', 0) * 100, # 信号强度权重最高(乘以100确保优先级) x.get('signalScore', 0) * 10, # 其次考虑信号得分(兼容性) abs(x['changePercent']) # 最后考虑涨跌幅 ), reverse=True ) # 智能补单:返回 TOP_N + 额外候选数,当前 TOP_N 中部分因冷却等被跳过时,策略仍会尝试后续交易对,避免无单可下 top_n_val = cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS']) extra = cfg.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', config.TRADING_CONFIG.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', 8)) try: top_n_val = int(top_n_val) if top_n_val is not None else 20 extra = int(extra) if extra is not None else 8 except (TypeError, ValueError): top_n_val, extra = 20, 8 take_count = min(len(sorted_results), top_n_val + extra) top_n = sorted_results[:take_count] self.top_symbols = top_n # 市场节奏自动识别:用本次扫描的 24h 涨跌幅中位数判断低波动/正常,写入 Redis 并更新进程内缓存 try: from . import market_regime except ImportError: market_regime = None if market_regime and getattr(self.client, "redis_cache", None): # 用过滤后的全部结果计算中位数(样本更多),无则用 top_n source = filtered_results if filtered_results else top_n change_percents = [r["changePercent"] for r in source if r.get("changePercent") is not None] threshold = float(cfg.get("LOW_VOLATILITY_THRESHOLD", config.TRADING_CONFIG.get("LOW_VOLATILITY_THRESHOLD", 2.5))) regime = await market_regime.update_regime_from_scan( change_percents, self.client.redis_cache, low_volatility_threshold=threshold ) market_regime.set_cached_regime(regime) # ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据 # 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存 # 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描 # 记录扫描性能(用于监控多用户时的系统压力) scan_duration = time.time() - self._scan_start_time logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对,耗时 {scan_duration:.2f}秒") if scan_duration > 60: logger.warning(f"⚠️ 扫描耗时较长({scan_duration:.2f}秒),可能影响系统性能,建议检查缓存命中率") # 记录扫描结果到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import MarketScan MarketScan.create( symbols_scanned=len(symbols), symbols_found=len(top_n), top_symbols=[s['symbol'] for s in top_n], scan_duration=scan_duration ) except Exception as e: logger.debug(f"记录扫描结果失败: {e}") logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对") # 打印结果(包含技术指标) # 分组打印:优先展示有信号的交易对,然后是高波动/成交量的观察对象 signals_found = [s for s in top_n if s.get('signal_strength', 0) > 0 or s.get('signalScore', 0) > 0] others = [s for s in top_n if s not in signals_found] if signals_found: logger.info(f"===== 发现潜在交易信号 ({len(signals_found)}) =====") for i, symbol_info in enumerate(signals_found, 1): self._log_single_symbol(i, symbol_info) if others: logger.info(f"===== 其它活跃交易对 (高波动/无明确信号) ({len(others)}) =====") for i, symbol_info in enumerate(others, 1): self._log_single_symbol(i, symbol_info) return top_n def _log_single_symbol(self, index: int, symbol_info: Dict): """打印单个交易对信息""" rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A" regime_str = symbol_info.get('marketRegime', 'unknown') strength = symbol_info.get('signal_strength', symbol_info.get('signalScore', 0)) strength_str = f"信号:{strength}" if strength > 0 else "" log_parts = [ f"{index}. {symbol_info['symbol']}:", f"{symbol_info['changePercent']:.2f}%", rsi_str, regime_str ] if strength_str: log_parts.append(strength_str) log_parts.append(f"价格: {symbol_info['price']:.4f}") logger.info(" | ".join(log_parts)) async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: """ 获取单个交易对的涨跌幅和技术指标 Args: symbol: 交易对 ticker_data: 可选的24小时行情数据(如果已批量获取) Returns: 包含涨跌幅和技术指标信息的字典 """ try: # 如果已有批量获取的数据,直接使用;否则单独获取 if ticker_data: ticker = ticker_data else: ticker = await self.client.get_ticker_24h(symbol) if not ticker: return None # 24h ticker 的“最新价”(更接近用户理解的“当前价格”) # 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳 ticker_price = ticker.get('price') try: ticker_price = float(ticker_price) if ticker_price is not None else None except Exception: ticker_price = None ticker_ts = ticker.get('ts') try: ticker_ts = int(ticker_ts) if ticker_ts is not None else None except Exception: ticker_ts = None # 获取更多K线数据用于技术指标计算(主周期 + 确认周期并行请求;单独超时避免整分析拖到 18s) # ⚠️ 优化:优先使用 WebSocket K 线缓存,避免 REST API 超时 primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h') cfg = dict(config.TRADING_CONFIG or {}) klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8) if klines_timeout < 3: klines_timeout = 3.0 elif klines_timeout > 20: klines_timeout = 20.0 # ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存 # 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用 klines, klines_4h = [], [] use_ws_prefer = cfg.get('SCAN_PREFER_WEBSOCKET', True) # 默认优先 WebSocket # 0. 优先从共享 Redis 缓存读取(多账号共用,最高优先级) try: from .market_ws_leader import use_shared_market_ws from .kline_stream import get_klines_from_redis if use_shared_market_ws(self.client.redis_cache): # 尝试从共享缓存获取 shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=50) shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=50) if shared_klines and len(shared_klines) >= 2: klines = shared_klines logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}") if shared_klines_4h and len(shared_klines_4h) >= 2: klines_4h = shared_klines_4h logger.debug(f"{symbol} 从共享缓存获取确认周期 K 线: {confirm_interval}") except Exception as e: logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}") # 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存 # ⚠️ 优化:减少不必要的订阅,优先使用已有缓存,避免订阅过多导致负载上升 if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2: try: from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh stream = get_kline_stream_instance() if stream: # ⚠️ 优化:先检查缓存是否已有数据,避免不必要的订阅 # 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力 cache_has_data = False if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) if cached_klines and len(cached_klines) >= 2: klines = cached_klines cache_has_data = True if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) if cached_klines_4h and len(cached_klines_4h) >= 2: klines_4h = cached_klines_4h cache_has_data = True # ⚠️ 优化:只有在缓存完全没有数据时才订阅,避免订阅过多导致负载上升 # 如果缓存已有部分数据,优先使用部分数据,减少订阅需求 need_subscribe_primary = (not klines or len(klines) < 2) need_subscribe_confirm = (not klines_4h or len(klines_4h) < 2) # 只在真正需要时才订阅(避免扫描时订阅过多流) if need_subscribe_primary or need_subscribe_confirm: subscribe_tasks = [] if need_subscribe_primary: subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=50)) if need_subscribe_confirm: subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=50)) if subscribe_tasks: await asyncio.gather(*subscribe_tasks, return_exceptions=True) # 如果优先使用 WebSocket,等待推送数据(但缩短等待时间,避免阻塞) if use_ws_prefer: # ⚠️ 优化:缩短等待时间(1秒),如果缓存已有部分数据,直接使用,不等待 max_wait_sec = 1.0 if cache_has_data else 1.5 # 如果缓存有数据,只等1秒;否则等1.5秒 wait_start = time.time() check_interval = 0.2 # 每 200ms 检查一次 while (time.time() - wait_start) < max_wait_sec: # 检查缓存是否可用 if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0): cached_klines = get_klines_from_cache(symbol, primary_interval, limit=50) if cached_klines and len(cached_klines) >= 2: klines = cached_klines if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0): cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=50) if cached_klines_4h and len(cached_klines_4h) >= 2: klines_4h = cached_klines_4h # 如果两个周期都有足够数据,提前退出 if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2: break await asyncio.sleep(check_interval) else: # 缓存已有完整数据,无需订阅 logger.debug(f"{symbol} 缓存已有完整 K 线数据,跳过订阅") except Exception as e: logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}") # 如果 WebSocket 缓存仍不完整,使用 REST API(带超时保护) # 注意:如果 WebSocket 缓存有部分数据,优先使用部分数据,REST 只补充缺失的 if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2: # 只请求缺失的数据,减少等待时间 rest_tasks = [] if not klines or len(klines) < 2: rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=50))) else: rest_tasks.append(('primary', None)) if not klines_4h or len(klines_4h) < 2: rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=50))) else: rest_tasks.append(('confirm', None)) try: # 只等待实际需要的数据 results = await asyncio.wait_for( asyncio.gather( *[task[1] if task[1] else asyncio.sleep(0) for task in rest_tasks], return_exceptions=True ), timeout=klines_timeout, ) # 合并结果:优先使用 WebSocket 缓存,REST 作为补充 for i, (task_type, _) in enumerate(rest_tasks): if task_type == 'primary' and (not klines or len(klines) < 2): if isinstance(results[i], list) and len(results[i]) >= 2: klines = results[i] elif task_type == 'confirm' and (not klines_4h or len(klines_4h) < 2): if isinstance(results[i], list) and len(results[i]) >= 2: klines_4h = results[i] except asyncio.TimeoutError: logger.debug(f"{symbol} K线拉取超时({klines_timeout:.0f}秒),使用已有缓存或降级结果") # 如果 WebSocket 缓存有部分数据,继续使用;否则使用降级结果 if not klines or len(klines) < 2: klines = [] if not klines_4h or len(klines_4h) < 2: klines_4h = [] except Exception as e: logger.debug(f"{symbol} K线拉取异常: {e}") if not klines or len(klines) < 2: klines = [] if not klines_4h or len(klines_4h) < 2: klines_4h = [] if not klines or len(klines) < 2: # 降级:仍有 ticker 时返回仅涨跌幅/成交量的结果,不直接跳过,避免整轮扫描被拖慢 try: change_pct = float(ticker.get('changePercent', 0) or 0) vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0) price = ticker_price if ticker_price is not None else float(ticker.get('lastPrice', 0) or ticker.get('price', 0) or 0) return { 'symbol': symbol, 'price': price, 'kline_close_price': price, 'ticker_price': ticker_price, 'ticker_ts': ticker_ts, 'prevPrice': None, 'kline_change_percent': change_pct, 'changePercent': change_pct, 'volume24h': vol, 'direction': 'UP' if change_pct > 0 else 'DOWN', 'rsi': None, 'macd': None, 'bollinger': None, 'atr': None, 'ema20': None, 'ema50': None, 'ema20_4h': None, 'price_4h': None, 'marketRegime': 'unknown', 'signalScore': 0, 'signal_strength': 0, 'trend_4h': None, 'klines': [], 'klines_4h': [], } except Exception as e: logger.debug(f"{symbol} 降级结果构建失败: {e}") return None # 提取价格数据(主周期) close_prices = [float(k[4]) for k in klines] # 收盘价 high_prices = [float(k[2]) for k in klines] # 最高价 low_prices = [float(k[3]) for k in klines] # 最低价 # 提取4H周期价格数据 close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices # 计算涨跌幅(基于主周期) current_price = close_prices[-1] prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0] if prev_price == 0: return None change_percent = ((current_price - prev_price) / prev_price) * 100 # ⚠️ 优化:检查技术指标计算结果缓存(基于K线数据的最后更新时间) # 如果K线数据没有更新,可以直接使用缓存的技术指标 cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}" last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳 # 尝试从缓存获取技术指标计算结果 cached_indicators = None try: cached_indicators = await self.client.redis_cache.get(cache_key_indicators) except Exception: pass use_cached_indicators = False if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time: # 缓存命中,使用缓存的技术指标 use_cached_indicators = True logger.debug(f"{symbol} 使用缓存的技术指标计算结果") rsi = cached_indicators.get('rsi') macd = cached_indicators.get('macd') bollinger = cached_indicators.get('bollinger') atr = cached_indicators.get('atr') ema20 = cached_indicators.get('ema20') ema50 = cached_indicators.get('ema50') ema20_4h = cached_indicators.get('ema20_4h') market_regime = cached_indicators.get('marketRegime') else: # 缓存未命中,重新计算技术指标 rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14) macd = TechnicalIndicators.calculate_macd(close_prices) bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20) atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14) atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period) ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20) ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50) # 计算4H周期的EMA20用于多周期共振检查 ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None # 判断市场状态 market_regime = TechnicalIndicators.detect_market_regime(close_prices) # 保存技术指标计算结果到缓存(TTL: 30秒,与K线缓存一致) try: indicators_cache = { 'last_kline_time': last_kline_time, 'rsi': rsi, 'macd': macd, 'bollinger': bollinger, 'atr': atr, 'ema20': ema20, 'ema50': ema50, 'ema20_4h': ema20_4h, 'marketRegime': market_regime, } await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30) logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)") except Exception as e: logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}") # 计算交易信号得分(用于排序)- 保留用于兼容性 signal_score = 0 # RSI信号(均值回归) if rsi is not None: if rsi < 30: # 超卖,做多信号 signal_score += 3 elif rsi > 70: # 超买,做空信号 signal_score += 3 elif 30 <= rsi <= 70: # 中性区域 signal_score += 1 # MACD信号 if macd and macd['histogram'] is not None: if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨 signal_score += 2 elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌 signal_score += 2 # 布林带信号(均值回归) if bollinger: if current_price <= bollinger['lower']: # 触及下轨,做多 signal_score += 3 elif current_price >= bollinger['upper']: # 触及上轨,做空 signal_score += 3 elif bollinger['lower'] < current_price < bollinger['upper']: signal_score += 1 # 均线信号 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 signal_score += 1 elif current_price < ema20 < ema50: # 下降趋势 signal_score += 1 # ⚠️ 2026-01-27优化:计算真实的signal_strength(用于排序和筛选) # 使用与strategy.py相同的逻辑,确保排序依据与交易判断一致 signal_strength = 0 direction = None # 获取4H周期当前价格(用于判断4H趋势) price_4h = close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price # 判断4H周期趋势方向 trend_4h = None if ema20_4h is not None: if price_4h > ema20_4h: trend_4h = 'up' elif price_4h < ema20_4h: trend_4h = 'down' else: trend_4h = 'neutral' # 策略权重配置(与strategy.py保持一致) TREND_SIGNAL_WEIGHTS = { 'macd_cross': 5, # MACD金叉/死叉 'ema_cross': 4, # EMA20上穿/下穿EMA50 'price_above_ema20': 3, # 价格在EMA20之上/下 '4h_trend_confirmation': 2, # 4H趋势确认 } # MACD金叉/死叉(权重最高) if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0: # MACD金叉,做多信号(需4H趋势向上或中性) if trend_4h in ('up', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] if direction is None: direction = 'BUY' elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0: # MACD死叉,做空信号(需4H趋势向下或中性) if trend_4h in ('down', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] if direction is None: direction = 'SELL' # EMA均线系统 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 if trend_4h in ('up', 'neutral', None): # 冲突检查 if direction == 'SELL': signal_strength = 0 direction = None else: signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] if direction is None: direction = 'BUY' elif current_price < ema20 < ema50: # 下降趋势 if trend_4h in ('down', 'neutral', None): # 冲突检查 if direction == 'BUY': signal_strength = 0 direction = None else: signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] if direction is None: direction = 'SELL' # 价格与EMA20关系 if ema20: if current_price > ema20: if trend_4h in ('up', 'neutral', None) and direction == 'BUY': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] elif current_price < ema20: if trend_4h in ('down', 'neutral', None) and direction == 'SELL': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] # 4H趋势确认加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation'] elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): # 逆势信号,直接拒绝 signal_strength = 0 direction = None # 强度上限归一到 0-10 signal_strength = max(0, min(int(signal_strength), 10)) # ===== 趋势状态缓存(用于后续“入场时机过滤”)===== try: # 只有在方向明确且信号强度达到最小门槛时,才记录趋势状态 min_strength = int(config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) or 7) use_trend_filter = bool(config.TRADING_CONFIG.get('USE_TREND_ENTRY_FILTER', False)) if use_trend_filter and direction and signal_strength >= min_strength: trend_state_key = f"trend_state:{symbol}" trend_state_value = { 'symbol': symbol, 'direction': direction, # BUY / SELL 'signal_strength': int(signal_strength), 'marketRegime': market_regime, 'trend_4h': trend_4h, # 使用技术分析用的收盘价作为“信号价格”,同时附带 24h ticker 价格 'signal_price': float(current_price), 'ticker_price': float(ticker_price) if ticker_price is not None else None, 'ema20': float(ema20) if ema20 is not None else None, 'ema50': float(ema50) if ema50 is not None else None, 'ema20_4h': float(ema20_4h) if ema20_4h is not None else None, 'price_4h': float(price_4h) if price_4h is not None else None, 'last_kline_time': last_kline_time, 'created_at_ts': int(ticker_ts or 0), } ttl_sec = int(config.TRADING_CONFIG.get('TREND_STATE_TTL_SEC', 3600) or 3600) if self.client and getattr(self.client, "redis_cache", None): try: await self.client.redis_cache.set(trend_state_key, trend_state_value, ttl=ttl_sec) logger.debug(f"{symbol} 趋势状态已缓存: dir={direction}, strength={signal_strength}, price={current_price:.6f}") except Exception as e: logger.debug(f"{symbol} 缓存趋势状态失败: {e}") except Exception as e: logger.debug(f"{symbol} 处理趋势状态缓存时出错: {e}") return { 'symbol': symbol, # 技术分析使用的价格(K线收盘价) 'price': current_price, 'kline_close_price': current_price, # 展示用“当前价”(24h ticker 最新价,通常更贴近用户的直觉) 'ticker_price': ticker_price, 'ticker_ts': ticker_ts, 'prevPrice': prev_price, # 1) 主周期涨跌幅(用于内部信号) 'kline_change_percent': change_percent, # 2) 24h涨跌幅(用于前端展示更符合“24h涨跌”的文案) 'changePercent': float(ticker.get('changePercent', 0) or 0), 'volume24h': ticker.get('volume', 0), 'direction': 'UP' if change_percent > 0 else 'DOWN', 'rsi': rsi, 'macd': macd, 'bollinger': bollinger, 'atr': atr, 'ema20': ema20, 'ema50': ema50, 'ema20_4h': ema20_4h, # 4H周期EMA20,用于多周期共振 'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格 'marketRegime': market_regime, 'signalScore': signal_score, # 保留用于兼容性 'signal_strength': signal_strength, # ⚠️ 2026-01-27优化:添加真实的信号强度 'trend_4h': trend_4h, # 4H趋势方向 'klines': klines[-10:], # 保留最近10根K线 'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线 } except Exception as e: logger.debug(f"获取 {symbol} 数据失败: {e}") return None def get_top_symbols(self) -> List[Dict]: """ 获取当前扫描到的前N个货币对 Returns: 前N个货币对列表 """ return self.top_symbols async def monitor_price(self, symbol: str, callback) -> None: """ 监控单个交易对的价格变化(WebSocket) Args: symbol: 交易对 callback: 价格变化回调函数 """ # 使用标准WebSocket try: import aiohttp import json # 直接使用 aiohttp 连接 Binance 期货 WebSocket API # 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams # 端点:wss://fstream.binance.com/ws/@ticker ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker" async with aiohttp.ClientSession() as session: async with session.ws_connect(ws_url) as ws: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: try: # 解析 JSON 消息 data = json.loads(msg.data) # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} if isinstance(data, dict): if 'c' in data: # 'c' 是当前价格 price = float(data['c']) await callback(symbol, price) elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']: price = float(data['data']['c']) await callback(symbol, price) except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e: logger.debug(f"解析 {symbol} 价格数据失败: {e}") continue elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSE: logger.info(f"监控 {symbol} WebSocket连接关闭") break except Exception as e: logger.error(f"监控 {symbol} 价格失败: {e}") def get_realtime_price(self, symbol: str) -> Optional[float]: """ 获取实时价格(从缓存) Args: symbol: 交易对 Returns: 实时价格 """ if self.client: return self.client.get_realtime_price(symbol) return None