auto_trade_sys/trading_system/shadow_mode.py
2026-03-21 09:23:53 +08:00

227 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
影子模式半自动化:读取 config/current_suggestions.json在开仓前应用黑名单/差时段/杠杆倍数建议。
与 aggregate_trade_stats 的 STATS_* 互补:本模块面向 daily_report / 影子分析产出的 JSON。
"""
from __future__ import annotations
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# 项目根目录trading_system/ 的上一级)
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
def _project_root() -> Path:
return _PROJECT_ROOT
def _safe_read_json(path: Path) -> Optional[dict]:
try:
if not path.is_file():
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.debug(f"读取 JSON 失败 {path}: {e}")
return None
_suggestions_cache: Tuple[float, Optional[dict]] = (0.0, None)
_tracking_cache: Tuple[float, Optional[dict]] = (0.0, None)
_CACHE_TTL_SEC = 45.0
def _get_suggestions(trading_config: dict) -> dict:
global _suggestions_cache
path_str = trading_config.get("SHADOW_MODE_SUGGESTIONS_PATH") or "config/current_suggestions.json"
path = Path(path_str)
if not path.is_absolute():
path = _project_root() / path
now = time.time()
if _suggestions_cache[1] is not None and now - _suggestions_cache[0] < _CACHE_TTL_SEC:
return _suggestions_cache[1]
data = _safe_read_json(path)
if data is None:
data = {
"blacklist": [],
"increase_position": [],
"decrease_position": [],
"worst_hours": [],
}
_suggestions_cache = (now, data)
return data
def _get_tracking_confidence(trading_config: dict) -> float:
global _tracking_cache
path_str = trading_config.get("SHADOW_MODE_TRACKING_PATH") or "config/shadow_mode_tracking.json"
path = Path(path_str)
if not path.is_absolute():
path = _project_root() / path
now = time.time()
if _tracking_cache[1] is not None and now - _tracking_cache[0] < _CACHE_TTL_SEC:
data = _tracking_cache[1]
else:
data = _safe_read_json(path) or {}
_tracking_cache = (now, data)
# 兼容多种字段名
for key in ("accuracy", "rolling_accuracy", "confidence", "min_confidence"):
v = data.get(key)
if v is not None:
try:
return float(v)
except (TypeError, ValueError):
pass
return 1.0
def _norm_symbol(s: str) -> str:
return (s or "").strip().upper()
def _find_blacklist_entry(suggestions: dict, symbol: str) -> Optional[dict]:
sym = _norm_symbol(symbol)
for item in suggestions.get("blacklist") or []:
if not isinstance(item, dict):
continue
if _norm_symbol(str(item.get("symbol", ""))) == sym:
return item
return None
def _in_worst_hours(suggestions: dict, hour_bj: int) -> Optional[dict]:
for item in suggestions.get("worst_hours") or []:
if not isinstance(item, dict):
continue
h = item.get("hour")
try:
hi = int(h)
except (TypeError, ValueError):
continue
if hi == int(hour_bj):
return item
return None
def _position_list_has(suggestions: dict, key: str, symbol: str) -> Optional[dict]:
sym = _norm_symbol(symbol)
for item in suggestions.get(key) or []:
if not isinstance(item, dict):
continue
if _norm_symbol(str(item.get("symbol", ""))) == sym:
return item
return None
def evaluate_shadow_mode(
symbol: str,
hour_bj: int,
trading_config: dict,
) -> Dict[str, Any]:
"""
评估影子模式:是否允许自动开仓、杠杆乘子、写入 entry_context 的 shadow_mode_applied。
当 SHADOW_MODE_AUTO_APPLY=False 或置信度不足时:不拦截、不调整杠杆,但标记 skipped。
"""
auto = bool(trading_config.get("SHADOW_MODE_AUTO_APPLY", False))
min_conf = float(trading_config.get("SHADOW_MODE_MIN_CONFIDENCE", 0.7) or 0.0)
min_conf = max(0.0, min(1.0, min_conf))
inc_mul = float(trading_config.get("SHADOW_MODE_INCREASE_LEVERAGE_MULT", 1.5) or 1.5)
dec_mul = float(trading_config.get("SHADOW_MODE_DECREASE_LEVERAGE_MULT", 0.5) or 0.5)
inc_mul = max(0.1, min(inc_mul, 3.0))
dec_mul = max(0.1, min(dec_mul, 2.0))
applied = {
"blacklist_check": "skipped",
"hour_check": "skipped",
"position_adjustment": "none",
"suggestion_source": "current_suggestions.json",
}
if not auto:
return {
"allowed": True,
"skip_reason": None,
"leverage_multiplier": 1.0,
"shadow_mode_applied": {**applied, "reason": "SHADOW_MODE_AUTO_APPLY=false"},
}
conf = _get_tracking_confidence(trading_config)
if conf < min_conf:
logger.info(
f"影子模式:跟踪置信度 {conf:.2f} < 最小要求 {min_conf:.2f},本笔不自动应用建议"
)
return {
"allowed": True,
"skip_reason": None,
"leverage_multiplier": 1.0,
"shadow_mode_applied": {
**applied,
"reason": f"confidence {conf:.2f} < {min_conf:.2f}",
},
}
suggestions = _get_suggestions(trading_config)
# 黑名单
bl = _find_blacklist_entry(suggestions, symbol)
if bl is not None:
applied["blacklist_check"] = "blocked"
reason = bl.get("reason") or "shadow blacklist"
return {
"allowed": False,
"skip_reason": reason,
"leverage_multiplier": 1.0,
"shadow_mode_applied": {**applied, "blacklist_detail": bl},
}
applied["blacklist_check"] = "passed"
# 差时段
wh = _in_worst_hours(suggestions, hour_bj)
if wh is not None:
applied["hour_check"] = "blocked"
avg_pnl = wh.get("avg_pnl", wh.get("avgPnl", ""))
return {
"allowed": False,
"skip_reason": f"worst_hour_{hour_bj}",
"leverage_multiplier": 1.0,
"shadow_mode_applied": {**applied, "worst_hour_detail": wh},
"log_avg_pnl": avg_pnl,
}
applied["hour_check"] = "passed"
# 杠杆:减仓优先于加仓(同时命中时更保守)
mult = 1.0
dec_item = _position_list_has(suggestions, "decrease_position", symbol)
inc_item = _position_list_has(suggestions, "increase_position", symbol)
if dec_item is not None:
mult *= dec_mul
applied["position_adjustment"] = "decrease"
applied["decrease_detail"] = dec_item
elif inc_item is not None:
mult *= inc_mul
applied["position_adjustment"] = "increase"
applied["increase_detail"] = inc_item
return {
"allowed": True,
"skip_reason": None,
"leverage_multiplier": mult,
"shadow_mode_applied": applied,
}
def invalidate_cache() -> None:
"""配置热更新时可调用(可选)。"""
global _suggestions_cache, _tracking_cache
_suggestions_cache = (0.0, None)
_tracking_cache = (0.0, None)