190 lines
6.6 KiB
Python
190 lines
6.6 KiB
Python
"""OpenVPN process management."""
|
|
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from dataclasses import dataclass
|
|
|
|
from config import OPENVPN_SEARCH_PATHS, OPENVPN_CONFIG_DIR
|
|
|
|
|
|
@dataclass
|
|
class VPNStatus:
|
|
"""VPN connection status."""
|
|
connected: bool
|
|
vpn_ip: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
class VPNManager:
|
|
"""Manages OpenVPN connections."""
|
|
|
|
def __init__(self):
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self.config_file: Optional[Path] = None
|
|
self.log_file: Optional[Path] = None
|
|
self._openvpn_path: Optional[str] = None
|
|
|
|
def find_openvpn_binary(self) -> Optional[str]:
|
|
"""Find the OpenVPN binary on the system.
|
|
|
|
Searches platform-specific paths and returns the first found binary.
|
|
Returns None if not found.
|
|
"""
|
|
if self._openvpn_path:
|
|
return self._openvpn_path
|
|
|
|
# First, check configured search paths
|
|
for path in OPENVPN_SEARCH_PATHS:
|
|
if Path(path).exists() and Path(path).is_file():
|
|
self._openvpn_path = path
|
|
return path
|
|
|
|
# Try to find via PATH environment (works on all platforms)
|
|
openvpn_in_path = shutil.which("openvpn")
|
|
if openvpn_in_path:
|
|
self._openvpn_path = openvpn_in_path
|
|
return openvpn_in_path
|
|
|
|
# macOS: Check for Tunnelblick's bundled OpenVPN
|
|
if sys.platform == 'darwin':
|
|
tunnelblick_base = Path("/Applications/Tunnelblick.app/Contents/Resources/openvpn")
|
|
if tunnelblick_base.exists():
|
|
# Find latest openvpn version directory
|
|
for version_dir in sorted(tunnelblick_base.glob("openvpn-*"), reverse=True):
|
|
openvpn_binary = version_dir / "openvpn"
|
|
if openvpn_binary.exists():
|
|
self._openvpn_path = str(openvpn_binary)
|
|
return str(openvpn_binary)
|
|
|
|
return None
|
|
|
|
def check_openvpn_installed(self) -> bool:
|
|
"""Check if OpenVPN is installed."""
|
|
return self.find_openvpn_binary() is not None
|
|
|
|
def connect(self, config_content: str) -> VPNStatus:
|
|
"""Connect using provided OpenVPN config."""
|
|
if self.process and self.process.poll() is None:
|
|
return VPNStatus(connected=False, error="Already connected")
|
|
|
|
openvpn_binary = self.find_openvpn_binary()
|
|
if not openvpn_binary:
|
|
return VPNStatus(
|
|
connected=False,
|
|
error="OpenVPN is not installed. Please install OpenVPN first.\n\n"
|
|
"Installation:\n"
|
|
" Linux (Debian/Ubuntu): sudo apt install openvpn\n"
|
|
" Linux (Fedora/RHEL): sudo dnf install openvpn\n"
|
|
" macOS: brew install openvpn\n"
|
|
" Windows: Download from https://openvpn.net/community-downloads/"
|
|
)
|
|
|
|
# Write config to temp file
|
|
self.config_file = OPENVPN_CONFIG_DIR / "mguard-temp.ovpn"
|
|
self.config_file.write_text(config_content)
|
|
|
|
# Log file
|
|
self.log_file = OPENVPN_CONFIG_DIR / "mguard.log"
|
|
|
|
try:
|
|
if os.name == 'nt':
|
|
# Windows: Direct call (requires admin privileges)
|
|
self.process = subprocess.Popen(
|
|
[openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)],
|
|
creationflags=subprocess.CREATE_NO_WINDOW
|
|
)
|
|
else:
|
|
# Linux/macOS: Use sudo openvpn
|
|
self.process = subprocess.Popen(
|
|
["sudo", openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
|
|
# Wait a bit for connection
|
|
import time
|
|
time.sleep(3)
|
|
|
|
if self.process.poll() is not None:
|
|
# Process ended, check for errors
|
|
if self.log_file.exists():
|
|
log_content = self.log_file.read_text()
|
|
return VPNStatus(connected=False, error=f"Connection failed: {log_content[-500:]}")
|
|
return VPNStatus(connected=False, error="Connection failed")
|
|
|
|
return VPNStatus(connected=True)
|
|
|
|
except PermissionError:
|
|
return VPNStatus(
|
|
connected=False,
|
|
error="Permission denied. Run as administrator/root."
|
|
)
|
|
except Exception as e:
|
|
return VPNStatus(connected=False, error=str(e))
|
|
|
|
def disconnect(self) -> VPNStatus:
|
|
"""Disconnect VPN."""
|
|
if not self.process:
|
|
return VPNStatus(connected=False)
|
|
|
|
try:
|
|
self.process.terminate()
|
|
self.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
|
|
self.process = None
|
|
|
|
# Clean up temp config
|
|
if self.config_file and self.config_file.exists():
|
|
self.config_file.unlink()
|
|
|
|
return VPNStatus(connected=False)
|
|
|
|
def get_status(self) -> VPNStatus:
|
|
"""Get current VPN status."""
|
|
if not self.process:
|
|
return VPNStatus(connected=False)
|
|
|
|
if self.process.poll() is not None:
|
|
# Process has ended
|
|
self.process = None
|
|
return VPNStatus(connected=False)
|
|
|
|
# Try to get VPN IP from log
|
|
vpn_ip = None
|
|
if self.log_file and self.log_file.exists():
|
|
try:
|
|
log_content = self.log_file.read_text()
|
|
# Parse for IP assignment
|
|
for line in log_content.split('\n'):
|
|
if 'ifconfig' in line.lower() and 'netmask' in line.lower():
|
|
parts = line.split()
|
|
for i, part in enumerate(parts):
|
|
if part == 'ifconfig' and i + 1 < len(parts):
|
|
vpn_ip = parts[i + 1]
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
return VPNStatus(connected=True, vpn_ip=vpn_ip)
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Check if VPN is connected."""
|
|
return self.process is not None and self.process.poll() is None
|
|
|
|
def get_openvpn_info(self) -> dict:
|
|
"""Get information about the OpenVPN installation."""
|
|
binary = self.find_openvpn_binary()
|
|
return {
|
|
"installed": binary is not None,
|
|
"path": binary,
|
|
"platform": sys.platform,
|
|
"search_paths": OPENVPN_SEARCH_PATHS
|
|
}
|