auto_trade_sys/trading_system/market_regime.py
薇薇安 48c3f946cc feat(config, market_scanner, position_manager, strategy): 引入市场节奏自动识别与流动性检查功能
在 `config.py` 中新增市场节奏自动识别配置,支持低波动期参数切换。更新 `market_scanner.py` 以根据市场波动情况动态调整策略,并在扫描时计算中位数以判断市场状态。同时,在 `position_manager.py` 中实现时间止损逻辑,确保在低波动期内有效管理持仓。新增流动性检查功能于 `strategy.py`,在开仓前评估市场深度与价差,提升交易决策的准确性与风险控制能力。
2026-02-17 10:41:47 +08:00

123 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场节奏自动识别:根据当前波动率/成交量判断「低波动期」与「正常期」,
并自动切换参数(仓位、信号门槛、资金费率阈值、时间止损等),无需手动切换。
"""
import logging
import time
from typing import Optional, Any
logger = logging.getLogger(__name__)
REDIS_KEY_REGIME = "market_regime"
REDIS_KEY_UPDATED_AT = "market_regime_updated_at"
REGIME_LOW = "low_volatility"
REGIME_NORMAL = "normal"
TTL_SEC = 3600 # 1 小时内无更新则视为过期,策略侧回退为 normal
def _get_redis():
"""获取 Redis 客户端(与 binance_client 一致)。"""
try:
from . import config
from .redis_cache import RedisCache
rc = RedisCache(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=getattr(config, "REDIS_USE_TLS", False),
ssl_cert_reqs=getattr(config, "REDIS_SSL_CERT_REQS", "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
)
return rc
except Exception as e:
logger.debug("market_regime Redis 不可用: %s", e)
return None
async def update_regime_from_scan(
change_percents: list,
redis_cache: Any = None,
low_volatility_threshold: float = 2.5,
) -> str:
"""
根据本次扫描得到的 24h 涨跌幅列表,计算中位数绝对波动,更新市场节奏并写入 Redis。
change_percents: 各 symbol 的 24h changePercent 列表(可含正负)
返回当前写入的 regimelow_volatility 或 normal。
"""
if not change_percents:
return REGIME_NORMAL
abs_changes = [abs(float(x)) for x in change_percents if x is not None]
if not abs_changes:
return REGIME_NORMAL
abs_changes.sort()
n = len(abs_changes)
median_abs = abs_changes[n // 2] if n % 2 else (abs_changes[n // 2 - 1] + abs_changes[n // 2]) / 2
regime = REGIME_LOW if median_abs < low_volatility_threshold else REGIME_NORMAL
cache = redis_cache or _get_redis()
if cache:
try:
await cache.connect()
await cache.set(REDIS_KEY_REGIME, regime, ttl=TTL_SEC)
await cache.set(REDIS_KEY_UPDATED_AT, str(int(time.time())), ttl=TTL_SEC)
logger.info(f"市场节奏: median_24h_change={median_abs:.2f}% -> {regime} (阈值={low_volatility_threshold}%)")
except Exception as e:
logger.debug("写入 market_regime 失败: %s", e)
return regime
async def get_regime(redis_cache: Any = None) -> str:
"""从 Redis 读取当前市场节奏;无或过期则返回 normal。"""
cache = redis_cache or _get_redis()
if not cache:
return REGIME_NORMAL
try:
await cache.connect()
r = await cache.get(REDIS_KEY_REGIME)
if r in (REGIME_LOW, REGIME_NORMAL):
return r
ts = await cache.get(REDIS_KEY_UPDATED_AT)
if ts and (time.time() - int(ts)) > TTL_SEC:
return REGIME_NORMAL
except Exception as e:
logger.debug("读取 market_regime 失败: %s", e)
return REGIME_NORMAL
def get_effective_config(key: str, default: Any = None, account_id: int = None) -> Any:
"""
根据当前市场节奏返回「有效配置」:若开启自动节奏且当前为低波动期,
则优先返回 LOW_VOLATILITY_<key>,否则返回 TRADING_CONFIG[key]。
用于仓位、信号门槛、资金费率阈值、时间止损等。
"""
try:
from . import config
except ImportError:
import config
cfg = getattr(config, "TRADING_CONFIG", None) or {}
auto = cfg.get("MARKET_REGIME_AUTO")
if not auto:
return cfg.get(key, default)
low_key = f"LOW_VOLATILITY_{key}"
if low_key not in cfg:
return cfg.get(key, default)
# 同步读 Redis 会阻塞;这里用缓存避免每次读 Redis。策略层在每轮扫描后已更新 regime本进程内可用内存缓存
regime = _cached_regime
if regime == REGIME_LOW:
val = cfg.get(low_key)
if val is not None:
return val
return cfg.get(key, default)
# 进程内缓存:由扫描器在每次 scan 结束后异步更新get_effective_config 同步读
_cached_regime: str = REGIME_NORMAL
def set_cached_regime(regime: str) -> None:
global _cached_regime
_cached_regime = regime if regime in (REGIME_LOW, REGIME_NORMAL) else REGIME_NORMAL
def get_cached_regime() -> str:
return _cached_regime