"""Firewall management service for dynamic iptables rules.""" import subprocess from typing import Literal class FirewallService: """Service for managing iptables firewall rules.""" # Chain name for our VPN rules VPN_CHAIN = "MGUARD_VPN" def __init__(self): self._ensure_chain_exists() def _run_iptables(self, args: list[str], check: bool = True) -> subprocess.CompletedProcess: """Run iptables command.""" cmd = ["iptables"] + args return subprocess.run(cmd, capture_output=True, text=True, check=check) def _ensure_chain_exists(self): """Ensure our custom chain exists.""" # Check if chain exists result = self._run_iptables(["-L", self.VPN_CHAIN], check=False) if result.returncode != 0: # Create chain self._run_iptables(["-N", self.VPN_CHAIN], check=False) # Add jump to our chain from FORWARD self._run_iptables(["-I", "FORWARD", "-j", self.VPN_CHAIN], check=False) def allow_connection( self, client_vpn_ip: str, gateway_vpn_ip: str, target_ip: str, target_port: int, protocol: Literal["tcp", "udp"] = "tcp" ) -> bool: """Allow connection from client through gateway to target endpoint.""" try: # Rule 1: Allow client to reach target through gateway self._run_iptables([ "-A", self.VPN_CHAIN, "-s", client_vpn_ip, "-d", target_ip, "-p", protocol, "--dport", str(target_port), "-j", "ACCEPT" ]) # Rule 2: Allow return traffic self._run_iptables([ "-A", self.VPN_CHAIN, "-s", target_ip, "-d", client_vpn_ip, "-p", protocol, "--sport", str(target_port), "-j", "ACCEPT" ]) # Add NAT/masquerade if needed for routing through gateway self._run_iptables([ "-t", "nat", "-A", "POSTROUTING", "-s", client_vpn_ip, "-d", target_ip, "-j", "MASQUERADE" ], check=False) return True except subprocess.CalledProcessError: return False def revoke_connection( self, client_vpn_ip: str, gateway_vpn_ip: str, target_ip: str, target_port: int, protocol: Literal["tcp", "udp"] = "tcp" ) -> bool: """Remove firewall rules for a connection.""" try: # Remove forward rules self._run_iptables([ "-D", self.VPN_CHAIN, "-s", client_vpn_ip, "-d", target_ip, "-p", protocol, "--dport", str(target_port), "-j", "ACCEPT" ], check=False) self._run_iptables([ "-D", self.VPN_CHAIN, "-s", target_ip, "-d", client_vpn_ip, "-p", protocol, "--sport", str(target_port), "-j", "ACCEPT" ], check=False) # Remove NAT rule self._run_iptables([ "-t", "nat", "-D", "POSTROUTING", "-s", client_vpn_ip, "-d", target_ip, "-j", "MASQUERADE" ], check=False) return True except subprocess.CalledProcessError: return False def list_rules(self) -> list[str]: """List all rules in our VPN chain.""" result = self._run_iptables(["-L", self.VPN_CHAIN, "-n", "-v"], check=False) if result.returncode == 0: return result.stdout.strip().split('\n') return [] def flush_rules(self) -> bool: """Remove all rules from our VPN chain.""" try: self._run_iptables(["-F", self.VPN_CHAIN]) return True except subprocess.CalledProcessError: return False