"""VPN Profile management web routes (nested under gateways).""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse, Response from sqlalchemy.orm import Session from ..database import get_db from ..models.user import User from ..models.gateway import Gateway from ..models.vpn_server import VPNServer from ..models.vpn_profile import VPNProfile from ..services.vpn_profile_service import VPNProfileService from .deps import require_admin_web, require_user_web, flash, get_flashed_messages router = APIRouter() @router.get("/gateways/{gateway_id}/profiles", response_class=HTMLResponse) async def list_profiles( request: Request, gateway_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_user_web) ): """List VPN profiles for a gateway.""" gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first() if not gateway: flash(request, "Gateway nicht gefunden", "danger") return RedirectResponse(url="/gateways", status_code=303) # Check access if not current_user.is_admin and not any( access.gateway_id == gateway_id for access in current_user.gateway_access ): flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/gateways", status_code=303) service = VPNProfileService(db) profiles = service.get_profiles_for_gateway(gateway_id) return request.app.state.templates.TemplateResponse( "gateways/profiles.html", { "request": request, "current_user": current_user, "gateway": gateway, "profiles": profiles, "flash_messages": get_flashed_messages(request) } ) @router.get("/gateways/{gateway_id}/profiles/new", response_class=HTMLResponse) async def new_profile_form( request: Request, gateway_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """New VPN profile form.""" gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first() if not gateway: flash(request, "Gateway nicht gefunden", "danger") return RedirectResponse(url="/gateways", status_code=303) # Get available VPN servers if current_user.is_super_admin: servers = db.query(VPNServer).filter(VPNServer.is_active == True).all() else: servers = db.query(VPNServer).filter( (VPNServer.tenant_id == current_user.tenant_id) | (VPNServer.tenant_id == None), VPNServer.is_active == True ).all() # Calculate next priority existing_profiles = db.query(VPNProfile).filter( VPNProfile.gateway_id == gateway_id ).count() next_priority = existing_profiles + 1 return request.app.state.templates.TemplateResponse( "gateways/profile_form.html", { "request": request, "current_user": current_user, "gateway": gateway, "profile": None, "vpn_servers": servers, "next_priority": next_priority, "flash_messages": get_flashed_messages(request) } ) @router.post("/gateways/{gateway_id}/profiles/new") async def create_profile( request: Request, gateway_id: int, name: str = Form(...), vpn_server_id: int = Form(...), priority: int = Form(1), description: str = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Create new VPN profile.""" try: service = VPNProfileService(db) profile = service.create_profile( gateway_id=gateway_id, vpn_server_id=vpn_server_id, name=name, priority=priority, description=description ) flash(request, f"VPN-Profil '{name}' erstellt", "success") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile.id}", status_code=303) except ValueError as e: flash(request, str(e), "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/new", status_code=303) except Exception as e: flash(request, f"Fehler: {str(e)}", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/new", status_code=303) @router.get("/gateways/{gateway_id}/profiles/{profile_id}", response_class=HTMLResponse) async def profile_detail( request: Request, gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_user_web) ): """VPN profile detail page.""" profile = db.query(VPNProfile).filter( VPNProfile.id == profile_id, VPNProfile.gateway_id == gateway_id ).first() if not profile: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) return request.app.state.templates.TemplateResponse( "gateways/profile_detail.html", { "request": request, "current_user": current_user, "gateway": profile.gateway, "profile": profile, "flash_messages": get_flashed_messages(request) } ) @router.get("/gateways/{gateway_id}/profiles/{profile_id}/edit", response_class=HTMLResponse) async def edit_profile_form( request: Request, gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Edit VPN profile form.""" profile = db.query(VPNProfile).filter( VPNProfile.id == profile_id, VPNProfile.gateway_id == gateway_id ).first() if not profile: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) # Get available VPN servers if current_user.is_super_admin: servers = db.query(VPNServer).filter(VPNServer.is_active == True).all() else: servers = db.query(VPNServer).filter( (VPNServer.tenant_id == current_user.tenant_id) | (VPNServer.tenant_id == None), VPNServer.is_active == True ).all() return request.app.state.templates.TemplateResponse( "gateways/profile_form.html", { "request": request, "current_user": current_user, "gateway": profile.gateway, "profile": profile, "vpn_servers": servers, "next_priority": profile.priority, "flash_messages": get_flashed_messages(request) } ) @router.post("/gateways/{gateway_id}/profiles/{profile_id}/edit") async def update_profile( request: Request, gateway_id: int, profile_id: int, name: str = Form(...), priority: int = Form(1), description: str = Form(None), is_active: str = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Update VPN profile.""" profile = db.query(VPNProfile).filter( VPNProfile.id == profile_id, VPNProfile.gateway_id == gateway_id ).first() if not profile: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) profile.name = name profile.description = description profile.priority = priority profile.is_active = is_active is not None # Checkbox sends "on" when checked, None when not db.commit() flash(request, f"Profil '{name}' aktualisiert", "success") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile_id}", status_code=303) @router.get("/gateways/{gateway_id}/profiles/{profile_id}/provision") async def provision_profile( gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_user_web) ): """Download OpenVPN config for a profile.""" service = VPNProfileService(db) profile = service.get_profile_by_id(profile_id) if not profile or profile.gateway_id != gateway_id: return Response(status_code=404, content="Profile not found") if not profile.is_ready: return Response(status_code=400, content="Profile not ready for provisioning") try: config = service.provision_profile(profile) filename = f"{profile.gateway.name}-{profile.name}.ovpn" filename = filename.lower().replace(' ', '-') return Response( content=config, media_type="application/x-openvpn-profile", headers={"Content-Disposition": f"attachment; filename={filename}"} ) except Exception as e: return Response(status_code=500, content=str(e)) @router.post("/gateways/{gateway_id}/profiles/{profile_id}/priority") async def update_priority( request: Request, gateway_id: int, profile_id: int, priority: int = Form(...), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Update profile priority.""" service = VPNProfileService(db) profile = service.get_profile_by_id(profile_id) if not profile or profile.gateway_id != gateway_id: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) service.set_priority(profile, priority) flash(request, "Priorität aktualisiert", "success") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) @router.post("/gateways/{gateway_id}/profiles/{profile_id}/revoke") async def revoke_profile( request: Request, gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Revoke profile certificate.""" service = VPNProfileService(db) profile = service.get_profile_by_id(profile_id) if not profile or profile.gateway_id != gateway_id: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) service.revoke_profile(profile) flash(request, f"Zertifikat für '{profile.name}' widerrufen", "warning") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) @router.post("/gateways/{gateway_id}/profiles/{profile_id}/renew") async def renew_profile( request: Request, gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Renew profile certificate.""" service = VPNProfileService(db) profile = service.get_profile_by_id(profile_id) if not profile or profile.gateway_id != gateway_id: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) service.renew_profile(profile) flash(request, f"Zertifikat für '{profile.name}' erneuert", "success") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile_id}", status_code=303) @router.post("/gateways/{gateway_id}/profiles/{profile_id}/delete") async def delete_profile( request: Request, gateway_id: int, profile_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Delete VPN profile.""" service = VPNProfileService(db) profile = service.get_profile_by_id(profile_id) if not profile or profile.gateway_id != gateway_id: flash(request, "Profil nicht gefunden", "danger") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) name = profile.name service.delete_profile(profile) flash(request, f"Profil '{name}' gelöscht", "warning") return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303) @router.get("/gateways/{gateway_id}/provision-all") async def provision_all_profiles( gateway_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_user_web) ): """Download all active profiles as a ZIP file.""" import io import zipfile service = VPNProfileService(db) gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first() if not gateway: return Response(status_code=404, content="Gateway not found") configs = service.generate_all_configs_for_gateway(gateway_id) if not configs: return Response(status_code=400, content="No active profiles available") # Create ZIP file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, config in configs: zip_file.writestr(filename, config) zip_buffer.seek(0) zip_filename = f"{gateway.name}-vpn-profiles.zip" zip_filename = zip_filename.lower().replace(' ', '-') return Response( content=zip_buffer.getvalue(), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={zip_filename}"} )