diff --git a/trading_system/main.py b/trading_system/main.py index 63e7740..b099ed9 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -377,9 +377,9 @@ async def main(): # 用当前 REST 结果播种缓存,后续由 WS 增量更新,业务可优先读缓存(Redis) try: redis_cache = getattr(client, "redis_cache", None) - await seed_balance_cache(balance, redis_cache) + await seed_balance_cache(balance, redis_cache, account_id=account_id) positions_seed = await client.get_open_positions() - await seed_position_cache(positions_seed, redis_cache) + await seed_position_cache(positions_seed, redis_cache, account_id=account_id) logger.info(f"✓ 已播种持仓/余额缓存(持仓 {len(positions_seed)} 个,已写入 Redis)") except Exception as e: logger.warning(f"播种 WS 缓存失败(将仅用 REST): {e}") diff --git a/trading_system/risk_manager.py b/trading_system/risk_manager.py index 9ec660d..8896d0e 100644 --- a/trading_system/risk_manager.py +++ b/trading_system/risk_manager.py @@ -210,6 +210,21 @@ class RiskManager: logger.warning("账户总余额为0,无法开仓") return False + # 币安 -2019:可用保证金不足时直接拒绝,避免下单被拒 + if available_balance is not None and float(available_balance) <= 0: + logger.warning( + "可用保证金不足或为负 (available=%.2f USDT),无法开仓,跳过", + float(available_balance), + ) + return False + if available_balance is not None and float(new_position_margin) > float(available_balance): + logger.warning( + "新仓位保证金 %.2f USDT > 可用保证金 %.2f USDT,无法开仓", + float(new_position_margin), + float(available_balance), + ) + return False + max_total_margin = total_balance * config.TRADING_CONFIG['MAX_TOTAL_POSITION_PERCENT'] max_total_margin_pct = config.TRADING_CONFIG['MAX_TOTAL_POSITION_PERCENT'] * 100 diff --git a/trading_system/user_data_stream.py b/trading_system/user_data_stream.py index c4d01b1..e71cebc 100644 --- a/trading_system/user_data_stream.py +++ b/trading_system/user_data_stream.py @@ -49,8 +49,17 @@ except ImportError: TTL_BALANCE = 300 -async def seed_position_cache(positions: List[Dict], redis_cache: Any = None) -> None: - """用 REST 全量持仓结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。""" +def _cache_account_id() -> int: + """当前进程的 account_id(多账号隔离时 Redis 缓存键必须按账号区分)""" + import os + try: + return int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1) + except Exception: + return 1 + + +async def seed_position_cache(positions: List[Dict], redis_cache: Any = None, account_id: int = None) -> None: + """用 REST 全量持仓结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。""" global _position_updates_cache, _position_cache_seeded _position_cache_seeded = True positions_list = [] @@ -82,14 +91,15 @@ async def seed_position_cache(positions: List[Dict], redis_cache: Any = None) -> _position_updates_cache.clear() if positions_list: try: - await redis_cache.set("ats:positions:cache", positions_list, ttl=TTL_POSITIONS) + aid = int(account_id) if account_id is not None else _cache_account_id() + await redis_cache.set(f"ats:positions:cache:{aid}", positions_list, ttl=TTL_POSITIONS) except Exception as e: logger.debug(f"写入持仓缓存到 Redis 失败: {e}") logger.debug(f"UserDataStream: 已填充持仓缓存(Redis=%s)", bool(redis_cache)) -async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None) -> None: - """用 REST 余额结果填充缓存。有 Redis 时只写 Redis、不占进程内存;无 Redis 时写进程内存。""" +async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None, account_id: int = None) -> None: + """用 REST 余额结果填充缓存。有 Redis 时只写 Redis;无 Redis 时写进程内存。多账号时 Redis 键按 account_id 隔离。""" global _balance_updates_cache, _balance_cache_seeded _balance_cache_seeded = True if balance and isinstance(balance, dict): @@ -98,7 +108,8 @@ async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None) - balance_data = {"wb": str(wb), "cw": str(av), "bc": "0"} if redis_cache: try: - await redis_cache.set("ats:balance:cache:USDT", balance_data, ttl=TTL_BALANCE) + aid = int(account_id) if account_id is not None else _cache_account_id() + await redis_cache.set(f"ats:balance:cache:USDT:{aid}", balance_data, ttl=TTL_BALANCE) except Exception as e: logger.debug(f"写入余额缓存到 Redis 失败: {e}") else: @@ -106,15 +117,15 @@ async def seed_balance_cache(balance: Dict[str, Any], redis_cache: Any = None) - logger.debug("UserDataStream: 已填充余额缓存 (USDT, Redis=%s)", bool(redis_cache)) -async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = None) -> Optional[List[Dict]]: +async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = None, account_id: int = None) -> Optional[List[Dict]]: """ 将持仓缓存转为与 REST get_open_positions 一致的列表格式;未播种时返回 None(业务应走 REST)。 - ⚠️ 内存优化:优先从 Redis 读取,减少进程内存占用。 + ⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。 """ - # 优先从 Redis 读取(多进程共享) + aid = int(account_id) if account_id is not None else _cache_account_id() if redis_cache: try: - redis_key = "ats:positions:cache" + redis_key = f"ats:positions:cache:{aid}" cached = await redis_cache.get(redis_key) if cached and isinstance(cached, list): # 过滤最小名义价值 @@ -161,15 +172,15 @@ async def get_positions_from_cache(min_notional: float = 1.0, redis_cache: Any = return out -async def get_balance_from_cache(redis_cache: Any = None) -> Optional[Dict[str, Any]]: +async def get_balance_from_cache(redis_cache: Any = None, account_id: int = None) -> Optional[Dict[str, Any]]: """ 从缓存返回 USDT 余额(与 REST get_account_balance 结构兼容);未播种或无缓存时返回 None。 - ⚠️ 内存优化:优先从 Redis 读取,减少进程内存占用。 + ⚠️ 多账号时 Redis 键按 account_id 隔离,避免读错账号数据导致风控失效。 """ - # 优先从 Redis 读取(多进程共享) + aid = int(account_id) if account_id is not None else _cache_account_id() if redis_cache: try: - redis_key = "ats:balance:cache:USDT" + redis_key = f"ats:balance:cache:USDT:{aid}" cached = await redis_cache.get(redis_key) if cached and isinstance(cached, dict): try: @@ -625,21 +636,21 @@ class UserDataStream: logger.debug(f"UserDataStream: ACCOUNT_UPDATE 持仓数 {len(P)}") async def _write_balance_to_redis(self, asset: str, balance_data: Dict): - """写入余额缓存到 Redis(带 TTL,避免无限增长)""" + """写入余额缓存到 Redis(带 TTL,按 account_id 隔离)""" try: redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: - redis_key = f"ats:balance:cache:{asset}" + redis_key = f"ats:balance:cache:{asset}:{self.account_id}" await redis_cache.set(redis_key, balance_data, ttl=TTL_BALANCE) except Exception as e: logger.debug(f"写入余额缓存到 Redis 失败: {e}") async def _write_positions_to_redis(self, positions_list: List[Dict]): - """写入持仓缓存到 Redis(带 TTL,避免无限增长)""" + """写入持仓缓存到 Redis(带 TTL,按 account_id 隔离)""" try: redis_cache = getattr(self.client, "redis_cache", None) if redis_cache: - redis_key = "ats:positions:cache" + redis_key = f"ats:positions:cache:{self.account_id}" await redis_cache.set(redis_key, positions_list, ttl=TTL_POSITIONS) except Exception as e: logger.debug(f"写入持仓缓存到 Redis 失败: {e}")