""" trend_4h 缓存模块 基于已有 WS K 线缓存,将计算得到的 trend_4h(up/down/neutral)写入 Redis, 供 strategy、market_scanner 等复用,减少重复 EMA/MACD 计算。 Redis Key: trend_4h:{symbol},TTL 10 分钟。 """ import logging from typing import Optional, Dict, Any logger = logging.getLogger(__name__) KEY_PREFIX = "trend_4h:" try: from .redis_ttl import TTL_TREND_4H except ImportError: TTL_TREND_4H = 600 # 10 分钟 def _judge_trend_4h( price_4h: float, ema20_4h: Optional[float], ema50_4h: Optional[float], macd_4h: Optional[Dict], ) -> str: """ 多指标投票判断 4H 趋势(与 strategy._judge_trend_4h 一致)。 Returns: 'up' | 'down' | 'neutral' """ if ema20_4h is None: return 'neutral' score = 0 if price_4h > ema20_4h: score += 1 elif price_4h < ema20_4h: score -= 1 if ema50_4h is not None: if ema20_4h > ema50_4h: score += 1 elif ema20_4h < ema50_4h: score -= 1 if macd_4h and isinstance(macd_4h, dict): macd_hist = macd_4h.get('histogram', 0) if macd_hist > 0: score += 1 elif macd_hist < 0: score -= 1 if score >= 2: return 'up' if score <= -2: return 'down' return 'neutral' async def get_trend_4h_cached( redis_cache: Any, symbol: str, price_4h: float, ema20_4h: Optional[float], ema50_4h: Optional[float], macd_4h: Optional[Dict], ttl_sec: int = None, ) -> str: """ 优先从 Redis 读取 trend_4h,未命中则计算并写入缓存。 Args: redis_cache: RedisCache 实例(可为 None,则直接计算不缓存) symbol: 交易对 price_4h, ema20_4h, ema50_4h, macd_4h: 计算 trend_4h 所需输入 ttl_sec: 缓存 TTL,默认使用 redis_ttl.TTL_TREND_4H(10 分钟) Returns: 'up' | 'down' | 'neutral' """ if ttl_sec is None: ttl_sec = TTL_TREND_4H key = f"{KEY_PREFIX}{symbol}" if redis_cache: try: await redis_cache.connect() cached = await redis_cache.get(key) if cached and cached in ('up', 'down', 'neutral'): return cached except Exception as e: logger.debug(f"trend_4h 缓存读取失败 {symbol}: {e}") trend = _judge_trend_4h(price_4h, ema20_4h, ema50_4h, macd_4h) if redis_cache: try: await redis_cache.set(key, trend, ttl=ttl_sec) except Exception as e: logger.debug(f"trend_4h 缓存写入失败 {symbol}: {e}") return trend