350 lines
11 KiB
Python
350 lines
11 KiB
Python
"""VPN Server management web routes."""
|
|
|
|
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional
|
|
|
|
from ..database import get_db
|
|
from ..models.user import User
|
|
from ..models.vpn_server import VPNServer, VPNProtocol, VPNCipher, VPNAuth, VPNCompression
|
|
from ..models.certificate_authority import CertificateAuthority, CAStatus
|
|
from ..services.vpn_server_service import VPNServerService
|
|
from .deps import require_admin_web, flash, get_flashed_messages
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/vpn-servers", response_class=HTMLResponse)
|
|
async def list_vpn_servers(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""List all VPN servers."""
|
|
service = VPNServerService(db)
|
|
|
|
if current_user.is_super_admin:
|
|
servers = db.query(VPNServer).all()
|
|
else:
|
|
servers = service.get_servers_for_tenant(current_user.tenant_id)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"vpn_servers/list.html",
|
|
{
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"servers": servers,
|
|
"flash_messages": get_flashed_messages(request)
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/vpn-servers/new", response_class=HTMLResponse)
|
|
async def new_vpn_server_form(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""New VPN server form."""
|
|
# Get available CAs
|
|
if current_user.is_super_admin:
|
|
cas = db.query(CertificateAuthority).filter(
|
|
CertificateAuthority.status == CAStatus.ACTIVE
|
|
).all()
|
|
else:
|
|
cas = db.query(CertificateAuthority).filter(
|
|
(CertificateAuthority.tenant_id == current_user.tenant_id) |
|
|
(CertificateAuthority.tenant_id == None),
|
|
CertificateAuthority.status == CAStatus.ACTIVE
|
|
).all()
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"vpn_servers/form.html",
|
|
{
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"server": None,
|
|
"cas": cas,
|
|
"protocols": VPNProtocol,
|
|
"ciphers": VPNCipher,
|
|
"auth_methods": VPNAuth,
|
|
"compression_options": VPNCompression,
|
|
"flash_messages": get_flashed_messages(request)
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/vpn-servers/new")
|
|
async def create_vpn_server(
|
|
request: Request,
|
|
name: str = Form(...),
|
|
hostname: str = Form(...),
|
|
ca_id: int = Form(...),
|
|
port: int = Form(1194),
|
|
protocol: str = Form("udp"),
|
|
vpn_network: str = Form("10.8.0.0"),
|
|
vpn_netmask: str = Form("255.255.255.0"),
|
|
cipher: str = Form("AES-256-GCM"),
|
|
auth: str = Form("SHA256"),
|
|
tls_version_min: str = Form("1.2"),
|
|
compression: str = Form("none"),
|
|
max_clients: int = Form(100),
|
|
keepalive_interval: int = Form(10),
|
|
keepalive_timeout: int = Form(60),
|
|
management_port: int = Form(7505),
|
|
is_primary: bool = Form(False),
|
|
description: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Create new VPN server."""
|
|
try:
|
|
service = VPNServerService(db)
|
|
|
|
tenant_id = None if current_user.is_super_admin else current_user.tenant_id
|
|
|
|
server = service.create_server(
|
|
name=name,
|
|
hostname=hostname,
|
|
ca_id=ca_id,
|
|
port=port,
|
|
protocol=VPNProtocol(protocol),
|
|
vpn_network=vpn_network,
|
|
vpn_netmask=vpn_netmask,
|
|
cipher=VPNCipher(cipher),
|
|
auth=VPNAuth(auth),
|
|
tls_version_min=tls_version_min,
|
|
compression=VPNCompression(compression),
|
|
max_clients=max_clients,
|
|
keepalive_interval=keepalive_interval,
|
|
keepalive_timeout=keepalive_timeout,
|
|
management_port=management_port,
|
|
is_primary=is_primary,
|
|
tenant_id=tenant_id,
|
|
description=description
|
|
)
|
|
|
|
flash(request, f"VPN-Server '{name}' erstellt", "success")
|
|
return RedirectResponse(url=f"/vpn-servers/{server.id}", status_code=303)
|
|
|
|
except ValueError as e:
|
|
flash(request, str(e), "danger")
|
|
return RedirectResponse(url="/vpn-servers/new", status_code=303)
|
|
except Exception as e:
|
|
flash(request, f"Fehler beim Erstellen: {str(e)}", "danger")
|
|
return RedirectResponse(url="/vpn-servers/new", status_code=303)
|
|
|
|
|
|
@router.get("/vpn-servers/{server_id}", response_class=HTMLResponse)
|
|
async def vpn_server_detail(
|
|
request: Request,
|
|
server_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""VPN server detail page."""
|
|
service = VPNServerService(db)
|
|
server = service.get_server_by_id(server_id)
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
# Check access
|
|
if not current_user.is_super_admin and server.tenant_id != current_user.tenant_id and server.tenant_id is not None:
|
|
flash(request, "Zugriff verweigert", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
# Get status
|
|
status = service.get_server_status(server)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"vpn_servers/detail.html",
|
|
{
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"server": server,
|
|
"status": status,
|
|
"flash_messages": get_flashed_messages(request)
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/vpn-servers/{server_id}/edit", response_class=HTMLResponse)
|
|
async def edit_vpn_server_form(
|
|
request: Request,
|
|
server_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Edit VPN server form."""
|
|
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
# Get available CAs
|
|
cas = db.query(CertificateAuthority).filter(
|
|
CertificateAuthority.status == CAStatus.ACTIVE
|
|
).all()
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"vpn_servers/form.html",
|
|
{
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"server": server,
|
|
"cas": cas,
|
|
"protocols": VPNProtocol,
|
|
"ciphers": VPNCipher,
|
|
"auth_methods": VPNAuth,
|
|
"compression_options": VPNCompression,
|
|
"flash_messages": get_flashed_messages(request)
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/vpn-servers/{server_id}/edit")
|
|
async def update_vpn_server(
|
|
request: Request,
|
|
server_id: int,
|
|
name: str = Form(...),
|
|
hostname: str = Form(...),
|
|
port: int = Form(1194),
|
|
protocol: str = Form("udp"),
|
|
vpn_network: str = Form("10.8.0.0"),
|
|
vpn_netmask: str = Form("255.255.255.0"),
|
|
cipher: str = Form("AES-256-GCM"),
|
|
auth: str = Form("SHA256"),
|
|
tls_version_min: str = Form("1.2"),
|
|
compression: str = Form("none"),
|
|
max_clients: int = Form(100),
|
|
keepalive_interval: int = Form(10),
|
|
keepalive_timeout: int = Form(60),
|
|
management_port: int = Form(7505),
|
|
is_primary: bool = Form(False),
|
|
is_active: bool = Form(True),
|
|
description: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Update VPN server."""
|
|
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
# Update fields
|
|
server.name = name
|
|
server.description = description
|
|
server.hostname = hostname
|
|
server.port = port
|
|
server.protocol = VPNProtocol(protocol)
|
|
server.vpn_network = vpn_network
|
|
server.vpn_netmask = vpn_netmask
|
|
server.cipher = VPNCipher(cipher)
|
|
server.auth = VPNAuth(auth)
|
|
server.tls_version_min = tls_version_min
|
|
server.compression = VPNCompression(compression)
|
|
server.max_clients = max_clients
|
|
server.keepalive_interval = keepalive_interval
|
|
server.keepalive_timeout = keepalive_timeout
|
|
server.management_port = management_port
|
|
server.is_primary = is_primary
|
|
server.is_active = is_active
|
|
|
|
# If setting as primary, unset other primaries
|
|
if is_primary:
|
|
db.query(VPNServer).filter(
|
|
VPNServer.tenant_id == server.tenant_id,
|
|
VPNServer.id != server.id
|
|
).update({"is_primary": False})
|
|
|
|
db.commit()
|
|
|
|
flash(request, "VPN-Server aktualisiert", "success")
|
|
return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303)
|
|
|
|
|
|
@router.get("/vpn-servers/{server_id}/clients", response_class=HTMLResponse)
|
|
async def vpn_server_clients(
|
|
request: Request,
|
|
server_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Show connected clients for a VPN server."""
|
|
service = VPNServerService(db)
|
|
server = service.get_server_by_id(server_id)
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
clients = service.get_connected_clients(server)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"vpn_servers/clients.html",
|
|
{
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"server": server,
|
|
"clients": clients,
|
|
"flash_messages": get_flashed_messages(request)
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/vpn-servers/{server_id}/disconnect/{common_name}")
|
|
async def disconnect_client(
|
|
request: Request,
|
|
server_id: int,
|
|
common_name: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Disconnect a client from VPN server."""
|
|
service = VPNServerService(db)
|
|
server = service.get_server_by_id(server_id)
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
if service.disconnect_client(server, common_name):
|
|
flash(request, f"Client '{common_name}' getrennt", "success")
|
|
else:
|
|
flash(request, f"Konnte Client '{common_name}' nicht trennen", "danger")
|
|
|
|
return RedirectResponse(url=f"/vpn-servers/{server_id}/clients", status_code=303)
|
|
|
|
|
|
@router.post("/vpn-servers/{server_id}/delete")
|
|
async def delete_vpn_server(
|
|
request: Request,
|
|
server_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin_web)
|
|
):
|
|
"""Delete VPN server."""
|
|
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
|
|
|
|
if not server:
|
|
flash(request, "VPN-Server nicht gefunden", "danger")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|
|
|
|
# Check if server has profiles
|
|
if server.vpn_profiles:
|
|
flash(request, "Server hat noch VPN-Profile. Bitte zuerst löschen.", "danger")
|
|
return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303)
|
|
|
|
name = server.name
|
|
db.delete(server)
|
|
db.commit()
|
|
|
|
flash(request, f"VPN-Server '{name}' gelöscht", "warning")
|
|
return RedirectResponse(url="/vpn-servers", status_code=303)
|