auto_trade_sys/backend/api/routes/admin.py
薇薇安 e816524972 1
2026-02-14 14:47:30 +08:00

169 lines
5.7 KiB
Python

"""
管理员接口:用户管理 / 授权管理
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from api.auth_deps import get_admin_user
from api.auth_utils import hash_password
from database.models import User, UserAccountMembership, Account
router = APIRouter(prefix="/api/admin", tags=["admin"])
class UserCreateReq(BaseModel):
username: str = Field(..., min_length=1, max_length=64)
password: str = Field(..., min_length=1, max_length=200)
role: str = Field("user", pattern="^(admin|user)$")
status: str = Field("active", pattern="^(active|disabled)$")
@router.get("/users")
async def list_users(_admin: Dict[str, Any] = Depends(get_admin_user)):
return User.list_all()
@router.get("/users/detailed")
async def list_users_with_accounts(_admin: Dict[str, Any] = Depends(get_admin_user)):
"""获取所有用户及其关联账号列表"""
users = User.list_all()
out = []
# 获取所有授权关系
# 优化:一次性查询所有 memberships 并在内存中分组,避免 N+1 查询
# 但由于 UserAccountMembership 没有 list_all 方法,暂时循环查询或添加 list_all
# 考虑到用户量不大,循环查询尚可接受。
for u in users:
uid = u['id']
memberships = UserAccountMembership.get_user_accounts(uid)
user_accounts = []
for m in memberships or []:
user_accounts.append({
"id": m.get("id"),
"name": m.get("name"),
"status": m.get("status"),
"role": m.get("role"),
"has_api_key": bool(m.get("api_key_enc")),
"has_api_secret": bool(m.get("api_secret_enc"))
})
out.append({
"id": uid,
"username": u['username'],
"role": u['role'],
"status": u['status'],
"accounts": user_accounts
})
return out
@router.post("/users")
async def create_user(payload: UserCreateReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
exists = User.get_by_username(payload.username)
if exists:
raise HTTPException(status_code=400, detail="用户名已存在")
uid = User.create(
username=payload.username,
password_hash=hash_password(payload.password),
role=payload.role,
status=payload.status,
)
return {"success": True, "id": int(uid)}
class UserPasswordReq(BaseModel):
password: str = Field(..., min_length=1, max_length=200)
@router.put("/users/{user_id}/password")
async def set_user_password(user_id: int, payload: UserPasswordReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
u = User.get_by_id(int(user_id))
if not u:
raise HTTPException(status_code=404, detail="用户不存在")
User.set_password(int(user_id), hash_password(payload.password))
return {"success": True}
class UserRoleReq(BaseModel):
role: str = Field(..., pattern="^(admin|user)$")
@router.put("/users/{user_id}/role")
async def set_user_role(user_id: int, payload: UserRoleReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
u = User.get_by_id(int(user_id))
if not u:
raise HTTPException(status_code=404, detail="用户不存在")
User.set_role(int(user_id), payload.role)
return {"success": True}
class UserStatusReq(BaseModel):
status: str = Field(..., pattern="^(active|disabled)$")
@router.put("/users/{user_id}/status")
async def set_user_status(user_id: int, payload: UserStatusReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
u = User.get_by_id(int(user_id))
if not u:
raise HTTPException(status_code=404, detail="用户不存在")
User.set_status(int(user_id), payload.status)
return {"success": True}
@router.get("/users/{user_id}/accounts")
async def list_user_accounts(user_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)):
u = User.get_by_id(int(user_id))
if not u:
raise HTTPException(status_code=404, detail="用户不存在")
memberships = UserAccountMembership.list_for_user(int(user_id))
# 追加账号名称(便于前端展示)
out = []
for m in memberships or []:
aid = int(m.get("account_id"))
a = Account.get(aid) or {}
out.append(
{
"user_id": int(m.get("user_id")),
"account_id": aid,
"role": m.get("role") or "viewer",
"account_name": a.get("name") or "",
"account_status": a.get("status") or "",
}
)
return out
class GrantReq(BaseModel):
role: str = Field("viewer", pattern="^(owner|viewer)$")
@router.put("/users/{user_id}/accounts/{account_id}")
async def grant_user_account(user_id: int, account_id: int, payload: GrantReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
u = User.get_by_id(int(user_id))
if not u:
raise HTTPException(status_code=404, detail="用户不存在")
a = Account.get(int(account_id))
if not a:
raise HTTPException(status_code=404, detail="账号不存在")
try:
if payload.role == "owner":
UserAccountMembership.clear_other_owners_for_account(int(account_id), int(user_id))
UserAccountMembership.add(int(user_id), int(account_id), role=payload.role)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"关联账号失败: {str(e)}",
)
return {"success": True}
@router.delete("/users/{user_id}/accounts/{account_id}")
async def revoke_user_account(user_id: int, account_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)):
UserAccountMembership.remove(int(user_id), int(account_id))
return {"success": True}