proxmox-cluster-network-cha.../planner.py

264 lines
9.9 KiB
Python

"""Phase 2: Plan the migration - IP mapping and config generation."""
import ipaddress
from models import NodeInfo, CorosyncConfig, CephConfig, MigrationPlan
from config_parser import (
generate_corosync_conf, generate_ceph_conf,
generate_network_interfaces, generate_hosts,
)
class Planner:
"""Plans the network migration with user input."""
def plan(self, nodes: list[NodeInfo], corosync: CorosyncConfig,
ceph: CephConfig | None, has_quorum: bool) -> MigrationPlan | None:
"""Interactive planning with the user."""
plan = MigrationPlan(
nodes=nodes,
corosync_config=corosync,
ceph_config=ceph,
quorum_available=has_quorum,
)
print("\n=== Phase 2: Migration planen ===\n")
# Get new network
plan.new_network = self._ask_new_network()
if not plan.new_network:
return None
new_net = ipaddress.ip_network(plan.new_network, strict=False)
plan.new_gateway = self._ask_gateway(new_net)
# Detect old network from first node
if nodes:
old_ip = ipaddress.ip_address(nodes[0].current_ip)
# Try to find matching interface
for node in nodes:
for iface in node.interfaces:
if iface.address == str(old_ip) or (
iface.address and iface.cidr and
ipaddress.ip_address(iface.address) in
ipaddress.ip_network(f'{iface.address}/{iface.cidr}', strict=False) and
old_ip in ipaddress.ip_network(f'{iface.address}/{iface.cidr}', strict=False)
):
plan.old_network = str(ipaddress.ip_network(
f'{iface.address}/{iface.cidr}', strict=False
))
plan.bridge_name = iface.name
break
if plan.old_network:
break
# Fallback: try to guess from corosync IPs
if not plan.old_network:
# Find common network from all corosync node IPs
for cidr_guess in [24, 16, 8]:
net = ipaddress.ip_network(
f'{nodes[0].current_ip}/{cidr_guess}', strict=False
)
if all(ipaddress.ip_address(n.current_ip) in net for n in nodes):
plan.old_network = str(net)
break
if plan.old_network:
print(f" Erkanntes altes Netzwerk: {plan.old_network}")
else:
print(" [!] Altes Netzwerk konnte nicht erkannt werden")
# Generate IP mapping suggestions
print("\n[IP-Mapping]")
print("Für jeden Node wird eine neue IP benötigt.\n")
for node in nodes:
suggested_ip = self._suggest_new_ip(node.current_ip, plan.new_network)
print(f" {node.name}: {node.current_ip} -> ", end="")
user_input = input(f"[{suggested_ip}]: ").strip()
if user_input:
node.new_ip = user_input
else:
node.new_ip = suggested_ip
print(f" => {node.new_ip}")
# Ceph network planning
if ceph:
print("\n[Ceph Netzwerke]")
print(f" Aktuelles Public Network: {ceph.public_network}")
print(f" Aktuelles Cluster Network: {ceph.cluster_network}")
default_ceph_net = plan.new_network
user_input = input(
f"\n Neues Ceph Public Network [{default_ceph_net}]: "
).strip()
plan.ceph_new_public_network = user_input or default_ceph_net
user_input = input(
f" Neues Ceph Cluster Network [{plan.ceph_new_public_network}]: "
).strip()
plan.ceph_new_cluster_network = user_input or plan.ceph_new_public_network
# Which bridge to modify
print(f"\n[Bridge]")
user_input = input(
f" Welche Bridge soll geändert werden? [{plan.bridge_name}]: "
).strip()
if user_input:
plan.bridge_name = user_input
# Show preview
self._show_preview(plan)
# Confirm
confirm = input("\nMigration durchführen? [j/N]: ").strip().lower()
if confirm not in ('j', 'ja', 'y', 'yes'):
print("Abgebrochen.")
return None
return plan
def _ask_new_network(self) -> str | None:
"""Ask for the new network."""
while True:
network = input("Neues Netzwerk (z.B. 172.0.2.0/16): ").strip()
if not network:
print("Abgebrochen.")
return None
try:
ipaddress.ip_network(network, strict=False)
return network
except ValueError as e:
print(f" Ungültiges Netzwerk: {e}")
def _ask_gateway(self, network: ipaddress.IPv4Network) -> str:
"""Ask for the gateway in the new network."""
# Suggest first usable IP as gateway
suggested = str(list(network.hosts())[0])
user_input = input(f"Neues Gateway [{suggested}]: ").strip()
return user_input or suggested
def _suggest_new_ip(self, old_ip: str, new_network: str) -> str:
"""Suggest a new IP by keeping the host part from the old IP."""
old = ipaddress.ip_address(old_ip)
new_net = ipaddress.ip_network(new_network, strict=False)
# Keep the last octet(s) from the old IP
old_host = int(old) & 0xFF # last octet
if new_net.prefixlen <= 16:
# For /16 or bigger, keep last two octets
old_host = int(old) & 0xFFFF
new_ip = ipaddress.ip_address(int(new_net.network_address) | old_host)
return str(new_ip)
def _show_preview(self, plan: MigrationPlan):
"""Show a preview of all planned changes."""
print("\n" + "=" * 60)
print(" MIGRATION PREVIEW")
print("=" * 60)
ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip}
print(f"\n Netzwerk: {plan.old_network} -> {plan.new_network}")
print(f" Gateway: {plan.new_gateway}")
print(f" Bridge: {plan.bridge_name}")
print(f" Quorum verfügbar: {'Ja' if plan.quorum_available else 'NEIN'}")
print("\n [Node IP-Mapping]")
for node in plan.nodes:
status = "erreichbar" if node.is_reachable else "NICHT ERREICHBAR"
print(f" {node.name}: {node.current_ip} -> {node.new_ip} ({status})")
if plan.ceph_config:
print("\n [Ceph Netzwerke]")
print(f" Public: {plan.ceph_config.public_network} -> {plan.ceph_new_public_network}")
print(f" Cluster: {plan.ceph_config.cluster_network} -> {plan.ceph_new_cluster_network}")
if plan.ceph_config.mon_hosts:
print(f" MON Hosts: {', '.join(plan.ceph_config.mon_hosts)}")
new_mons = [ip_mapping.get(h, h) for h in plan.ceph_config.mon_hosts]
print(f" -> {', '.join(new_mons)}")
print("\n [Dateien die geändert werden]")
print(" - /etc/network/interfaces (auf jedem Node)")
print(" - /etc/hosts (auf jedem Node)")
print(" - /etc/corosync/corosync.conf (auf jedem Node)")
if plan.ceph_config:
if plan.quorum_available:
print(" - /etc/pve/ceph.conf (über Cluster-FS)")
else:
print(" - /etc/ceph/ceph.conf (direkt, da kein Quorum)")
if not plan.quorum_available:
print("\n [!] WARNUNG: Kein Quorum verfügbar!")
print(" Es wird 'pvecm expected 1' verwendet um Quorum zu erzwingen.")
print(" Ceph-Config wird direkt auf jedem Node geschrieben.")
print("\n" + "=" * 60)
def generate_new_configs(self, plan: MigrationPlan) -> dict:
"""Generate all new configuration file contents.
Returns dict with:
'corosync': new corosync.conf content
'ceph': new ceph.conf content (or None)
'nodes': {node_name: {'interfaces': content, 'hosts': content}}
"""
ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip}
configs = {
'corosync': None,
'ceph': None,
'nodes': {},
}
# Generate new corosync.conf
if plan.corosync_config:
configs['corosync'] = generate_corosync_conf(
plan.corosync_config, ip_mapping
)
# Generate new ceph.conf
if plan.ceph_config:
configs['ceph'] = generate_ceph_conf(
plan.ceph_config, ip_mapping,
plan.ceph_new_public_network,
plan.ceph_new_cluster_network,
)
# Generate per-node configs
new_cidr = ipaddress.ip_network(plan.new_network, strict=False).prefixlen
# Detect old gateway from first reachable node
old_gateway = None
for node in plan.nodes:
for iface in node.interfaces:
if iface.name == plan.bridge_name and iface.gateway:
old_gateway = iface.gateway
break
if old_gateway:
break
for node in plan.nodes:
if not node.new_ip or not node.network_interfaces_content:
continue
node_configs = {}
# Network interfaces
node_configs['interfaces'] = generate_network_interfaces(
node.network_interfaces_content,
node.current_ip, node.new_ip,
new_cidr, plan.new_gateway, old_gateway,
)
# /etc/hosts
node_configs['hosts'] = generate_hosts(
node.hosts_content, ip_mapping
)
configs['nodes'][node.name] = node_configs
return configs