""" 市场节奏自动识别:根据当前波动率/成交量判断「低波动期」与「正常期」, 并自动切换参数(仓位、信号门槛、资金费率阈值、时间止损等),无需手动切换。 """ import logging import time from typing import Optional, Any logger = logging.getLogger(__name__) REDIS_KEY_REGIME = "market_regime" REDIS_KEY_UPDATED_AT = "market_regime_updated_at" REGIME_LOW = "low_volatility" REGIME_NORMAL = "normal" TTL_SEC = 3600 # 1 小时内无更新则视为过期,策略侧回退为 normal def _get_redis(): """获取 Redis 客户端(与 binance_client 一致)。""" try: from . import config from .redis_cache import RedisCache rc = RedisCache( redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"), use_tls=getattr(config, "REDIS_USE_TLS", False), ssl_cert_reqs=getattr(config, "REDIS_SSL_CERT_REQS", "required"), ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None), username=getattr(config, "REDIS_USERNAME", None), password=getattr(config, "REDIS_PASSWORD", None), ) return rc except Exception as e: logger.debug("market_regime Redis 不可用: %s", e) return None async def update_regime_from_scan( change_percents: list, redis_cache: Any = None, low_volatility_threshold: float = 2.5, ) -> str: """ 根据本次扫描得到的 24h 涨跌幅列表,计算中位数绝对波动,更新市场节奏并写入 Redis。 change_percents: 各 symbol 的 24h changePercent 列表(可含正负) 返回当前写入的 regime:low_volatility 或 normal。 """ if not change_percents: return REGIME_NORMAL abs_changes = [abs(float(x)) for x in change_percents if x is not None] if not abs_changes: return REGIME_NORMAL abs_changes.sort() n = len(abs_changes) median_abs = abs_changes[n // 2] if n % 2 else (abs_changes[n // 2 - 1] + abs_changes[n // 2]) / 2 regime = REGIME_LOW if median_abs < low_volatility_threshold else REGIME_NORMAL cache = redis_cache or _get_redis() if cache: try: await cache.connect() await cache.set(REDIS_KEY_REGIME, regime, ttl=TTL_SEC) await cache.set(REDIS_KEY_UPDATED_AT, str(int(time.time())), ttl=TTL_SEC) logger.info(f"市场节奏: median_24h_change={median_abs:.2f}% -> {regime} (阈值={low_volatility_threshold}%)") except Exception as e: logger.debug("写入 market_regime 失败: %s", e) return regime async def get_regime(redis_cache: Any = None) -> str: """从 Redis 读取当前市场节奏;无或过期则返回 normal。""" cache = redis_cache or _get_redis() if not cache: return REGIME_NORMAL try: await cache.connect() r = await cache.get(REDIS_KEY_REGIME) if r in (REGIME_LOW, REGIME_NORMAL): return r ts = await cache.get(REDIS_KEY_UPDATED_AT) if ts and (time.time() - int(ts)) > TTL_SEC: return REGIME_NORMAL except Exception as e: logger.debug("读取 market_regime 失败: %s", e) return REGIME_NORMAL def get_effective_config(key: str, default: Any = None, account_id: int = None) -> Any: """ 根据当前市场节奏返回「有效配置」:若开启自动节奏且当前为低波动期, 则优先返回 LOW_VOLATILITY_,否则返回 TRADING_CONFIG[key]。 用于仓位、信号门槛、资金费率阈值、时间止损等。 """ try: from . import config except ImportError: import config cfg = getattr(config, "TRADING_CONFIG", None) or {} auto = cfg.get("MARKET_REGIME_AUTO") if not auto: return cfg.get(key, default) low_key = f"LOW_VOLATILITY_{key}" if low_key not in cfg: return cfg.get(key, default) # 同步读 Redis 会阻塞;这里用缓存避免每次读 Redis。策略层在每轮扫描后已更新 regime,本进程内可用内存缓存 regime = _cached_regime if regime == REGIME_LOW: val = cfg.get(low_key) if val is not None: return val return cfg.get(key, default) # 进程内缓存:由扫描器在每次 scan 结束后异步更新,get_effective_config 同步读 _cached_regime: str = REGIME_NORMAL def set_cached_regime(regime: str) -> None: global _cached_regime _cached_regime = regime if regime in (REGIME_LOW, REGIME_NORMAL) else REGIME_NORMAL def get_cached_regime() -> str: return _cached_regime