""" 管理员接口:用户管理 / 授权管理 """ from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any from api.auth_deps import get_admin_user from api.auth_utils import hash_password from database.models import User, UserAccountMembership, Account router = APIRouter(prefix="/api/admin", tags=["admin"]) class UserCreateReq(BaseModel): username: str = Field(..., min_length=1, max_length=64) password: str = Field(..., min_length=1, max_length=200) role: str = Field("user", pattern="^(admin|user)$") status: str = Field("active", pattern="^(active|disabled)$") @router.get("/users") async def list_users(_admin: Dict[str, Any] = Depends(get_admin_user)): return User.list_all() @router.get("/users/detailed") async def list_users_with_accounts(_admin: Dict[str, Any] = Depends(get_admin_user)): """获取所有用户及其关联账号列表""" users = User.list_all() out = [] # 获取所有授权关系 # 优化:一次性查询所有 memberships 并在内存中分组,避免 N+1 查询 # 但由于 UserAccountMembership 没有 list_all 方法,暂时循环查询或添加 list_all # 考虑到用户量不大,循环查询尚可接受。 for u in users: uid = u['id'] memberships = UserAccountMembership.get_user_accounts(uid) user_accounts = [] for m in memberships or []: user_accounts.append({ "id": m.get("id"), "name": m.get("name"), "status": m.get("status"), "role": m.get("role"), "has_api_key": bool(m.get("api_key_enc")), "has_api_secret": bool(m.get("api_secret_enc")) }) out.append({ "id": uid, "username": u['username'], "role": u['role'], "status": u['status'], "accounts": user_accounts }) return out @router.post("/users") async def create_user(payload: UserCreateReq, _admin: Dict[str, Any] = Depends(get_admin_user)): exists = User.get_by_username(payload.username) if exists: raise HTTPException(status_code=400, detail="用户名已存在") uid = User.create( username=payload.username, password_hash=hash_password(payload.password), role=payload.role, status=payload.status, ) return {"success": True, "id": int(uid)} class UserPasswordReq(BaseModel): password: str = Field(..., min_length=1, max_length=200) @router.put("/users/{user_id}/password") async def set_user_password(user_id: int, payload: UserPasswordReq, _admin: Dict[str, Any] = Depends(get_admin_user)): u = User.get_by_id(int(user_id)) if not u: raise HTTPException(status_code=404, detail="用户不存在") User.set_password(int(user_id), hash_password(payload.password)) return {"success": True} class UserRoleReq(BaseModel): role: str = Field(..., pattern="^(admin|user)$") @router.put("/users/{user_id}/role") async def set_user_role(user_id: int, payload: UserRoleReq, _admin: Dict[str, Any] = Depends(get_admin_user)): u = User.get_by_id(int(user_id)) if not u: raise HTTPException(status_code=404, detail="用户不存在") User.set_role(int(user_id), payload.role) return {"success": True} class UserStatusReq(BaseModel): status: str = Field(..., pattern="^(active|disabled)$") @router.put("/users/{user_id}/status") async def set_user_status(user_id: int, payload: UserStatusReq, _admin: Dict[str, Any] = Depends(get_admin_user)): u = User.get_by_id(int(user_id)) if not u: raise HTTPException(status_code=404, detail="用户不存在") User.set_status(int(user_id), payload.status) return {"success": True} @router.get("/users/{user_id}/accounts") async def list_user_accounts(user_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)): u = User.get_by_id(int(user_id)) if not u: raise HTTPException(status_code=404, detail="用户不存在") memberships = UserAccountMembership.list_for_user(int(user_id)) # 追加账号名称(便于前端展示) out = [] for m in memberships or []: aid = int(m.get("account_id")) a = Account.get(aid) or {} out.append( { "user_id": int(m.get("user_id")), "account_id": aid, "role": m.get("role") or "viewer", "account_name": a.get("name") or "", "account_status": a.get("status") or "", } ) return out class GrantReq(BaseModel): role: str = Field("viewer", pattern="^(owner|viewer)$") @router.put("/users/{user_id}/accounts/{account_id}") async def grant_user_account(user_id: int, account_id: int, payload: GrantReq, _admin: Dict[str, Any] = Depends(get_admin_user)): u = User.get_by_id(int(user_id)) if not u: raise HTTPException(status_code=404, detail="用户不存在") a = Account.get(int(account_id)) if not a: raise HTTPException(status_code=404, detail="账号不存在") try: UserAccountMembership.add(int(user_id), int(account_id), role=payload.role) except Exception as e: raise HTTPException( status_code=500, detail=f"关联账号失败: {str(e)}", ) return {"success": True} @router.delete("/users/{user_id}/accounts/{account_id}") async def revoke_user_account(user_id: int, account_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)): UserAccountMembership.remove(int(user_id), int(account_id)) return {"success": True}