openvpn-endpoint-server/server/app/web/vpn_profiles.py

395 lines
13 KiB
Python

"""VPN Profile management web routes (nested under gateways)."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.user import User
from ..models.gateway import Gateway
from ..models.vpn_server import VPNServer
from ..models.vpn_profile import VPNProfile
from ..services.vpn_profile_service import VPNProfileService
from .deps import require_admin_web, require_user_web, flash, get_flashed_messages
router = APIRouter()
@router.get("/gateways/{gateway_id}/profiles", response_class=HTMLResponse)
async def list_profiles(
request: Request,
gateway_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_user_web)
):
"""List VPN profiles for a gateway."""
gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first()
if not gateway:
flash(request, "Gateway nicht gefunden", "danger")
return RedirectResponse(url="/gateways", status_code=303)
# Check access
if not current_user.is_admin and not any(
access.gateway_id == gateway_id for access in current_user.gateway_access
):
flash(request, "Zugriff verweigert", "danger")
return RedirectResponse(url="/gateways", status_code=303)
service = VPNProfileService(db)
profiles = service.get_profiles_for_gateway(gateway_id)
return request.app.state.templates.TemplateResponse(
"gateways/profiles.html",
{
"request": request,
"current_user": current_user,
"gateway": gateway,
"profiles": profiles,
"flash_messages": get_flashed_messages(request)
}
)
@router.get("/gateways/{gateway_id}/profiles/new", response_class=HTMLResponse)
async def new_profile_form(
request: Request,
gateway_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""New VPN profile form."""
gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first()
if not gateway:
flash(request, "Gateway nicht gefunden", "danger")
return RedirectResponse(url="/gateways", status_code=303)
# Get available VPN servers
if current_user.is_super_admin:
servers = db.query(VPNServer).filter(VPNServer.is_active == True).all()
else:
servers = db.query(VPNServer).filter(
(VPNServer.tenant_id == current_user.tenant_id) |
(VPNServer.tenant_id == None),
VPNServer.is_active == True
).all()
# Calculate next priority
existing_profiles = db.query(VPNProfile).filter(
VPNProfile.gateway_id == gateway_id
).count()
next_priority = existing_profiles + 1
return request.app.state.templates.TemplateResponse(
"gateways/profile_form.html",
{
"request": request,
"current_user": current_user,
"gateway": gateway,
"profile": None,
"vpn_servers": servers,
"next_priority": next_priority,
"flash_messages": get_flashed_messages(request)
}
)
@router.post("/gateways/{gateway_id}/profiles/new")
async def create_profile(
request: Request,
gateway_id: int,
name: str = Form(...),
vpn_server_id: int = Form(...),
priority: int = Form(1),
description: str = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Create new VPN profile."""
try:
service = VPNProfileService(db)
profile = service.create_profile(
gateway_id=gateway_id,
vpn_server_id=vpn_server_id,
name=name,
priority=priority,
description=description
)
flash(request, f"VPN-Profil '{name}' erstellt", "success")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile.id}", status_code=303)
except ValueError as e:
flash(request, str(e), "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/new", status_code=303)
except Exception as e:
flash(request, f"Fehler: {str(e)}", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/new", status_code=303)
@router.get("/gateways/{gateway_id}/profiles/{profile_id}", response_class=HTMLResponse)
async def profile_detail(
request: Request,
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_user_web)
):
"""VPN profile detail page."""
profile = db.query(VPNProfile).filter(
VPNProfile.id == profile_id,
VPNProfile.gateway_id == gateway_id
).first()
if not profile:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
return request.app.state.templates.TemplateResponse(
"gateways/profile_detail.html",
{
"request": request,
"current_user": current_user,
"gateway": profile.gateway,
"profile": profile,
"flash_messages": get_flashed_messages(request)
}
)
@router.get("/gateways/{gateway_id}/profiles/{profile_id}/edit", response_class=HTMLResponse)
async def edit_profile_form(
request: Request,
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Edit VPN profile form."""
profile = db.query(VPNProfile).filter(
VPNProfile.id == profile_id,
VPNProfile.gateway_id == gateway_id
).first()
if not profile:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
# Get available VPN servers
if current_user.is_super_admin:
servers = db.query(VPNServer).filter(VPNServer.is_active == True).all()
else:
servers = db.query(VPNServer).filter(
(VPNServer.tenant_id == current_user.tenant_id) |
(VPNServer.tenant_id == None),
VPNServer.is_active == True
).all()
return request.app.state.templates.TemplateResponse(
"gateways/profile_form.html",
{
"request": request,
"current_user": current_user,
"gateway": profile.gateway,
"profile": profile,
"vpn_servers": servers,
"next_priority": profile.priority,
"flash_messages": get_flashed_messages(request)
}
)
@router.post("/gateways/{gateway_id}/profiles/{profile_id}/edit")
async def update_profile(
request: Request,
gateway_id: int,
profile_id: int,
name: str = Form(...),
priority: int = Form(1),
description: str = Form(None),
is_active: str = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Update VPN profile."""
profile = db.query(VPNProfile).filter(
VPNProfile.id == profile_id,
VPNProfile.gateway_id == gateway_id
).first()
if not profile:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
profile.name = name
profile.description = description
profile.priority = priority
profile.is_active = is_active is not None # Checkbox sends "on" when checked, None when not
db.commit()
flash(request, f"Profil '{name}' aktualisiert", "success")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile_id}", status_code=303)
@router.get("/gateways/{gateway_id}/profiles/{profile_id}/provision")
async def provision_profile(
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_user_web)
):
"""Download OpenVPN config for a profile."""
service = VPNProfileService(db)
profile = service.get_profile_by_id(profile_id)
if not profile or profile.gateway_id != gateway_id:
return Response(status_code=404, content="Profile not found")
if not profile.is_ready:
return Response(status_code=400, content="Profile not ready for provisioning")
try:
config = service.provision_profile(profile)
filename = f"{profile.gateway.name}-{profile.name}.ovpn"
filename = filename.lower().replace(' ', '-')
return Response(
content=config,
media_type="application/x-openvpn-profile",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
return Response(status_code=500, content=str(e))
@router.post("/gateways/{gateway_id}/profiles/{profile_id}/priority")
async def update_priority(
request: Request,
gateway_id: int,
profile_id: int,
priority: int = Form(...),
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Update profile priority."""
service = VPNProfileService(db)
profile = service.get_profile_by_id(profile_id)
if not profile or profile.gateway_id != gateway_id:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
service.set_priority(profile, priority)
flash(request, "Priorität aktualisiert", "success")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
@router.post("/gateways/{gateway_id}/profiles/{profile_id}/revoke")
async def revoke_profile(
request: Request,
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Revoke profile certificate."""
service = VPNProfileService(db)
profile = service.get_profile_by_id(profile_id)
if not profile or profile.gateway_id != gateway_id:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
service.revoke_profile(profile)
flash(request, f"Zertifikat für '{profile.name}' widerrufen", "warning")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
@router.post("/gateways/{gateway_id}/profiles/{profile_id}/renew")
async def renew_profile(
request: Request,
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Renew profile certificate."""
service = VPNProfileService(db)
profile = service.get_profile_by_id(profile_id)
if not profile or profile.gateway_id != gateway_id:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
service.renew_profile(profile)
flash(request, f"Zertifikat für '{profile.name}' erneuert", "success")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles/{profile_id}", status_code=303)
@router.post("/gateways/{gateway_id}/profiles/{profile_id}/delete")
async def delete_profile(
request: Request,
gateway_id: int,
profile_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin_web)
):
"""Delete VPN profile."""
service = VPNProfileService(db)
profile = service.get_profile_by_id(profile_id)
if not profile or profile.gateway_id != gateway_id:
flash(request, "Profil nicht gefunden", "danger")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
name = profile.name
service.delete_profile(profile)
flash(request, f"Profil '{name}' gelöscht", "warning")
return RedirectResponse(url=f"/gateways/{gateway_id}/profiles", status_code=303)
@router.get("/gateways/{gateway_id}/provision-all")
async def provision_all_profiles(
gateway_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_user_web)
):
"""Download all active profiles as a ZIP file."""
import io
import zipfile
service = VPNProfileService(db)
gateway = db.query(Gateway).filter(Gateway.id == gateway_id).first()
if not gateway:
return Response(status_code=404, content="Gateway not found")
configs = service.generate_all_configs_for_gateway(gateway_id)
if not configs:
return Response(status_code=400, content="No active profiles available")
# Create ZIP file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, config in configs:
zip_file.writestr(filename, config)
zip_buffer.seek(0)
zip_filename = f"{gateway.name}-vpn-profiles.zip"
zip_filename = zip_filename.lower().replace(' ', '-')
return Response(
content=zip_buffer.getvalue(),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={zip_filename}"}
)