proxmox-cluster-network-cha.../config_parser.py

293 lines
9.1 KiB
Python

"""Parsers for Proxmox configuration files (Corosync, Ceph, /etc/network/interfaces)."""
import re
from models import (
CorosyncConfig, CorosyncNode, CephConfig, NetworkInterface,
)
def parse_corosync_conf(content: str) -> CorosyncConfig:
"""Parse corosync.conf and extract node information."""
config = CorosyncConfig(raw_content=content)
# Extract config_version
m = re.search(r'config_version:\s*(\d+)', content)
if m:
config.config_version = int(m.group(1))
# Extract cluster_name
m = re.search(r'cluster_name:\s*(\S+)', content)
if m:
config.cluster_name = m.group(1)
# Extract transport
m = re.search(r'transport:\s*(\S+)', content)
if m:
config.transport = m.group(1)
# Extract nodes from nodelist section
nodelist_match = re.search(r'nodelist\s*\{(.*?)\n\}', content, re.DOTALL)
if nodelist_match:
nodelist_content = nodelist_match.group(1)
# Find all node blocks
node_blocks = re.findall(r'node\s*\{(.*?)\}', nodelist_content, re.DOTALL)
for block in node_blocks:
node = CorosyncNode(nodeid=0, name="", ring0_addr="")
m = re.search(r'nodeid:\s*(\d+)', block)
if m:
node.nodeid = int(m.group(1))
m = re.search(r'name:\s*(\S+)', block)
if m:
node.name = m.group(1)
m = re.search(r'ring0_addr:\s*(\S+)', block)
if m:
node.ring0_addr = m.group(1)
m = re.search(r'ring1_addr:\s*(\S+)', block)
if m:
node.ring1_addr = m.group(1)
config.nodes.append(node)
return config
def generate_corosync_conf(config: CorosyncConfig, ip_mapping: dict[str, str]) -> str:
"""Generate new corosync.conf with updated IP addresses.
ip_mapping: old_ip -> new_ip
"""
new_content = config.raw_content
for old_ip, new_ip in ip_mapping.items():
new_content = new_content.replace(old_ip, new_ip)
# Increment config_version
m = re.search(r'config_version:\s*(\d+)', new_content)
if m:
old_version = int(m.group(1))
new_content = new_content.replace(
f'config_version: {old_version}',
f'config_version: {old_version + 1}'
)
return new_content
def parse_ceph_conf(content: str) -> CephConfig:
"""Parse ceph.conf (INI-like format)."""
config = CephConfig(raw_content=content)
# Extract fsid
m = re.search(r'fsid\s*=\s*(\S+)', content)
if m:
config.fsid = m.group(1)
# Extract public_network
m = re.search(r'public.network\s*=\s*(\S+)', content)
if m:
config.public_network = m.group(1)
# Extract cluster_network
m = re.search(r'cluster.network\s*=\s*(\S+)', content)
if m:
config.cluster_network = m.group(1)
# Extract mon_host (can be comma-separated, space-separated, or both)
m = re.search(r'mon.host\s*=\s*(.+)', content)
if m:
hosts_str = m.group(1).strip()
# Split by comma or whitespace
config.mon_hosts = [
h.strip() for h in re.split(r'[,\s]+', hosts_str)
if h.strip()
]
# Extract [mon.X] sections
mon_sections = re.findall(
r'\[(mon\.[\w.-]+)\]\s*\n((?:\s+\w.*\n)*)', content
)
for section_name, section_body in mon_sections:
props = {}
for line in section_body.strip().split('\n'):
line = line.strip()
if '=' in line:
key, val = line.split('=', 1)
props[key.strip()] = val.strip()
config.mon_sections[section_name] = props
return config
def generate_ceph_conf(config: CephConfig, ip_mapping: dict[str, str],
new_public_network: str, new_cluster_network: str) -> str:
"""Generate new ceph.conf with updated IPs and networks."""
new_content = config.raw_content
# Replace network definitions
if config.public_network:
new_content = new_content.replace(
config.public_network, new_public_network, 1
)
if config.cluster_network:
new_content = new_content.replace(
config.cluster_network, new_cluster_network, 1
)
# Replace all IPs in the config
for old_ip, new_ip in ip_mapping.items():
new_content = new_content.replace(old_ip, new_ip)
return new_content
def parse_network_interfaces(content: str) -> list[NetworkInterface]:
"""Parse /etc/network/interfaces and extract interface configs."""
interfaces = []
current_iface = None
current_lines = []
for line in content.split('\n'):
stripped = line.strip()
# New iface block
m = re.match(r'iface\s+(\S+)\s+inet\s+(\S+)', stripped)
if m:
# Save previous
if current_iface:
interfaces.append(_build_interface(current_iface, current_lines))
current_iface = m.group(1)
current_lines = [line]
continue
# Auto line or source line starts a new context
if stripped.startswith('auto ') or stripped.startswith('source '):
if current_iface:
interfaces.append(_build_interface(current_iface, current_lines))
current_iface = None
current_lines = []
continue
if current_iface and stripped:
current_lines.append(line)
# Don't forget the last one
if current_iface:
interfaces.append(_build_interface(current_iface, current_lines))
return interfaces
def _build_interface(name: str, lines: list[str]) -> NetworkInterface:
"""Build a NetworkInterface from parsed lines."""
raw = '\n'.join(lines)
address = ""
netmask = ""
cidr = 0
gateway = None
bridge_ports = None
for line in lines:
stripped = line.strip()
# address with CIDR notation: address 192.168.0.1/24
m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', stripped)
if m:
address = m.group(1)
cidr = int(m.group(2))
netmask = cidr_to_netmask(cidr)
continue
# address without CIDR
m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)', stripped)
if m:
address = m.group(1)
continue
m = re.match(r'netmask\s+(\S+)', stripped)
if m:
netmask = m.group(1)
cidr = netmask_to_cidr(netmask)
continue
m = re.match(r'gateway\s+(\S+)', stripped)
if m:
gateway = m.group(1)
continue
m = re.match(r'bridge[_-]ports\s+(\S+)', stripped)
if m:
bridge_ports = m.group(1)
continue
return NetworkInterface(
name=name,
address=address,
netmask=netmask,
cidr=cidr,
gateway=gateway,
bridge_ports=bridge_ports,
raw_config=raw,
)
def generate_network_interfaces(content: str, old_ip: str, new_ip: str,
new_cidr: int, new_gateway: str | None = None,
old_gateway: str | None = None) -> str:
"""Update /etc/network/interfaces with new IP, keeping everything else."""
new_content = content
# Replace IP in address lines (with and without CIDR)
# address 192.168.0.101/24 -> address 172.0.2.101/16
new_content = re.sub(
rf'(address\s+){re.escape(old_ip)}/\d+',
rf'\g<1>{new_ip}/{new_cidr}',
new_content
)
# address 192.168.0.101 (without CIDR)
new_content = re.sub(
rf'(address\s+){re.escape(old_ip)}(\s)',
rf'\g<1>{new_ip}\2',
new_content
)
# Replace gateway if provided
if new_gateway and old_gateway:
new_content = new_content.replace(
f'gateway {old_gateway}',
f'gateway {new_gateway}'
)
return new_content
def generate_network_interfaces_multi(
content: str,
replacements: list[tuple[str, str, int, str | None, str | None]]
) -> str:
"""Update /etc/network/interfaces with multiple IP replacements.
Each replacement is: (old_ip, new_ip, new_cidr, old_gateway, new_gateway)
This handles multiple bridges/NICs (management, ceph public, ceph cluster).
"""
new_content = content
for old_ip, new_ip, new_cidr, old_gw, new_gw in replacements:
new_content = generate_network_interfaces(
new_content, old_ip, new_ip, new_cidr, new_gw, old_gw
)
return new_content
def generate_hosts(content: str, ip_mapping: dict[str, str]) -> str:
"""Update /etc/hosts with new IPs."""
new_content = content
for old_ip, new_ip in ip_mapping.items():
new_content = new_content.replace(old_ip, new_ip)
return new_content
def cidr_to_netmask(cidr: int) -> str:
"""Convert CIDR prefix length to netmask string."""
bits = (0xFFFFFFFF << (32 - cidr)) & 0xFFFFFFFF
return f"{(bits >> 24) & 0xFF}.{(bits >> 16) & 0xFF}.{(bits >> 8) & 0xFF}.{bits & 0xFF}"
def netmask_to_cidr(netmask: str) -> int:
"""Convert netmask string to CIDR prefix length."""
parts = netmask.split('.')
binary = ''.join(f'{int(p):08b}' for p in parts)
return binary.count('1')