diff --git a/trading_system/main.py b/trading_system/main.py index 727df7e..88784e3 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -236,6 +236,7 @@ async def main(): # 初始化组件 client = None user_data_stream = None + ticker_24h_stream = None try: # 1. 初始化币安客户端 logger.info("初始化币安客户端...") @@ -324,6 +325,19 @@ async def main(): logger.warning("⚠ User Data Stream 未启动,将仅依赖 REST 同步订单与持仓") user_data_stream = None + # 3.1 启动 24h ticker WS 流(扫描时优先用缓存,避免批量 REST 与超时) + try: + from .ticker_24h_stream import Ticker24hStream + use_testnet = getattr(config, "USE_TESTNET", False) + ticker_24h_stream = Ticker24hStream(testnet=use_testnet) + if await ticker_24h_stream.start(): + logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)") + else: + ticker_24h_stream = None + except Exception as e: + logger.debug(f"启动 24h ticker WS 失败(将使用 REST): {e}") + ticker_24h_stream = None + # 4. 初始化各个模块 logger.info("初始化交易模块...") scanner = MarketScanner(client) @@ -411,6 +425,12 @@ async def main(): logger.info("User Data Stream 已停止") except Exception as e: logger.debug(f"停止 User Data Stream 时异常: {e}") + try: + if ticker_24h_stream is not None: + await ticker_24h_stream.stop() + logger.info("Ticker24h Stream 已停止") + except Exception as e: + logger.debug(f"停止 Ticker24h Stream 时异常: {e}") if client: await client.disconnect() logger.info("程序已退出") diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index d19edce..68a5da7 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -90,14 +90,27 @@ class MarketScanner: if excluded_count > 0: logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)") - # 先批量获取所有交易对的24小时行情数据(减少API请求) - logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") - all_tickers = await self.client.get_all_tickers_24h() + # 优先从 24h ticker WebSocket 缓存读取,避免批量 REST 请求与超时;无/过期再走 REST + all_tickers = None + try: + try: + from .ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + except ImportError: + from ticker_24h_stream import get_tickers_24h_cache, is_ticker_24h_cache_fresh + if is_ticker_24h_cache_fresh(max_age_sec=120.0): + all_tickers = get_tickers_24h_cache() + if all_tickers: + logger.info(f"使用 24h ticker WS 缓存({len(all_tickers)} 个交易对),跳过 REST 批量请求") + except Exception as e: + logger.debug(f"读取 24h ticker WS 缓存失败: {e}") + if not all_tickers: + logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") + all_tickers = await self.client.get_all_tickers_24h() # 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量 pre_filtered_symbols = [] for symbol in symbols: - ticker = all_tickers.get(symbol) + ticker = all_tickers.get(symbol) if all_tickers else None if ticker: change_percent = abs(ticker.get('changePercent', 0)) volume = ticker.get('volume', 0) diff --git a/trading_system/ticker_24h_stream.py b/trading_system/ticker_24h_stream.py new file mode 100644 index 0000000..611d7f4 --- /dev/null +++ b/trading_system/ticker_24h_stream.py @@ -0,0 +1,138 @@ +""" +24 小时行情 WebSocket 流:订阅 !ticker@arr,维护全市场 ticker 缓存。 +供 market_scanner 优先使用,避免批量 REST get_all_tickers_24h,减少请求与超时。 +文档:仅发生变化的交易对会出现在推送数组中,约 1 秒一次。 +""" +import asyncio +import json +import logging +import time +from typing import Dict, Optional, Any + +logger = logging.getLogger(__name__) + +# 全市场 24h ticker 缓存:symbol -> { symbol, price, volume, changePercent, ts } +_ticker_24h_cache: Dict[str, Dict[str, Any]] = {} +_ticker_24h_updated_at: float = 0.0 + + +def get_tickers_24h_cache() -> Dict[str, Dict[str, Any]]: + """返回当前 24h ticker 缓存(与 get_all_tickers_24h 结构兼容)。""" + return dict(_ticker_24h_cache) + + +def get_tickers_24h_cache_updated_at() -> float: + """返回缓存最后更新时间(monotonic)。未更新过为 0。""" + return _ticker_24h_updated_at + + +def is_ticker_24h_cache_fresh(max_age_sec: float = 120.0) -> bool: + """缓存是否在 max_age_sec 秒内更新过且非空。""" + if not _ticker_24h_cache: + return False + return (time.monotonic() - _ticker_24h_updated_at) <= max_age_sec + + +class Ticker24hStream: + """订阅合约 !ticker@arr,持续更新 _ticker_24h_cache。无需 listenKey,公开行情。""" + + def __init__(self, testnet: bool = False): + self.testnet = testnet + self._ws = None + self._task: Optional[asyncio.Task] = None + self._running = False + + def _ws_url(self) -> str: + if self.testnet: + return "wss://stream.binancefuture.com/ws/!ticker@arr" + return "wss://fstream.binance.com/ws/!ticker@arr" + + async def start(self) -> bool: + if self._running: + return True + self._running = True + self._task = asyncio.create_task(self._run_ws()) + logger.info("Ticker24hStream: 已启动(!ticker@arr),扫描将优先使用 WS 缓存") + return True + + async def stop(self): + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + if self._ws: + try: + await self._ws.close() + except Exception: + pass + self._ws = None + logger.info("Ticker24hStream: 已停止") + + async def _run_ws(self): + import aiohttp + while self._running: + url = self._ws_url() + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + url, heartbeat=50, timeout=aiohttp.ClientTimeout(total=15) + ) as ws: + self._ws = ws + logger.info("Ticker24hStream: WS 已连接") + async for msg in ws: + if not self._running: + break + if msg.type == aiohttp.WSMsgType.TEXT: + self._handle_message(msg.data) + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE): + break + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Ticker24hStream: WS 异常 {e},10s 后重连") + await asyncio.sleep(10) + self._ws = None + if not self._running: + break + + def _handle_message(self, raw: str): + global _ticker_24h_cache, _ticker_24h_updated_at + try: + data = json.loads(raw) + except Exception: + return + # 可能是单条对象(stream 名)或数组;文档说是数组 + if isinstance(data, list): + arr = data + elif isinstance(data, dict): + # 组合流格式 { "stream": "!ticker@arr", "data": [ ... ] } + arr = data.get("data") if isinstance(data.get("data"), list) else [data] + else: + return + now_ms = int(time.time() * 1000) + for t in arr: + if not isinstance(t, dict): + continue + s = (t.get("s") or t.get("symbol") or "").strip() + if not s or not s.endswith("USDT"): + continue + try: + price = float(t.get("c") or t.get("lastPrice") or 0) + change_pct = float(t.get("P") or t.get("priceChangePercent") or 0) + # 成交量:优先 quoteVolume(USDT),文档可能为 q 或 quoteVolume + vol = float(t.get("quoteVolume") or t.get("q") or t.get("v") or 0) + except (TypeError, ValueError): + continue + _ticker_24h_cache[s] = { + "symbol": s, + "price": price, + "volume": vol, + "changePercent": change_pct, + "ts": now_ms, + } + _ticker_24h_updated_at = time.monotonic() + logger.debug(f"Ticker24hStream: 已更新 {len(arr)} 条,缓存共 {len(_ticker_24h_cache)} 个交易对")