"""VPN management service for OpenVPN operations. This service provides basic OpenVPN management interface communication. For PKI/certificate management, use CertificateService. For VPN server management, use VPNServerService. """ import socket from ..config import get_settings settings = get_settings() class VPNService: """Service for OpenVPN management interface operations.""" def __init__(self, host: str = None, port: int = None): self.management_host = host or settings.openvpn_management_host self.management_port = port or settings.openvpn_management_port def _send_management_command(self, command: str) -> str: """Send command to OpenVPN management interface.""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((self.management_host, self.management_port)) # Read welcome message sock.recv(1024) # Send command sock.send(f"{command}\n".encode()) # Read response response = b"" while True: data = sock.recv(4096) if not data: break response += data if b"END" in data or b"SUCCESS" in data or b"ERROR" in data: break sock.close() return response.decode() except Exception as e: return f"ERROR: {str(e)}" def get_connected_clients(self) -> list[dict]: """Get list of currently connected VPN clients.""" response = self._send_management_command("status") clients = [] if "ERROR" in response: return clients # Parse status output in_client_list = False for line in response.split('\n'): if line.startswith("ROUTING TABLE"): in_client_list = False elif line.startswith("Common Name"): in_client_list = True continue elif in_client_list and ',' in line: parts = line.split(',') if len(parts) >= 5: clients.append({ "common_name": parts[0], "real_address": parts[1], "bytes_received": int(parts[2]) if parts[2].isdigit() else 0, "bytes_sent": int(parts[3]) if parts[3].isdigit() else 0, "connected_since": parts[4] }) return clients def disconnect_client(self, common_name: str) -> bool: """Disconnect a specific VPN client.""" response = self._send_management_command(f"kill {common_name}") return "SUCCESS" in response def get_server_status(self) -> dict: """Get OpenVPN server status.""" response = self._send_management_command("state") if "ERROR" in response: return {"status": "error", "message": response} # Parse state for line in response.split('\n'): if 'CONNECTED' in line: return {"status": "running", "message": "Server is running"} return {"status": "unknown", "message": response}