openvpn-endpoint-server/server/app/services/vpn_profile_service.py

283 lines
8.8 KiB
Python

"""VPN Profile management service."""
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from ..models.vpn_profile import VPNProfile, VPNProfileStatus
from ..models.vpn_server import VPNServer
from ..models.gateway import Gateway
from ..models.certificate_authority import CertificateAuthority
from .certificate_service import CertificateService
from ..config import get_settings
settings = get_settings()
class VPNProfileService:
"""Service for managing VPN profiles for gateways."""
def __init__(self, db: Session):
self.db = db
self.cert_service = CertificateService(db)
def create_profile(
self,
gateway_id: int,
vpn_server_id: int,
name: str,
priority: int = 1,
description: Optional[str] = None
) -> VPNProfile:
"""Create a new VPN profile for a gateway."""
# Get gateway
gateway = self.db.query(Gateway).filter(
Gateway.id == gateway_id
).first()
if not gateway:
raise ValueError(f"Gateway with id {gateway_id} not found")
# Get VPN server
server = self.db.query(VPNServer).filter(
VPNServer.id == vpn_server_id
).first()
if not server:
raise ValueError(f"VPN Server with id {vpn_server_id} not found")
if not server.certificate_authority.is_ready:
raise ValueError("CA is not ready (DH parameters may still be generating)")
# Generate unique common name
base_cn = f"gw-{gateway.name.lower().replace(' ', '-')}"
profile_count = self.db.query(VPNProfile).filter(
VPNProfile.gateway_id == gateway_id
).count()
cert_cn = f"{base_cn}-{profile_count + 1}"
# Create profile
profile = VPNProfile(
gateway_id=gateway_id,
vpn_server_id=vpn_server_id,
ca_id=server.ca_id,
name=name,
description=description,
cert_cn=cert_cn,
priority=priority,
status=VPNProfileStatus.PENDING
)
self.db.add(profile)
self.db.commit()
self.db.refresh(profile)
# Generate client certificate
self._generate_client_cert(profile)
return profile
def _generate_client_cert(self, profile: VPNProfile):
"""Generate client certificate for profile."""
ca = profile.certificate_authority
cert_data = self.cert_service.generate_client_certificate(
ca=ca,
common_name=profile.cert_cn
)
profile.client_cert = cert_data["cert"]
profile.client_key = cert_data["key"]
profile.valid_from = cert_data["valid_from"]
profile.valid_until = cert_data["valid_until"]
profile.status = VPNProfileStatus.ACTIVE
self.db.commit()
def generate_client_config(self, profile: VPNProfile) -> str:
"""Generate OpenVPN client configuration (.ovpn) for a profile."""
if not profile.is_ready:
raise ValueError("Profile is not ready for provisioning")
server = profile.vpn_server
ca = profile.certificate_authority
config_lines = [
"# OpenVPN Client Configuration",
f"# Profile: {profile.name}",
f"# Gateway: {profile.gateway.name}",
f"# Server: {server.name}",
f"# Generated: {datetime.utcnow().isoformat()}",
"",
"client",
"dev tun",
f"proto {server.protocol.value}",
f"remote {server.hostname} {server.port}",
"",
"resolv-retry infinite",
"nobind",
"persist-key",
"persist-tun",
"",
"remote-cert-tls server",
f"cipher {server.cipher.value}",
f"auth {server.auth.value}",
"",
"verb 3",
"",
]
# Add CA certificate
config_lines.extend([
"<ca>",
ca.ca_cert.strip(),
"</ca>",
"",
])
# Add client certificate
config_lines.extend([
"<cert>",
profile.client_cert.strip(),
"</cert>",
"",
])
# Add client key
config_lines.extend([
"<key>",
profile.client_key.strip(),
"</key>",
"",
])
# Add TLS-Auth key if enabled
if server.tls_auth_enabled and server.ta_key:
config_lines.extend([
"key-direction 1",
"<tls-auth>",
server.ta_key.strip(),
"</tls-auth>",
])
return "\n".join(config_lines)
def provision_profile(self, profile: VPNProfile) -> str:
"""Mark profile as provisioned and return config."""
config = self.generate_client_config(profile)
profile.status = VPNProfileStatus.PROVISIONED
profile.provisioned_at = datetime.utcnow()
profile.gateway.is_provisioned = True
self.db.commit()
return config
def set_priority(self, profile: VPNProfile, new_priority: int):
"""Set profile priority and reorder others if needed."""
gateway_id = profile.gateway_id
# Get all profiles for this gateway ordered by priority
profiles = self.db.query(VPNProfile).filter(
VPNProfile.gateway_id == gateway_id,
VPNProfile.id != profile.id
).order_by(VPNProfile.priority).all()
# Update priorities
current_priority = 1
for p in profiles:
if current_priority == new_priority:
current_priority += 1
p.priority = current_priority
current_priority += 1
profile.priority = new_priority
self.db.commit()
def get_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]:
"""Get all VPN profiles for a gateway, ordered by priority."""
return self.db.query(VPNProfile).filter(
VPNProfile.gateway_id == gateway_id
).order_by(VPNProfile.priority).all()
def get_active_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]:
"""Get active VPN profiles for a gateway."""
return self.db.query(VPNProfile).filter(
VPNProfile.gateway_id == gateway_id,
VPNProfile.is_active == True,
VPNProfile.status.in_([VPNProfileStatus.ACTIVE, VPNProfileStatus.PROVISIONED])
).order_by(VPNProfile.priority).all()
def revoke_profile(self, profile: VPNProfile, reason: str = "unspecified"):
"""Revoke a VPN profile's certificate."""
if profile.client_cert:
self.cert_service.revoke_certificate(
ca=profile.certificate_authority,
cert_pem=profile.client_cert,
reason=reason
)
profile.status = VPNProfileStatus.REVOKED
profile.is_active = False
self.db.commit()
def renew_profile(self, profile: VPNProfile):
"""Renew the certificate for a profile."""
# Revoke old certificate
if profile.client_cert:
self.cert_service.revoke_certificate(
ca=profile.certificate_authority,
cert_pem=profile.client_cert,
reason="superseded"
)
# Generate new certificate
self._generate_client_cert(profile)
# Reset provisioned status
profile.status = VPNProfileStatus.ACTIVE
profile.provisioned_at = None
self.db.commit()
def get_profile_by_id(self, profile_id: int) -> Optional[VPNProfile]:
"""Get a VPN profile by ID."""
return self.db.query(VPNProfile).filter(
VPNProfile.id == profile_id
).first()
def delete_profile(self, profile: VPNProfile):
"""Delete a VPN profile."""
# Revoke certificate first
if profile.client_cert and profile.status != VPNProfileStatus.REVOKED:
self.cert_service.revoke_certificate(
ca=profile.certificate_authority,
cert_pem=profile.client_cert,
reason="cessationOfOperation"
)
self.db.delete(profile)
self.db.commit()
def generate_all_configs_for_gateway(self, gateway_id: int) -> list[tuple[str, str]]:
"""Generate configs for all active profiles of a gateway.
Returns list of tuples: (filename, config_content)
"""
profiles = self.get_active_profiles_for_gateway(gateway_id)
configs = []
for profile in profiles:
try:
config = self.generate_client_config(profile)
filename = f"{profile.gateway.name}-{profile.name}.ovpn"
filename = filename.lower().replace(' ', '-')
configs.append((filename, config))
except Exception:
continue
return configs