"""Parsers for Proxmox configuration files (Corosync, Ceph, /etc/network/interfaces).""" import re from models import ( CorosyncConfig, CorosyncNode, CephConfig, NetworkInterface, ) def parse_corosync_conf(content: str) -> CorosyncConfig: """Parse corosync.conf and extract node information.""" config = CorosyncConfig(raw_content=content) # Extract config_version m = re.search(r'config_version:\s*(\d+)', content) if m: config.config_version = int(m.group(1)) # Extract cluster_name m = re.search(r'cluster_name:\s*(\S+)', content) if m: config.cluster_name = m.group(1) # Extract transport m = re.search(r'transport:\s*(\S+)', content) if m: config.transport = m.group(1) # Extract nodes from nodelist section nodelist_match = re.search(r'nodelist\s*\{(.*?)\n\}', content, re.DOTALL) if nodelist_match: nodelist_content = nodelist_match.group(1) # Find all node blocks node_blocks = re.findall(r'node\s*\{(.*?)\}', nodelist_content, re.DOTALL) for block in node_blocks: node = CorosyncNode(nodeid=0, name="", ring0_addr="") m = re.search(r'nodeid:\s*(\d+)', block) if m: node.nodeid = int(m.group(1)) m = re.search(r'name:\s*(\S+)', block) if m: node.name = m.group(1) m = re.search(r'ring0_addr:\s*(\S+)', block) if m: node.ring0_addr = m.group(1) m = re.search(r'ring1_addr:\s*(\S+)', block) if m: node.ring1_addr = m.group(1) config.nodes.append(node) return config def generate_corosync_conf(config: CorosyncConfig, ip_mapping: dict[str, str]) -> str: """Generate new corosync.conf with updated IP addresses. ip_mapping: old_ip -> new_ip """ new_content = config.raw_content for old_ip, new_ip in ip_mapping.items(): new_content = new_content.replace(old_ip, new_ip) # Increment config_version m = re.search(r'config_version:\s*(\d+)', new_content) if m: old_version = int(m.group(1)) new_content = new_content.replace( f'config_version: {old_version}', f'config_version: {old_version + 1}' ) return new_content def parse_ceph_conf(content: str) -> CephConfig: """Parse ceph.conf (INI-like format).""" config = CephConfig(raw_content=content) # Extract fsid m = re.search(r'fsid\s*=\s*(\S+)', content) if m: config.fsid = m.group(1) # Extract public_network m = re.search(r'public.network\s*=\s*(\S+)', content) if m: config.public_network = m.group(1) # Extract cluster_network m = re.search(r'cluster.network\s*=\s*(\S+)', content) if m: config.cluster_network = m.group(1) # Extract mon_host (can be comma-separated, space-separated, or both) m = re.search(r'mon.host\s*=\s*(.+)', content) if m: hosts_str = m.group(1).strip() # Split by comma or whitespace config.mon_hosts = [ h.strip() for h in re.split(r'[,\s]+', hosts_str) if h.strip() ] # Extract [mon.X] sections mon_sections = re.findall( r'\[(mon\.[\w.-]+)\]\s*\n((?:\s+\w.*\n)*)', content ) for section_name, section_body in mon_sections: props = {} for line in section_body.strip().split('\n'): line = line.strip() if '=' in line: key, val = line.split('=', 1) props[key.strip()] = val.strip() config.mon_sections[section_name] = props return config def generate_ceph_conf(config: CephConfig, ip_mapping: dict[str, str], new_public_network: str, new_cluster_network: str) -> str: """Generate new ceph.conf with updated IPs and networks.""" new_content = config.raw_content # Replace network definitions if config.public_network: new_content = new_content.replace( config.public_network, new_public_network, 1 ) if config.cluster_network: new_content = new_content.replace( config.cluster_network, new_cluster_network, 1 ) # Replace all IPs in the config for old_ip, new_ip in ip_mapping.items(): new_content = new_content.replace(old_ip, new_ip) return new_content def parse_network_interfaces(content: str) -> list[NetworkInterface]: """Parse /etc/network/interfaces and extract interface configs.""" interfaces = [] current_iface = None current_lines = [] for line in content.split('\n'): stripped = line.strip() # New iface block m = re.match(r'iface\s+(\S+)\s+inet\s+(\S+)', stripped) if m: # Save previous if current_iface: interfaces.append(_build_interface(current_iface, current_lines)) current_iface = m.group(1) current_lines = [line] continue # Auto line or source line starts a new context if stripped.startswith('auto ') or stripped.startswith('source '): if current_iface: interfaces.append(_build_interface(current_iface, current_lines)) current_iface = None current_lines = [] continue if current_iface and stripped: current_lines.append(line) # Don't forget the last one if current_iface: interfaces.append(_build_interface(current_iface, current_lines)) return interfaces def _build_interface(name: str, lines: list[str]) -> NetworkInterface: """Build a NetworkInterface from parsed lines.""" raw = '\n'.join(lines) address = "" netmask = "" cidr = 0 gateway = None bridge_ports = None for line in lines: stripped = line.strip() # address with CIDR notation: address 192.168.0.1/24 m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', stripped) if m: address = m.group(1) cidr = int(m.group(2)) netmask = cidr_to_netmask(cidr) continue # address without CIDR m = re.match(r'address\s+(\d+\.\d+\.\d+\.\d+)', stripped) if m: address = m.group(1) continue m = re.match(r'netmask\s+(\S+)', stripped) if m: netmask = m.group(1) cidr = netmask_to_cidr(netmask) continue m = re.match(r'gateway\s+(\S+)', stripped) if m: gateway = m.group(1) continue m = re.match(r'bridge[_-]ports\s+(\S+)', stripped) if m: bridge_ports = m.group(1) continue return NetworkInterface( name=name, address=address, netmask=netmask, cidr=cidr, gateway=gateway, bridge_ports=bridge_ports, raw_config=raw, ) def generate_network_interfaces(content: str, old_ip: str, new_ip: str, new_cidr: int, new_gateway: str | None = None, old_gateway: str | None = None) -> str: """Update /etc/network/interfaces with new IP, keeping everything else.""" new_content = content # Replace IP in address lines (with and without CIDR) # address 192.168.0.101/24 -> address 172.0.2.101/16 new_content = re.sub( rf'(address\s+){re.escape(old_ip)}/\d+', rf'\g<1>{new_ip}/{new_cidr}', new_content ) # address 192.168.0.101 (without CIDR) new_content = re.sub( rf'(address\s+){re.escape(old_ip)}(\s)', rf'\g<1>{new_ip}\2', new_content ) # Replace gateway if provided if new_gateway and old_gateway: new_content = new_content.replace( f'gateway {old_gateway}', f'gateway {new_gateway}' ) return new_content def generate_network_interfaces_multi( content: str, replacements: list[tuple[str, str, int, str | None, str | None]] ) -> str: """Update /etc/network/interfaces with multiple IP replacements. Each replacement is: (old_ip, new_ip, new_cidr, old_gateway, new_gateway) This handles multiple bridges/NICs (management, ceph public, ceph cluster). """ new_content = content for old_ip, new_ip, new_cidr, old_gw, new_gw in replacements: new_content = generate_network_interfaces( new_content, old_ip, new_ip, new_cidr, new_gw, old_gw ) return new_content def generate_hosts(content: str, ip_mapping: dict[str, str]) -> str: """Update /etc/hosts with new IPs.""" new_content = content for old_ip, new_ip in ip_mapping.items(): new_content = new_content.replace(old_ip, new_ip) return new_content def cidr_to_netmask(cidr: int) -> str: """Convert CIDR prefix length to netmask string.""" bits = (0xFFFFFFFF << (32 - cidr)) & 0xFFFFFFFF return f"{(bits >> 24) & 0xFF}.{(bits >> 16) & 0xFF}.{(bits >> 8) & 0xFF}.{bits & 0xFF}" def netmask_to_cidr(netmask: str) -> int: """Convert netmask string to CIDR prefix length.""" parts = netmask.split('.') binary = ''.join(f'{int(p):08b}' for p in parts) return binary.count('1')