diff --git a/trading_system/signal_filters.py b/trading_system/signal_filters.py new file mode 100644 index 0000000..344fd80 --- /dev/null +++ b/trading_system/signal_filters.py @@ -0,0 +1,215 @@ +""" +信号过滤插件层 + +目标: +- 将零散的信号过滤规则抽象为可插拔的「过滤器」,便于后续演进(新规则 / 机器学习模型)。 +- 第一阶段:在不大幅改动既有策略的前提下,增加一层统一的信号过滤入口。 + +设计原则: +- 只做「进一步过滤/收紧」,不绕过任何已有风控与止损规则。 +- 过滤失败时只会减少开仓次数,不会放宽任何限制。 +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Optional + +from . import config + + +@dataclass +class SignalFilterContext: + """ + 信号过滤上下文 + + 说明: + - symbol_info 与 trade_signal 来自现有 strategy / scanner 逻辑,不做修改。 + - 仅在需要时读取 config.TRADING_CONFIG,避免在过滤层重复维护配置。 + """ + + symbol: str + symbol_info: Dict + trade_signal: Dict + market_regime: Optional[str] = None + + +@dataclass +class SignalFilterResult: + """信号过滤结果""" + + accepted: bool + reasons: List[str] + + +async def apply_signal_filters(ctx: SignalFilterContext) -> SignalFilterResult: + """ + 应用所有已启用的信号过滤规则。 + + 第一阶段: + - 引入「最小波动率」过滤:在波动极低(接近死币)时不自动开仓,仅保留推荐。 + - 其它过滤(资金费率、流动性、RSI 等)仍由现有 strategy 逻辑负责。 + + 后续阶段预留: + - 可在此处串联更多插件式过滤器(如多周期趋势评分、ML 评分过滤等)。 + """ + reasons: List[str] = [] + accepted = True + + # 1) 波动率过滤:避免在接近「死币」的环境下做趋势单 + if not _filter_by_volatility(ctx, reasons): + accepted = False + + # 2) 严格成交量过滤:在自动交易阶段只做流动性更好的标的 + if accepted and not _filter_by_strict_volume(ctx, reasons): + accepted = False + + # 3) 按市场状态提高最低信号强度:ranging/unknown 环境下只做更强信号 + if accepted and not _filter_by_regime_and_signal_strength(ctx, reasons): + accepted = False + + # TODO(后续阶段):在此处串联更多过滤器,例如: + # - _filter_by_multi_timeframe_trend(ctx, reasons) + # - await _filter_by_ml_score(ctx, reasons) + + return SignalFilterResult(accepted=accepted, reasons=reasons) + + +def _filter_by_volatility(ctx: SignalFilterContext, reasons: List[str]) -> bool: + """ + 基于主周期涨跌幅的简单波动率过滤。 + + 实现细节: + - 使用 scanner 产出的 kline_change_percent(主周期涨跌幅)作为代理波动率。 + - MIN_VOLATILITY 采用比例表示:0.03 = 3%。 + - 仅当有明确 should_trade 信号时才应用此过滤,避免影响推荐展示。 + """ + if not ctx.trade_signal.get("should_trade"): + return True + + try: + min_volatility = float(config.TRADING_CONFIG.get("MIN_VOLATILITY", 0.0) or 0.0) + except Exception: + min_volatility = 0.0 + + if min_volatility <= 0: + return True + + change_pct = ctx.symbol_info.get("kline_change_percent") + if change_pct is None: + # 无主周期涨跌数据时不强制过滤,仅记录调试信息可在后续完善 + return True + + try: + change_pct_val = float(change_pct) + except (TypeError, ValueError): + return True + + # kline_change_percent 为百分数(如 3.5 表示 +3.5%) + realized_vol = abs(change_pct_val) / 100.0 + if realized_vol < min_volatility: + reasons.append( + f"❌ 波动率过滤:主周期波动率≈{realized_vol:.3%} < 最小要求 {min_volatility:.3%}" + ) + return False + + return True + + +def _filter_by_strict_volume(ctx: SignalFilterContext, reasons: List[str]) -> bool: + """ + 严格成交量过滤:在自动交易阶段只做 24h 成交额更高的一批标的。 + + 说明: + - 使用 MIN_VOLUME_24H_STRICT(默认 5000 万 USDT)作为更严格的流动性门槛。 + - 仅当 should_trade=True 时生效,不影响推荐与扫描排序。 + """ + if not ctx.trade_signal.get("should_trade"): + return True + + volume_24h = ctx.symbol_info.get("volume24h") + if volume_24h is None: + return True + + try: + volume_val = float(volume_24h) + except (TypeError, ValueError): + return True + + try: + strict_min = float(config.TRADING_CONFIG.get("MIN_VOLUME_24H_STRICT", 0) or 0) + except Exception: + strict_min = 0.0 + + if strict_min <= 0: + return True + + if volume_val < strict_min: + reasons.append( + f"❌ 严格成交量过滤:24h成交额≈{volume_val:,.0f} < 严格下限 {strict_min:,.0f}" + ) + return False + + return True + + +def _filter_by_regime_and_signal_strength(ctx: SignalFilterContext, reasons: List[str]) -> bool: + """ + 按市场状态提高最低信号强度。 + + 设计: + - 在 trending 状态下沿用原有 MIN_SIGNAL_STRENGTH 逻辑。 + - 在 ranging/unknown 状态下,即使通过基础门槛,也要求信号更强才放行。 + 这样可以减少边缘信号在“脏行情”里的出手次数。 + """ + if not ctx.trade_signal.get("should_trade"): + return True + + direction = ctx.trade_signal.get("direction") + if direction not in ("BUY", "SELL"): + return True + + try: + strength = int(ctx.trade_signal.get("strength") or 0) + except (TypeError, ValueError): + strength = 0 + + mr = (ctx.market_regime or "").strip().lower() if ctx.market_regime else "" + + # 默认仅对 ranging/unknown 提升门槛;门槛值可配置 + try: + extra_ranging = int(config.TRADING_CONFIG.get("RANGING_EXTRA_MIN_STRENGTH", 0) or 0) + extra_unknown = int(config.TRADING_CONFIG.get("UNKNOWN_EXTRA_MIN_STRENGTH", 0) or 0) + base_min = int(config.TRADING_CONFIG.get("MIN_SIGNAL_STRENGTH", 7) or 7) + except Exception: + extra_ranging = 0 + extra_unknown = 0 + base_min = 7 + + target_min = base_min + bucket = None + if mr == "ranging" and extra_ranging > 0: + target_min = max(target_min, base_min + extra_ranging) + bucket = "震荡市" + elif mr not in ("trending", "ranging") and extra_unknown > 0: + target_min = max(target_min, base_min + extra_unknown) + bucket = "未知市况" + + if bucket and strength < target_min: + reasons.append( + f"❌ {bucket}信号强度过滤:强度={strength}/10 < 有效门槛 {target_min}/10" + ) + return False + + return True + + +async def apply_ml_score_filter_placeholder(ctx: SignalFilterContext) -> SignalFilterResult: + """ + 预留的 ML 评分过滤入口(占位实现)。 + + 说明: + - 当前实现总是返回接受,且不修改交易行为。 + - 后续可在此处加载离线训练好的模型,对特征向量打分并决定是否放行。 + """ + return SignalFilterResult(accepted=True, reasons=[]) + diff --git a/trading_system/strategy.py b/trading_system/strategy.py index 9b8559c..8bf8ec8 100644 --- a/trading_system/strategy.py +++ b/trading_system/strategy.py @@ -11,6 +11,7 @@ try: from .market_scanner import MarketScanner from .risk_manager import RiskManager from .position_manager import PositionManager + from .signal_filters import SignalFilterContext, apply_signal_filters from . import config from .symbol_policy import resolve_symbol_trading_policy except ImportError: @@ -18,6 +19,7 @@ except ImportError: from market_scanner import MarketScanner from risk_manager import RiskManager from position_manager import PositionManager + from signal_filters import SignalFilterContext, apply_signal_filters import config from symbol_policy import resolve_symbol_trading_policy @@ -213,7 +215,32 @@ class TradingStrategy: f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过自动交易" ) continue - + + # 统一的信号过滤插件层(第一阶段:波动率等「进一步收紧」过滤) + try: + sf_ctx = SignalFilterContext( + symbol=symbol, + symbol_info=symbol_info, + trade_signal=trade_signal, + market_regime=market_regime, + ) + sf_result = await apply_signal_filters(sf_ctx) + if not sf_result.accepted: + # 将过滤原因拼接到 trade_signal 的 reason 里,便于后续分析 + extra_reason = " | ".join(sf_result.reasons) if sf_result.reasons else "信号过滤未通过" + base_reason = trade_signal.get("reason") or "" + combined_reason = f"{base_reason} | {extra_reason}" if base_reason else extra_reason + trade_signal["reason"] = combined_reason + logger.info(f\"{symbol} 信号过滤未通过,跳过自动交易: {combined_reason}\") + continue + elif sf_result.reasons: + # 若过滤层仅追加了评语(accepted=True),也拼接到 reason 便于排查 + extra_reason = " | ".join(sf_result.reasons) + base_reason = trade_signal.get("reason") or "" + trade_signal["reason"] = f"{base_reason} | {extra_reason}" if base_reason else extra_reason + except Exception as e: + logger.warning(f\"{symbol} 应用信号过滤插件时出错(忽略,回退为原逻辑): {e}\") + # 确定交易方向(基于技术指标) trade_direction = trade_signal['direction'] # 4H 下跌禁止开多(熊市/保守方案:避免逆势抄底)