176 lines
5.2 KiB
Python
176 lines
5.2 KiB
Python
"""mGuard REST API client for router configuration."""
|
|
|
|
import httpx
|
|
from typing import Optional
|
|
import urllib3
|
|
|
|
# Disable SSL warnings for self-signed certificates
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
|
|
class MGuardAPIClient:
|
|
"""Client for mGuard REST API (Firmware 10.x+)."""
|
|
|
|
def __init__(self, host: str, username: str, password: str, port: int = 443):
|
|
"""Initialize mGuard API client.
|
|
|
|
Args:
|
|
host: Router IP address or hostname
|
|
username: Admin username
|
|
password: Admin password
|
|
port: HTTPS port (default 443)
|
|
"""
|
|
self.base_url = f"https://{host}:{port}"
|
|
self.username = username
|
|
self.password = password
|
|
self.client = httpx.Client(
|
|
timeout=30.0,
|
|
verify=False, # mGuard uses self-signed certs
|
|
auth=(username, password)
|
|
)
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test connection to mGuard REST API."""
|
|
try:
|
|
response = self.client.get(f"{self.base_url}/api/v1/info")
|
|
return response.status_code == 200
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def get_system_info(self) -> Optional[dict]:
|
|
"""Get system information."""
|
|
try:
|
|
response = self.client.get(f"{self.base_url}/api/v1/info")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
def get_configuration(self) -> Optional[dict]:
|
|
"""Get current configuration."""
|
|
try:
|
|
response = self.client.get(f"{self.base_url}/api/v1/configuration")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
def configure_vpn(self, vpn_config: str) -> bool:
|
|
"""Configure OpenVPN client.
|
|
|
|
Args:
|
|
vpn_config: OpenVPN configuration content
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
# The actual API endpoint depends on mGuard firmware version
|
|
# This is an example - actual implementation may vary
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/v1/vpn/openvpn/client",
|
|
json={
|
|
"enabled": True,
|
|
"config": vpn_config
|
|
}
|
|
)
|
|
return response.status_code in (200, 201, 204)
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def upload_config(self, config_content: str) -> bool:
|
|
"""Upload configuration file.
|
|
|
|
Args:
|
|
config_content: Configuration file content
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/v1/configuration",
|
|
content=config_content,
|
|
headers={"Content-Type": "application/octet-stream"}
|
|
)
|
|
return response.status_code in (200, 201, 204)
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def add_firewall_rule(
|
|
self,
|
|
name: str,
|
|
source: str,
|
|
destination: str,
|
|
port: int,
|
|
protocol: str = "tcp",
|
|
action: str = "accept"
|
|
) -> bool:
|
|
"""Add a firewall rule.
|
|
|
|
Args:
|
|
name: Rule name
|
|
source: Source IP/network
|
|
destination: Destination IP/network
|
|
port: Destination port
|
|
protocol: tcp or udp
|
|
action: accept or drop
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/v1/firewall/rules",
|
|
json={
|
|
"name": name,
|
|
"source": source,
|
|
"destination": destination,
|
|
"port": port,
|
|
"protocol": protocol,
|
|
"action": action,
|
|
"enabled": True
|
|
}
|
|
)
|
|
return response.status_code in (200, 201, 204)
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def remove_firewall_rule(self, rule_id: str) -> bool:
|
|
"""Remove a firewall rule.
|
|
|
|
Args:
|
|
rule_id: Rule ID to remove
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
response = self.client.delete(
|
|
f"{self.base_url}/api/v1/firewall/rules/{rule_id}"
|
|
)
|
|
return response.status_code in (200, 204)
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def reboot(self) -> bool:
|
|
"""Reboot the router."""
|
|
try:
|
|
response = self.client.post(f"{self.base_url}/api/v1/actions/reboot")
|
|
return response.status_code in (200, 202, 204)
|
|
except httpx.HTTPError:
|
|
return False
|
|
|
|
def get_vpn_status(self) -> Optional[dict]:
|
|
"""Get VPN connection status."""
|
|
try:
|
|
response = self.client.get(f"{self.base_url}/api/v1/vpn/status")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
def close(self):
|
|
"""Close the HTTP client."""
|
|
self.client.close()
|