Introduced new configuration options to manage trading activities on Sundays and during night hours. This includes limits on the number of trades on Sundays, minimum signal strength requirements for Sunday trades, and restrictions on opening new positions during specified night hours. Updated relevant backend and frontend components to reflect these changes, enhancing risk control and user awareness of trading conditions.
630 lines
32 KiB
Python
630 lines
32 KiB
Python
"""
|
||
主程序 - 币安自动交易系统入口
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import sys
|
||
import traceback
|
||
from pathlib import Path
|
||
|
||
# 启动方式兼容(更鲁棒):
|
||
# - supervisor 推荐:python -m trading_system.main(相对导入)
|
||
# - 手动调试:python trading_system/main.py(同目录导入)
|
||
# - 其它非常规启动方式:尽量通过补齐 sys.path 避免本地模块找不到
|
||
try:
|
||
from .binance_client import BinanceClient # type: ignore
|
||
from .market_scanner import MarketScanner # type: ignore
|
||
from .risk_manager import RiskManager # type: ignore
|
||
from .position_manager import PositionManager # type: ignore
|
||
from .strategy import TradingStrategy # type: ignore
|
||
from .user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
|
||
from . import config # type: ignore
|
||
except Exception:
|
||
_here = Path(__file__).resolve().parent
|
||
_root = _here.parent
|
||
# 某些 supervisor/启动脚本可能会导致 sys.path 没包含 trading_system 目录
|
||
if str(_here) not in sys.path:
|
||
sys.path.insert(0, str(_here))
|
||
if str(_root) not in sys.path:
|
||
sys.path.insert(0, str(_root))
|
||
from binance_client import BinanceClient # type: ignore
|
||
from market_scanner import MarketScanner # type: ignore
|
||
from risk_manager import RiskManager # type: ignore
|
||
from position_manager import PositionManager # type: ignore
|
||
from strategy import TradingStrategy # type: ignore
|
||
from user_data_stream import UserDataStream, seed_position_cache, seed_balance_cache # type: ignore
|
||
import config # type: ignore
|
||
|
||
# 配置日志(支持相对路径)
|
||
log_file = config.LOG_FILE
|
||
if not Path(log_file).is_absolute():
|
||
# 如果是相对路径,相对于项目根目录
|
||
project_root = Path(__file__).parent.parent
|
||
log_file = project_root / log_file
|
||
|
||
# 设置日志时间格式为北京时间(UTC+8)
|
||
import time
|
||
from datetime import timezone, timedelta
|
||
|
||
class BeijingTimeFormatter(logging.Formatter):
|
||
"""使用北京时间的日志格式化器"""
|
||
def formatTime(self, record, datefmt=None):
|
||
# 转换为北京时间(UTC+8)
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
|
||
if datefmt:
|
||
return dt.strftime(datefmt)
|
||
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
from datetime import datetime
|
||
|
||
# 创建格式化器
|
||
formatter = BeijingTimeFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
|
||
# 创建处理器
|
||
file_handler = logging.FileHandler(str(log_file), encoding='utf-8')
|
||
file_handler.setFormatter(formatter)
|
||
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=getattr(logging, config.LOG_LEVEL),
|
||
handlers=[file_handler, console_handler]
|
||
)
|
||
|
||
# 追加:将 ERROR 日志写入 Redis(不影响现有文件/控制台日志)
|
||
try:
|
||
# 兼容两种启动方式:
|
||
# - 直接运行:python trading_system/main.py
|
||
# - 模块运行:python -m trading_system.main
|
||
try:
|
||
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
|
||
except Exception:
|
||
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
|
||
|
||
redis_cfg = RedisLogConfig(
|
||
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
|
||
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
|
||
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
|
||
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
|
||
username=getattr(config, "REDIS_USERNAME", None),
|
||
password=getattr(config, "REDIS_PASSWORD", None),
|
||
service="trading_system",
|
||
)
|
||
redis_handler = RedisErrorLogHandler(redis_cfg)
|
||
# 让 handler 自己按组筛选(error/warning/info),这里只需要放宽到 INFO
|
||
redis_handler.setLevel(logging.INFO)
|
||
logging.getLogger().addHandler(redis_handler)
|
||
|
||
# 诊断:启动时快速检测一次 Redis 可用性(失败不影响运行)
|
||
try:
|
||
client = redis_handler._get_redis() # noqa: SLF001(仅用于诊断)
|
||
if client is None:
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning(
|
||
f"⚠ Redis 日志写入未启用(无法连接或缺少依赖)。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
|
||
)
|
||
else:
|
||
logger = logging.getLogger(__name__)
|
||
logger.info(
|
||
f"✓ Redis 日志写入已启用。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
|
||
)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
# Redis handler 仅用于增强监控,失败不影响交易系统启动
|
||
pass
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _asyncio_exception_handler(loop, context):
|
||
"""
|
||
自定义 asyncio 异常处理器:将 WebSocket 关闭时的 ping 写入异常静默处理,
|
||
避免刷「Task exception was never retrieved」和 ConnectionResetError 堆栈。
|
||
"""
|
||
exc = context.get("exception")
|
||
msg = str(exc).lower() if exc else ""
|
||
# 1) WebSocket 关闭时 aiohttp 的 ping 往已关闭的 transport 写会抛 ConnectionResetError / OSError
|
||
if exc is not None and msg:
|
||
if "closing transport" in msg or "cannot write to closing transport" in msg:
|
||
logger.debug("WebSocket 连接关闭时 ping 已结束(可忽略): %s", exc)
|
||
return
|
||
if isinstance(exc, (ConnectionResetError, OSError, BrokenPipeError)) and "write" in msg:
|
||
# 其他「连接已关仍写入」类错误也视为可忽略
|
||
logger.debug("连接关闭时写入被拒绝(可忽略): %s", exc)
|
||
return
|
||
# 2) 根据任务 coroutine 判断:aiohttp WebSocketWriter.ping 产生的未检索异常
|
||
future = context.get("future")
|
||
if future is not None and exc is not None:
|
||
try:
|
||
coro = getattr(future, "get_coro", lambda: getattr(future, "_coro", None))()
|
||
if coro is not None:
|
||
qual = (getattr(coro, "__qualname__", "") or getattr(coro, "__name__", "")) or ""
|
||
if "ping" in qual.lower() and "websocket" in qual.lower():
|
||
logger.debug("WebSocket ping 任务异常(连接已关闭,可忽略): %s", exc)
|
||
return
|
||
except Exception:
|
||
pass
|
||
# 其他「Task exception was never retrieved」按 asyncio 默认方式记录
|
||
asyncio_logger = logging.getLogger("asyncio")
|
||
asyncio_logger.error("Task exception was never retrieved")
|
||
if "exception" in context:
|
||
asyncio_logger.error("".join(traceback.format_exception(context["exception"])))
|
||
elif "message" in context:
|
||
asyncio_logger.error(context["message"])
|
||
|
||
|
||
async def main():
|
||
"""主函数"""
|
||
# 设置 asyncio 未检索异常的处理器(避免 aiohttp WebSocket ping 在连接关闭时刷 ERROR)
|
||
try:
|
||
asyncio.get_running_loop().set_exception_handler(_asyncio_exception_handler)
|
||
except RuntimeError:
|
||
pass # 无运行中 loop 时忽略
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("币安自动交易系统启动")
|
||
logger.info("=" * 60)
|
||
|
||
# 检查配置管理器状态
|
||
logger.info("检查配置管理器状态...")
|
||
|
||
# 强制重新初始化配置管理器(确保能读取到数据库配置)
|
||
try:
|
||
logger.info("重新初始化配置管理器...")
|
||
# 重置全局变量,强制重新初始化
|
||
config._config_manager = None
|
||
config.USE_DB_CONFIG = False
|
||
config._init_config_manager()
|
||
|
||
if config._config_manager:
|
||
config._config_manager.reload()
|
||
logger.info(f"✓ 配置管理器初始化成功,已加载 {len(config._config_manager._cache)} 个配置项")
|
||
# 打印一些关键配置项,用于调试
|
||
test_key = config._config_manager.get('BINANCE_API_KEY')
|
||
test_secret = config._config_manager.get('BINANCE_API_SECRET')
|
||
logger.info(f"从数据库读取: BINANCE_API_KEY存在={bool(test_key)}, BINANCE_API_SECRET存在={bool(test_secret)}")
|
||
if test_key:
|
||
logger.info(f"BINANCE_API_KEY前4位: {test_key[:4]}...")
|
||
else:
|
||
logger.warning("⚠ 配置管理器初始化失败,返回None")
|
||
except Exception as e:
|
||
logger.error(f"配置管理器初始化异常: {e}", exc_info=True)
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
if config.USE_DB_CONFIG:
|
||
logger.info("✓ 使用数据库配置")
|
||
else:
|
||
logger.warning("⚠ 未使用数据库配置,将使用环境变量和默认配置")
|
||
logger.warning("如果前端已配置API密钥,请检查:")
|
||
logger.warning("1. backend目录是否存在")
|
||
logger.warning("2. 数据库连接是否正常")
|
||
logger.warning("3. config_manager模块是否可以正常导入")
|
||
logger.warning("4. 数据库配置表中是否有BINANCE_API_KEY和BINANCE_API_SECRET")
|
||
|
||
# 统一在此重新加载配置(只打一次日志,避免重复)
|
||
try:
|
||
config.reload_config()
|
||
logger.info("配置已重新加载")
|
||
except Exception as e:
|
||
logger.warning(f"重新加载配置失败: {e}", exc_info=True)
|
||
|
||
# 检查API密钥(重新获取,确保是最新值)
|
||
# 优先直接从 config_manager 获取(使用正确的 account_id)
|
||
api_key = None
|
||
api_secret = None
|
||
|
||
if config._config_manager:
|
||
try:
|
||
# 直接从 config_manager 获取(会调用 Account.get_credentials,使用正确的 account_id)
|
||
api_key = config._config_manager.get('BINANCE_API_KEY')
|
||
api_secret = config._config_manager.get('BINANCE_API_SECRET')
|
||
logger.info(f"从config_manager获取API密钥: KEY存在={bool(api_key)}, SECRET存在={bool(api_secret)}")
|
||
if api_key:
|
||
logger.info(f"API_KEY前4位: {api_key[:4]}...")
|
||
except Exception as e:
|
||
logger.warning(f"从config_manager获取API密钥失败: {e}")
|
||
|
||
# 如果从 config_manager 获取失败,尝试从 _get_config_value 获取(兜底)
|
||
if not api_key or api_key == 'your_api_key_here':
|
||
api_key = config._get_config_value('BINANCE_API_KEY', '')
|
||
logger.info(f"从_get_config_value获取API_KEY: 存在={bool(api_key)}")
|
||
|
||
if not api_secret or api_secret == 'your_api_secret_here':
|
||
api_secret = config._get_config_value('BINANCE_API_SECRET', '')
|
||
logger.info(f"从_get_config_value获取API_SECRET: 存在={bool(api_secret)}")
|
||
|
||
logger.info(f"API密钥检查: KEY存在={bool(api_key)}, SECRET存在={bool(api_secret)}")
|
||
if api_key and api_key != 'your_api_key_here' and len(api_key) > 4:
|
||
logger.info(f"API密钥前4位: {api_key[:4]}...")
|
||
else:
|
||
logger.warning(f"⚠ API密钥未正确加载,当前值: {api_key}")
|
||
if config._config_manager:
|
||
logger.info(f"配置管理器缓存中的键: {list(config._config_manager._cache.keys())[:10]}")
|
||
|
||
if not api_key or not api_secret or api_key == 'your_api_key_here' or api_secret == 'your_api_secret_here':
|
||
logger.error("=" * 60)
|
||
logger.error("API密钥未配置!")
|
||
logger.error("=" * 60)
|
||
if config.USE_DB_CONFIG:
|
||
logger.error("配置管理器已启用,但未从数据库读取到API密钥")
|
||
logger.error("请检查:")
|
||
logger.error("1. 前端配置界面是否已设置BINANCE_API_KEY和BINANCE_API_SECRET")
|
||
logger.error("2. 数据库trading_config表中是否有这些配置项")
|
||
logger.error("3. 数据库连接是否正常")
|
||
else:
|
||
logger.error("请设置 BINANCE_API_KEY 和 BINANCE_API_SECRET 环境变量")
|
||
logger.error("或在 config.py 中直接配置")
|
||
logger.error("或确保backend目录存在,以便从数据库读取配置")
|
||
logger.error("=" * 60)
|
||
return
|
||
|
||
# 更新config模块的API密钥(确保使用最新值)
|
||
config.BINANCE_API_KEY = api_key
|
||
config.BINANCE_API_SECRET = api_secret
|
||
import os
|
||
config.USE_TESTNET = config._get_config_value('USE_TESTNET', False) if config._get_config_value('USE_TESTNET') is not None else os.getenv('USE_TESTNET', 'False').lower() == 'true'
|
||
|
||
logger.info(f"最终使用的API密钥: KEY前4位={api_key[:4] if api_key and len(api_key) > 4 else 'N/A'}..., SECRET前4位={api_secret[:4] if api_secret and len(api_secret) > 4 else 'N/A'}..., 测试网={config.USE_TESTNET}")
|
||
|
||
# 初始化组件
|
||
client = None
|
||
user_data_stream = None
|
||
ticker_24h_stream = None
|
||
kline_stream = None
|
||
book_ticker_stream = None
|
||
market_ws_refresh_tasks = []
|
||
try:
|
||
# 1. 初始化币安客户端
|
||
logger.info("初始化币安客户端...")
|
||
|
||
# 再次确认API密钥(使用最新值)
|
||
api_key = config._get_config_value('BINANCE_API_KEY', config.BINANCE_API_KEY)
|
||
api_secret = config._get_config_value('BINANCE_API_SECRET', config.BINANCE_API_SECRET)
|
||
use_testnet = config._get_config_value('USE_TESTNET', config.USE_TESTNET)
|
||
if isinstance(use_testnet, str):
|
||
use_testnet = use_testnet.lower() in ('true', '1', 'yes', 'on')
|
||
elif not isinstance(use_testnet, bool):
|
||
use_testnet = bool(use_testnet)
|
||
|
||
logger.info(f"测试网模式: {use_testnet}")
|
||
logger.info(f"连接超时: {config.CONNECTION_TIMEOUT}秒")
|
||
logger.info(f"重试次数: {config.CONNECTION_RETRIES}次")
|
||
|
||
client = BinanceClient(
|
||
api_key=api_key,
|
||
api_secret=api_secret,
|
||
testnet=use_testnet
|
||
)
|
||
await client.connect()
|
||
|
||
# 2. 检查账户余额/权限
|
||
logger.info("检查账户余额...")
|
||
balance = await client.get_account_balance()
|
||
|
||
# 若底层调用失败(例如 -2015 / -1022 / IP白名单),这里给出明确错误并以 code=2 退出(supervisor 不会反复拉起)
|
||
if isinstance(balance, dict) and balance.get("ok") is False:
|
||
code = balance.get("error_code")
|
||
msg = balance.get("error_msg") or ""
|
||
logger.error("=" * 60)
|
||
logger.error(f"获取账户余额失败(可能是权限/白名单/环境不匹配)。error_code={code}, error={msg}")
|
||
logger.error("请检查:")
|
||
logger.error("1) API Key/Secret 是否正确(不要有空格/换行)")
|
||
logger.error("2) API Key 是否启用了【合约交易(USDT-M Futures)】权限")
|
||
logger.error("3) 若设置了 IP 白名单,请把服务器出口 IP 加进去")
|
||
logger.error("4) 测试网/生产网是否匹配(账号的 USE_TESTNET 设置要与 Key 所属环境一致)")
|
||
logger.error("=" * 60)
|
||
raise SystemExit(2)
|
||
|
||
total = float((balance or {}).get("total") or 0.0)
|
||
available = float((balance or {}).get("available") or 0.0)
|
||
logger.info(f"账户余额: 总余额 {total:.2f} USDT, 可用余额 {available:.2f} USDT")
|
||
|
||
# 若余额为 0:不直接退出,保持进程运行并周期性重试,便于充值后自动恢复
|
||
if available <= 0:
|
||
logger.error("=" * 60)
|
||
logger.error("账户可用余额不足(<=0),交易策略不会启动,将每 60 秒重试一次余额读取。")
|
||
logger.error(f"当前余额: total={total:.2f} USDT, available={available:.2f} USDT")
|
||
logger.error("提示:若你确信余额不为 0,但仍显示为 0,请优先检查 API 权限/IP 白名单/测试网配置。")
|
||
logger.error("=" * 60)
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
try:
|
||
b2 = await client.get_account_balance()
|
||
if isinstance(b2, dict) and b2.get("ok") is False:
|
||
logger.error(f"余额重试失败:error_code={b2.get('error_code')}, error={b2.get('error_msg')}")
|
||
continue
|
||
total = float((b2 or {}).get("total") or 0.0)
|
||
available = float((b2 or {}).get("available") or 0.0)
|
||
logger.info(f"余额重试: total={total:.2f}, available={available:.2f}")
|
||
if available > 0:
|
||
logger.info("检测到可用余额 > 0,开始启动交易策略。")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"余额重试异常: {e}", exc_info=True)
|
||
continue
|
||
|
||
# 3. 启动 User Data Stream(订单/持仓/余额推送,listenKey 保活,减少 REST 请求)
|
||
import os
|
||
account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or "1")
|
||
|
||
# ⚠️ 优化:初始化 ListenKey 缓存管理器
|
||
# 注意:每个账号(account_id)有独立的 listenKey 缓存,不同账号之间不会共用
|
||
# 同一个账号的多个进程/实例可以共用 listenKey(因为它们使用相同的 API Key)
|
||
try:
|
||
from .listen_key_cache import get_listen_key_cache, set_listen_key_cache, ListenKeyCache
|
||
if getattr(client, "redis_cache", None):
|
||
cache = ListenKeyCache(client.redis_cache)
|
||
set_listen_key_cache(cache)
|
||
logger.info(f"✓ ListenKey 缓存管理器已初始化(账号 {account_id},同一账号的多进程/实例可共享 listenKey)")
|
||
except Exception as e:
|
||
logger.debug(f"初始化 ListenKey 缓存管理器失败: {e}")
|
||
|
||
user_data_stream = UserDataStream(client, account_id)
|
||
logger.info(f"正在启动 User Data Stream(账号 {account_id})...")
|
||
if await user_data_stream.start():
|
||
logger.info(f"✓ User Data Stream 已启动(账号 {account_id},订单/持仓/余额 WS 推送,30 分钟 keepalive)")
|
||
# 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存(Redis)
|
||
try:
|
||
redis_cache = getattr(client, "redis_cache", None)
|
||
await seed_balance_cache(balance, redis_cache, account_id=account_id)
|
||
positions_seed = await client.get_open_positions()
|
||
await seed_position_cache(positions_seed, redis_cache, account_id=account_id)
|
||
logger.info(f"✓ 已播种持仓/余额缓存(持仓 {len(positions_seed)} 个,已写入 Redis)")
|
||
except Exception as e:
|
||
logger.warning(f"播种 WS 缓存失败(将仅用 REST): {e}")
|
||
else:
|
||
logger.warning(f"⚠ User Data Stream 未启动(账号 {account_id}),将仅依赖 REST 同步订单与持仓")
|
||
logger.warning(" 可能原因:")
|
||
logger.warning(" 1. listenKey 创建失败(检查 API Key 权限:需要 'Enable Reading' 和 'Enable Futures')")
|
||
logger.warning(" 2. 网络连接问题")
|
||
logger.warning(" 3. IP 白名单限制")
|
||
logger.warning(" 提示:可运行 python -m trading_system.check_user_data_stream 进行诊断")
|
||
user_data_stream = None
|
||
|
||
# 3.0 市场 WS 多进程共用:选主 + 仅 Leader 建连接,非 Leader 从 Redis 读
|
||
ticker_24h_stream = None
|
||
kline_stream = None
|
||
book_ticker_stream = None
|
||
market_ws_refresh_tasks = []
|
||
if getattr(client, "redis_cache", None):
|
||
try:
|
||
await client.redis_cache.connect()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from .market_ws_leader import (
|
||
use_shared_market_ws,
|
||
try_acquire_market_ws_leader,
|
||
is_market_ws_leader,
|
||
run_leader_renew_loop,
|
||
release_market_ws_leader,
|
||
stop_leader_renew_loop,
|
||
)
|
||
from .ticker_24h_stream import Ticker24hStream, refresh_ticker_24h_from_redis_loop
|
||
from .book_ticker_stream import BookTickerStream, refresh_book_ticker_from_redis_loop
|
||
from .kline_stream import KlineStream
|
||
use_testnet = getattr(config, "USE_TESTNET", False)
|
||
redis_cache = getattr(client, "redis_cache", None)
|
||
|
||
if use_shared_market_ws(redis_cache):
|
||
is_leader = await try_acquire_market_ws_leader(redis_cache)
|
||
if is_leader:
|
||
await run_leader_renew_loop(redis_cache)
|
||
try:
|
||
ticker_24h_stream = Ticker24hStream(testnet=use_testnet, redis_cache=redis_cache)
|
||
if await ticker_24h_stream.start():
|
||
logger.info("✓ 24h ticker WS 已启动(Leader,写入 Redis 供多进程共用)")
|
||
else:
|
||
ticker_24h_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 24h ticker WS 失败: {e}")
|
||
try:
|
||
kline_stream = KlineStream(testnet=use_testnet, redis_cache=redis_cache)
|
||
if await kline_stream.start():
|
||
logger.info("✓ K线 WS 已启动(Leader,写入 Redis 供多进程共用)")
|
||
else:
|
||
kline_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 K线 WS 失败: {e}")
|
||
try:
|
||
book_ticker_stream = BookTickerStream(testnet=use_testnet, redis_cache=redis_cache)
|
||
if await book_ticker_stream.start():
|
||
logger.info("✓ 最优挂单 WS 已启动(Leader,写入 Redis 供多进程共用)")
|
||
else:
|
||
book_ticker_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 BookTicker WS 失败: {e}")
|
||
else:
|
||
logger.info("✓ 本进程为非 Leader,将仅从 Redis 读取 Ticker24h/BookTicker/K线 缓存")
|
||
t_refresh = asyncio.create_task(refresh_ticker_24h_from_redis_loop(redis_cache, 2.0))
|
||
b_refresh = asyncio.create_task(refresh_book_ticker_from_redis_loop(redis_cache, 2.0))
|
||
market_ws_refresh_tasks = [t_refresh, b_refresh]
|
||
else:
|
||
is_leader = True
|
||
try:
|
||
ticker_24h_stream = Ticker24hStream(testnet=use_testnet)
|
||
if await ticker_24h_stream.start():
|
||
logger.info("✓ 24h ticker WS 已启动(扫描将优先使用 WS 缓存)")
|
||
else:
|
||
ticker_24h_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 24h ticker WS 失败: {e}")
|
||
try:
|
||
kline_stream = KlineStream(testnet=use_testnet)
|
||
if await kline_stream.start():
|
||
logger.info("✓ K线 WS 已启动(技术指标将优先使用 WS 缓存)")
|
||
else:
|
||
kline_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 K线 WS 失败: {e}")
|
||
try:
|
||
book_ticker_stream = BookTickerStream(testnet=use_testnet)
|
||
if await book_ticker_stream.start():
|
||
logger.info("✓ 最优挂单 WS 已启动(用于滑点估算与价格优化)")
|
||
else:
|
||
book_ticker_stream = None
|
||
except Exception as e:
|
||
logger.debug(f"启动 BookTicker WS 失败: {e}")
|
||
except Exception as e:
|
||
logger.debug(f"市场 WS 启动异常: {e}")
|
||
ticker_24h_stream = kline_stream = book_ticker_stream = None
|
||
|
||
# 4. 初始化各个模块
|
||
logger.info("初始化交易模块...")
|
||
scanner = MarketScanner(client)
|
||
risk_manager = RiskManager(client)
|
||
position_manager = PositionManager(client, risk_manager)
|
||
|
||
# 交易进程不生成推荐(由独立的推荐进程 recommendations_main.py 处理)
|
||
logger.info("交易进程模式:只执行自动交易,不生成推荐")
|
||
logger.info("推荐由独立的推荐进程(auto_recommend)处理")
|
||
|
||
strategy = TradingStrategy(client, scanner, risk_manager, position_manager, recommender=None)
|
||
|
||
# 4. 打印配置信息(完整版,便于验证山寨币策略)
|
||
logger.info("=" * 60)
|
||
logger.info("交易配置(当前策略):")
|
||
logger.info("=" * 60)
|
||
logger.info("【仓位控制】")
|
||
logger.info(f" 单笔最大仓位: {config.TRADING_CONFIG['MAX_POSITION_PERCENT']*100:.2f}%")
|
||
logger.info(f" 总仓位上限: {config.TRADING_CONFIG['MAX_TOTAL_POSITION_PERCENT']*100:.1f}%")
|
||
logger.info(f" 最小仓位: {config.TRADING_CONFIG.get('MIN_POSITION_PERCENT', 0.01)*100:.2f}%")
|
||
logger.info(f" 最大持仓数: {config.TRADING_CONFIG.get('MAX_OPEN_POSITIONS', 10)} 个")
|
||
logger.info(f" 每日最大开仓: {config.TRADING_CONFIG.get('MAX_DAILY_ENTRIES', 20)} 笔")
|
||
sunday_max = config.TRADING_CONFIG.get('SUNDAY_MAX_OPENS', 0)
|
||
if sunday_max and int(sunday_max) > 0:
|
||
logger.info(f" 周日开仓上限: {int(sunday_max)} 笔")
|
||
sunday_min_sig = config.TRADING_CONFIG.get('SUNDAY_MIN_SIGNAL_STRENGTH', 0)
|
||
if sunday_min_sig and int(sunday_min_sig) > 0:
|
||
logger.info(f" 周日信号门槛: >={int(sunday_min_sig)}")
|
||
if config.TRADING_CONFIG.get('NIGHT_HOURS_NO_OPEN_ENABLED', False):
|
||
only_sun = config.TRADING_CONFIG.get('NIGHT_HOURS_ONLY_SUNDAY', True)
|
||
logger.info(
|
||
f" 晚间禁止开仓: {config.TRADING_CONFIG.get('NIGHT_HOURS_START', 21)}:00~次日"
|
||
f"{config.TRADING_CONFIG.get('NIGHT_HOURS_END', 6)}:00(北京)"
|
||
+ (",仅周六晚~周日晨" if only_sun else ",每日")
|
||
)
|
||
logger.info("")
|
||
logger.info("【杠杆配置】")
|
||
logger.info(f" 基础杠杆: {config.TRADING_CONFIG.get('LEVERAGE', 10)}x")
|
||
logger.info(f" 最大杠杆: {config.TRADING_CONFIG.get('MAX_LEVERAGE', 20)}x")
|
||
logger.info(f" 动态杠杆: {'开启' if config.TRADING_CONFIG.get('USE_DYNAMIC_LEVERAGE') else '关闭'}")
|
||
logger.info("")
|
||
logger.info("【风险控制】")
|
||
logger.info(f" 固定止损: {config.TRADING_CONFIG['STOP_LOSS_PERCENT']*100:.1f}%")
|
||
logger.info(f" 固定止盈: {config.TRADING_CONFIG['TAKE_PROFIT_PERCENT']*100:.1f}%")
|
||
logger.info(f" ATR止损: {'开启' if config.TRADING_CONFIG.get('USE_ATR_STOP_LOSS') else '关闭'}")
|
||
logger.info(f" ATR止损倍数: {config.TRADING_CONFIG.get('ATR_STOP_LOSS_MULTIPLIER', 2.0)}")
|
||
logger.info(f" ATR止盈倍数: {config.TRADING_CONFIG.get('ATR_TAKE_PROFIT_MULTIPLIER', 3.0)}")
|
||
logger.info(f" 盈亏比: {config.TRADING_CONFIG.get('RISK_REWARD_RATIO', 3.0)}:1")
|
||
logger.info(f" 固定风险: {'开启' if config.TRADING_CONFIG.get('USE_FIXED_RISK_SIZING') else '关闭'}")
|
||
logger.info(f" 每笔风险: {config.TRADING_CONFIG.get('FIXED_RISK_PERCENT', 0.02)*100:.1f}%")
|
||
profit_protection = config.TRADING_CONFIG.get('PROFIT_PROTECTION_ENABLED', True)
|
||
logger.info(f" 盈利保护(移动止损+保本): {'开启' if profit_protection else '关闭'}")
|
||
if profit_protection and config.TRADING_CONFIG.get('USE_TRAILING_STOP'):
|
||
# 修复:配置值已经是比例形式,直接乘以100显示
|
||
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.1)
|
||
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.05)
|
||
# 如果值>1,说明可能还是百分比形式,需要转换;否则直接使用
|
||
if trailing_activation > 1:
|
||
trailing_activation = trailing_activation / 100.0
|
||
if trailing_protect > 1:
|
||
trailing_protect = trailing_protect / 100.0
|
||
logger.info(f" 激活条件: 盈利{trailing_activation*100:.0f}%")
|
||
logger.info(f" 保护利润: {trailing_protect*100:.0f}%")
|
||
if profit_protection:
|
||
fee_buf = config.TRADING_CONFIG.get('FEE_BUFFER_PCT', 0.0015)
|
||
lock_pct = config.TRADING_CONFIG.get('LOCK_PROFIT_AT_BREAKEVEN_AFTER_PCT', 0.02)
|
||
if lock_pct and lock_pct > 1:
|
||
lock_pct = lock_pct / 100.0
|
||
logger.info(f" 保本含手续费缓冲 {float(fee_buf or 0)*100:.2f}% | 盈利达{float(lock_pct or 0)*100:.0f}%移至保本")
|
||
logger.info(f" 最小持仓时间: {config.TRADING_CONFIG.get('MIN_HOLD_TIME_SEC', 0)} 秒")
|
||
logger.info("")
|
||
logger.info("【市场扫描】")
|
||
logger.info(f" 扫描间隔: {config.TRADING_CONFIG['SCAN_INTERVAL']} 秒")
|
||
logger.info(f" 扫描交易对数量: {config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500)}")
|
||
logger.info(f" 处理交易对数量: {config.TRADING_CONFIG['TOP_N_SYMBOLS']} 个")
|
||
logger.info(f" 最小24H成交额: ${config.TRADING_CONFIG.get('MIN_VOLUME_24H', 0)/1000000:.0f}M")
|
||
# 修复:配置值格式转换
|
||
min_volatility = config.TRADING_CONFIG.get('MIN_VOLATILITY', 0.02)
|
||
if min_volatility > 1:
|
||
min_volatility = min_volatility / 100.0
|
||
logger.info(f" 最小波动率: {min_volatility*100:.1f}%")
|
||
logger.info(f" 最小信号强度: {config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 5)}")
|
||
logger.info(f" 最小涨跌幅: {config.TRADING_CONFIG['MIN_CHANGE_PERCENT']:.1f}%")
|
||
logger.info("")
|
||
logger.info("【其他】")
|
||
logger.info(f" 测试网模式: {config.USE_TESTNET}")
|
||
logger.info(f" 智能入场: {'开启' if config.TRADING_CONFIG.get('SMART_ENTRY_ENABLED') else '关闭'}")
|
||
logger.info(f" 只做趋势: {'是' if config.TRADING_CONFIG.get('AUTO_TRADE_ONLY_TRENDING') else '否'}")
|
||
logger.info("=" * 60)
|
||
|
||
# 5. 启动交易策略
|
||
logger.info("启动交易策略...")
|
||
await strategy.execute_strategy()
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("收到停止信号,正在关闭...")
|
||
except Exception as e:
|
||
logger.error(f"程序运行出错: {e}", exc_info=True)
|
||
raise
|
||
finally:
|
||
# 清理资源(先停 User Data Stream,再断 client)
|
||
try:
|
||
if user_data_stream is not None:
|
||
await user_data_stream.stop()
|
||
logger.info("User Data Stream 已停止")
|
||
except Exception as e:
|
||
logger.debug(f"停止 User Data Stream 时异常: {e}")
|
||
try:
|
||
if ticker_24h_stream is not None:
|
||
await ticker_24h_stream.stop()
|
||
logger.info("Ticker24h Stream 已停止")
|
||
except Exception as e:
|
||
logger.debug(f"停止 Ticker24h Stream 时异常: {e}")
|
||
try:
|
||
if kline_stream is not None:
|
||
await kline_stream.stop()
|
||
logger.info("Kline Stream 已停止")
|
||
except Exception as e:
|
||
logger.debug(f"停止 Kline Stream 时异常: {e}")
|
||
try:
|
||
if book_ticker_stream is not None:
|
||
await book_ticker_stream.stop()
|
||
logger.info("BookTicker Stream 已停止")
|
||
except Exception as e:
|
||
logger.debug(f"停止 BookTicker Stream 时异常: {e}")
|
||
for t in market_ws_refresh_tasks:
|
||
try:
|
||
t.cancel()
|
||
await t
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from .market_ws_leader import is_market_ws_leader, release_market_ws_leader, stop_leader_renew_loop
|
||
if is_market_ws_leader() and client and getattr(client, "redis_cache", None):
|
||
release_market_ws_leader(client.redis_cache)
|
||
stop_leader_renew_loop()
|
||
except Exception:
|
||
pass
|
||
if client:
|
||
await client.disconnect()
|
||
logger.info("程序已退出")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
logger.info("程序被用户中断")
|
||
except Exception as e:
|
||
logger.error(f"程序异常退出: {e}", exc_info=True)
|
||
raise
|