proxmox-cluster-network-cha.../models.py

81 lines
2.5 KiB
Python

"""Data models for the Proxmox Cluster Network Changer."""
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class NetworkInterface:
"""Represents a network interface configuration."""
name: str # e.g. vmbr0
address: str # e.g. 192.168.0.101
netmask: str # e.g. 255.255.255.0
cidr: int # e.g. 24
gateway: Optional[str] = None
bridge_ports: Optional[str] = None
raw_config: str = ""
@dataclass
class NodeInfo:
"""Represents a single Proxmox node."""
name: str # e.g. pve1
current_ip: str # current corosync/management IP
new_ip: Optional[str] = None # planned new management IP
ssh_host: Optional[str] = None # how to reach it (IP or hostname)
is_local: bool = False # is this the node we're running on
is_reachable: bool = False
interfaces: list[NetworkInterface] = field(default_factory=list)
hosts_content: str = ""
network_interfaces_content: str = ""
# Extra IP mappings for this node (e.g. ceph public/cluster on separate NICs)
# {old_ip: new_ip}
extra_ip_mapping: dict[str, str] = field(default_factory=dict)
@dataclass
class CorosyncNode:
"""A node entry in corosync.conf."""
nodeid: int
name: str
ring0_addr: str
ring1_addr: Optional[str] = None
@dataclass
class CorosyncConfig:
"""Parsed corosync configuration."""
nodes: list[CorosyncNode] = field(default_factory=list)
config_version: int = 1
cluster_name: str = ""
transport: str = "knet"
raw_content: str = ""
@dataclass
class CephConfig:
"""Parsed Ceph configuration."""
fsid: str = ""
public_network: str = "" # e.g. 192.168.0.0/24
cluster_network: str = "" # e.g. 192.168.0.0/24
mon_hosts: list[str] = field(default_factory=list)
mon_sections: dict[str, dict[str, str]] = field(default_factory=dict)
raw_content: str = ""
@dataclass
class MigrationPlan:
"""Complete migration plan with old -> new mappings."""
nodes: list[NodeInfo] = field(default_factory=list)
old_network: str = "" # e.g. 192.168.0.0/24
new_network: str = "" # e.g. 172.0.2.0/16
new_gateway: Optional[str] = None
ceph_new_public_network: str = ""
ceph_new_cluster_network: str = ""
corosync_config: Optional[CorosyncConfig] = None
ceph_config: Optional[CephConfig] = None
dry_run: bool = False
quorum_available: bool = True
# Detected bridges: {bridge_name: subnet}
detected_bridges: dict[str, str] = field(default_factory=dict)