"""OpenVPN process management.""" import os import sys import shutil import subprocess import tempfile from pathlib import Path from typing import Optional from dataclasses import dataclass from config import OPENVPN_SEARCH_PATHS, OPENVPN_CONFIG_DIR @dataclass class VPNStatus: """VPN connection status.""" connected: bool vpn_ip: Optional[str] = None error: Optional[str] = None class VPNManager: """Manages OpenVPN connections.""" def __init__(self): self.process: Optional[subprocess.Popen] = None self.config_file: Optional[Path] = None self.log_file: Optional[Path] = None self._openvpn_path: Optional[str] = None def find_openvpn_binary(self) -> Optional[str]: """Find the OpenVPN binary on the system. Searches platform-specific paths and returns the first found binary. Returns None if not found. """ if self._openvpn_path: return self._openvpn_path # First, check configured search paths for path in OPENVPN_SEARCH_PATHS: if Path(path).exists() and Path(path).is_file(): self._openvpn_path = path return path # Try to find via PATH environment (works on all platforms) openvpn_in_path = shutil.which("openvpn") if openvpn_in_path: self._openvpn_path = openvpn_in_path return openvpn_in_path # macOS: Check for Tunnelblick's bundled OpenVPN if sys.platform == 'darwin': tunnelblick_base = Path("/Applications/Tunnelblick.app/Contents/Resources/openvpn") if tunnelblick_base.exists(): # Find latest openvpn version directory for version_dir in sorted(tunnelblick_base.glob("openvpn-*"), reverse=True): openvpn_binary = version_dir / "openvpn" if openvpn_binary.exists(): self._openvpn_path = str(openvpn_binary) return str(openvpn_binary) return None def check_openvpn_installed(self) -> bool: """Check if OpenVPN is installed.""" return self.find_openvpn_binary() is not None def connect(self, config_content: str) -> VPNStatus: """Connect using provided OpenVPN config.""" if self.process and self.process.poll() is None: return VPNStatus(connected=False, error="Already connected") openvpn_binary = self.find_openvpn_binary() if not openvpn_binary: return VPNStatus( connected=False, error="OpenVPN is not installed. Please install OpenVPN first.\n\n" "Installation:\n" " Linux (Debian/Ubuntu): sudo apt install openvpn\n" " Linux (Fedora/RHEL): sudo dnf install openvpn\n" " macOS: brew install openvpn\n" " Windows: Download from https://openvpn.net/community-downloads/" ) # Write config to temp file self.config_file = OPENVPN_CONFIG_DIR / "mguard-temp.ovpn" self.config_file.write_text(config_content) # Log file self.log_file = OPENVPN_CONFIG_DIR / "mguard.log" try: if os.name == 'nt': # Windows: Direct call (requires admin privileges) self.process = subprocess.Popen( [openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)], creationflags=subprocess.CREATE_NO_WINDOW ) else: # Linux/macOS: Use sudo openvpn self.process = subprocess.Popen( ["sudo", openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Wait a bit for connection import time time.sleep(3) if self.process.poll() is not None: # Process ended, check for errors if self.log_file.exists(): log_content = self.log_file.read_text() return VPNStatus(connected=False, error=f"Connection failed: {log_content[-500:]}") return VPNStatus(connected=False, error="Connection failed") return VPNStatus(connected=True) except PermissionError: return VPNStatus( connected=False, error="Permission denied. Run as administrator/root." ) except Exception as e: return VPNStatus(connected=False, error=str(e)) def disconnect(self) -> VPNStatus: """Disconnect VPN.""" if not self.process: return VPNStatus(connected=False) try: self.process.terminate() self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() self.process = None # Clean up temp config if self.config_file and self.config_file.exists(): self.config_file.unlink() return VPNStatus(connected=False) def get_status(self) -> VPNStatus: """Get current VPN status.""" if not self.process: return VPNStatus(connected=False) if self.process.poll() is not None: # Process has ended self.process = None return VPNStatus(connected=False) # Try to get VPN IP from log vpn_ip = None if self.log_file and self.log_file.exists(): try: log_content = self.log_file.read_text() # Parse for IP assignment for line in log_content.split('\n'): if 'ifconfig' in line.lower() and 'netmask' in line.lower(): parts = line.split() for i, part in enumerate(parts): if part == 'ifconfig' and i + 1 < len(parts): vpn_ip = parts[i + 1] break except Exception: pass return VPNStatus(connected=True, vpn_ip=vpn_ip) def is_connected(self) -> bool: """Check if VPN is connected.""" return self.process is not None and self.process.poll() is None def get_openvpn_info(self) -> dict: """Get information about the OpenVPN installation.""" binary = self.find_openvpn_binary() return { "installed": binary is not None, "path": binary, "platform": sys.platform, "search_paths": OPENVPN_SEARCH_PATHS }