"""mGuard REST API client for router configuration.""" import httpx from typing import Optional import urllib3 # Disable SSL warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class MGuardAPIClient: """Client for mGuard REST API (Firmware 10.x+).""" def __init__(self, host: str, username: str, password: str, port: int = 443): """Initialize mGuard API client. Args: host: Router IP address or hostname username: Admin username password: Admin password port: HTTPS port (default 443) """ self.base_url = f"https://{host}:{port}" self.username = username self.password = password self.client = httpx.Client( timeout=30.0, verify=False, # mGuard uses self-signed certs auth=(username, password) ) def test_connection(self) -> bool: """Test connection to mGuard REST API.""" try: response = self.client.get(f"{self.base_url}/api/v1/info") return response.status_code == 200 except httpx.HTTPError: return False def get_system_info(self) -> Optional[dict]: """Get system information.""" try: response = self.client.get(f"{self.base_url}/api/v1/info") response.raise_for_status() return response.json() except httpx.HTTPError: return None def get_configuration(self) -> Optional[dict]: """Get current configuration.""" try: response = self.client.get(f"{self.base_url}/api/v1/configuration") response.raise_for_status() return response.json() except httpx.HTTPError: return None def configure_vpn(self, vpn_config: str) -> bool: """Configure OpenVPN client. Args: vpn_config: OpenVPN configuration content Returns: True if successful """ try: # The actual API endpoint depends on mGuard firmware version # This is an example - actual implementation may vary response = self.client.post( f"{self.base_url}/api/v1/vpn/openvpn/client", json={ "enabled": True, "config": vpn_config } ) return response.status_code in (200, 201, 204) except httpx.HTTPError: return False def upload_config(self, config_content: str) -> bool: """Upload configuration file. Args: config_content: Configuration file content Returns: True if successful """ try: response = self.client.post( f"{self.base_url}/api/v1/configuration", content=config_content, headers={"Content-Type": "application/octet-stream"} ) return response.status_code in (200, 201, 204) except httpx.HTTPError: return False def add_firewall_rule( self, name: str, source: str, destination: str, port: int, protocol: str = "tcp", action: str = "accept" ) -> bool: """Add a firewall rule. Args: name: Rule name source: Source IP/network destination: Destination IP/network port: Destination port protocol: tcp or udp action: accept or drop Returns: True if successful """ try: response = self.client.post( f"{self.base_url}/api/v1/firewall/rules", json={ "name": name, "source": source, "destination": destination, "port": port, "protocol": protocol, "action": action, "enabled": True } ) return response.status_code in (200, 201, 204) except httpx.HTTPError: return False def remove_firewall_rule(self, rule_id: str) -> bool: """Remove a firewall rule. Args: rule_id: Rule ID to remove Returns: True if successful """ try: response = self.client.delete( f"{self.base_url}/api/v1/firewall/rules/{rule_id}" ) return response.status_code in (200, 204) except httpx.HTTPError: return False def reboot(self) -> bool: """Reboot the router.""" try: response = self.client.post(f"{self.base_url}/api/v1/actions/reboot") return response.status_code in (200, 202, 204) except httpx.HTTPError: return False def get_vpn_status(self) -> Optional[dict]: """Get VPN connection status.""" try: response = self.client.get(f"{self.base_url}/api/v1/vpn/status") response.raise_for_status() return response.json() except httpx.HTTPError: return None def close(self): """Close the HTTP client.""" self.client.close()