proxmox-cluster-network-cha.../migrator.py

512 lines
19 KiB
Python

"""Phase 4: Execute the network migration."""
import time
from models import MigrationPlan
from ssh_manager import SSHManager
class Migrator:
"""Executes the actual network migration across all nodes."""
def __init__(self, ssh: SSHManager):
self.ssh = ssh
def run(self, plan: MigrationPlan, configs: dict, dry_run: bool = False) -> bool:
"""Execute the migration.
Args:
plan: The migration plan
configs: Generated configs from Planner.generate_new_configs()
dry_run: If True, only show what would be done
"""
print("\n=== Phase 4: Migration ===\n")
if dry_run:
print(" *** DRY RUN - Es werden keine Änderungen vorgenommen ***\n")
ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip}
reachable_nodes = [n for n in plan.nodes if n.is_reachable]
if not reachable_nodes:
print(" FEHLER: Keine Nodes erreichbar!")
return False
# Step 1: Write new configs to all nodes (but don't activate yet)
print("[1/8] Neue Konfigurationen verteilen...")
if not self._distribute_configs(plan, configs, dry_run):
return False
# Step 2: Preserve SSH keys before stopping pve-cluster
# /etc/pve/priv/authorized_keys gets unmounted when pve-cluster stops!
print("\n[2/8] SSH-Keys sichern (werden nach pve-cluster stop benötigt)...")
if not self._preserve_ssh_keys(reachable_nodes, dry_run):
return False
# Step 3: Stop Corosync on all nodes
print("\n[3/8] Corosync stoppen auf allen Nodes...")
if not self._stop_corosync(reachable_nodes, dry_run):
return False
# Step 4: Stop pve-cluster (pmxcfs) to release corosync.conf
print("\n[4/8] pve-cluster stoppen...")
if not self._stop_pve_cluster(reachable_nodes, dry_run):
return False
# Step 5: Write corosync config directly
print("\n[5/8] Corosync-Konfiguration aktualisieren...")
if not self._update_corosync(reachable_nodes, configs, dry_run):
return False
# Step 6: Update /etc/hosts on all nodes
print("\n[6/8] /etc/hosts aktualisieren...")
if not self._update_hosts(plan, configs, dry_run):
return False
# Step 7: Update network interfaces and restart networking
print("\n[7/8] Netzwerk-Interfaces aktualisieren und Netzwerk neu starten...")
if not self._update_network(plan, configs, dry_run):
return False
# Step 8: Start services back up
print("\n[8/8] Services starten...")
if not self._start_services(plan, configs, dry_run):
return False
return True
def _distribute_configs(self, plan: MigrationPlan, configs: dict,
dry_run: bool) -> bool:
"""Write prepared configs as staged files (not yet active)."""
for node in plan.nodes:
if not node.is_reachable or node.name not in configs['nodes']:
continue
node_configs = configs['nodes'][node.name]
staging_dir = "/root/.network-migration-staged"
if dry_run:
print(f" [{node.name}] Würde Configs nach {staging_dir}/ schreiben")
continue
# Create staging directory
self.ssh.run_on_node(
node.ssh_host, f"mkdir -p {staging_dir}", node.is_local
)
# Stage network interfaces
ok, msg = self.ssh.write_node_file(
node.ssh_host,
f"{staging_dir}/interfaces",
node_configs['interfaces'],
node.is_local,
)
if ok:
print(f" [{node.name}] interfaces staged")
else:
print(f" [{node.name}] FEHLER interfaces: {msg}")
return False
# Stage hosts
ok, msg = self.ssh.write_node_file(
node.ssh_host,
f"{staging_dir}/hosts",
node_configs['hosts'],
node.is_local,
)
if ok:
print(f" [{node.name}] hosts staged")
else:
print(f" [{node.name}] FEHLER hosts: {msg}")
return False
# Stage corosync config
if configs['corosync']:
for node in plan.nodes:
if not node.is_reachable:
continue
staging_dir = "/root/.network-migration-staged"
if dry_run:
print(f" [{node.name}] Würde corosync.conf stagen")
continue
ok, msg = self.ssh.write_node_file(
node.ssh_host,
f"{staging_dir}/corosync.conf",
configs['corosync'],
node.is_local,
)
if ok:
print(f" [{node.name}] corosync.conf staged")
else:
print(f" [{node.name}] FEHLER corosync.conf: {msg}")
return False
# Stage ceph config
if configs['ceph']:
for node in plan.nodes:
if not node.is_reachable:
continue
staging_dir = "/root/.network-migration-staged"
if dry_run:
print(f" [{node.name}] Würde ceph.conf stagen")
continue
ok, msg = self.ssh.write_node_file(
node.ssh_host,
f"{staging_dir}/ceph.conf",
configs['ceph'],
node.is_local,
)
if ok:
print(f" [{node.name}] ceph.conf staged")
else:
print(f" [{node.name}] FEHLER ceph.conf: {msg}")
return False
return True
def _preserve_ssh_keys(self, nodes: list, dry_run: bool) -> bool:
"""Copy /etc/pve/priv/authorized_keys to ~/.ssh/ on all nodes.
When pve-cluster (pmxcfs) is stopped, /etc/pve gets unmounted and
the cluster SSH keys disappear. This breaks SSH between nodes.
We temporarily copy them to ~/.ssh/authorized_keys so SSH keeps working.
"""
for node in nodes:
if dry_run:
print(f" [{node.name}] Würde SSH-Keys sichern")
continue
# Append pve keys to ~/.ssh/authorized_keys (avoid duplicates)
cmd = (
"if [ -f /etc/pve/priv/authorized_keys ]; then "
" mkdir -p /root/.ssh && "
" cp /root/.ssh/authorized_keys /root/.ssh/authorized_keys.pre_migration 2>/dev/null; "
" cat /etc/pve/priv/authorized_keys >> /root/.ssh/authorized_keys && "
" sort -u -o /root/.ssh/authorized_keys /root/.ssh/authorized_keys && "
" chmod 600 /root/.ssh/authorized_keys && "
" echo ok; "
"else "
" echo no_pve_keys; "
"fi"
)
rc, stdout, err = self.ssh.run_on_node(
node.ssh_host, cmd, node.is_local
)
if rc == 0 and "ok" in stdout:
print(f" [{node.name}] SSH-Keys gesichert")
elif "no_pve_keys" in stdout:
print(f" [{node.name}] Keine PVE-Keys gefunden (übersprungen)")
else:
print(f" [{node.name}] WARNUNG SSH-Keys: {err}")
return True
def _restore_ssh_keys(self, nodes: list):
"""Restore original ~/.ssh/authorized_keys after migration."""
for node in nodes:
new_host = node.new_ip if not node.is_local else node.ssh_host
cmd = (
"if [ -f /root/.ssh/authorized_keys.pre_migration ]; then "
" mv /root/.ssh/authorized_keys.pre_migration /root/.ssh/authorized_keys && "
" echo restored; "
"else "
" echo no_backup; "
"fi"
)
self.ssh.run_on_node(new_host, cmd, node.is_local)
def _stop_corosync(self, nodes: list, dry_run: bool) -> bool:
"""Stop corosync on all nodes."""
for node in nodes:
if dry_run:
print(f" [{node.name}] Würde corosync stoppen")
continue
rc, _, err = self.ssh.run_on_node(
node.ssh_host, "systemctl stop corosync", node.is_local
)
if rc == 0:
print(f" [{node.name}] corosync gestoppt")
else:
print(f" [{node.name}] WARNUNG beim Stoppen: {err}")
return True
def _stop_pve_cluster(self, nodes: list, dry_run: bool) -> bool:
"""Stop pve-cluster service to unmount /etc/pve."""
for node in nodes:
if dry_run:
print(f" [{node.name}] Würde pve-cluster stoppen")
continue
rc, _, err = self.ssh.run_on_node(
node.ssh_host, "systemctl stop pve-cluster", node.is_local
)
if rc == 0:
print(f" [{node.name}] pve-cluster gestoppt")
else:
print(f" [{node.name}] WARNUNG: {err}")
return True
def _update_corosync(self, nodes: list, configs: dict,
dry_run: bool) -> bool:
"""Write new corosync.conf directly to /etc/corosync/."""
if not configs['corosync']:
print(" Keine Corosync-Änderungen")
return True
for node in nodes:
if dry_run:
print(f" [{node.name}] Würde /etc/corosync/corosync.conf schreiben")
continue
staging = "/root/.network-migration-staged/corosync.conf"
rc, _, err = self.ssh.run_on_node(
node.ssh_host,
f"cp {staging} /etc/corosync/corosync.conf",
node.is_local,
)
if rc == 0:
print(f" [{node.name}] corosync.conf aktualisiert")
else:
print(f" [{node.name}] FEHLER: {err}")
return False
return True
def _update_hosts(self, plan: MigrationPlan, configs: dict,
dry_run: bool) -> bool:
"""Update /etc/hosts on all nodes."""
for node in plan.nodes:
if not node.is_reachable or node.name not in configs['nodes']:
continue
if dry_run:
print(f" [{node.name}] Würde /etc/hosts aktualisieren")
continue
staging = "/root/.network-migration-staged/hosts"
rc, _, err = self.ssh.run_on_node(
node.ssh_host,
f"cp {staging} /etc/hosts",
node.is_local,
)
if rc == 0:
print(f" [{node.name}] /etc/hosts aktualisiert")
else:
print(f" [{node.name}] FEHLER: {err}")
return False
return True
def _update_network(self, plan: MigrationPlan, configs: dict,
dry_run: bool) -> bool:
"""Update /etc/network/interfaces and restart networking."""
for node in plan.nodes:
if not node.is_reachable or node.name not in configs['nodes']:
continue
if dry_run:
print(f" [{node.name}] Würde /etc/network/interfaces aktualisieren")
print(f" [{node.name}] Würde 'ifreload -a' ausführen")
continue
staging = "/root/.network-migration-staged/interfaces"
rc, _, err = self.ssh.run_on_node(
node.ssh_host,
f"cp {staging} /etc/network/interfaces",
node.is_local,
)
if rc == 0:
print(f" [{node.name}] /etc/network/interfaces aktualisiert")
else:
print(f" [{node.name}] FEHLER: {err}")
return False
# Reload network - ifreload -a reloads ALL interfaces
rc, _, _ = self.ssh.run_on_node(
node.ssh_host, "which ifreload", node.is_local
)
if rc == 0:
reload_cmd = "ifreload -a"
else:
# Fallback: restart networking service
reload_cmd = "systemctl restart networking"
print(f" [{node.name}] Netzwerk wird neu geladen ({reload_cmd})...")
rc, _, err = self.ssh.run_on_node(
node.ssh_host, reload_cmd, node.is_local, timeout=60
)
if rc == 0:
print(f" [{node.name}] Netzwerk neu geladen")
else:
print(f" [{node.name}] WARNUNG beim Netzwerk-Reload: {err}")
# Don't fail here - the node might just be unreachable on old IP now
return True
def _start_services(self, plan: MigrationPlan, configs: dict,
dry_run: bool) -> bool:
"""Start pve-cluster and corosync, then handle Ceph."""
# Now we need to reach nodes on their NEW IPs
for node in plan.nodes:
if not node.is_reachable:
continue
new_host = node.new_ip if not node.is_local else node.ssh_host
is_local = node.is_local
# Start pve-cluster
if dry_run:
print(f" [{node.name}] Würde pve-cluster starten")
print(f" [{node.name}] Würde corosync starten")
continue
print(f" [{node.name}] Starte pve-cluster...")
rc, _, err = self.ssh.run_on_node(
new_host, "systemctl start pve-cluster", is_local, timeout=30
)
if rc == 0:
print(f" [{node.name}] pve-cluster gestartet")
else:
print(f" [{node.name}] WARNUNG pve-cluster: {err}")
print(f" [{node.name}] Starte corosync...")
rc, _, err = self.ssh.run_on_node(
new_host, "systemctl start corosync", is_local, timeout=30
)
if rc == 0:
print(f" [{node.name}] corosync gestartet")
else:
print(f" [{node.name}] WARNUNG corosync: {err}")
if dry_run:
print("\n Würde auf Quorum warten...")
return True
# Wait for quorum
print("\n Warte auf Quorum...")
if not self._wait_for_quorum(timeout=60):
print(" [!] Quorum nicht erreicht! Versuche 'pvecm expected 1'...")
rc, _, _ = self.ssh.execute_local("pvecm expected 1")
if rc == 0:
print(" Quorum erzwungen mit 'pvecm expected 1'")
time.sleep(5)
else:
print(" [!] Konnte Quorum nicht erzwingen!")
# Update Ceph config via cluster FS if possible
if configs.get('ceph'):
self._update_ceph(plan, configs)
# Restore original SSH keys (pve-cluster manages them again now)
print("\n SSH-Keys wiederherstellen...")
self._restore_ssh_keys(plan.nodes)
# Cleanup staging directories
print("\n Staging-Verzeichnisse aufräumen...")
for node in plan.nodes:
if not node.is_reachable:
continue
new_host = node.new_ip if not node.is_local else node.ssh_host
self.ssh.run_on_node(
new_host,
"rm -rf /root/.network-migration-staged",
node.is_local,
)
return True
def _wait_for_quorum(self, timeout: int = 60) -> bool:
"""Wait for cluster quorum to be established."""
start = time.time()
while time.time() - start < timeout:
rc, stdout, _ = self.ssh.execute_local("pvecm status 2>/dev/null")
if rc == 0 and "Quorate: Yes" in stdout:
print(" Quorum erreicht!")
return True
print(" ... warte auf Quorum ...")
time.sleep(5)
return False
def _update_ceph(self, plan: MigrationPlan, configs: dict):
"""Update Ceph configuration after quorum is available."""
print("\n [Ceph] Konfiguration aktualisieren...")
# Try to write via /etc/pve/ceph.conf first
rc, _, _ = self.ssh.execute_local(
"touch /etc/pve/.ceph_test && rm -f /etc/pve/.ceph_test"
)
if rc == 0:
# /etc/pve is writable - use cluster filesystem
ok, msg = self.ssh.write_local_file("/etc/pve/ceph.conf", configs['ceph'])
if ok:
print(" [Ceph] /etc/pve/ceph.conf aktualisiert (via Cluster-FS)")
else:
print(f" [Ceph] FEHLER /etc/pve/ceph.conf: {msg}")
self._update_ceph_direct(plan, configs)
else:
# /etc/pve not writable - write directly on each node
print(" [Ceph] /etc/pve nicht beschreibbar, schreibe direkt...")
self._update_ceph_direct(plan, configs)
# Restart Ceph services
print(" [Ceph] Services neu starten...")
for node in plan.nodes:
if not node.is_reachable:
continue
new_host = node.new_ip if not node.is_local else node.ssh_host
# Restart MON
self.ssh.run_on_node(
new_host,
f"systemctl restart ceph-mon@{node.name} 2>/dev/null",
node.is_local, timeout=30,
)
# Restart MGR
self.ssh.run_on_node(
new_host,
f"systemctl restart ceph-mgr@{node.name} 2>/dev/null",
node.is_local, timeout=30,
)
# Restart all OSDs on this node
self.ssh.run_on_node(
new_host,
"systemctl restart ceph-osd.target 2>/dev/null",
node.is_local, timeout=60,
)
print(f" [{node.name}] Ceph-Services neu gestartet")
def _update_ceph_direct(self, plan: MigrationPlan, configs: dict):
"""Write ceph.conf directly on each node (fallback when no quorum)."""
for node in plan.nodes:
if not node.is_reachable:
continue
new_host = node.new_ip if not node.is_local else node.ssh_host
ok, msg = self.ssh.write_node_file(
new_host, "/etc/ceph/ceph.conf",
configs['ceph'], node.is_local,
)
if ok:
print(f" [{node.name}] /etc/ceph/ceph.conf direkt geschrieben")
else:
print(f" [{node.name}] FEHLER /etc/ceph/ceph.conf: {msg}")
def _update_ceph_mon_map(self, plan: MigrationPlan):
"""Update Ceph MON map with new addresses.
This is needed when MON IPs change.
"""
ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip}
for node in plan.nodes:
if not node.is_reachable:
continue
new_host = node.new_ip if not node.is_local else node.ssh_host
new_ip = node.new_ip
# Extract monmap, modify, and reinject
cmds = [
f"ceph-mon -i {node.name} --extract-monmap /tmp/monmap",
# Remove old entries and add new ones
]
# This is complex - for now we rely on the ceph.conf update
# and let Ceph handle the MON map update on restart
print(f" [{node.name}] MON-Map wird beim Neustart aktualisiert")