openvpn-endpoint-server/client/services/vpn_manager.py

145 lines
4.8 KiB
Python

"""OpenVPN process management."""
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
from config import OPENVPN_EXE, OPENVPN_CONFIG_DIR
@dataclass
class VPNStatus:
"""VPN connection status."""
connected: bool
vpn_ip: Optional[str] = None
error: Optional[str] = None
class VPNManager:
"""Manages OpenVPN connections."""
def __init__(self):
self.process: Optional[subprocess.Popen] = None
self.config_file: Optional[Path] = None
self.log_file: Optional[Path] = None
def check_openvpn_installed(self) -> bool:
"""Check if OpenVPN is installed."""
if os.name == 'nt':
return Path(OPENVPN_EXE).exists()
else:
try:
subprocess.run(["which", "openvpn"], capture_output=True, check=True)
return True
except subprocess.CalledProcessError:
return False
def connect(self, config_content: str) -> VPNStatus:
"""Connect using provided OpenVPN config."""
if self.process and self.process.poll() is None:
return VPNStatus(connected=False, error="Already connected")
if not self.check_openvpn_installed():
return VPNStatus(
connected=False,
error="OpenVPN is not installed. Please install OpenVPN first."
)
# Write config to temp file
self.config_file = OPENVPN_CONFIG_DIR / "mguard-temp.ovpn"
self.config_file.write_text(config_content)
# Log file
self.log_file = OPENVPN_CONFIG_DIR / "mguard.log"
try:
if os.name == 'nt':
# Windows: Use OpenVPN GUI or direct call
# Note: Requires admin privileges
self.process = subprocess.Popen(
[OPENVPN_EXE, "--config", str(self.config_file), "--log", str(self.log_file)],
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
# Linux: Use sudo openvpn
self.process = subprocess.Popen(
["sudo", "openvpn", "--config", str(self.config_file), "--log", str(self.log_file)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Wait a bit for connection
import time
time.sleep(3)
if self.process.poll() is not None:
# Process ended, check for errors
if self.log_file.exists():
log_content = self.log_file.read_text()
return VPNStatus(connected=False, error=f"Connection failed: {log_content[-500:]}")
return VPNStatus(connected=False, error="Connection failed")
return VPNStatus(connected=True)
except PermissionError:
return VPNStatus(
connected=False,
error="Permission denied. Run as administrator/root."
)
except Exception as e:
return VPNStatus(connected=False, error=str(e))
def disconnect(self) -> VPNStatus:
"""Disconnect VPN."""
if not self.process:
return VPNStatus(connected=False)
try:
self.process.terminate()
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process = None
# Clean up temp config
if self.config_file and self.config_file.exists():
self.config_file.unlink()
return VPNStatus(connected=False)
def get_status(self) -> VPNStatus:
"""Get current VPN status."""
if not self.process:
return VPNStatus(connected=False)
if self.process.poll() is not None:
# Process has ended
self.process = None
return VPNStatus(connected=False)
# Try to get VPN IP from log
vpn_ip = None
if self.log_file and self.log_file.exists():
try:
log_content = self.log_file.read_text()
# Parse for IP assignment
for line in log_content.split('\n'):
if 'ifconfig' in line.lower() and 'netmask' in line.lower():
parts = line.split()
for i, part in enumerate(parts):
if part == 'ifconfig' and i + 1 < len(parts):
vpn_ip = parts[i + 1]
break
except Exception:
pass
return VPNStatus(connected=True, vpn_ip=vpn_ip)
def is_connected(self) -> bool:
"""Check if VPN is connected."""
return self.process is not None and self.process.poll() is None