272 lines
8.3 KiB
Python
272 lines
8.3 KiB
Python
"""Parsers for Proxmox configuration files (Corosync, Ceph, /etc/network/interfaces)."""
|
|
|
|
import re
|
|
from models import (
|
|
CorosyncConfig, CorosyncNode, CephConfig, NetworkInterface,
|
|
)
|
|
|
|
|
|
def parse_corosync_conf(content: str) -> CorosyncConfig:
|
|
"""Parse corosync.conf and extract node information."""
|
|
config = CorosyncConfig(raw_content=content)
|
|
|
|
# Extract config_version
|
|
m = re.search(r'config_version:\s*(\d+)', content)
|
|
if m:
|
|
config.config_version = int(m.group(1))
|
|
|
|
# Extract cluster_name
|
|
m = re.search(r'cluster_name:\s*(\S+)', content)
|
|
if m:
|
|
config.cluster_name = m.group(1)
|
|
|
|
# Extract transport
|
|
m = re.search(r'transport:\s*(\S+)', content)
|
|
if m:
|
|
config.transport = m.group(1)
|
|
|
|
# Extract nodes from nodelist section
|
|
nodelist_match = re.search(r'nodelist\s*\{(.*?)\n\}', content, re.DOTALL)
|
|
if nodelist_match:
|
|
nodelist_content = nodelist_match.group(1)
|
|
# Find all node blocks
|
|
node_blocks = re.findall(r'node\s*\{(.*?)\}', nodelist_content, re.DOTALL)
|
|
for block in node_blocks:
|
|
node = CorosyncNode(nodeid=0, name="", ring0_addr="")
|
|
m = re.search(r'nodeid:\s*(\d+)', block)
|
|
if m:
|
|
node.nodeid = int(m.group(1))
|
|
m = re.search(r'name:\s*(\S+)', block)
|
|
if m:
|
|
node.name = m.group(1)
|
|
m = re.search(r'ring0_addr:\s*(\S+)', block)
|
|
if m:
|
|
node.ring0_addr = m.group(1)
|
|
m = re.search(r'ring1_addr:\s*(\S+)', block)
|
|
if m:
|
|
node.ring1_addr = m.group(1)
|
|
config.nodes.append(node)
|
|
|
|
return config
|
|
|
|
|
|
def generate_corosync_conf(config: CorosyncConfig, ip_mapping: dict[str, str]) -> str:
|
|
"""Generate new corosync.conf with updated IP addresses.
|
|
|
|
ip_mapping: old_ip -> new_ip
|
|
"""
|
|
new_content = config.raw_content
|
|
|
|
for old_ip, new_ip in ip_mapping.items():
|
|
new_content = new_content.replace(old_ip, new_ip)
|
|
|
|
# Increment config_version
|
|
m = re.search(r'config_version:\s*(\d+)', new_content)
|
|
if m:
|
|
old_version = int(m.group(1))
|
|
new_content = new_content.replace(
|
|
f'config_version: {old_version}',
|
|
f'config_version: {old_version + 1}'
|
|
)
|
|
|
|
return new_content
|
|
|
|
|
|
def parse_ceph_conf(content: str) -> CephConfig:
|
|
"""Parse ceph.conf (INI-like format)."""
|
|
config = CephConfig(raw_content=content)
|
|
|
|
# Extract fsid
|
|
m = re.search(r'fsid\s*=\s*(\S+)', content)
|
|
if m:
|
|
config.fsid = m.group(1)
|
|
|
|
# Extract public_network
|
|
m = re.search(r'public.network\s*=\s*(\S+)', content)
|
|
if m:
|
|
config.public_network = m.group(1)
|
|
|
|
# Extract cluster_network
|
|
m = re.search(r'cluster.network\s*=\s*(\S+)', content)
|
|
if m:
|
|
config.cluster_network = m.group(1)
|
|
|
|
# Extract mon_host
|
|
m = re.search(r'mon.host\s*=\s*(.+)', content)
|
|
if m:
|
|
hosts_str = m.group(1).strip()
|
|
config.mon_hosts = [h.strip() for h in hosts_str.split(',') if h.strip()]
|
|
|
|
# Extract [mon.X] sections
|
|
mon_sections = re.findall(
|
|
r'\[(mon\.[\w.-]+)\]\s*\n((?:\s+\w.*\n)*)', content
|
|
)
|
|
for section_name, section_body in mon_sections:
|
|
props = {}
|
|
for line in section_body.strip().split('\n'):
|
|
line = line.strip()
|
|
if '=' in line:
|
|
key, val = line.split('=', 1)
|
|
props[key.strip()] = val.strip()
|
|
config.mon_sections[section_name] = props
|
|
|
|
return config
|
|
|
|
|
|
def generate_ceph_conf(config: CephConfig, ip_mapping: dict[str, str],
|
|
new_public_network: str, new_cluster_network: str) -> str:
|
|
"""Generate new ceph.conf with updated IPs and networks."""
|
|
new_content = config.raw_content
|
|
|
|
# Replace network definitions
|
|
if config.public_network:
|
|
new_content = new_content.replace(
|
|
config.public_network, new_public_network, 1
|
|
)
|
|
if config.cluster_network:
|
|
new_content = new_content.replace(
|
|
config.cluster_network, new_cluster_network, 1
|
|
)
|
|
|
|
# Replace all IPs in the config
|
|
for old_ip, new_ip in ip_mapping.items():
|
|
new_content = new_content.replace(old_ip, new_ip)
|
|
|
|
return new_content
|
|
|
|
|
|
def parse_network_interfaces(content: str) -> list[NetworkInterface]:
|
|
"""Parse /etc/network/interfaces and extract interface configs."""
|
|
interfaces = []
|
|
current_iface = None
|
|
current_lines = []
|
|
|
|
for line in content.split('\n'):
|
|
stripped = line.strip()
|
|
|
|
# New iface block
|
|
m = re.match(r'iface\s+(\S+)\s+inet\s+(\S+)', stripped)
|
|
if m:
|
|
# Save previous
|
|
if current_iface:
|
|
interfaces.append(_build_interface(current_iface, current_lines))
|
|
current_iface = m.group(1)
|
|
current_lines = [line]
|
|
continue
|
|
|
|
# Auto line or source line starts a new context
|
|
if stripped.startswith('auto ') or stripped.startswith('source '):
|
|
if current_iface:
|
|
interfaces.append(_build_interface(current_iface, current_lines))
|
|
current_iface = None
|
|
current_lines = []
|
|
continue
|
|
|
|
if current_iface and stripped:
|
|
current_lines.append(line)
|
|
|
|
# Don't forget the last one
|
|
if current_iface:
|
|
interfaces.append(_build_interface(current_iface, current_lines))
|
|
|
|
return interfaces
|
|
|
|
|
|
def _build_interface(name: str, lines: list[str]) -> NetworkInterface:
|
|
"""Build a NetworkInterface from parsed lines."""
|
|
raw = '\n'.join(lines)
|
|
address = ""
|
|
netmask = ""
|
|
cidr = 0
|
|
gateway = None
|
|
bridge_ports = None
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
# address with CIDR notation: address 192.168.0.1/24
|
|
m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', stripped)
|
|
if m:
|
|
address = m.group(1)
|
|
cidr = int(m.group(2))
|
|
netmask = cidr_to_netmask(cidr)
|
|
continue
|
|
# address without CIDR
|
|
m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)', stripped)
|
|
if m:
|
|
address = m.group(1)
|
|
continue
|
|
m = re.match(r'netmask\s+(\S+)', stripped)
|
|
if m:
|
|
netmask = m.group(1)
|
|
cidr = netmask_to_cidr(netmask)
|
|
continue
|
|
m = re.match(r'gateway\s+(\S+)', stripped)
|
|
if m:
|
|
gateway = m.group(1)
|
|
continue
|
|
m = re.match(r'bridge[_-]ports\s+(\S+)', stripped)
|
|
if m:
|
|
bridge_ports = m.group(1)
|
|
continue
|
|
|
|
return NetworkInterface(
|
|
name=name,
|
|
address=address,
|
|
netmask=netmask,
|
|
cidr=cidr,
|
|
gateway=gateway,
|
|
bridge_ports=bridge_ports,
|
|
raw_config=raw,
|
|
)
|
|
|
|
|
|
def generate_network_interfaces(content: str, old_ip: str, new_ip: str,
|
|
new_cidr: int, new_gateway: str | None = None,
|
|
old_gateway: str | None = None) -> str:
|
|
"""Update /etc/network/interfaces with new IP, keeping everything else."""
|
|
new_content = content
|
|
|
|
# Replace IP in address lines (with and without CIDR)
|
|
# address 192.168.0.101/24 -> address 172.0.2.101/16
|
|
new_content = re.sub(
|
|
rf'(address\s+){re.escape(old_ip)}/\d+',
|
|
rf'\g<1>{new_ip}/{new_cidr}',
|
|
new_content
|
|
)
|
|
# address 192.168.0.101 (without CIDR)
|
|
new_content = re.sub(
|
|
rf'(address\s+){re.escape(old_ip)}(\s)',
|
|
rf'\g<1>{new_ip}\2',
|
|
new_content
|
|
)
|
|
|
|
# Replace gateway if provided
|
|
if new_gateway and old_gateway:
|
|
new_content = new_content.replace(
|
|
f'gateway {old_gateway}',
|
|
f'gateway {new_gateway}'
|
|
)
|
|
|
|
return new_content
|
|
|
|
|
|
def generate_hosts(content: str, ip_mapping: dict[str, str]) -> str:
|
|
"""Update /etc/hosts with new IPs."""
|
|
new_content = content
|
|
for old_ip, new_ip in ip_mapping.items():
|
|
new_content = new_content.replace(old_ip, new_ip)
|
|
return new_content
|
|
|
|
|
|
def cidr_to_netmask(cidr: int) -> str:
|
|
"""Convert CIDR prefix length to netmask string."""
|
|
bits = (0xFFFFFFFF << (32 - cidr)) & 0xFFFFFFFF
|
|
return f"{(bits >> 24) & 0xFF}.{(bits >> 16) & 0xFF}.{(bits >> 8) & 0xFF}.{bits & 0xFF}"
|
|
|
|
|
|
def netmask_to_cidr(netmask: str) -> int:
|
|
"""Convert netmask string to CIDR prefix length."""
|
|
parts = netmask.split('.')
|
|
binary = ''.join(f'{int(p):08b}' for p in parts)
|
|
return binary.count('1')
|