"""VPN Profile management service.""" from datetime import datetime from typing import Optional from sqlalchemy.orm import Session from ..models.vpn_profile import VPNProfile, VPNProfileStatus from ..models.vpn_server import VPNServer from ..models.gateway import Gateway from ..models.certificate_authority import CertificateAuthority from .certificate_service import CertificateService from ..config import get_settings settings = get_settings() class VPNProfileService: """Service for managing VPN profiles for gateways.""" def __init__(self, db: Session): self.db = db self.cert_service = CertificateService(db) def create_profile( self, gateway_id: int, vpn_server_id: int, name: str, priority: int = 1, description: Optional[str] = None ) -> VPNProfile: """Create a new VPN profile for a gateway.""" # Get gateway gateway = self.db.query(Gateway).filter( Gateway.id == gateway_id ).first() if not gateway: raise ValueError(f"Gateway with id {gateway_id} not found") # Get VPN server server = self.db.query(VPNServer).filter( VPNServer.id == vpn_server_id ).first() if not server: raise ValueError(f"VPN Server with id {vpn_server_id} not found") if not server.certificate_authority.is_ready: raise ValueError("CA is not ready (DH parameters may still be generating)") # Generate unique common name base_cn = f"gw-{gateway.name.lower().replace(' ', '-')}" profile_count = self.db.query(VPNProfile).filter( VPNProfile.gateway_id == gateway_id ).count() cert_cn = f"{base_cn}-{profile_count + 1}" # Create profile profile = VPNProfile( gateway_id=gateway_id, vpn_server_id=vpn_server_id, ca_id=server.ca_id, name=name, description=description, cert_cn=cert_cn, priority=priority, status=VPNProfileStatus.PENDING ) self.db.add(profile) self.db.commit() self.db.refresh(profile) # Generate client certificate self._generate_client_cert(profile) return profile def _generate_client_cert(self, profile: VPNProfile): """Generate client certificate for profile.""" ca = profile.certificate_authority cert_data = self.cert_service.generate_client_certificate( ca=ca, common_name=profile.cert_cn ) profile.client_cert = cert_data["cert"] profile.client_key = cert_data["key"] profile.valid_from = cert_data["valid_from"] profile.valid_until = cert_data["valid_until"] profile.status = VPNProfileStatus.ACTIVE self.db.commit() def generate_client_config(self, profile: VPNProfile) -> str: """Generate OpenVPN client configuration (.ovpn) for a profile.""" if not profile.is_ready: raise ValueError("Profile is not ready for provisioning") server = profile.vpn_server ca = profile.certificate_authority config_lines = [ "# OpenVPN Client Configuration", f"# Profile: {profile.name}", f"# Gateway: {profile.gateway.name}", f"# Server: {server.name}", f"# Generated: {datetime.utcnow().isoformat()}", "", "client", "dev tun", f"proto {server.protocol.value}", f"remote {server.hostname} {server.port}", "", "resolv-retry infinite", "nobind", "persist-key", "persist-tun", "", "remote-cert-tls server", f"cipher {server.cipher.value}", f"auth {server.auth.value}", "", "verb 3", "", ] # Add CA certificate config_lines.extend([ "", ca.ca_cert.strip(), "", "", ]) # Add client certificate config_lines.extend([ "", profile.client_cert.strip(), "", "", ]) # Add client key config_lines.extend([ "", profile.client_key.strip(), "", "", ]) # Add TLS-Auth key if enabled if server.tls_auth_enabled and server.ta_key: config_lines.extend([ "key-direction 1", "", server.ta_key.strip(), "", ]) return "\n".join(config_lines) def provision_profile(self, profile: VPNProfile) -> str: """Mark profile as provisioned and return config.""" config = self.generate_client_config(profile) profile.status = VPNProfileStatus.PROVISIONED profile.provisioned_at = datetime.utcnow() profile.gateway.is_provisioned = True self.db.commit() return config def set_priority(self, profile: VPNProfile, new_priority: int): """Set profile priority and reorder others if needed.""" gateway_id = profile.gateway_id # Get all profiles for this gateway ordered by priority profiles = self.db.query(VPNProfile).filter( VPNProfile.gateway_id == gateway_id, VPNProfile.id != profile.id ).order_by(VPNProfile.priority).all() # Update priorities current_priority = 1 for p in profiles: if current_priority == new_priority: current_priority += 1 p.priority = current_priority current_priority += 1 profile.priority = new_priority self.db.commit() def get_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]: """Get all VPN profiles for a gateway, ordered by priority.""" return self.db.query(VPNProfile).filter( VPNProfile.gateway_id == gateway_id ).order_by(VPNProfile.priority).all() def get_active_profiles_for_gateway(self, gateway_id: int) -> list[VPNProfile]: """Get active VPN profiles for a gateway.""" return self.db.query(VPNProfile).filter( VPNProfile.gateway_id == gateway_id, VPNProfile.is_active == True, VPNProfile.status.in_([VPNProfileStatus.ACTIVE, VPNProfileStatus.PROVISIONED]) ).order_by(VPNProfile.priority).all() def revoke_profile(self, profile: VPNProfile, reason: str = "unspecified"): """Revoke a VPN profile's certificate.""" if profile.client_cert: self.cert_service.revoke_certificate( ca=profile.certificate_authority, cert_pem=profile.client_cert, reason=reason ) profile.status = VPNProfileStatus.REVOKED profile.is_active = False self.db.commit() def renew_profile(self, profile: VPNProfile): """Renew the certificate for a profile.""" # Revoke old certificate if profile.client_cert: self.cert_service.revoke_certificate( ca=profile.certificate_authority, cert_pem=profile.client_cert, reason="superseded" ) # Generate new certificate self._generate_client_cert(profile) # Reset provisioned status profile.status = VPNProfileStatus.ACTIVE profile.provisioned_at = None self.db.commit() def get_profile_by_id(self, profile_id: int) -> Optional[VPNProfile]: """Get a VPN profile by ID.""" return self.db.query(VPNProfile).filter( VPNProfile.id == profile_id ).first() def delete_profile(self, profile: VPNProfile): """Delete a VPN profile.""" # Revoke certificate first if profile.client_cert and profile.status != VPNProfileStatus.REVOKED: self.cert_service.revoke_certificate( ca=profile.certificate_authority, cert_pem=profile.client_cert, reason="cessationOfOperation" ) self.db.delete(profile) self.db.commit() def generate_all_configs_for_gateway(self, gateway_id: int) -> list[tuple[str, str]]: """Generate configs for all active profiles of a gateway. Returns list of tuples: (filename, config_content) """ profiles = self.get_active_profiles_for_gateway(gateway_id) configs = [] for profile in profiles: try: config = self.generate_client_config(profile) filename = f"{profile.gateway.name}-{profile.name}.ovpn" filename = filename.lower().replace(' ', '-') configs.append((filename, config)) except Exception: continue return configs