315 lines
10 KiB
Python
315 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""mGuard Provisioning Tool - CLI for gateway provisioning."""
|
|
|
|
import click
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
from mguard_api import MGuardAPIClient
|
|
from config_generator import ConfigGenerator
|
|
|
|
console = Console()
|
|
|
|
|
|
@click.group()
|
|
@click.option('--server', '-s', default='http://localhost:8000', help='API server URL')
|
|
@click.option('--username', '-u', prompt=True, help='Admin username')
|
|
@click.option('--password', '-p', prompt=True, hide_input=True, help='Admin password')
|
|
@click.pass_context
|
|
def cli(ctx, server, username, password):
|
|
"""mGuard Gateway Provisioning Tool.
|
|
|
|
Use this tool to provision and configure mGuard routers.
|
|
"""
|
|
ctx.ensure_object(dict)
|
|
|
|
# Login to API
|
|
import httpx
|
|
client = httpx.Client(timeout=30.0)
|
|
|
|
try:
|
|
response = client.post(
|
|
f"{server}/api/auth/login",
|
|
json={"username": username, "password": password}
|
|
)
|
|
response.raise_for_status()
|
|
tokens = response.json()
|
|
|
|
ctx.obj['server'] = server
|
|
ctx.obj['token'] = tokens['access_token']
|
|
ctx.obj['client'] = client
|
|
|
|
console.print("[green]Successfully authenticated[/green]")
|
|
|
|
except httpx.HTTPError as e:
|
|
console.print(f"[red]Authentication failed: {e}[/red]")
|
|
raise click.Abort()
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_context
|
|
def list_gateways(ctx):
|
|
"""List all gateways."""
|
|
client = ctx.obj['client']
|
|
server = ctx.obj['server']
|
|
token = ctx.obj['token']
|
|
|
|
response = client.get(
|
|
f"{server}/api/gateways",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
gateways = response.json()
|
|
|
|
table = Table(title="Gateways")
|
|
table.add_column("ID", style="cyan")
|
|
table.add_column("Name", style="green")
|
|
table.add_column("Type")
|
|
table.add_column("Status")
|
|
table.add_column("Provisioned")
|
|
|
|
for gw in gateways:
|
|
status = "[green]Online[/green]" if gw['is_online'] else "[red]Offline[/red]"
|
|
prov = "[green]Yes[/green]" if gw['is_provisioned'] else "[yellow]No[/yellow]"
|
|
table.add_row(
|
|
str(gw['id']),
|
|
gw['name'],
|
|
gw['router_type'],
|
|
status,
|
|
prov
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument('gateway_id', type=int)
|
|
@click.option('--output', '-o', default=None, help='Output file path')
|
|
@click.pass_context
|
|
def download_config(ctx, gateway_id, output):
|
|
"""Download provisioning config for a gateway."""
|
|
client = ctx.obj['client']
|
|
server = ctx.obj['server']
|
|
token = ctx.obj['token']
|
|
|
|
response = client.get(
|
|
f"{server}/api/gateways/{gateway_id}/provision",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
console.print(f"[red]Error: {response.text}[/red]")
|
|
return
|
|
|
|
config = response.text
|
|
|
|
if output:
|
|
with open(output, 'w') as f:
|
|
f.write(config)
|
|
console.print(f"[green]Config saved to {output}[/green]")
|
|
else:
|
|
output_file = f"gateway-{gateway_id}.ovpn"
|
|
with open(output_file, 'w') as f:
|
|
f.write(config)
|
|
console.print(f"[green]Config saved to {output_file}[/green]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument('gateway_id', type=int)
|
|
@click.argument('router_ip')
|
|
@click.option('--router-user', '-u', default='admin', help='Router username')
|
|
@click.option('--router-pass', '-p', prompt=True, hide_input=True, help='Router password')
|
|
@click.pass_context
|
|
def provision_online(ctx, gateway_id, router_ip, router_user, router_pass):
|
|
"""Provision a gateway via network (REST API or SSH).
|
|
|
|
GATEWAY_ID: ID of the gateway in the server database
|
|
ROUTER_IP: IP address of the mGuard router
|
|
"""
|
|
client = ctx.obj['client']
|
|
server = ctx.obj['server']
|
|
token = ctx.obj['token']
|
|
|
|
console.print(f"[yellow]Connecting to router at {router_ip}...[/yellow]")
|
|
|
|
# Get gateway info
|
|
response = client.get(
|
|
f"{server}/api/gateways/{gateway_id}",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
console.print(f"[red]Error: Gateway not found[/red]")
|
|
return
|
|
|
|
gateway = response.json()
|
|
firmware = gateway.get('firmware_version', '')
|
|
|
|
# Determine provisioning method
|
|
if firmware and firmware.startswith('10.'):
|
|
console.print("[cyan]Using REST API provisioning (Firmware 10.x)[/cyan]")
|
|
_provision_rest_api(ctx, gateway, router_ip, router_user, router_pass)
|
|
else:
|
|
console.print("[cyan]Using SSH provisioning (Legacy firmware)[/cyan]")
|
|
_provision_ssh(ctx, gateway, router_ip, router_user, router_pass)
|
|
|
|
|
|
def _provision_rest_api(ctx, gateway, router_ip, router_user, router_pass):
|
|
"""Provision via mGuard REST API."""
|
|
mguard = MGuardAPIClient(router_ip, router_user, router_pass)
|
|
|
|
try:
|
|
# Test connection
|
|
if not mguard.test_connection():
|
|
console.print("[red]Cannot connect to router REST API[/red]")
|
|
return
|
|
|
|
# Download VPN config from server
|
|
client = ctx.obj['client']
|
|
server = ctx.obj['server']
|
|
token = ctx.obj['token']
|
|
|
|
response = client.get(
|
|
f"{server}/api/gateways/{gateway['id']}/provision",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
vpn_config = response.text
|
|
|
|
# Apply VPN configuration
|
|
console.print("[yellow]Applying VPN configuration...[/yellow]")
|
|
if mguard.configure_vpn(vpn_config):
|
|
console.print("[green]VPN configuration applied successfully![/green]")
|
|
else:
|
|
console.print("[red]Failed to apply VPN configuration[/red]")
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]Error: {e}[/red]")
|
|
|
|
|
|
def _provision_ssh(ctx, gateway, router_ip, router_user, router_pass):
|
|
"""Provision via SSH (legacy routers)."""
|
|
import paramiko
|
|
|
|
try:
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect(router_ip, username=router_user, password=router_pass, timeout=10)
|
|
|
|
console.print("[green]SSH connection established[/green]")
|
|
|
|
# Download VPN config from server
|
|
client = ctx.obj['client']
|
|
server = ctx.obj['server']
|
|
token = ctx.obj['token']
|
|
|
|
response = client.get(
|
|
f"{server}/api/gateways/{gateway['id']}/provision",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
vpn_config = response.text
|
|
|
|
# Upload config file
|
|
sftp = ssh.open_sftp()
|
|
with sftp.file('/tmp/vpn.ovpn', 'w') as f:
|
|
f.write(vpn_config)
|
|
|
|
console.print("[yellow]VPN config uploaded[/yellow]")
|
|
|
|
# Apply configuration (mGuard-specific commands)
|
|
stdin, stdout, stderr = ssh.exec_command(
|
|
'/Packages/mguard-api_0/mbin/action vpn/import /tmp/vpn.ovpn'
|
|
)
|
|
result = stdout.read().decode()
|
|
|
|
if 'error' in result.lower():
|
|
console.print(f"[red]Error: {result}[/red]")
|
|
else:
|
|
console.print("[green]VPN configuration applied![/green]")
|
|
|
|
ssh.close()
|
|
|
|
except paramiko.AuthenticationException:
|
|
console.print("[red]SSH authentication failed[/red]")
|
|
except paramiko.SSHException as e:
|
|
console.print(f"[red]SSH error: {e}[/red]")
|
|
except Exception as e:
|
|
console.print(f"[red]Error: {e}[/red]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument('config_file')
|
|
@click.argument('router_ip')
|
|
@click.option('--router-user', '-u', default='admin', help='Router username')
|
|
@click.option('--router-pass', '-p', prompt=True, hide_input=True, help='Router password')
|
|
def provision_offline(config_file, router_ip, router_user, router_pass):
|
|
"""Provision a gateway using a downloaded config file.
|
|
|
|
CONFIG_FILE: Path to the .ovpn or .atv config file
|
|
ROUTER_IP: IP address of the mGuard router (must be on same network)
|
|
"""
|
|
import os
|
|
from pathlib import Path
|
|
|
|
config_path = Path(config_file)
|
|
if not config_path.exists():
|
|
console.print(f"[red]Config file not found: {config_file}[/red]")
|
|
return
|
|
|
|
console.print(f"[yellow]Loading config from {config_file}...[/yellow]")
|
|
config_content = config_path.read_text()
|
|
|
|
# Determine file type
|
|
if config_file.endswith('.ovpn'):
|
|
console.print("[cyan]OpenVPN config detected[/cyan]")
|
|
elif config_file.endswith('.atv'):
|
|
console.print("[cyan]mGuard ATV config detected[/cyan]")
|
|
else:
|
|
console.print("[yellow]Unknown config format, attempting generic upload[/yellow]")
|
|
|
|
# Try REST API first, then SSH
|
|
mguard = MGuardAPIClient(router_ip, router_user, router_pass)
|
|
|
|
if mguard.test_connection():
|
|
console.print("[cyan]Using REST API...[/cyan]")
|
|
if mguard.upload_config(config_content):
|
|
console.print("[green]Configuration uploaded successfully![/green]")
|
|
else:
|
|
console.print("[red]REST API upload failed[/red]")
|
|
else:
|
|
console.print("[cyan]REST API not available, trying SSH...[/cyan]")
|
|
import paramiko
|
|
|
|
try:
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect(router_ip, username=router_user, password=router_pass, timeout=10)
|
|
|
|
sftp = ssh.open_sftp()
|
|
remote_path = f'/tmp/{config_path.name}'
|
|
with sftp.file(remote_path, 'w') as f:
|
|
f.write(config_content)
|
|
|
|
console.print(f"[green]Config uploaded to {remote_path}[/green]")
|
|
|
|
# Apply based on file type
|
|
if config_file.endswith('.atv'):
|
|
stdin, stdout, stderr = ssh.exec_command(
|
|
f'/Packages/mguard-api_0/mbin/action config/restore {remote_path}'
|
|
)
|
|
else:
|
|
stdin, stdout, stderr = ssh.exec_command(
|
|
f'/Packages/mguard-api_0/mbin/action vpn/import {remote_path}'
|
|
)
|
|
|
|
result = stdout.read().decode()
|
|
console.print(f"Result: {result}")
|
|
|
|
ssh.close()
|
|
console.print("[green]Provisioning complete![/green]")
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]SSH provisioning failed: {e}[/red]")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli(obj={})
|