"""User management web routes.""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from sqlalchemy.orm import Session from ..database import get_db from ..models.user import User, UserRole from ..utils.security import get_password_hash from .deps import get_current_user_web, require_admin_web, flash, get_flashed_messages router = APIRouter() @router.get("/users", response_class=HTMLResponse) async def list_users( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """List users.""" if current_user.role == UserRole.SUPER_ADMIN: users = db.query(User).all() else: users = db.query(User).filter(User.tenant_id == current_user.tenant_id).all() return request.app.state.templates.TemplateResponse( "users/list.html", { "request": request, "current_user": current_user, "users": users, "flash_messages": get_flashed_messages(request) } ) @router.get("/users/new", response_class=HTMLResponse) async def new_user_form( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """New user form.""" from ..models.tenant import Tenant tenants = db.query(Tenant).filter(Tenant.is_active == True).all() return request.app.state.templates.TemplateResponse( "users/form.html", { "request": request, "current_user": current_user, "user": None, "tenants": tenants, "roles": [r.value for r in UserRole if r != UserRole.SUPER_ADMIN or current_user.role == UserRole.SUPER_ADMIN], "flash_messages": get_flashed_messages(request) } ) @router.post("/users/new") async def create_user( request: Request, username: str = Form(...), email: str = Form(...), password: str = Form(...), role: str = Form(...), full_name: str = Form(None), tenant_id: int = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Create new user.""" # Check if username exists existing = db.query(User).filter(User.username == username).first() if existing: flash(request, "Benutzername bereits vergeben", "danger") return RedirectResponse(url="/users/new", status_code=303) # Check if email exists existing = db.query(User).filter(User.email == email).first() if existing: flash(request, "E-Mail bereits vergeben", "danger") return RedirectResponse(url="/users/new", status_code=303) # Determine tenant if current_user.role == UserRole.SUPER_ADMIN and tenant_id: user_tenant_id = tenant_id else: user_tenant_id = current_user.tenant_id user = User( username=username, email=email, password_hash=get_password_hash(password), role=UserRole(role), full_name=full_name or None, tenant_id=user_tenant_id if UserRole(role) != UserRole.SUPER_ADMIN else None ) db.add(user) db.commit() flash(request, f"Benutzer '{username}' erstellt", "success") return RedirectResponse(url="/users", status_code=303) @router.get("/users/{user_id}", response_class=HTMLResponse) async def user_detail( request: Request, user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """User detail page.""" user = db.query(User).filter(User.id == user_id).first() if not user: flash(request, "Benutzer nicht gefunden", "danger") return RedirectResponse(url="/users", status_code=303) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN and user.tenant_id != current_user.tenant_id: flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/users", status_code=303) return request.app.state.templates.TemplateResponse( "users/detail.html", { "request": request, "current_user": current_user, "user": user, "flash_messages": get_flashed_messages(request) } ) @router.get("/users/{user_id}/access", response_class=HTMLResponse) async def user_access( request: Request, user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Manage user gateway access.""" from ..models.gateway import Gateway from ..models.access import UserGatewayAccess user = db.query(User).filter(User.id == user_id).first() if not user: flash(request, "Benutzer nicht gefunden", "danger") return RedirectResponse(url="/users", status_code=303) # Get all gateways and current access if current_user.role == UserRole.SUPER_ADMIN: gateways = db.query(Gateway).all() else: gateways = db.query(Gateway).filter(Gateway.tenant_id == current_user.tenant_id).all() user_access_list = db.query(UserGatewayAccess).filter( UserGatewayAccess.user_id == user_id ).all() access_gateway_ids = [a.gateway_id for a in user_access_list] return request.app.state.templates.TemplateResponse( "users/access.html", { "request": request, "current_user": current_user, "user": user, "gateways": gateways, "access_gateway_ids": access_gateway_ids, "flash_messages": get_flashed_messages(request) } ) @router.post("/users/{user_id}/access") async def save_user_access( request: Request, user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Save user gateway access.""" from ..models.gateway import Gateway from ..models.access import UserGatewayAccess user = db.query(User).filter(User.id == user_id).first() if not user: flash(request, "Benutzer nicht gefunden", "danger") return RedirectResponse(url="/users", status_code=303) # Get form data form_data = await request.form() selected_gateway_ids = [int(gw_id) for gw_id in form_data.getlist("gateway_ids")] # Get available gateways if current_user.role == UserRole.SUPER_ADMIN: available_gateways = db.query(Gateway).all() else: available_gateways = db.query(Gateway).filter(Gateway.tenant_id == current_user.tenant_id).all() available_gateway_ids = [g.id for g in available_gateways] # Remove existing access for available gateways db.query(UserGatewayAccess).filter( UserGatewayAccess.user_id == user_id, UserGatewayAccess.gateway_id.in_(available_gateway_ids) ).delete(synchronize_session=False) # Add new access for gateway_id in selected_gateway_ids: if gateway_id in available_gateway_ids: access = UserGatewayAccess( user_id=user_id, gateway_id=gateway_id, granted_by_id=current_user.id ) db.add(access) db.commit() flash(request, f"Zugriffe für '{user.username}' aktualisiert", "success") return RedirectResponse(url=f"/users/{user_id}/access", status_code=303) @router.get("/users/{user_id}/edit", response_class=HTMLResponse) async def edit_user_form( request: Request, user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Edit user form.""" from ..models.tenant import Tenant user = db.query(User).filter(User.id == user_id).first() if not user: flash(request, "Benutzer nicht gefunden", "danger") return RedirectResponse(url="/users", status_code=303) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN and user.tenant_id != current_user.tenant_id: flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/users", status_code=303) tenants = db.query(Tenant).filter(Tenant.is_active == True).all() return request.app.state.templates.TemplateResponse( "users/form.html", { "request": request, "current_user": current_user, "user": user, "tenants": tenants, "roles": [r.value for r in UserRole if r != UserRole.SUPER_ADMIN or current_user.role == UserRole.SUPER_ADMIN], "flash_messages": get_flashed_messages(request) } ) @router.post("/users/{user_id}/edit") async def update_user( request: Request, user_id: int, username: str = Form(...), email: str = Form(...), password: str = Form(None), role: str = Form(...), full_name: str = Form(None), tenant_id: int = Form(None), is_active: str = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Update user.""" user = db.query(User).filter(User.id == user_id).first() if not user: flash(request, "Benutzer nicht gefunden", "danger") return RedirectResponse(url="/users", status_code=303) # Check tenant access if current_user.role != UserRole.SUPER_ADMIN and user.tenant_id != current_user.tenant_id: flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/users", status_code=303) # Check if username is taken by another user existing = db.query(User).filter(User.username == username, User.id != user_id).first() if existing: flash(request, "Benutzername bereits vergeben", "danger") return RedirectResponse(url=f"/users/{user_id}/edit", status_code=303) # Check if email is taken by another user existing = db.query(User).filter(User.email == email, User.id != user_id).first() if existing: flash(request, "E-Mail bereits vergeben", "danger") return RedirectResponse(url=f"/users/{user_id}/edit", status_code=303) user.username = username user.email = email user.role = UserRole(role) user.full_name = full_name or None user.is_active = is_active is not None # Only update password if provided if password: user.password_hash = get_password_hash(password) # Update tenant if current_user.role == UserRole.SUPER_ADMIN: user.tenant_id = tenant_id if UserRole(role) != UserRole.SUPER_ADMIN else None db.commit() flash(request, f"Benutzer '{username}' aktualisiert", "success") return RedirectResponse(url=f"/users/{user_id}", status_code=303)