openvpn-endpoint-server/client/services/vpn_manager.py

190 lines
6.6 KiB
Python

"""OpenVPN process management."""
import os
import sys
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
from config import OPENVPN_SEARCH_PATHS, OPENVPN_CONFIG_DIR
@dataclass
class VPNStatus:
"""VPN connection status."""
connected: bool
vpn_ip: Optional[str] = None
error: Optional[str] = None
class VPNManager:
"""Manages OpenVPN connections."""
def __init__(self):
self.process: Optional[subprocess.Popen] = None
self.config_file: Optional[Path] = None
self.log_file: Optional[Path] = None
self._openvpn_path: Optional[str] = None
def find_openvpn_binary(self) -> Optional[str]:
"""Find the OpenVPN binary on the system.
Searches platform-specific paths and returns the first found binary.
Returns None if not found.
"""
if self._openvpn_path:
return self._openvpn_path
# First, check configured search paths
for path in OPENVPN_SEARCH_PATHS:
if Path(path).exists() and Path(path).is_file():
self._openvpn_path = path
return path
# Try to find via PATH environment (works on all platforms)
openvpn_in_path = shutil.which("openvpn")
if openvpn_in_path:
self._openvpn_path = openvpn_in_path
return openvpn_in_path
# macOS: Check for Tunnelblick's bundled OpenVPN
if sys.platform == 'darwin':
tunnelblick_base = Path("/Applications/Tunnelblick.app/Contents/Resources/openvpn")
if tunnelblick_base.exists():
# Find latest openvpn version directory
for version_dir in sorted(tunnelblick_base.glob("openvpn-*"), reverse=True):
openvpn_binary = version_dir / "openvpn"
if openvpn_binary.exists():
self._openvpn_path = str(openvpn_binary)
return str(openvpn_binary)
return None
def check_openvpn_installed(self) -> bool:
"""Check if OpenVPN is installed."""
return self.find_openvpn_binary() is not None
def connect(self, config_content: str) -> VPNStatus:
"""Connect using provided OpenVPN config."""
if self.process and self.process.poll() is None:
return VPNStatus(connected=False, error="Already connected")
openvpn_binary = self.find_openvpn_binary()
if not openvpn_binary:
return VPNStatus(
connected=False,
error="OpenVPN is not installed. Please install OpenVPN first.\n\n"
"Installation:\n"
" Linux (Debian/Ubuntu): sudo apt install openvpn\n"
" Linux (Fedora/RHEL): sudo dnf install openvpn\n"
" macOS: brew install openvpn\n"
" Windows: Download from https://openvpn.net/community-downloads/"
)
# Write config to temp file
self.config_file = OPENVPN_CONFIG_DIR / "mguard-temp.ovpn"
self.config_file.write_text(config_content)
# Log file
self.log_file = OPENVPN_CONFIG_DIR / "mguard.log"
try:
if os.name == 'nt':
# Windows: Direct call (requires admin privileges)
self.process = subprocess.Popen(
[openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)],
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
# Linux/macOS: Use sudo openvpn
self.process = subprocess.Popen(
["sudo", openvpn_binary, "--config", str(self.config_file), "--log", str(self.log_file)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Wait a bit for connection
import time
time.sleep(3)
if self.process.poll() is not None:
# Process ended, check for errors
if self.log_file.exists():
log_content = self.log_file.read_text()
return VPNStatus(connected=False, error=f"Connection failed: {log_content[-500:]}")
return VPNStatus(connected=False, error="Connection failed")
return VPNStatus(connected=True)
except PermissionError:
return VPNStatus(
connected=False,
error="Permission denied. Run as administrator/root."
)
except Exception as e:
return VPNStatus(connected=False, error=str(e))
def disconnect(self) -> VPNStatus:
"""Disconnect VPN."""
if not self.process:
return VPNStatus(connected=False)
try:
self.process.terminate()
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process = None
# Clean up temp config
if self.config_file and self.config_file.exists():
self.config_file.unlink()
return VPNStatus(connected=False)
def get_status(self) -> VPNStatus:
"""Get current VPN status."""
if not self.process:
return VPNStatus(connected=False)
if self.process.poll() is not None:
# Process has ended
self.process = None
return VPNStatus(connected=False)
# Try to get VPN IP from log
vpn_ip = None
if self.log_file and self.log_file.exists():
try:
log_content = self.log_file.read_text()
# Parse for IP assignment
for line in log_content.split('\n'):
if 'ifconfig' in line.lower() and 'netmask' in line.lower():
parts = line.split()
for i, part in enumerate(parts):
if part == 'ifconfig' and i + 1 < len(parts):
vpn_ip = parts[i + 1]
break
except Exception:
pass
return VPNStatus(connected=True, vpn_ip=vpn_ip)
def is_connected(self) -> bool:
"""Check if VPN is connected."""
return self.process is not None and self.process.poll() is None
def get_openvpn_info(self) -> dict:
"""Get information about the OpenVPN installation."""
binary = self.find_openvpn_binary()
return {
"installed": binary is not None,
"path": binary,
"platform": sys.platform,
"search_paths": OPENVPN_SEARCH_PATHS
}