"""VPN Server management web routes.""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from sqlalchemy.orm import Session from typing import Optional from ..database import get_db from ..models.user import User from ..models.vpn_server import VPNServer, VPNProtocol, VPNCipher, VPNAuth, VPNCompression from ..models.certificate_authority import CertificateAuthority, CAStatus from ..services.vpn_server_service import VPNServerService from .deps import require_admin_web, flash, get_flashed_messages router = APIRouter() @router.get("/vpn-servers", response_class=HTMLResponse) async def list_vpn_servers( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """List all VPN servers.""" service = VPNServerService(db) if current_user.is_super_admin: servers = db.query(VPNServer).all() else: servers = service.get_servers_for_tenant(current_user.tenant_id) return request.app.state.templates.TemplateResponse( "vpn_servers/list.html", { "request": request, "current_user": current_user, "servers": servers, "flash_messages": get_flashed_messages(request) } ) @router.get("/vpn-servers/new", response_class=HTMLResponse) async def new_vpn_server_form( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """New VPN server form.""" # Get available CAs if current_user.is_super_admin: cas = db.query(CertificateAuthority).filter( CertificateAuthority.status == CAStatus.ACTIVE ).all() else: cas = db.query(CertificateAuthority).filter( (CertificateAuthority.tenant_id == current_user.tenant_id) | (CertificateAuthority.tenant_id == None), CertificateAuthority.status == CAStatus.ACTIVE ).all() return request.app.state.templates.TemplateResponse( "vpn_servers/form.html", { "request": request, "current_user": current_user, "server": None, "cas": cas, "protocols": VPNProtocol, "ciphers": VPNCipher, "auth_methods": VPNAuth, "compression_options": VPNCompression, "flash_messages": get_flashed_messages(request) } ) @router.post("/vpn-servers/new") async def create_vpn_server( request: Request, name: str = Form(...), hostname: str = Form(...), ca_id: int = Form(...), port: int = Form(1194), protocol: str = Form("udp"), vpn_network: str = Form("10.8.0.0"), vpn_netmask: str = Form("255.255.255.0"), cipher: str = Form("AES-256-GCM"), auth: str = Form("SHA256"), tls_version_min: str = Form("1.2"), compression: str = Form("none"), max_clients: int = Form(100), keepalive_interval: int = Form(10), keepalive_timeout: int = Form(60), management_port: int = Form(7505), is_primary: bool = Form(False), description: str = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Create new VPN server.""" try: service = VPNServerService(db) tenant_id = None if current_user.is_super_admin else current_user.tenant_id server = service.create_server( name=name, hostname=hostname, ca_id=ca_id, port=port, protocol=VPNProtocol(protocol), vpn_network=vpn_network, vpn_netmask=vpn_netmask, cipher=VPNCipher(cipher), auth=VPNAuth(auth), tls_version_min=tls_version_min, compression=VPNCompression(compression), max_clients=max_clients, keepalive_interval=keepalive_interval, keepalive_timeout=keepalive_timeout, management_port=management_port, is_primary=is_primary, tenant_id=tenant_id, description=description ) flash(request, f"VPN-Server '{name}' erstellt", "success") return RedirectResponse(url=f"/vpn-servers/{server.id}", status_code=303) except ValueError as e: flash(request, str(e), "danger") return RedirectResponse(url="/vpn-servers/new", status_code=303) except Exception as e: flash(request, f"Fehler beim Erstellen: {str(e)}", "danger") return RedirectResponse(url="/vpn-servers/new", status_code=303) @router.get("/vpn-servers/{server_id}", response_class=HTMLResponse) async def vpn_server_detail( request: Request, server_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """VPN server detail page.""" service = VPNServerService(db) server = service.get_server_by_id(server_id) if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) # Check access if not current_user.is_super_admin and server.tenant_id != current_user.tenant_id and server.tenant_id is not None: flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) # Get status status = service.get_server_status(server) return request.app.state.templates.TemplateResponse( "vpn_servers/detail.html", { "request": request, "current_user": current_user, "server": server, "status": status, "flash_messages": get_flashed_messages(request) } ) @router.get("/vpn-servers/{server_id}/edit", response_class=HTMLResponse) async def edit_vpn_server_form( request: Request, server_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Edit VPN server form.""" server = db.query(VPNServer).filter(VPNServer.id == server_id).first() if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) # Get available CAs cas = db.query(CertificateAuthority).filter( CertificateAuthority.status == CAStatus.ACTIVE ).all() return request.app.state.templates.TemplateResponse( "vpn_servers/form.html", { "request": request, "current_user": current_user, "server": server, "cas": cas, "protocols": VPNProtocol, "ciphers": VPNCipher, "auth_methods": VPNAuth, "compression_options": VPNCompression, "flash_messages": get_flashed_messages(request) } ) @router.post("/vpn-servers/{server_id}/edit") async def update_vpn_server( request: Request, server_id: int, name: str = Form(...), hostname: str = Form(...), port: int = Form(1194), protocol: str = Form("udp"), vpn_network: str = Form("10.8.0.0"), vpn_netmask: str = Form("255.255.255.0"), cipher: str = Form("AES-256-GCM"), auth: str = Form("SHA256"), tls_version_min: str = Form("1.2"), compression: str = Form("none"), max_clients: int = Form(100), keepalive_interval: int = Form(10), keepalive_timeout: int = Form(60), management_port: int = Form(7505), is_primary: bool = Form(False), is_active: bool = Form(True), description: str = Form(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Update VPN server.""" server = db.query(VPNServer).filter(VPNServer.id == server_id).first() if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) # Update fields server.name = name server.description = description server.hostname = hostname server.port = port server.protocol = VPNProtocol(protocol) server.vpn_network = vpn_network server.vpn_netmask = vpn_netmask server.cipher = VPNCipher(cipher) server.auth = VPNAuth(auth) server.tls_version_min = tls_version_min server.compression = VPNCompression(compression) server.max_clients = max_clients server.keepalive_interval = keepalive_interval server.keepalive_timeout = keepalive_timeout server.management_port = management_port server.is_primary = is_primary server.is_active = is_active # If setting as primary, unset other primaries if is_primary: db.query(VPNServer).filter( VPNServer.tenant_id == server.tenant_id, VPNServer.id != server.id ).update({"is_primary": False}) db.commit() flash(request, "VPN-Server aktualisiert", "success") return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303) @router.get("/vpn-servers/{server_id}/clients", response_class=HTMLResponse) async def vpn_server_clients( request: Request, server_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Show connected clients for a VPN server.""" service = VPNServerService(db) server = service.get_server_by_id(server_id) if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) clients = service.get_connected_clients(server) return request.app.state.templates.TemplateResponse( "vpn_servers/clients.html", { "request": request, "current_user": current_user, "server": server, "clients": clients, "flash_messages": get_flashed_messages(request) } ) @router.post("/vpn-servers/{server_id}/disconnect/{common_name}") async def disconnect_client( request: Request, server_id: int, common_name: str, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Disconnect a client from VPN server.""" service = VPNServerService(db) server = service.get_server_by_id(server_id) if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) if service.disconnect_client(server, common_name): flash(request, f"Client '{common_name}' getrennt", "success") else: flash(request, f"Konnte Client '{common_name}' nicht trennen", "danger") return RedirectResponse(url=f"/vpn-servers/{server_id}/clients", status_code=303) @router.post("/vpn-servers/{server_id}/delete") async def delete_vpn_server( request: Request, server_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Delete VPN server.""" server = db.query(VPNServer).filter(VPNServer.id == server_id).first() if not server: flash(request, "VPN-Server nicht gefunden", "danger") return RedirectResponse(url="/vpn-servers", status_code=303) # Check if server has profiles if server.vpn_profiles: flash(request, "Server hat noch VPN-Profile. Bitte zuerst löschen.", "danger") return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303) name = server.name db.delete(server) db.commit() flash(request, f"VPN-Server '{name}' gelöscht", "warning") return RedirectResponse(url="/vpn-servers", status_code=303)