"""API dependencies for authentication and authorization.""" from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from ..database import get_db from ..models.user import User, UserRole from ..utils.security import decode_token # Bearer token security scheme security = HTTPBearer() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Get current authenticated user from JWT token.""" token = credentials.credentials payload = decode_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"} ) if payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type", headers={"WWW-Authenticate": "Bearer"} ) user_id = int(payload.get("sub")) # sub is stored as string per JWT spec user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is deactivated" ) return user def get_current_active_user( current_user: User = Depends(get_current_user) ) -> User: """Ensure user is active.""" if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user def require_admin( current_user: User = Depends(get_current_user) ) -> User: """Require admin role.""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user def require_super_admin( current_user: User = Depends(get_current_user) ) -> User: """Require super admin role.""" if current_user.role != UserRole.SUPER_ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Super admin privileges required" ) return current_user