openvpn-endpoint-server/server/app/services/firewall_service.py

130 lines
4.0 KiB
Python

"""Firewall management service for dynamic iptables rules."""
import subprocess
from typing import Literal
class FirewallService:
"""Service for managing iptables firewall rules."""
# Chain name for our VPN rules
VPN_CHAIN = "MGUARD_VPN"
def __init__(self):
self._ensure_chain_exists()
def _run_iptables(self, args: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run iptables command."""
cmd = ["iptables"] + args
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def _ensure_chain_exists(self):
"""Ensure our custom chain exists."""
# Check if chain exists
result = self._run_iptables(["-L", self.VPN_CHAIN], check=False)
if result.returncode != 0:
# Create chain
self._run_iptables(["-N", self.VPN_CHAIN], check=False)
# Add jump to our chain from FORWARD
self._run_iptables(["-I", "FORWARD", "-j", self.VPN_CHAIN], check=False)
def allow_connection(
self,
client_vpn_ip: str,
gateway_vpn_ip: str,
target_ip: str,
target_port: int,
protocol: Literal["tcp", "udp"] = "tcp"
) -> bool:
"""Allow connection from client through gateway to target endpoint."""
try:
# Rule 1: Allow client to reach target through gateway
self._run_iptables([
"-A", self.VPN_CHAIN,
"-s", client_vpn_ip,
"-d", target_ip,
"-p", protocol,
"--dport", str(target_port),
"-j", "ACCEPT"
])
# Rule 2: Allow return traffic
self._run_iptables([
"-A", self.VPN_CHAIN,
"-s", target_ip,
"-d", client_vpn_ip,
"-p", protocol,
"--sport", str(target_port),
"-j", "ACCEPT"
])
# Add NAT/masquerade if needed for routing through gateway
self._run_iptables([
"-t", "nat",
"-A", "POSTROUTING",
"-s", client_vpn_ip,
"-d", target_ip,
"-j", "MASQUERADE"
], check=False)
return True
except subprocess.CalledProcessError:
return False
def revoke_connection(
self,
client_vpn_ip: str,
gateway_vpn_ip: str,
target_ip: str,
target_port: int,
protocol: Literal["tcp", "udp"] = "tcp"
) -> bool:
"""Remove firewall rules for a connection."""
try:
# Remove forward rules
self._run_iptables([
"-D", self.VPN_CHAIN,
"-s", client_vpn_ip,
"-d", target_ip,
"-p", protocol,
"--dport", str(target_port),
"-j", "ACCEPT"
], check=False)
self._run_iptables([
"-D", self.VPN_CHAIN,
"-s", target_ip,
"-d", client_vpn_ip,
"-p", protocol,
"--sport", str(target_port),
"-j", "ACCEPT"
], check=False)
# Remove NAT rule
self._run_iptables([
"-t", "nat",
"-D", "POSTROUTING",
"-s", client_vpn_ip,
"-d", target_ip,
"-j", "MASQUERADE"
], check=False)
return True
except subprocess.CalledProcessError:
return False
def list_rules(self) -> list[str]:
"""List all rules in our VPN chain."""
result = self._run_iptables(["-L", self.VPN_CHAIN, "-n", "-v"], check=False)
if result.returncode == 0:
return result.stdout.strip().split('\n')
return []
def flush_rules(self) -> bool:
"""Remove all rules from our VPN chain."""
try:
self._run_iptables(["-F", self.VPN_CHAIN])
return True
except subprocess.CalledProcessError:
return False