283 lines
8.8 KiB
Python
283 lines
8.8 KiB
Python
"""VPN Profile management service."""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..models.vpn_profile import VPNProfile, VPNProfileStatus
|
|
from ..models.vpn_server import VPNServer
|
|
from ..models.gateway import Gateway
|
|
from ..models.certificate_authority import CertificateAuthority
|
|
from .certificate_service import CertificateService
|
|
from ..config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
class VPNProfileService:
|
|
"""Service for managing VPN profiles for gateways."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.cert_service = CertificateService(db)
|
|
|
|
def create_profile(
|
|
self,
|
|
gateway_id: int,
|
|
vpn_server_id: int,
|
|
name: str,
|
|
priority: int = 1,
|
|
description: Optional[str] = None
|
|
) -> VPNProfile:
|
|
"""Create a new VPN profile for a gateway."""
|
|
|
|
# Get gateway
|
|
gateway = self.db.query(Gateway).filter(
|
|
Gateway.id == gateway_id
|
|
).first()
|
|
|
|
if not gateway:
|
|
raise ValueError(f"Gateway with id {gateway_id} not found")
|
|
|
|
# Get VPN server
|
|
server = self.db.query(VPNServer).filter(
|
|
VPNServer.id == vpn_server_id
|
|
).first()
|
|
|
|
if not server:
|
|
raise ValueError(f"VPN Server with id {vpn_server_id} not found")
|
|
|
|
if not server.certificate_authority.is_ready:
|
|
raise ValueError("CA is not ready (DH parameters may still be generating)")
|
|
|
|
# Generate unique common name
|
|
base_cn = f"gw-{gateway.name.lower().replace(' ', '-')}"
|
|
profile_count = self.db.query(VPNProfile).filter(
|
|
VPNProfile.gateway_id == gateway_id
|
|
).count()
|
|
|
|
cert_cn = f"{base_cn}-{profile_count + 1}"
|
|
|
|
# Create profile
|
|
profile = VPNProfile(
|
|
gateway_id=gateway_id,
|
|
vpn_server_id=vpn_server_id,
|
|
ca_id=server.ca_id,
|
|
name=name,
|
|
description=description,
|
|
cert_cn=cert_cn,
|
|
priority=priority,
|
|
status=VPNProfileStatus.PENDING
|
|
)
|
|
|
|
self.db.add(profile)
|
|
self.db.commit()
|
|
self.db.refresh(profile)
|
|
|
|
# Generate client certificate
|
|
self._generate_client_cert(profile)
|
|
|
|
return profile
|
|
|
|
def _generate_client_cert(self, profile: VPNProfile):
|
|
"""Generate client certificate for profile."""
|
|
ca = profile.certificate_authority
|
|
|
|
cert_data = self.cert_service.generate_client_certificate(
|
|
ca=ca,
|
|
common_name=profile.cert_cn
|
|
)
|
|
|
|
profile.client_cert = cert_data["cert"]
|
|
profile.client_key = cert_data["key"]
|
|
profile.valid_from = cert_data["valid_from"]
|
|
profile.valid_until = cert_data["valid_until"]
|
|
profile.status = VPNProfileStatus.ACTIVE
|
|
|
|
self.db.commit()
|
|
|
|
def generate_client_config(self, profile: VPNProfile) -> str:
|
|
"""Generate OpenVPN client configuration (.ovpn) for a profile."""
|
|
|
|
if not profile.is_ready:
|
|
raise ValueError("Profile is not ready for provisioning")
|
|
|
|
server = profile.vpn_server
|
|
ca = profile.certificate_authority
|
|
|
|
config_lines = [
|
|
"# OpenVPN Client Configuration",
|
|
f"# Profile: {profile.name}",
|
|
f"# Gateway: {profile.gateway.name}",
|
|
f"# Server: {server.name}",
|
|
f"# Generated: {datetime.utcnow().isoformat()}",
|
|
"",
|
|
"client",
|
|
"dev tun",
|
|
f"proto {server.protocol.value}",
|
|
f"remote {server.hostname} {server.port}",
|
|
"",
|
|
"resolv-retry infinite",
|
|
"nobind",
|
|
"persist-key",
|
|
"persist-tun",
|
|
"",
|
|
"remote-cert-tls server",
|
|
f"cipher {server.cipher.value}",
|
|
f"auth {server.auth.value}",
|
|
"",
|
|
"verb 3",
|
|
"",
|
|
]
|
|
|
|
# Add CA certificate
|
|
config_lines.extend([
|
|
"<ca>",
|
|
ca.ca_cert.strip(),
|
|
"</ca>",
|
|
"",
|
|
])
|
|
|
|
# Add client certificate
|
|
config_lines.extend([
|
|
"<cert>",
|
|
profile.client_cert.strip(),
|
|
"</cert>",
|
|
"",
|
|
])
|
|
|
|
# Add client key
|
|
config_lines.extend([
|
|
"<key>",
|
|
profile.client_key.strip(),
|
|
"</key>",
|
|
"",
|
|
])
|
|
|
|
# Add TLS-Auth key if enabled
|
|
if server.tls_auth_enabled and server.ta_key:
|
|
config_lines.extend([
|
|
"key-direction 1",
|
|
"<tls-auth>",
|
|
server.ta_key.strip(),
|
|
"</tls-auth>",
|
|
])
|
|
|
|
return "\n".join(config_lines)
|
|
|
|
def provision_profile(self, profile: VPNProfile) -> str:
|
|
"""Mark profile as provisioned and return config."""
|
|
config = self.generate_client_config(profile)
|
|
|
|
profile.status = VPNProfileStatus.PROVISIONED
|
|
profile.provisioned_at = datetime.utcnow()
|
|
profile.gateway.is_provisioned = True
|
|
|
|
self.db.commit()
|
|
|
|
return config
|
|
|
|
def set_priority(self, profile: VPNProfile, new_priority: int):
|
|
"""Set profile priority and reorder others if needed."""
|
|
gateway_id = profile.gateway_id
|
|
|
|
# Get all profiles for this gateway ordered by priority
|
|
profiles = self.db.query(VPNProfile).filter(
|
|
VPNProfile.gateway_id == gateway_id,
|
|
VPNProfile.id != profile.id
|
|
).order_by(VPNProfile.priority).all()
|
|
|
|
# Update priorities
|
|
current_priority = 1
|
|
for p in profiles:
|
|
if current_priority == new_priority:
|
|
current_priority += 1
|
|
p.priority = current_priority
|
|
current_priority += 1
|
|
|
|
profile.priority = new_priority
|
|
self.db.commit()
|
|
|
|
def get_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]:
|
|
"""Get all VPN profiles for a gateway, ordered by priority."""
|
|
return self.db.query(VPNProfile).filter(
|
|
VPNProfile.gateway_id == gateway_id
|
|
).order_by(VPNProfile.priority).all()
|
|
|
|
def get_active_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]:
|
|
"""Get active VPN profiles for a gateway."""
|
|
return self.db.query(VPNProfile).filter(
|
|
VPNProfile.gateway_id == gateway_id,
|
|
VPNProfile.is_active == True,
|
|
VPNProfile.status.in_([VPNProfileStatus.ACTIVE, VPNProfileStatus.PROVISIONED])
|
|
).order_by(VPNProfile.priority).all()
|
|
|
|
def revoke_profile(self, profile: VPNProfile, reason: str = "unspecified"):
|
|
"""Revoke a VPN profile's certificate."""
|
|
if profile.client_cert:
|
|
self.cert_service.revoke_certificate(
|
|
ca=profile.certificate_authority,
|
|
cert_pem=profile.client_cert,
|
|
reason=reason
|
|
)
|
|
|
|
profile.status = VPNProfileStatus.REVOKED
|
|
profile.is_active = False
|
|
self.db.commit()
|
|
|
|
def renew_profile(self, profile: VPNProfile):
|
|
"""Renew the certificate for a profile."""
|
|
# Revoke old certificate
|
|
if profile.client_cert:
|
|
self.cert_service.revoke_certificate(
|
|
ca=profile.certificate_authority,
|
|
cert_pem=profile.client_cert,
|
|
reason="superseded"
|
|
)
|
|
|
|
# Generate new certificate
|
|
self._generate_client_cert(profile)
|
|
|
|
# Reset provisioned status
|
|
profile.status = VPNProfileStatus.ACTIVE
|
|
profile.provisioned_at = None
|
|
self.db.commit()
|
|
|
|
def get_profile_by_id(self, profile_id: int) -> Optional[VPNProfile]:
|
|
"""Get a VPN profile by ID."""
|
|
return self.db.query(VPNProfile).filter(
|
|
VPNProfile.id == profile_id
|
|
).first()
|
|
|
|
def delete_profile(self, profile: VPNProfile):
|
|
"""Delete a VPN profile."""
|
|
# Revoke certificate first
|
|
if profile.client_cert and profile.status != VPNProfileStatus.REVOKED:
|
|
self.cert_service.revoke_certificate(
|
|
ca=profile.certificate_authority,
|
|
cert_pem=profile.client_cert,
|
|
reason="cessationOfOperation"
|
|
)
|
|
|
|
self.db.delete(profile)
|
|
self.db.commit()
|
|
|
|
def generate_all_configs_for_gateway(self, gateway_id: int) -> list[tuple[str, str]]:
|
|
"""Generate configs for all active profiles of a gateway.
|
|
|
|
Returns list of tuples: (filename, config_content)
|
|
"""
|
|
profiles = self.get_active_profiles_for_gateway(gateway_id)
|
|
configs = []
|
|
|
|
for profile in profiles:
|
|
try:
|
|
config = self.generate_client_config(profile)
|
|
filename = f"{profile.gateway.name}-{profile.name}.ovpn"
|
|
filename = filename.lower().replace(' ', '-')
|
|
configs.append((filename, config))
|
|
except Exception:
|
|
continue
|
|
|
|
return configs
|