feat(strategy): Implement signal filtering mechanism for trade signals

Added a unified signal filtering layer to the trading strategy, allowing for enhanced analysis of trade signals based on market conditions. The filtering process includes logging reasons for acceptance or rejection, improving transparency and decision-making in automated trading. Exception handling is also implemented to ensure robustness during signal filtering application.
This commit is contained in:
薇薇安 2026-03-12 19:11:06 +08:00
parent 199c4a95dd
commit d68d3ad66c
2 changed files with 243 additions and 1 deletions

View File

@ -0,0 +1,215 @@
"""
信号过滤插件层
目标
- 将零散的信号过滤规则抽象为可插拔的过滤器便于后续演进新规则 / 机器学习模型
- 第一阶段在不大幅改动既有策略的前提下增加一层统一的信号过滤入口
设计原则
- 只做进一步过滤/收紧不绕过任何已有风控与止损规则
- 过滤失败时只会减少开仓次数不会放宽任何限制
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional
from . import config
@dataclass
class SignalFilterContext:
"""
信号过滤上下文
说明
- symbol_info trade_signal 来自现有 strategy / scanner 逻辑不做修改
- 仅在需要时读取 config.TRADING_CONFIG避免在过滤层重复维护配置
"""
symbol: str
symbol_info: Dict
trade_signal: Dict
market_regime: Optional[str] = None
@dataclass
class SignalFilterResult:
"""信号过滤结果"""
accepted: bool
reasons: List[str]
async def apply_signal_filters(ctx: SignalFilterContext) -> SignalFilterResult:
"""
应用所有已启用的信号过滤规则
第一阶段
- 引入最小波动率过滤在波动极低接近死币时不自动开仓仅保留推荐
- 其它过滤资金费率流动性RSI 仍由现有 strategy 逻辑负责
后续阶段预留
- 可在此处串联更多插件式过滤器如多周期趋势评分ML 评分过滤等
"""
reasons: List[str] = []
accepted = True
# 1) 波动率过滤:避免在接近「死币」的环境下做趋势单
if not _filter_by_volatility(ctx, reasons):
accepted = False
# 2) 严格成交量过滤:在自动交易阶段只做流动性更好的标的
if accepted and not _filter_by_strict_volume(ctx, reasons):
accepted = False
# 3) 按市场状态提高最低信号强度ranging/unknown 环境下只做更强信号
if accepted and not _filter_by_regime_and_signal_strength(ctx, reasons):
accepted = False
# TODO(后续阶段):在此处串联更多过滤器,例如:
# - _filter_by_multi_timeframe_trend(ctx, reasons)
# - await _filter_by_ml_score(ctx, reasons)
return SignalFilterResult(accepted=accepted, reasons=reasons)
def _filter_by_volatility(ctx: SignalFilterContext, reasons: List[str]) -> bool:
"""
基于主周期涨跌幅的简单波动率过滤
实现细节
- 使用 scanner 产出的 kline_change_percent主周期涨跌幅作为代理波动率
- MIN_VOLATILITY 采用比例表示0.03 = 3%
- 仅当有明确 should_trade 信号时才应用此过滤避免影响推荐展示
"""
if not ctx.trade_signal.get("should_trade"):
return True
try:
min_volatility = float(config.TRADING_CONFIG.get("MIN_VOLATILITY", 0.0) or 0.0)
except Exception:
min_volatility = 0.0
if min_volatility <= 0:
return True
change_pct = ctx.symbol_info.get("kline_change_percent")
if change_pct is None:
# 无主周期涨跌数据时不强制过滤,仅记录调试信息可在后续完善
return True
try:
change_pct_val = float(change_pct)
except (TypeError, ValueError):
return True
# kline_change_percent 为百分数(如 3.5 表示 +3.5%
realized_vol = abs(change_pct_val) / 100.0
if realized_vol < min_volatility:
reasons.append(
f"❌ 波动率过滤:主周期波动率≈{realized_vol:.3%} < 最小要求 {min_volatility:.3%}"
)
return False
return True
def _filter_by_strict_volume(ctx: SignalFilterContext, reasons: List[str]) -> bool:
"""
严格成交量过滤在自动交易阶段只做 24h 成交额更高的一批标的
说明
- 使用 MIN_VOLUME_24H_STRICT默认 5000 USDT作为更严格的流动性门槛
- 仅当 should_trade=True 时生效不影响推荐与扫描排序
"""
if not ctx.trade_signal.get("should_trade"):
return True
volume_24h = ctx.symbol_info.get("volume24h")
if volume_24h is None:
return True
try:
volume_val = float(volume_24h)
except (TypeError, ValueError):
return True
try:
strict_min = float(config.TRADING_CONFIG.get("MIN_VOLUME_24H_STRICT", 0) or 0)
except Exception:
strict_min = 0.0
if strict_min <= 0:
return True
if volume_val < strict_min:
reasons.append(
f"❌ 严格成交量过滤24h成交额≈{volume_val:,.0f} < 严格下限 {strict_min:,.0f}"
)
return False
return True
def _filter_by_regime_and_signal_strength(ctx: SignalFilterContext, reasons: List[str]) -> bool:
"""
按市场状态提高最低信号强度
设计
- trending 状态下沿用原有 MIN_SIGNAL_STRENGTH 逻辑
- ranging/unknown 状态下即使通过基础门槛也要求信号更强才放行
这样可以减少边缘信号在脏行情里的出手次数
"""
if not ctx.trade_signal.get("should_trade"):
return True
direction = ctx.trade_signal.get("direction")
if direction not in ("BUY", "SELL"):
return True
try:
strength = int(ctx.trade_signal.get("strength") or 0)
except (TypeError, ValueError):
strength = 0
mr = (ctx.market_regime or "").strip().lower() if ctx.market_regime else ""
# 默认仅对 ranging/unknown 提升门槛;门槛值可配置
try:
extra_ranging = int(config.TRADING_CONFIG.get("RANGING_EXTRA_MIN_STRENGTH", 0) or 0)
extra_unknown = int(config.TRADING_CONFIG.get("UNKNOWN_EXTRA_MIN_STRENGTH", 0) or 0)
base_min = int(config.TRADING_CONFIG.get("MIN_SIGNAL_STRENGTH", 7) or 7)
except Exception:
extra_ranging = 0
extra_unknown = 0
base_min = 7
target_min = base_min
bucket = None
if mr == "ranging" and extra_ranging > 0:
target_min = max(target_min, base_min + extra_ranging)
bucket = "震荡市"
elif mr not in ("trending", "ranging") and extra_unknown > 0:
target_min = max(target_min, base_min + extra_unknown)
bucket = "未知市况"
if bucket and strength < target_min:
reasons.append(
f"{bucket}信号强度过滤:强度={strength}/10 < 有效门槛 {target_min}/10"
)
return False
return True
async def apply_ml_score_filter_placeholder(ctx: SignalFilterContext) -> SignalFilterResult:
"""
预留的 ML 评分过滤入口占位实现
说明
- 当前实现总是返回接受且不修改交易行为
- 后续可在此处加载离线训练好的模型对特征向量打分并决定是否放行
"""
return SignalFilterResult(accepted=True, reasons=[])

View File

@ -11,6 +11,7 @@ try:
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .position_manager import PositionManager
from .signal_filters import SignalFilterContext, apply_signal_filters
from . import config
from .symbol_policy import resolve_symbol_trading_policy
except ImportError:
@ -18,6 +19,7 @@ except ImportError:
from market_scanner import MarketScanner
from risk_manager import RiskManager
from position_manager import PositionManager
from signal_filters import SignalFilterContext, apply_signal_filters
import config
from symbol_policy import resolve_symbol_trading_policy
@ -214,6 +216,31 @@ class TradingStrategy:
)
continue
# 统一的信号过滤插件层(第一阶段:波动率等「进一步收紧」过滤)
try:
sf_ctx = SignalFilterContext(
symbol=symbol,
symbol_info=symbol_info,
trade_signal=trade_signal,
market_regime=market_regime,
)
sf_result = await apply_signal_filters(sf_ctx)
if not sf_result.accepted:
# 将过滤原因拼接到 trade_signal 的 reason 里,便于后续分析
extra_reason = " | ".join(sf_result.reasons) if sf_result.reasons else "信号过滤未通过"
base_reason = trade_signal.get("reason") or ""
combined_reason = f"{base_reason} | {extra_reason}" if base_reason else extra_reason
trade_signal["reason"] = combined_reason
logger.info(f\"{symbol} 信号过滤未通过,跳过自动交易: {combined_reason}\")
continue
elif sf_result.reasons:
# 若过滤层仅追加了评语accepted=True也拼接到 reason 便于排查
extra_reason = " | ".join(sf_result.reasons)
base_reason = trade_signal.get("reason") or ""
trade_signal["reason"] = f"{base_reason} | {extra_reason}" if base_reason else extra_reason
except Exception as e:
logger.warning(f\"{symbol} 应用信号过滤插件时出错(忽略,回退为原逻辑): {e}\")
# 确定交易方向(基于技术指标)
trade_direction = trade_signal['direction']
# 4H 下跌禁止开多(熊市/保守方案:避免逆势抄底)