"""Phase 4: Execute the network migration.""" import time from models import MigrationPlan from ssh_manager import SSHManager class Migrator: """Executes the actual network migration across all nodes.""" def __init__(self, ssh: SSHManager): self.ssh = ssh def run(self, plan: MigrationPlan, configs: dict, dry_run: bool = False) -> bool: """Execute the migration. Args: plan: The migration plan configs: Generated configs from Planner.generate_new_configs() dry_run: If True, only show what would be done """ print("\n=== Phase 4: Migration ===\n") if dry_run: print(" *** DRY RUN - Es werden keine Änderungen vorgenommen ***\n") ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} reachable_nodes = [n for n in plan.nodes if n.is_reachable] if not reachable_nodes: print(" FEHLER: Keine Nodes erreichbar!") return False uses_password = self.ssh.uses_password step = 1 total = 7 if uses_password else 8 # Step 1: Write new configs to all nodes (but don't activate yet) print(f"[{step}/{total}] Neue Konfigurationen verteilen...") if not self._distribute_configs(plan, configs, dry_run): return False step += 1 # Step 2: Preserve SSH keys (only needed for key-based auth) if not uses_password: print(f"\n[{step}/{total}] SSH-Keys sichern (werden nach pve-cluster stop benötigt)...") if not self._preserve_ssh_keys(reachable_nodes, dry_run): return False step += 1 else: if not dry_run: print(f"\n SSH-Key-Sicherung übersprungen (Passwort-Auth aktiv)") # Stop Corosync on all nodes print(f"\n[{step}/{total}] Corosync stoppen auf allen Nodes...") if not self._stop_corosync(reachable_nodes, dry_run): return False step += 1 # Stop pve-cluster (pmxcfs) to release corosync.conf print(f"\n[{step}/{total}] pve-cluster stoppen...") if not self._stop_pve_cluster(reachable_nodes, dry_run): return False step += 1 # Write corosync config directly print(f"\n[{step}/{total}] Corosync-Konfiguration aktualisieren...") if not self._update_corosync(reachable_nodes, configs, dry_run): return False step += 1 # Update /etc/hosts on all nodes print(f"\n[{step}/{total}] /etc/hosts aktualisieren...") if not self._update_hosts(plan, configs, dry_run): return False step += 1 # Update network interfaces and restart networking print(f"\n[{step}/{total}] Netzwerk-Interfaces aktualisieren und Netzwerk neu starten...") if not self._update_network(plan, configs, dry_run): return False step += 1 # Start services back up print(f"\n[{step}/{total}] Services starten...") if not self._start_services(plan, configs, dry_run): return False return True def _distribute_configs(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Write prepared configs as staged files (not yet active).""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue node_configs = configs['nodes'][node.name] staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde Configs nach {staging_dir}/ schreiben") continue # Create staging directory self.ssh.run_on_node( node.ssh_host, f"mkdir -p {staging_dir}", node.is_local ) # Stage network interfaces ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/interfaces", node_configs['interfaces'], node.is_local, ) if ok: print(f" [{node.name}] interfaces staged") else: print(f" [{node.name}] FEHLER interfaces: {msg}") return False # Stage hosts ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/hosts", node_configs['hosts'], node.is_local, ) if ok: print(f" [{node.name}] hosts staged") else: print(f" [{node.name}] FEHLER hosts: {msg}") return False # Stage corosync config if configs['corosync']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde corosync.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/corosync.conf", configs['corosync'], node.is_local, ) if ok: print(f" [{node.name}] corosync.conf staged") else: print(f" [{node.name}] FEHLER corosync.conf: {msg}") return False # Stage ceph config if configs['ceph']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde ceph.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] ceph.conf staged") else: print(f" [{node.name}] FEHLER ceph.conf: {msg}") return False return True def _preserve_ssh_keys(self, nodes: list, dry_run: bool) -> bool: """Ensure SSH keeps working after pve-cluster stop. When pve-cluster (pmxcfs) is stopped, /etc/pve gets unmounted and the cluster SSH keys in /etc/pve/priv/authorized_keys disappear. This breaks SSH between nodes. Fix: Copy PVE keys to ~/.ssh/authorized_keys AND ensure sshd is configured to actually check that file (Proxmox may only check /etc/pve/). """ for node in nodes: if dry_run: print(f" [{node.name}] Würde SSH-Keys sichern") continue # Step 1: Copy PVE keys to ~/.ssh/authorized_keys copy_cmd = ( "mkdir -p /root/.ssh && " "cp /root/.ssh/authorized_keys /root/.ssh/authorized_keys.pre_migration 2>/dev/null; " "if [ -f /etc/pve/priv/authorized_keys ]; then " " cat /etc/pve/priv/authorized_keys >> /root/.ssh/authorized_keys && " " sort -u /root/.ssh/authorized_keys > /root/.ssh/authorized_keys.tmp && " " mv /root/.ssh/authorized_keys.tmp /root/.ssh/authorized_keys && " " chmod 600 /root/.ssh/authorized_keys && " " echo keys_copied; " "else " " echo no_pve_keys; " "fi" ) rc, stdout, err = self.ssh.run_on_node( node.ssh_host, copy_cmd, node.is_local ) if "keys_copied" in stdout: print(f" [{node.name}] PVE-Keys nach ~/.ssh/authorized_keys kopiert") elif "no_pve_keys" in stdout: print(f" [{node.name}] Keine PVE-Keys gefunden (übersprungen)") else: print(f" [{node.name}] WARNUNG Key-Kopie: rc={rc} {err}") # Step 2: Ensure sshd checks ~/.ssh/authorized_keys # Proxmox sshd_config may only list /etc/pve/priv/authorized_keys, # or use AuthorizedKeysCommand pointing to /etc/pve/priv/. # We need to ensure .ssh/authorized_keys is checked as fallback. sshd_cmd = ( "cp /etc/ssh/sshd_config /etc/ssh/sshd_config.pre_migration && " "NEED_RELOAD=0 && " # Handle AuthorizedKeysFile "if grep -q '^AuthorizedKeysFile' /etc/ssh/sshd_config; then " " if ! grep '^AuthorizedKeysFile' /etc/ssh/sshd_config | grep -q '.ssh/authorized_keys'; then " " sed -i '/^AuthorizedKeysFile/s|$| .ssh/authorized_keys|' /etc/ssh/sshd_config && " " NEED_RELOAD=1; " " fi; " "else " # No AuthorizedKeysFile line = uses default (.ssh/authorized_keys), which is fine. # But if AuthorizedKeysCommand is active, it might override. Add explicit line. " if grep -q '^AuthorizedKeysCommand ' /etc/ssh/sshd_config; then " " echo 'AuthorizedKeysFile .ssh/authorized_keys' >> /etc/ssh/sshd_config && " " NEED_RELOAD=1; " " fi; " "fi && " # Temporarily disable AuthorizedKeysCommand if it points to /etc/pve "if grep '^AuthorizedKeysCommand ' /etc/ssh/sshd_config | grep -q '/etc/pve'; then " " sed -i 's|^AuthorizedKeysCommand |#AuthorizedKeysCommand_DISABLED |' /etc/ssh/sshd_config && " " NEED_RELOAD=1; " "fi && " "if [ $NEED_RELOAD -eq 1 ]; then " " systemctl reload sshd && echo sshd_modified; " "else " " echo sshd_already_ok; " "fi" ) rc2, stdout2, err2 = self.ssh.run_on_node( node.ssh_host, sshd_cmd, node.is_local ) if "sshd_modified" in stdout2: print(f" [{node.name}] sshd_config angepasst (.ssh/authorized_keys hinzugefügt)") elif "sshd_already_ok" in stdout2: print(f" [{node.name}] sshd_config OK") else: print(f" [{node.name}] WARNUNG sshd: {err2}") # Step 3: Verify SSH will still work after pve-cluster stop # Test that ~/.ssh/authorized_keys is readable on all remote nodes print(" [Verifikation] Prüfe ob SSH-Keys korrekt gesichert sind...") for node in nodes: if node.is_local: continue rc, stdout, _ = self.ssh.run_on_node( node.ssh_host, "wc -l /root/.ssh/authorized_keys 2>/dev/null || echo 0", False, ) key_count = stdout.strip().split()[0] if stdout.strip() else "0" print(f" [{node.name}] authorized_keys: {key_count} Zeilen") return True def _restore_ssh_keys(self, nodes: list): """Restore original ~/.ssh/authorized_keys and sshd_config after migration.""" for node in nodes: new_host = node.new_ip if not node.is_local else node.ssh_host cmd = ( "if [ -f /root/.ssh/authorized_keys.pre_migration ]; then " " mv /root/.ssh/authorized_keys.pre_migration /root/.ssh/authorized_keys; " "fi; " "if [ -f /etc/ssh/sshd_config.pre_migration ]; then " " mv /etc/ssh/sshd_config.pre_migration /etc/ssh/sshd_config && " " systemctl reload sshd 2>/dev/null; " "fi" ) self.ssh.run_on_node(new_host, cmd, node.is_local) def _stop_corosync(self, nodes: list, dry_run: bool) -> bool: """Stop corosync on all nodes.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde corosync stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop corosync", node.is_local ) if rc == 0: print(f" [{node.name}] corosync gestoppt") else: print(f" [{node.name}] WARNUNG beim Stoppen: {err}") return True def _stop_pve_cluster(self, nodes: list, dry_run: bool) -> bool: """Stop pve-cluster service to unmount /etc/pve.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde pve-cluster stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop pve-cluster", node.is_local ) if rc == 0: print(f" [{node.name}] pve-cluster gestoppt") else: print(f" [{node.name}] WARNUNG: {err}") return True def _update_corosync(self, nodes: list, configs: dict, dry_run: bool) -> bool: """Write new corosync.conf directly to /etc/corosync/.""" if not configs['corosync']: print(" Keine Corosync-Änderungen") return True for node in nodes: if dry_run: print(f" [{node.name}] Würde /etc/corosync/corosync.conf schreiben") continue staging = "/root/.network-migration-staged/corosync.conf" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/corosync/corosync.conf", node.is_local, ) if rc == 0: print(f" [{node.name}] corosync.conf aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_hosts(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/hosts on all nodes.""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue if dry_run: print(f" [{node.name}] Würde /etc/hosts aktualisieren") continue staging = "/root/.network-migration-staged/hosts" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/hosts", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/hosts aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_network(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/network/interfaces and restart networking.""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue if dry_run: print(f" [{node.name}] Würde /etc/network/interfaces aktualisieren") print(f" [{node.name}] Würde 'ifreload -a' ausführen") continue staging = "/root/.network-migration-staged/interfaces" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/network/interfaces", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/network/interfaces aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False # Reload network - ifreload -a reloads ALL interfaces rc, _, _ = self.ssh.run_on_node( node.ssh_host, "which ifreload", node.is_local ) if rc == 0: reload_cmd = "ifreload -a" else: # Fallback: restart networking service reload_cmd = "systemctl restart networking" print(f" [{node.name}] Netzwerk wird neu geladen ({reload_cmd})...") rc, _, err = self.ssh.run_on_node( node.ssh_host, reload_cmd, node.is_local, timeout=60 ) if rc == 0: print(f" [{node.name}] Netzwerk neu geladen") else: print(f" [{node.name}] WARNUNG beim Netzwerk-Reload: {err}") # Don't fail here - the node might just be unreachable on old IP now return True def _start_services(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Start pve-cluster and corosync, then handle Ceph.""" # Now we need to reach nodes on their NEW IPs for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host is_local = node.is_local # Start pve-cluster if dry_run: print(f" [{node.name}] Würde pve-cluster starten") print(f" [{node.name}] Würde corosync starten") continue print(f" [{node.name}] Starte pve-cluster...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start pve-cluster", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] pve-cluster gestartet") else: print(f" [{node.name}] WARNUNG pve-cluster: {err}") print(f" [{node.name}] Starte corosync...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start corosync", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] corosync gestartet") else: print(f" [{node.name}] WARNUNG corosync: {err}") if dry_run: print("\n Würde auf Quorum warten...") return True # Wait for quorum print("\n Warte auf Quorum...") if not self._wait_for_quorum(timeout=60): print(" [!] Quorum nicht erreicht! Versuche 'pvecm expected 1'...") rc, _, _ = self.ssh.execute_local("pvecm expected 1") if rc == 0: print(" Quorum erzwungen mit 'pvecm expected 1'") time.sleep(5) else: print(" [!] Konnte Quorum nicht erzwingen!") # Update Ceph config via cluster FS if possible if configs.get('ceph'): self._update_ceph(plan, configs) # Restore original SSH keys (only needed for key-based auth) if not self.ssh.uses_password: print("\n SSH-Keys wiederherstellen...") self._restore_ssh_keys(plan.nodes) # Cleanup staging directories print("\n Staging-Verzeichnisse aufräumen...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host self.ssh.run_on_node( new_host, "rm -rf /root/.network-migration-staged", node.is_local, ) return True def _wait_for_quorum(self, timeout: int = 60) -> bool: """Wait for cluster quorum to be established.""" start = time.time() while time.time() - start < timeout: rc, stdout, _ = self.ssh.execute_local("pvecm status 2>/dev/null") if rc == 0 and "Quorate: Yes" in stdout: print(" Quorum erreicht!") return True print(" ... warte auf Quorum ...") time.sleep(5) return False def _update_ceph(self, plan: MigrationPlan, configs: dict): """Update Ceph configuration after quorum is available.""" print("\n [Ceph] Konfiguration aktualisieren...") # Try to write via /etc/pve/ceph.conf first rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.ceph_test && rm -f /etc/pve/.ceph_test" ) if rc == 0: # /etc/pve is writable - use cluster filesystem ok, msg = self.ssh.write_local_file("/etc/pve/ceph.conf", configs['ceph']) if ok: print(" [Ceph] /etc/pve/ceph.conf aktualisiert (via Cluster-FS)") else: print(f" [Ceph] FEHLER /etc/pve/ceph.conf: {msg}") self._update_ceph_direct(plan, configs) else: # /etc/pve not writable - write directly on each node print(" [Ceph] /etc/pve nicht beschreibbar, schreibe direkt...") self._update_ceph_direct(plan, configs) # Restart Ceph services print(" [Ceph] Services neu starten...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host # Restart MON self.ssh.run_on_node( new_host, f"systemctl restart ceph-mon@{node.name} 2>/dev/null", node.is_local, timeout=30, ) # Restart MGR self.ssh.run_on_node( new_host, f"systemctl restart ceph-mgr@{node.name} 2>/dev/null", node.is_local, timeout=30, ) # Restart all OSDs on this node self.ssh.run_on_node( new_host, "systemctl restart ceph-osd.target 2>/dev/null", node.is_local, timeout=60, ) print(f" [{node.name}] Ceph-Services neu gestartet") def _update_ceph_direct(self, plan: MigrationPlan, configs: dict): """Write ceph.conf directly on each node (fallback when no quorum).""" for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host ok, msg = self.ssh.write_node_file( new_host, "/etc/ceph/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] /etc/ceph/ceph.conf direkt geschrieben") else: print(f" [{node.name}] FEHLER /etc/ceph/ceph.conf: {msg}") def _update_ceph_mon_map(self, plan: MigrationPlan): """Update Ceph MON map with new addresses. This is needed when MON IPs change. """ ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host new_ip = node.new_ip # Extract monmap, modify, and reinject cmds = [ f"ceph-mon -i {node.name} --extract-monmap /tmp/monmap", # Remove old entries and add new ones ] # This is complex - for now we rely on the ceph.conf update # and let Ceph handle the MON map update on restart print(f" [{node.name}] MON-Map wird beim Neustart aktualisiert")