"""Authentication service.""" from datetime import datetime from sqlalchemy.orm import Session from ..models.user import User from ..schemas.user import UserCreate, Token from ..utils.security import ( verify_password, get_password_hash, create_access_token, create_refresh_token, decode_token ) class AuthService: """Service for authentication operations.""" def __init__(self, db: Session): self.db = db def authenticate_user(self, username: str, password: str) -> User | None: """Authenticate user with username and password.""" user = self.db.query(User).filter(User.username == username).first() if not user: return None if not verify_password(password, user.password_hash): return None if not user.is_active: return None # Update last login user.last_login = datetime.utcnow() self.db.commit() return user def create_tokens(self, user: User) -> Token: """Create access and refresh tokens for user.""" access_token = create_access_token( user_id=user.id, username=user.username, role=user.role.value, tenant_id=user.tenant_id ) refresh_token = create_refresh_token(user_id=user.id) return Token( access_token=access_token, refresh_token=refresh_token ) def refresh_tokens(self, refresh_token: str) -> Token | None: """Refresh access token using refresh token.""" payload = decode_token(refresh_token) if not payload: return None if payload.get("type") != "refresh": return None user_id = payload.get("sub") user = self.db.query(User).filter(User.id == user_id).first() if not user or not user.is_active: return None return self.create_tokens(user) def create_user(self, user_data: UserCreate) -> User: """Create a new user.""" user = User( username=user_data.username, email=user_data.email, password_hash=get_password_hash(user_data.password), full_name=user_data.full_name, role=user_data.role, tenant_id=user_data.tenant_id ) self.db.add(user) self.db.commit() self.db.refresh(user) return user def get_user_by_id(self, user_id: int) -> User | None: """Get user by ID.""" return self.db.query(User).filter(User.id == user_id).first() def get_user_by_username(self, username: str) -> User | None: """Get user by username.""" return self.db.query(User).filter(User.username == username).first()