openvpn-endpoint-server/server/app/web/vpn_servers.py

350 lines
11 KiB
Python

"""VPN Server management web routes."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from typing import Optional
from ..database import get_db
from ..models.user import User
from ..models.vpn_server import VPNServer, VPNProtocol, VPNCipher, VPNAuth, VPNCompression
from ..models.certificate_authority import CertificateAuthority, CAStatus
from ..services.vpn_server_service import VPNServerService
from .deps import require_admin_web, flash, get_flashed_messages
router = APIRouter()
@router.get("/vpn-servers", response_class=HTMLResponse)
async def list_vpn_servers(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""List all VPN servers."""
service = VPNServerService(db)
if current_user.is_super_admin:
servers = db.query(VPNServer).all()
else:
servers = service.get_servers_for_tenant(current_user.tenant_id)
return request.app.state.templates.TemplateResponse(
"vpn_servers/list.html",
{
"request": request,
"current_user": current_user,
"servers": servers,
"flash_messages": get_flashed_messages(request)
}
)
@router.get("/vpn-servers/new", response_class=HTMLResponse)
async def new_vpn_server_form(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""New VPN server form."""
# Get available CAs
if current_user.is_super_admin:
cas = db.query(CertificateAuthority).filter(
CertificateAuthority.status == CAStatus.ACTIVE
).all()
else:
cas = db.query(CertificateAuthority).filter(
(CertificateAuthority.tenant_id == current_user.tenant_id) |
(CertificateAuthority.tenant_id == None),
CertificateAuthority.status == CAStatus.ACTIVE
).all()
return request.app.state.templates.TemplateResponse(
"vpn_servers/form.html",
{
"request": request,
"current_user": current_user,
"server": None,
"cas": cas,
"protocols": VPNProtocol,
"ciphers": VPNCipher,
"auth_methods": VPNAuth,
"compression_options": VPNCompression,
"flash_messages": get_flashed_messages(request)
}
)
@router.post("/vpn-servers/new")
async def create_vpn_server(
request: Request,
name: str = Form(...),
hostname: str = Form(...),
ca_id: int = Form(...),
port: int = Form(1194),
protocol: str = Form("udp"),
vpn_network: str = Form("10.8.0.0"),
vpn_netmask: str = Form("255.255.255.0"),
cipher: str = Form("AES-256-GCM"),
auth: str = Form("SHA256"),
tls_version_min: str = Form("1.2"),
compression: str = Form("none"),
max_clients: int = Form(100),
keepalive_interval: int = Form(10),
keepalive_timeout: int = Form(60),
management_port: int = Form(7505),
is_primary: bool = Form(False),
description: str = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Create new VPN server."""
try:
service = VPNServerService(db)
tenant_id = None if current_user.is_super_admin else current_user.tenant_id
server = service.create_server(
name=name,
hostname=hostname,
ca_id=ca_id,
port=port,
protocol=VPNProtocol(protocol),
vpn_network=vpn_network,
vpn_netmask=vpn_netmask,
cipher=VPNCipher(cipher),
auth=VPNAuth(auth),
tls_version_min=tls_version_min,
compression=VPNCompression(compression),
max_clients=max_clients,
keepalive_interval=keepalive_interval,
keepalive_timeout=keepalive_timeout,
management_port=management_port,
is_primary=is_primary,
tenant_id=tenant_id,
description=description
)
flash(request, f"VPN-Server '{name}' erstellt", "success")
return RedirectResponse(url=f"/vpn-servers/{server.id}", status_code=303)
except ValueError as e:
flash(request, str(e), "danger")
return RedirectResponse(url="/vpn-servers/new", status_code=303)
except Exception as e:
flash(request, f"Fehler beim Erstellen: {str(e)}", "danger")
return RedirectResponse(url="/vpn-servers/new", status_code=303)
@router.get("/vpn-servers/{server_id}", response_class=HTMLResponse)
async def vpn_server_detail(
request: Request,
server_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""VPN server detail page."""
service = VPNServerService(db)
server = service.get_server_by_id(server_id)
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
# Check access
if not current_user.is_super_admin and server.tenant_id != current_user.tenant_id and server.tenant_id is not None:
flash(request, "Zugriff verweigert", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
# Get status
status = service.get_server_status(server)
return request.app.state.templates.TemplateResponse(
"vpn_servers/detail.html",
{
"request": request,
"current_user": current_user,
"server": server,
"status": status,
"flash_messages": get_flashed_messages(request)
}
)
@router.get("/vpn-servers/{server_id}/edit", response_class=HTMLResponse)
async def edit_vpn_server_form(
request: Request,
server_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Edit VPN server form."""
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
# Get available CAs
cas = db.query(CertificateAuthority).filter(
CertificateAuthority.status == CAStatus.ACTIVE
).all()
return request.app.state.templates.TemplateResponse(
"vpn_servers/form.html",
{
"request": request,
"current_user": current_user,
"server": server,
"cas": cas,
"protocols": VPNProtocol,
"ciphers": VPNCipher,
"auth_methods": VPNAuth,
"compression_options": VPNCompression,
"flash_messages": get_flashed_messages(request)
}
)
@router.post("/vpn-servers/{server_id}/edit")
async def update_vpn_server(
request: Request,
server_id: int,
name: str = Form(...),
hostname: str = Form(...),
port: int = Form(1194),
protocol: str = Form("udp"),
vpn_network: str = Form("10.8.0.0"),
vpn_netmask: str = Form("255.255.255.0"),
cipher: str = Form("AES-256-GCM"),
auth: str = Form("SHA256"),
tls_version_min: str = Form("1.2"),
compression: str = Form("none"),
max_clients: int = Form(100),
keepalive_interval: int = Form(10),
keepalive_timeout: int = Form(60),
management_port: int = Form(7505),
is_primary: bool = Form(False),
is_active: bool = Form(True),
description: str = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Update VPN server."""
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
# Update fields
server.name = name
server.description = description
server.hostname = hostname
server.port = port
server.protocol = VPNProtocol(protocol)
server.vpn_network = vpn_network
server.vpn_netmask = vpn_netmask
server.cipher = VPNCipher(cipher)
server.auth = VPNAuth(auth)
server.tls_version_min = tls_version_min
server.compression = VPNCompression(compression)
server.max_clients = max_clients
server.keepalive_interval = keepalive_interval
server.keepalive_timeout = keepalive_timeout
server.management_port = management_port
server.is_primary = is_primary
server.is_active = is_active
# If setting as primary, unset other primaries
if is_primary:
db.query(VPNServer).filter(
VPNServer.tenant_id == server.tenant_id,
VPNServer.id != server.id
).update({"is_primary": False})
db.commit()
flash(request, "VPN-Server aktualisiert", "success")
return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303)
@router.get("/vpn-servers/{server_id}/clients", response_class=HTMLResponse)
async def vpn_server_clients(
request: Request,
server_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Show connected clients for a VPN server."""
service = VPNServerService(db)
server = service.get_server_by_id(server_id)
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
clients = service.get_connected_clients(server)
return request.app.state.templates.TemplateResponse(
"vpn_servers/clients.html",
{
"request": request,
"current_user": current_user,
"server": server,
"clients": clients,
"flash_messages": get_flashed_messages(request)
}
)
@router.post("/vpn-servers/{server_id}/disconnect/{common_name}")
async def disconnect_client(
request: Request,
server_id: int,
common_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Disconnect a client from VPN server."""
service = VPNServerService(db)
server = service.get_server_by_id(server_id)
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
if service.disconnect_client(server, common_name):
flash(request, f"Client '{common_name}' getrennt", "success")
else:
flash(request, f"Konnte Client '{common_name}' nicht trennen", "danger")
return RedirectResponse(url=f"/vpn-servers/{server_id}/clients", status_code=303)
@router.post("/vpn-servers/{server_id}/delete")
async def delete_vpn_server(
request: Request,
server_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Delete VPN server."""
server = db.query(VPNServer).filter(VPNServer.id == server_id).first()
if not server:
flash(request, "VPN-Server nicht gefunden", "danger")
return RedirectResponse(url="/vpn-servers", status_code=303)
# Check if server has profiles
if server.vpn_profiles:
flash(request, "Server hat noch VPN-Profile. Bitte zuerst löschen.", "danger")
return RedirectResponse(url=f"/vpn-servers/{server_id}", status_code=303)
name = server.name
db.delete(server)
db.commit()
flash(request, f"VPN-Server '{name}' gelöscht", "warning")
return RedirectResponse(url="/vpn-servers", status_code=303)