"""User management API routes.""" from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from ..database import get_db from ..models.user import User, UserRole from ..schemas.user import UserCreate, UserUpdate, UserResponse from ..utils.security import get_password_hash from .deps import get_current_user, require_admin router = APIRouter() @router.get("/", response_model=list[UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): """List users (admins see their tenant's users, super admins see all).""" query = db.query(User) # Filter by tenant for non-super-admins if current_user.role != UserRole.SUPER_ADMIN: query = query.filter(User.tenant_id == current_user.tenant_id) users = query.offset(skip).limit(limit).all() return users @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user_data: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): """Create a new user.""" # Check permissions if current_user.role != UserRole.SUPER_ADMIN: # Regular admins can only create users in their own tenant if user_data.tenant_id != current_user.tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create users in other tenants" ) # Cannot create super admins if user_data.role == UserRole.SUPER_ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create super admin users" ) # Check if username exists existing = db.query(User).filter(User.username == user_data.username).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Check if email exists existing = db.query(User).filter(User.email == user_data.email).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists" ) user = User( username=user_data.username, email=user_data.email, password_hash=get_password_hash(user_data.password), full_name=user_data.full_name, role=user_data.role, tenant_id=user_data.tenant_id ) db.add(user) db.commit() db.refresh(user) return user @router.get("/{user_id}", response_model=UserResponse) def get_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): """Get user by ID.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN: if user.tenant_id != current_user.tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot access users from other tenants" ) return user @router.put("/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_data: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): """Update user.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN: if user.tenant_id != current_user.tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify users from other tenants" ) update_data = user_data.model_dump(exclude_unset=True) # Hash password if provided if "password" in update_data: update_data["password_hash"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(user, field, value) db.commit() db.refresh(user) return user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): """Delete user.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Cannot delete yourself if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account" ) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN: if user.tenant_id != current_user.tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete users from other tenants" ) db.delete(user) db.commit()