auto_trade_sys/trading_system/market_scanner.py
薇薇安 09edc4f57d feat(market_scanner): 优化回退信号逻辑以提升信号处理能力
在市场扫描逻辑中调整了回退信号的计算方式,降低了信号强度为零时的触发条件,并引入24小时涨跌幅作为方向判断依据。这一改动旨在增强策略的灵活性,确保在市场波动时能够提供更有效的交易建议,同时提升用户体验。
2026-02-25 13:45:36 +08:00

1037 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场扫描器 - 发现涨跌幅最大的前N个货币对并分析技术指标
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .indicators import TechnicalIndicators
from . import config
except ImportError:
from binance_client import BinanceClient
from indicators import TechnicalIndicators
import config
logger = logging.getLogger(__name__)
class MarketScanner:
"""市场扫描器类"""
def __init__(self, client: BinanceClient):
"""
初始化市场扫描器
Args:
client: 币安客户端
"""
self.client = client
self.top_symbols: List[Dict] = []
async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]:
"""
扫描市场找出涨跌幅最大的前N个货币对
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
Returns:
前N个货币对列表包含涨跌幅信息
"""
import time
self._scan_start_time = time.time()
# 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰
cfg = dict(config.TRADING_CONFIG or {})
if config_override and isinstance(config_override, dict):
cfg.update(config_override)
ns = (cache_namespace or "trade").strip() or "trade"
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
# 虽然中间数据K线、技术指标已经缓存但最终扫描结果不缓存
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
logger.info("开始扫描市场...")
# 获取所有USDT交易对
raw_symbols = await self.client.get_all_usdt_pairs()
all_symbols = [s for s in (raw_symbols or []) if s]
if raw_symbols and len(all_symbols) < len(raw_symbols):
logger.info(f"已过滤 {len(raw_symbols) - len(all_symbols)} 个非ASCII交易对保留 {len(all_symbols)}")
if not all_symbols:
logger.warning("未获取到交易对")
return []
# 根据配置限制扫描的交易对数量(确保为整数,避免 DB/JSON 返回 float 导致 slice 报错)
max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500)
try:
max_scan_symbols = int(max_scan_symbols) if max_scan_symbols is not None else 500
except (TypeError, ValueError):
max_scan_symbols = 500
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
symbols = all_symbols[:max_scan_symbols]
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols}")
else:
symbols = all_symbols
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
# 过滤主流币(山寨币策略应该排除主流币)
exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True)
if exclude_major_coins:
# 主流币列表市值排名前15的主流币
major_coins = {
'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT',
'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT',
'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT',
'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT'
}
symbols_before = len(symbols)
symbols = [s for s in symbols if s not in major_coins]
excluded_count = symbols_before - len(symbols)
if excluded_count > 0:
logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)")
# 优先从 Redis 读 24h ticker进程内不保留全量减轻内存无/过期再 REST
all_tickers = None
try:
try:
from .ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
except ImportError:
from ticker_24h_stream import (
get_tickers_24h_cache,
get_tickers_24h_from_redis,
is_ticker_24h_cache_fresh,
)
if is_ticker_24h_cache_fresh(max_age_sec=120.0):
redis_cache = getattr(self.client, "redis_cache", None)
if redis_cache:
all_tickers = await get_tickers_24h_from_redis(redis_cache)
else:
all_tickers = get_tickers_24h_cache()
if all_tickers:
logger.info(f"使用 24h ticker 缓存({len(all_tickers)} 个交易对,来自 Redis/WS跳过 REST 批量请求")
except Exception as e:
logger.debug(f"读取 24h ticker 缓存失败: {e}")
if not all_tickers:
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
all_tickers = await self.client.get_all_tickers_24h()
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
pre_filtered_symbols = []
for symbol in symbols:
ticker = all_tickers.get(symbol) if all_tickers else None
if ticker:
change_percent = abs(ticker.get('changePercent', 0))
volume = ticker.get('volume', 0)
if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and
volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])):
pre_filtered_symbols.append(symbol)
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}")
# 扫描阶段资金费率过滤:排除 |lastFundingRate| 过大的标的;低波动期使用更严阈值(有效配置)
scan_funding_enabled = cfg.get('SCAN_FUNDING_RATE_FILTER_ENABLED', False)
scan_funding_max_abs = float(config.get_effective_config('SCAN_FUNDING_RATE_MAX_ABS', 0.001))
if scan_funding_enabled and pre_filtered_symbols:
try:
premium_all = await self.client.get_premium_index(None) # 全量,一次请求
rate_map = {}
if isinstance(premium_all, list):
for item in premium_all:
if isinstance(item, dict) and 'symbol' in item:
try:
rate_map[item['symbol']] = float(item.get('lastFundingRate', 0))
except (TypeError, ValueError):
pass
elif isinstance(premium_all, dict) and premium_all.get('symbol'):
rate_map[premium_all['symbol']] = float(premium_all.get('lastFundingRate', 0))
if rate_map:
before = len(pre_filtered_symbols)
pre_filtered_symbols = [
s for s in pre_filtered_symbols
if abs(rate_map.get(s, 0)) <= scan_funding_max_abs
]
excluded = before - len(pre_filtered_symbols)
if excluded > 0:
logger.info(f"扫描阶段资金费率过滤: 排除 {excluded} 个标的 (|lastFundingRate| > {scan_funding_max_abs}),剩余 {len(pre_filtered_symbols)}")
except Exception as e:
logger.debug("扫描阶段资金费率过滤失败,继续使用原列表: %s", e)
# 方案 A扫描前预热 K 线(批量预订阅 WS + REST 预取),提高缓存命中率、减少分析超时
prewarm_enabled = cfg.get('SCAN_PREWARM_KLINE_ENABLED', True)
if prewarm_enabled and pre_filtered_symbols:
await self._prewarm_klines_for_scan(pre_filtered_symbols, cfg)
# 只对符合条件的交易对进行详细分析获取K线和技术指标
# 并发数由 SCAN_CONCURRENT_SYMBOLS 控制2 CPU 4G 多账号建议 2单账号可 35
concurrent = cfg.get('SCAN_CONCURRENT_SYMBOLS', 2)
try:
concurrent = max(1, min(10, int(concurrent)))
except (TypeError, ValueError):
concurrent = 2
semaphore = asyncio.Semaphore(concurrent)
analysis_timeout = float(cfg.get('SCAN_SYMBOL_ANALYSIS_TIMEOUT_SEC', 18) or 18)
if analysis_timeout < 5:
analysis_timeout = 5.0
elif analysis_timeout > 60:
analysis_timeout = 60.0
async def get_symbol_change_with_limit(symbol):
async with semaphore:
try:
# ⚠️ 优化:优先使用共享缓存,减少超时风险
result = await asyncio.wait_for(
self._get_symbol_change(symbol, all_tickers.get(symbol)),
timeout=analysis_timeout
)
return result
except asyncio.TimeoutError:
# ⚠️ 优化:超时时尝试返回降级结果(仅涨跌幅/成交量),而不是完全跳过
logger.warning(f"{symbol} 分析超时({analysis_timeout:.0f}秒),尝试返回降级结果...")
try:
ticker = all_tickers.get(symbol) if all_tickers else None
if ticker:
change_pct = float(ticker.get('changePercent', 0) or 0)
vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0)
price = float(ticker.get('price', 0) or ticker.get('lastPrice', 0) or 0)
if price > 0:
return {
'symbol': symbol,
'price': price,
'changePercent': change_pct,
'volume24h': vol,
'direction': 'UP' if change_pct > 0 else 'DOWN',
'signalScore': 0,
'signal_strength': 0,
}
except Exception as e:
logger.debug(f"{symbol} 降级结果构建失败: {e}")
logger.warning(f"{symbol} 分析超时且无法返回降级结果,跳过")
return None
except Exception as e:
logger.debug(f"{symbol} 分析出错: {e}")
return None
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤有效结果
valid_results = [
r for r in results
if isinstance(r, dict)
and r.get('changePercent') is not None
]
# ⚠️ 优化2成交量验证 - 24H Volume低于1000万美金直接剔除
min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金
min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])
# 使用更严格的成交量要求
min_volume = max(min_volume_strict, min_volume_normal)
# 过滤最小涨跌幅和成交量
filtered_results = [
r for r in valid_results
if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT'])
and r.get('volume24h', 0) >= min_volume
]
if min_volume_strict > min_volume_normal:
logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)")
# ⚠️ 2026-01-27优化按真实的signal_strength排序而不是简单的signalScore
# 优先考虑信号强度高的交易对8-10分提升胜率
sorted_results = sorted(
filtered_results,
key=lambda x: (
x.get('signal_strength', 0) * 100, # 信号强度权重最高乘以100确保优先级
x.get('signalScore', 0) * 10, # 其次考虑信号得分(兼容性)
abs(x['changePercent']) # 最后考虑涨跌幅
),
reverse=True
)
# 智能补单:返回 TOP_N + 额外候选数,当前 TOP_N 中部分因冷却等被跳过时,策略仍会尝试后续交易对,避免无单可下
top_n_val = cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])
extra = cfg.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', config.TRADING_CONFIG.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', 8))
try:
top_n_val = int(top_n_val) if top_n_val is not None else 20
extra = int(extra) if extra is not None else 8
except (TypeError, ValueError):
top_n_val, extra = 20, 8
take_count = min(len(sorted_results), top_n_val + extra)
top_n = sorted_results[:take_count]
self.top_symbols = top_n
# 市场节奏自动识别:用本次扫描的 24h 涨跌幅中位数判断低波动/正常,写入 Redis 并更新进程内缓存
try:
from . import market_regime
except ImportError:
market_regime = None
if market_regime and getattr(self.client, "redis_cache", None):
# 用过滤后的全部结果计算中位数(样本更多),无则用 top_n
source = filtered_results if filtered_results else top_n
change_percents = [r["changePercent"] for r in source if r.get("changePercent") is not None]
threshold = float(cfg.get("LOW_VOLATILITY_THRESHOLD", config.TRADING_CONFIG.get("LOW_VOLATILITY_THRESHOLD", 2.5)))
regime = await market_regime.update_regime_from_scan(
change_percents, self.client.redis_cache, low_volatility_threshold=threshold
)
market_regime.set_cached_regime(regime)
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
# 虽然中间数据K线、技术指标已经缓存但最终扫描结果不缓存
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
# 记录扫描性能(用于监控多用户时的系统压力)
scan_duration = time.time() - self._scan_start_time
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对,耗时 {scan_duration:.2f}")
if scan_duration > 60:
logger.warning(f"⚠️ 扫描耗时较长({scan_duration:.2f}秒),可能影响系统性能,建议检查缓存命中率")
# 记录扫描结果到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import MarketScan
MarketScan.create(
symbols_scanned=len(symbols),
symbols_found=len(top_n),
top_symbols=[s['symbol'] for s in top_n],
scan_duration=scan_duration
)
except Exception as e:
logger.debug(f"记录扫描结果失败: {e}")
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
# 打印结果(包含技术指标)
# 分组打印:优先展示有信号的交易对,然后是高波动/成交量的观察对象
signals_found = [s for s in top_n if s.get('signal_strength', 0) > 0 or s.get('signalScore', 0) > 0]
others = [s for s in top_n if s not in signals_found]
if signals_found:
logger.info(f"===== 发现潜在交易信号 ({len(signals_found)}) =====")
for i, symbol_info in enumerate(signals_found, 1):
self._log_single_symbol(i, symbol_info)
if others:
logger.info(f"===== 其它活跃交易对 (高波动/无明确信号) ({len(others)}) =====")
for i, symbol_info in enumerate(others, 1):
self._log_single_symbol(i, symbol_info)
# 当所有标的趋势信号强度均为 0 时打一行说明,避免误以为异常
strong_count = sum(1 for s in top_n if s.get('signal_strength', 0) > 0)
if top_n and strong_count == 0:
logger.info(
"本轮扫描: 所有 %d 个标的趋势信号强度均为 0可能原因1h 与 4H 方向冲突被清零、或无明确 MACD 金叉/死叉、或多空冲突)。"
"仅当趋势信号≥5 才会生成合约推荐与自动交易。",
len(top_n),
)
return top_n
def _log_single_symbol(self, index: int, symbol_info: Dict):
"""打印单个交易对信息(含信号强度,便于判断为何未触发交易)"""
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
regime_str = symbol_info.get('marketRegime', 'unknown')
strength = symbol_info.get('signal_strength', symbol_info.get('signalScore', 0))
try:
strength = int(strength) if strength is not None else 0
except (TypeError, ValueError):
strength = 0
strength_str = f"信号:{strength}"
log_parts = [
f"{index}. {symbol_info['symbol']}:",
f"{symbol_info['changePercent']:.2f}%",
rsi_str,
regime_str,
strength_str,
f"价格: {symbol_info['price']:.4f}",
]
logger.info(" | ".join(log_parts))
async def _prewarm_klines_for_scan(self, symbols: List[str], cfg: Optional[Dict] = None) -> None:
"""
方案 A扫描前预热 K 线,批量预订阅 WebSocket + REST 预取,提高缓存命中率、减少分析超时。
文档:组合流可订阅多个 symbol@kline_interval币安限制每秒 10 条订阅。
"""
if not symbols:
return
cfg = cfg or dict(config.TRADING_CONFIG or {})
primary_interval = cfg.get('PRIMARY_INTERVAL', config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h'))
confirm_interval = cfg.get('CONFIRM_INTERVAL', config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h'))
kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30)
kline_limit = max(20, min(50, kline_limit))
prewarm_concurrent = int(cfg.get('SCAN_PREWARM_CONCURRENT', 5) or 5)
klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8)
import time
start_ts = time.time()
# 1. 批量预订阅 WebSocket K 线流(币安限制 10 条/秒)
stream = None
try:
from .kline_stream import get_kline_stream_instance
stream = get_kline_stream_instance()
except ImportError:
pass
if stream:
subs_done = 0
for sym in symbols:
try:
await stream.subscribe(sym, primary_interval, limit=kline_limit)
await stream.subscribe(sym, confirm_interval, limit=kline_limit)
subs_done += 2
if subs_done >= 20: # 每 20 次订阅略等待,避免超限
await asyncio.sleep(2.1)
subs_done = 0
except Exception as e:
logger.debug(f"预订阅 {sym} K 线失败: {e}")
logger.debug(f"K 线预订阅完成,{len(symbols)} 个交易对 x2 周期")
# 2. 批量 REST 预取 K 线(写入 Redis供后续分析直接命中
async def fetch_one(symbol: str) -> None:
try:
tasks = [
asyncio.wait_for(
self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit),
timeout=klines_timeout,
),
asyncio.wait_for(
self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit),
timeout=klines_timeout,
),
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.debug(f"预热 {symbol} K 线失败: {e}")
sem = asyncio.Semaphore(prewarm_concurrent)
async def fetch_with_limit(s):
async with sem:
await fetch_one(s)
await asyncio.gather(*[fetch_with_limit(s) for s in symbols], return_exceptions=True)
elapsed = time.time() - start_ts
logger.info(f"K 线预热完成,{len(symbols)} 个交易对,耗时 {elapsed:.1f}sWS 预订阅 + REST 预取)")
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
"""
获取单个交易对的涨跌幅和技术指标
Args:
symbol: 交易对
ticker_data: 可选的24小时行情数据如果已批量获取
Returns:
包含涨跌幅和技术指标信息的字典
"""
try:
# 如果已有批量获取的数据,直接使用;否则单独获取
if ticker_data:
ticker = ticker_data
else:
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
# 24h ticker 的“最新价”(更接近用户理解的“当前价格”)
# 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳
ticker_price = ticker.get('price')
try:
ticker_price = float(ticker_price) if ticker_price is not None else None
except Exception:
ticker_price = None
ticker_ts = ticker.get('ts')
try:
ticker_ts = int(ticker_ts) if ticker_ts is not None else None
except Exception:
ticker_ts = None
# 获取更多K线数据用于技术指标计算主周期 + 确认周期并行请求;单独超时避免整分析拖到 18s
# ⚠️ 优化:优先使用 WebSocket K 线缓存,避免 REST API 超时
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
cfg = dict(config.TRADING_CONFIG or {})
klines_timeout = float(cfg.get('SCAN_KLINE_FETCH_TIMEOUT_SEC', 8) or 8)
if klines_timeout < 3:
klines_timeout = 3.0
elif klines_timeout > 20:
klines_timeout = 20.0
kline_limit = int(cfg.get('SCAN_KLINE_LIMIT', 30) or 30)
kline_limit = max(20, min(50, kline_limit))
# ⚠️ 优化:优先使用共享 Redis 缓存(多账号共用),然后才是 WebSocket 本地缓存
# 这样多个账号扫描相同交易对时,可以共用缓存数据,减少 REST API 调用
klines, klines_4h = [], []
use_ws_prefer = cfg.get('SCAN_PREFER_WEBSOCKET', True) # 默认优先 WebSocket
# 0. 优先从共享 Redis 缓存读取(多账号共用,最高优先级)
try:
from .market_ws_leader import use_shared_market_ws
from .kline_stream import get_klines_from_redis
if use_shared_market_ws(self.client.redis_cache):
# 尝试从共享缓存获取
shared_klines = await get_klines_from_redis(self.client.redis_cache, symbol, primary_interval, limit=kline_limit)
shared_klines_4h = await get_klines_from_redis(self.client.redis_cache, symbol, confirm_interval, limit=kline_limit)
if shared_klines and len(shared_klines) >= 2:
klines = shared_klines
logger.debug(f"{symbol} 从共享缓存获取主周期 K 线: {primary_interval}")
if shared_klines_4h and len(shared_klines_4h) >= 2:
klines_4h = shared_klines_4h
logger.debug(f"{symbol} 从共享缓存获取确认周期 K 线: {confirm_interval}")
except Exception as e:
logger.debug(f"{symbol} 读取共享 Redis 缓存失败: {e}")
# 1. 如果共享缓存不完整,尝试 WebSocket 本地缓存
# ⚠️ 优化:减少不必要的订阅,优先使用已有缓存,避免订阅过多导致负载上升
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
try:
from .kline_stream import get_klines_from_cache, get_kline_stream_instance, is_kline_cache_fresh
stream = get_kline_stream_instance()
if stream:
# ⚠️ 优化:先检查缓存是否已有数据,避免不必要的订阅
# 如果缓存已有数据(即使不完整),先尝试使用,减少订阅压力
cache_has_data = False
if is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit)
if cached_klines and len(cached_klines) >= 2:
klines = cached_klines
cache_has_data = True
if is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit)
if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h
cache_has_data = True
# ⚠️ 优化:只有在缓存完全没有数据时才订阅,避免订阅过多导致负载上升
# 如果缓存已有部分数据,优先使用部分数据,减少订阅需求
need_subscribe_primary = (not klines or len(klines) < 2)
need_subscribe_confirm = (not klines_4h or len(klines_4h) < 2)
# 只在真正需要时才订阅(避免扫描时订阅过多流)
if need_subscribe_primary or need_subscribe_confirm:
subscribe_tasks = []
if need_subscribe_primary:
subscribe_tasks.append(stream.subscribe(symbol, primary_interval, limit=kline_limit))
if need_subscribe_confirm:
subscribe_tasks.append(stream.subscribe(symbol, confirm_interval, limit=kline_limit))
if subscribe_tasks:
await asyncio.gather(*subscribe_tasks, return_exceptions=True)
# 如果优先使用 WebSocket等待推送数据但缩短等待时间避免阻塞
if use_ws_prefer:
# ⚠️ 优化缩短等待时间1秒如果缓存已有部分数据直接使用不等待
max_wait_sec = 1.0 if cache_has_data else 1.5 # 如果缓存有数据只等1秒否则等1.5秒
wait_start = time.time()
check_interval = 0.2 # 每 200ms 检查一次
while (time.time() - wait_start) < max_wait_sec:
# 检查缓存是否可用
if need_subscribe_primary and is_kline_cache_fresh(symbol, primary_interval, max_age_sec=300.0):
cached_klines = get_klines_from_cache(symbol, primary_interval, limit=kline_limit)
if cached_klines and len(cached_klines) >= 2:
klines = cached_klines
if need_subscribe_confirm and is_kline_cache_fresh(symbol, confirm_interval, max_age_sec=300.0):
cached_klines_4h = get_klines_from_cache(symbol, confirm_interval, limit=kline_limit)
if cached_klines_4h and len(cached_klines_4h) >= 2:
klines_4h = cached_klines_4h
# 如果两个周期都有足够数据,提前退出
if klines and len(klines) >= 2 and klines_4h and len(klines_4h) >= 2:
break
await asyncio.sleep(check_interval)
else:
# 缓存已有完整数据,无需订阅
logger.debug(f"{symbol} 缓存已有完整 K 线数据,跳过订阅")
except Exception as e:
logger.debug(f"{symbol} WebSocket K 线缓存获取失败: {e}")
# 如果 WebSocket 缓存仍不完整,使用 REST API带超时保护
# 注意:如果 WebSocket 缓存有部分数据优先使用部分数据REST 只补充缺失的
if not klines or len(klines) < 2 or not klines_4h or len(klines_4h) < 2:
# 只请求缺失的数据,减少等待时间
rest_tasks = []
if not klines or len(klines) < 2:
rest_tasks.append(('primary', self.client.get_klines(symbol=symbol, interval=primary_interval, limit=kline_limit)))
else:
rest_tasks.append(('primary', None))
if not klines_4h or len(klines_4h) < 2:
rest_tasks.append(('confirm', self.client.get_klines(symbol=symbol, interval=confirm_interval, limit=kline_limit)))
else:
rest_tasks.append(('confirm', None))
try:
# 只等待实际需要的数据
results = await asyncio.wait_for(
asyncio.gather(
*[task[1] if task[1] else asyncio.sleep(0) for task in rest_tasks],
return_exceptions=True
),
timeout=klines_timeout,
)
# 合并结果:优先使用 WebSocket 缓存REST 作为补充
for i, (task_type, _) in enumerate(rest_tasks):
if task_type == 'primary' and (not klines or len(klines) < 2):
if isinstance(results[i], list) and len(results[i]) >= 2:
klines = results[i]
elif task_type == 'confirm' and (not klines_4h or len(klines_4h) < 2):
if isinstance(results[i], list) and len(results[i]) >= 2:
klines_4h = results[i]
except asyncio.TimeoutError:
logger.debug(f"{symbol} K线拉取超时{klines_timeout:.0f}秒),使用已有缓存或降级结果")
# 如果 WebSocket 缓存有部分数据,继续使用;否则使用降级结果
if not klines or len(klines) < 2:
klines = []
if not klines_4h or len(klines_4h) < 2:
klines_4h = []
except Exception as e:
logger.debug(f"{symbol} K线拉取异常: {e}")
if not klines or len(klines) < 2:
klines = []
if not klines_4h or len(klines_4h) < 2:
klines_4h = []
if not klines or len(klines) < 2:
# 降级:仍有 ticker 时返回仅涨跌幅/成交量的结果,不直接跳过,避免整轮扫描被拖慢
try:
change_pct = float(ticker.get('changePercent', 0) or 0)
vol = float(ticker.get('volume', 0) or ticker.get('quoteVolume', 0) or 0)
price = ticker_price if ticker_price is not None else float(ticker.get('lastPrice', 0) or ticker.get('price', 0) or 0)
return {
'symbol': symbol,
'price': price,
'kline_close_price': price,
'ticker_price': ticker_price,
'ticker_ts': ticker_ts,
'prevPrice': None,
'kline_change_percent': change_pct,
'changePercent': change_pct,
'volume24h': vol,
'direction': 'UP' if change_pct > 0 else 'DOWN',
'rsi': None,
'macd': None,
'bollinger': None,
'atr': None,
'ema20': None,
'ema50': None,
'ema20_4h': None,
'price_4h': None,
'marketRegime': 'unknown',
'signalScore': 0,
'signal_strength': 0,
'trend_4h': None,
'klines': [],
'klines_4h': [],
}
except Exception as e:
logger.debug(f"{symbol} 降级结果构建失败: {e}")
return None
# 提取价格数据(主周期)
close_prices = [float(k[4]) for k in klines] # 收盘价
high_prices = [float(k[2]) for k in klines] # 最高价
low_prices = [float(k[3]) for k in klines] # 最低价
# 提取4H周期价格数据
close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices
# 计算涨跌幅(基于主周期)
current_price = close_prices[-1]
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
if prev_price == 0:
return None
change_percent = ((current_price - prev_price) / prev_price) * 100
# ⚠️ 优化检查技术指标计算结果缓存基于K线数据的最后更新时间
# 如果K线数据没有更新可以直接使用缓存的技术指标
cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}"
last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳
# 尝试从缓存获取技术指标计算结果
cached_indicators = None
try:
cached_indicators = await self.client.redis_cache.get(cache_key_indicators)
except Exception:
pass
ema50_4h = None
macd_4h = None
use_cached_indicators = False
if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time:
# 缓存命中,使用缓存的技术指标
use_cached_indicators = True
logger.debug(f"{symbol} 使用缓存的技术指标计算结果")
rsi = cached_indicators.get('rsi')
macd = cached_indicators.get('macd')
bollinger = cached_indicators.get('bollinger')
atr = cached_indicators.get('atr')
ema20 = cached_indicators.get('ema20')
ema50 = cached_indicators.get('ema50')
ema20_4h = cached_indicators.get('ema20_4h')
ema50_4h = cached_indicators.get('ema50_4h')
macd_4h = cached_indicators.get('macd_4h')
market_regime = cached_indicators.get('marketRegime')
else:
# 缓存未命中,重新计算技术指标
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
macd = TechnicalIndicators.calculate_macd(close_prices)
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14)
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period)
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
# 计算4H周期的EMA、MACD用于多周期共振检查
ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None
ema50_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=50) if len(close_prices_4h) >= 50 else None
macd_4h = TechnicalIndicators.calculate_macd(close_prices_4h) if len(close_prices_4h) >= 35 else None
# 判断市场状态
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
# 保存技术指标计算结果到缓存TTL: 30秒与K线缓存一致
try:
indicators_cache = {
'last_kline_time': last_kline_time,
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'ema20_4h': ema20_4h,
'ema50_4h': ema50_4h,
'macd_4h': macd_4h,
'marketRegime': market_regime,
}
await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30)
logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)")
except Exception as e:
logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}")
# 统一:缓存可能返回 None保证下游始终拿到 'trending'/'ranging'/'unknown'
market_regime = market_regime or 'unknown'
# 计算交易信号得分(用于排序)- 保留用于兼容性
signal_score = 0
# RSI信号均值回归
if rsi is not None:
if rsi < 30: # 超卖,做多信号
signal_score += 3
elif rsi > 70: # 超买,做空信号
signal_score += 3
elif 30 <= rsi <= 70: # 中性区域
signal_score += 1
# MACD信号
if macd and macd['histogram'] is not None:
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
signal_score += 2
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
signal_score += 2
# 布林带信号(均值回归)
if bollinger:
if current_price <= bollinger['lower']: # 触及下轨,做多
signal_score += 3
elif current_price >= bollinger['upper']: # 触及上轨,做空
signal_score += 3
elif bollinger['lower'] < current_price < bollinger['upper']:
signal_score += 1
# 均线信号
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_score += 1
elif current_price < ema20 < ema50: # 下降趋势
signal_score += 1
# ⚠️ 2026-01-27优化计算真实的signal_strength用于排序和筛选
# 使用与strategy.py相同的逻辑确保排序依据与交易判断一致
signal_strength = 0
direction = None
# 获取4H周期当前价格用于判断4H趋势
price_4h = close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price
# 判断4H周期趋势方向优先从 Redis 缓存读取,与 strategy 共用)
try:
from .trend_4h_cache import get_trend_4h_cached
trend_4h = await get_trend_4h_cached(
self.client.redis_cache if self.client else None,
symbol, price_4h, ema20_4h, ema50_4h, macd_4h,
)
except Exception as e:
logger.debug(f"{symbol} trend_4h 缓存获取失败: {e}")
trend_4h = None
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
# 策略权重配置与strategy.py保持一致
TREND_SIGNAL_WEIGHTS = {
'macd_cross': 5, # MACD金叉/死叉
'ema_cross': 4, # EMA20上穿/下穿EMA50
'price_above_ema20': 3, # 价格在EMA20之上/下
'4h_trend_confirmation': 2, # 4H趋势确认
}
# MACD金叉/死叉(权重最高)
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
# MACD金叉做多信号需4H趋势向上或中性
if trend_4h in ('up', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
if direction is None:
direction = 'BUY'
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
# MACD死叉做空信号需4H趋势向下或中性
if trend_4h in ('down', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
if direction is None:
direction = 'SELL'
# EMA均线系统
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
if trend_4h in ('up', 'neutral', None):
# 冲突检查
if direction == 'SELL':
signal_strength = 0
direction = None
else:
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50: # 下降趋势
if trend_4h in ('down', 'neutral', None):
# 冲突检查
if direction == 'BUY':
signal_strength = 0
direction = None
else:
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
if direction is None:
direction = 'SELL'
# 价格与EMA20关系
if ema20:
if current_price > ema20:
if trend_4h in ('up', 'neutral', None) and direction == 'BUY':
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
elif current_price < ema20:
if trend_4h in ('down', 'neutral', None) and direction == 'SELL':
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
# 4H趋势确认加分 / 逆势是否清零
allow_4h_neutral = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ALLOW_4H_NEUTRAL", False))
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation']
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
# 逆势时若开启「允许4H中性」仅不加分、不清零便于出推荐与列表展示策略层仍会禁止逆势自动下单
if not allow_4h_neutral:
signal_strength = 0
direction = None
# 强度上限归一到 0-10
signal_strength = max(0, min(int(signal_strength), 10))
# 回退趋势逻辑过严时MACD/EMA/4H 未同时满足),用 RSI/MACD/布林/涨跌 给弱信号,避免长期全 0
# 强度映射signal_score 4->2, 5->3, 6->4, 7->5, 8->6, 9->7, 10->7高得分可触达 MIN_SIGNAL_STRENGTH 7
if signal_strength == 0 and direction is None and signal_score >= 4:
fallback_strength = min(7, max(2, signal_score - 2))
if rsi is not None:
if rsi < 35:
direction = 'BUY'
signal_strength = fallback_strength
elif rsi > 65:
direction = 'SELL'
signal_strength = fallback_strength
if direction is None and bollinger:
if current_price <= bollinger.get('lower', float('inf')):
direction = 'BUY'
signal_strength = fallback_strength
elif current_price >= bollinger.get('upper', -float('inf')):
direction = 'SELL'
signal_strength = fallback_strength
if direction is None and macd and macd.get('histogram') is not None:
if macd['histogram'] > 0 and macd.get('macd', 0) > macd.get('signal', 0):
direction = 'BUY'
signal_strength = fallback_strength
elif macd['histogram'] < 0 and macd.get('macd', 0) < macd.get('signal', 0):
direction = 'SELL'
signal_strength = fallback_strength
if direction is None and ema20 and ema50:
if current_price > ema20 > ema50:
direction = 'BUY'
signal_strength = fallback_strength
elif current_price < ema20 < ema50:
direction = 'SELL'
signal_strength = fallback_strength
# 仍无方向时用 24h 涨跌给方向(便于出推荐/开单;强度用 fallback_strength高得分可≥7
if direction is None:
change_pct = symbol_info.get('changePercent')
try:
ch = float(change_pct) if change_pct is not None else None
except (TypeError, ValueError):
ch = None
if ch is not None:
direction = 'BUY' if ch > 0.5 else ('SELL' if ch < -0.5 else None)
if direction:
signal_strength = fallback_strength
# ===== 趋势状态缓存(用于后续“入场时机过滤”)=====
try:
# 只有在方向明确且信号强度达到最小门槛时,才记录趋势状态
min_strength = int(config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) or 7)
use_trend_filter = bool(config.TRADING_CONFIG.get('USE_TREND_ENTRY_FILTER', False))
if use_trend_filter and direction and signal_strength >= min_strength:
trend_state_key = f"trend_state:{symbol}"
trend_state_value = {
'symbol': symbol,
'direction': direction, # BUY / SELL
'signal_strength': int(signal_strength),
'marketRegime': market_regime,
'trend_4h': trend_4h,
# 使用技术分析用的收盘价作为“信号价格”,同时附带 24h ticker 价格
'signal_price': float(current_price),
'ticker_price': float(ticker_price) if ticker_price is not None else None,
'ema20': float(ema20) if ema20 is not None else None,
'ema50': float(ema50) if ema50 is not None else None,
'ema20_4h': float(ema20_4h) if ema20_4h is not None else None,
'price_4h': float(price_4h) if price_4h is not None else None,
'last_kline_time': last_kline_time,
'created_at_ts': int(ticker_ts or 0),
}
ttl_sec = int(config.TRADING_CONFIG.get('TREND_STATE_TTL_SEC', 3600) or 3600)
if self.client and getattr(self.client, "redis_cache", None):
try:
await self.client.redis_cache.set(trend_state_key, trend_state_value, ttl=ttl_sec)
logger.debug(f"{symbol} 趋势状态已缓存: dir={direction}, strength={signal_strength}, price={current_price:.6f}")
except Exception as e:
logger.debug(f"{symbol} 缓存趋势状态失败: {e}")
except Exception as e:
logger.debug(f"{symbol} 处理趋势状态缓存时出错: {e}")
return {
'symbol': symbol,
# 技术分析使用的价格K线收盘价
'price': current_price,
'kline_close_price': current_price,
# 展示用“当前价”24h ticker 最新价,通常更贴近用户的直觉)
'ticker_price': ticker_price,
'ticker_ts': ticker_ts,
'prevPrice': prev_price,
# 1) 主周期涨跌幅(用于内部信号)
'kline_change_percent': change_percent,
# 2) 24h涨跌幅用于前端展示更符合“24h涨跌”的文案
'changePercent': float(ticker.get('changePercent', 0) or 0),
'volume24h': ticker.get('volume', 0),
'direction': 'UP' if change_percent > 0 else 'DOWN',
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'ema20_4h': ema20_4h, # 4H周期EMA20用于多周期共振
'ema50_4h': ema50_4h, # 4H周期EMA50
'macd_4h': macd_4h, # 4H周期MACD
'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格
'marketRegime': market_regime,
'signalScore': signal_score, # 保留用于兼容性
'signal_strength': signal_strength, # ⚠️ 2026-01-27优化添加真实的信号强度
'trend_4h': trend_4h, # 4H趋势方向
'klines': klines[-10:], # 保留最近10根K线
'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线
}
except Exception as e:
logger.debug(f"获取 {symbol} 数据失败: {e}")
return None
def get_top_symbols(self) -> List[Dict]:
"""
获取当前扫描到的前N个货币对
Returns:
前N个货币对列表
"""
return self.top_symbols
async def monitor_price(self, symbol: str, callback) -> None:
"""
监控单个交易对的价格变化WebSocket
Args:
symbol: 交易对
callback: 价格变化回调函数
"""
# 使用标准WebSocket
try:
import aiohttp
import json
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
price = float(data['c'])
await callback(symbol, price)
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
price = float(data['data']['c'])
await callback(symbol, price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"监控 {symbol} WebSocket连接关闭")
break
except Exception as e:
logger.error(f"监控 {symbol} 价格失败: {e}")
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格(从缓存)
Args:
symbol: 交易对
Returns:
实时价格
"""
if self.client:
return self.client.get_realtime_price(symbol)
return None