"""Phase 4: Execute the network migration.""" import time from models import MigrationPlan from ssh_manager import SSHManager class Migrator: """Executes the actual network migration across all nodes.""" def __init__(self, ssh: SSHManager): self.ssh = ssh def run(self, plan: MigrationPlan, configs: dict, dry_run: bool = False) -> bool: """Execute the migration. Args: plan: The migration plan configs: Generated configs from Planner.generate_new_configs() dry_run: If True, only show what would be done """ print("\n=== Phase 4: Migration ===\n") if dry_run: print(" *** DRY RUN - Es werden keine Änderungen vorgenommen ***\n") ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} reachable_nodes = [n for n in plan.nodes if n.is_reachable] if not reachable_nodes: print(" FEHLER: Keine Nodes erreichbar!") return False # Step 1: Write new configs to all nodes (but don't activate yet) print("[1/7] Neue Konfigurationen verteilen...") if not self._distribute_configs(plan, configs, dry_run): return False # Step 2: Stop Corosync on all nodes print("\n[2/7] Corosync stoppen auf allen Nodes...") if not self._stop_corosync(reachable_nodes, dry_run): return False # Step 3: Stop pve-cluster (pmxcfs) to release corosync.conf print("\n[3/7] pve-cluster stoppen...") if not self._stop_pve_cluster(reachable_nodes, dry_run): return False # Step 4: Write corosync config directly print("\n[4/7] Corosync-Konfiguration aktualisieren...") if not self._update_corosync(reachable_nodes, configs, dry_run): return False # Step 5: Update /etc/hosts on all nodes print("\n[5/7] /etc/hosts aktualisieren...") if not self._update_hosts(plan, configs, dry_run): return False # Step 6: Update network interfaces and restart networking print("\n[6/7] Netzwerk-Interfaces aktualisieren und Netzwerk neu starten...") if not self._update_network(plan, configs, dry_run): return False # Step 7: Start services back up print("\n[7/7] Services starten...") if not self._start_services(plan, configs, dry_run): return False return True def _distribute_configs(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Write prepared configs as staged files (not yet active).""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue node_configs = configs['nodes'][node.name] staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde Configs nach {staging_dir}/ schreiben") continue # Create staging directory self.ssh.run_on_node( node.ssh_host, f"mkdir -p {staging_dir}", node.is_local ) # Stage network interfaces ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/interfaces", node_configs['interfaces'], node.is_local, ) if ok: print(f" [{node.name}] interfaces staged") else: print(f" [{node.name}] FEHLER interfaces: {msg}") return False # Stage hosts ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/hosts", node_configs['hosts'], node.is_local, ) if ok: print(f" [{node.name}] hosts staged") else: print(f" [{node.name}] FEHLER hosts: {msg}") return False # Stage corosync config if configs['corosync']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde corosync.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/corosync.conf", configs['corosync'], node.is_local, ) if ok: print(f" [{node.name}] corosync.conf staged") else: print(f" [{node.name}] FEHLER corosync.conf: {msg}") return False # Stage ceph config if configs['ceph']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde ceph.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] ceph.conf staged") else: print(f" [{node.name}] FEHLER ceph.conf: {msg}") return False return True def _stop_corosync(self, nodes: list, dry_run: bool) -> bool: """Stop corosync on all nodes.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde corosync stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop corosync", node.is_local ) if rc == 0: print(f" [{node.name}] corosync gestoppt") else: print(f" [{node.name}] WARNUNG beim Stoppen: {err}") return True def _stop_pve_cluster(self, nodes: list, dry_run: bool) -> bool: """Stop pve-cluster service to unmount /etc/pve.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde pve-cluster stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop pve-cluster", node.is_local ) if rc == 0: print(f" [{node.name}] pve-cluster gestoppt") else: print(f" [{node.name}] WARNUNG: {err}") return True def _update_corosync(self, nodes: list, configs: dict, dry_run: bool) -> bool: """Write new corosync.conf directly to /etc/corosync/.""" if not configs['corosync']: print(" Keine Corosync-Änderungen") return True for node in nodes: if dry_run: print(f" [{node.name}] Würde /etc/corosync/corosync.conf schreiben") continue staging = "/root/.network-migration-staged/corosync.conf" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/corosync/corosync.conf", node.is_local, ) if rc == 0: print(f" [{node.name}] corosync.conf aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_hosts(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/hosts on all nodes.""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue if dry_run: print(f" [{node.name}] Würde /etc/hosts aktualisieren") continue staging = "/root/.network-migration-staged/hosts" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/hosts", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/hosts aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_network(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/network/interfaces and restart networking. Strategy: 1. Copy staged interfaces to /etc/network/interfaces on ALL nodes first 2. Reload remote nodes with nohup + delay (fire-and-forget, SSH will die) 3. Reload local node LAST This avoids cutting off our own SSH connectivity before reaching remote nodes. """ active_nodes = [ n for n in plan.nodes if n.is_reachable and n.name in configs['nodes'] ] remote_nodes = [n for n in active_nodes if not n.is_local] local_node = next((n for n in active_nodes if n.is_local), None) # Phase 1: Copy staged interfaces on ALL nodes (no reload yet) for node in active_nodes: if dry_run: print(f" [{node.name}] Würde /etc/network/interfaces aktualisieren") continue staging = "/root/.network-migration-staged/interfaces" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/network/interfaces", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/network/interfaces aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False if dry_run: for node in active_nodes: print(f" [{node.name}] Würde 'ifreload -a' ausführen") return True # Determine reload command rc, _, _ = self.ssh.execute_local("which ifreload") reload_cmd = "ifreload -a" if rc == 0 else "systemctl restart networking" # Phase 2: Reload REMOTE nodes first (fire-and-forget with nohup) # The SSH connection will die when the remote network changes, # so we use nohup + delay to let the SSH command return first. for node in remote_nodes: # nohup with 2s delay: SSH returns immediately, then network reloads bg_cmd = ( f"nohup bash -c 'sleep 2 && {reload_cmd}' " f">/tmp/ifreload.log 2>&1 &" ) print(f" [{node.name}] Netzwerk-Reload geplant (fire-and-forget)...") self.ssh.run_on_node(node.ssh_host, bg_cmd, False, timeout=10) print(f" [{node.name}] {reload_cmd} wird in 2s ausgeführt") # Phase 3: Reload LOCAL node last if local_node: print(f" [{local_node.name}] Netzwerk wird neu geladen ({reload_cmd})...") rc, _, err = self.ssh.run_on_node( local_node.ssh_host, reload_cmd, True, timeout=60 ) if rc == 0: print(f" [{local_node.name}] Netzwerk neu geladen") else: print(f" [{local_node.name}] WARNUNG beim Netzwerk-Reload: {err}") # Wait for remote nodes to finish their reload if remote_nodes: wait_secs = 8 print(f"\n Warte {wait_secs}s bis alle Remote-Nodes ihr Netzwerk neu geladen haben...") time.sleep(wait_secs) # Verify: try to reach remote nodes on NEW IPs (with retries) print(" [Verifikation] Prüfe Erreichbarkeit auf neuen IPs...") for node in remote_nodes: if not node.new_ip: continue reachable = False for attempt in range(3): reachable = self.ssh.is_reachable(node.new_ip) if reachable: break if attempt < 2: print(f" [{node.name}] {node.new_ip} noch nicht erreichbar, warte 5s...") time.sleep(5) if reachable: print(f" [{node.name}] {node.new_ip} erreichbar") else: print(f" [{node.name}] {node.new_ip} NICHT erreichbar nach 3 Versuchen!") print(f" [{node.name}] WARNUNG: Service-Start auf diesem Node könnte fehlschlagen") return True def _start_services(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Start pve-cluster and corosync, then handle Ceph.""" # Now we need to reach nodes on their NEW IPs for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host is_local = node.is_local # Start pve-cluster if dry_run: print(f" [{node.name}] Würde pve-cluster starten") print(f" [{node.name}] Würde corosync starten") continue print(f" [{node.name}] Starte pve-cluster...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start pve-cluster", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] pve-cluster gestartet") else: print(f" [{node.name}] WARNUNG pve-cluster: {err}") print(f" [{node.name}] Starte corosync...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start corosync", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] corosync gestartet") else: print(f" [{node.name}] WARNUNG corosync: {err}") if dry_run: print("\n Würde auf Quorum warten...") return True # Wait for quorum print("\n Warte auf Quorum...") if not self._wait_for_quorum(timeout=60): print(" [!] Quorum nicht erreicht! Versuche 'pvecm expected 1'...") rc, _, _ = self.ssh.execute_local("pvecm expected 1") if rc == 0: print(" Quorum erzwungen mit 'pvecm expected 1'") time.sleep(5) else: print(" [!] Konnte Quorum nicht erzwingen!") # Update corosync.conf in cluster FS (now that /etc/pve is writable) if configs.get('corosync'): self._update_corosync_cluster_fs(configs) # Update Ceph config via cluster FS if possible if configs.get('ceph'): self._update_ceph(plan, configs) # Cleanup staging directories print("\n Staging-Verzeichnisse aufräumen...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host self.ssh.run_on_node( new_host, "rm -rf /root/.network-migration-staged", node.is_local, ) return True def _update_corosync_cluster_fs(self, configs: dict): """Write corosync.conf to /etc/pve/ now that quorum is available. During migration, we only wrote to /etc/corosync/corosync.conf (directly). But Proxmox reads from /etc/pve/corosync.conf (cluster FS) on service start, so we need to update that too, otherwise the old IPs show up in the GUI. """ print("\n [Corosync] /etc/pve/corosync.conf aktualisieren...") # Check if /etc/pve is writable rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.corosync_test && rm -f /etc/pve/.corosync_test" ) if rc == 0: ok, msg = self.ssh.write_local_file( "/etc/pve/corosync.conf", configs['corosync'] ) if ok: print(" [Corosync] /etc/pve/corosync.conf aktualisiert (via Cluster-FS)") else: print(f" [Corosync] FEHLER /etc/pve/corosync.conf: {msg}") else: print(" [Corosync] WARNUNG: /etc/pve nicht beschreibbar!") print(" [Corosync] Manuell ausführen: cp /etc/corosync/corosync.conf /etc/pve/corosync.conf") def _wait_for_quorum(self, timeout: int = 60) -> bool: """Wait for cluster quorum to be established.""" start = time.time() while time.time() - start < timeout: rc, stdout, _ = self.ssh.execute_local("pvecm status 2>/dev/null") if rc == 0 and "Quorate: Yes" in stdout: print(" Quorum erreicht!") return True print(" ... warte auf Quorum ...") time.sleep(5) return False def _update_ceph(self, plan: MigrationPlan, configs: dict): """Update Ceph configuration after quorum is available.""" print("\n [Ceph] Konfiguration aktualisieren...") # Try to write via /etc/pve/ceph.conf first rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.ceph_test && rm -f /etc/pve/.ceph_test" ) if rc == 0: # /etc/pve is writable - use cluster filesystem ok, msg = self.ssh.write_local_file("/etc/pve/ceph.conf", configs['ceph']) if ok: print(" [Ceph] /etc/pve/ceph.conf aktualisiert (via Cluster-FS)") else: print(f" [Ceph] FEHLER /etc/pve/ceph.conf: {msg}") self._update_ceph_direct(plan, configs) else: # /etc/pve not writable - write directly on each node print(" [Ceph] /etc/pve nicht beschreibbar, schreibe direkt...") self._update_ceph_direct(plan, configs) # Update Ceph MON map with new IPs (MUST happen before restart) self._update_ceph_mon_map(plan) # Restart Ceph services # Note: first MON is already running (started during monmap update) print("\n [Ceph] Services neu starten...") first_started = False for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host if not first_started: # First node's MON was already started during monmap update first_started = True print(f" [{node.name}] ceph-mon läuft bereits (Primary)") else: # Start MON on remaining nodes rc, _, err = self.ssh.run_on_node( new_host, f"systemctl start ceph-mon@{node.name} 2>/dev/null", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] ceph-mon gestartet") else: print(f" [{node.name}] WARNUNG ceph-mon: {err}") # Restart MGR self.ssh.run_on_node( new_host, f"systemctl restart ceph-mgr@{node.name} 2>/dev/null", node.is_local, timeout=30, ) # Restart all OSDs on this node self.ssh.run_on_node( new_host, "systemctl restart ceph-osd.target 2>/dev/null", node.is_local, timeout=60, ) # Restart MDS if present (CephFS metadata server) self.ssh.run_on_node( new_host, f"systemctl restart ceph-mds@{node.name} 2>/dev/null", node.is_local, timeout=30, ) print(f" [{node.name}] Ceph-Services gestartet") def _update_ceph_direct(self, plan: MigrationPlan, configs: dict): """Write ceph.conf directly on each node (fallback when no quorum).""" for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host ok, msg = self.ssh.write_node_file( new_host, "/etc/ceph/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] /etc/ceph/ceph.conf direkt geschrieben") else: print(f" [{node.name}] FEHLER /etc/ceph/ceph.conf: {msg}") def _update_ceph_mon_map(self, plan: MigrationPlan): """Update Ceph MON map with new addresses. When MON IPs change, the internal monmap (stored in MON's RocksDB) must be explicitly updated. Just updating ceph.conf is NOT enough. Strategy: Update monmap on the FIRST node, start its MON, then get the authoritative monmap from the running MON and inject it into all remaining nodes. This avoids epoch mismatches between nodes. """ ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} if not ip_mapping: print(" [Ceph] Keine IP-Änderungen für MON-Map") return # Build the list of MON nodes with their new IPs mon_nodes = [] reachable_nodes = [] for node in plan.nodes: if not node.is_reachable: continue new_ip = node.new_ip or node.current_ip mon_nodes.append((node.name, new_ip)) reachable_nodes.append(node) if not reachable_nodes: print(" [Ceph] Keine erreichbaren Nodes für MON-Map Update") return print("\n [Ceph] MON-Map aktualisieren...") # Stop ceph-mon on all nodes first for node in reachable_nodes: new_host = node.new_ip if not node.is_local else node.ssh_host self.ssh.run_on_node( new_host, f"systemctl stop ceph-mon@{node.name} 2>/dev/null", node.is_local, timeout=30, ) print(f" [{node.name}] ceph-mon gestoppt") # --- Phase 1: Update monmap on the FIRST node --- first_node = reachable_nodes[0] first_host = first_node.new_ip if not first_node.is_local else first_node.ssh_host remaining_nodes = reachable_nodes[1:] print(f"\n [{first_node.name}] Erstelle neue MON-Map (Primary)...") # Extract current monmap from first node rc, _, err = self.ssh.run_on_node( first_host, f"ceph-mon -i {first_node.name} --extract-monmap /tmp/monmap", first_node.is_local, timeout=30, ) if rc != 0: print(f" [{first_node.name}] FEHLER: monmap extrahieren fehlgeschlagen: {err}") return # Show current monmap self.ssh.run_on_node( first_host, "monmaptool --print /tmp/monmap", first_node.is_local, timeout=10, ) # Remove all existing MON entries for mon_name, _ in mon_nodes: self.ssh.run_on_node( first_host, f"monmaptool --rm {mon_name} /tmp/monmap 2>/dev/null", first_node.is_local, timeout=10, ) # Re-add all MON entries with new IPs (msgr2 on 3300 + msgr1 on 6789) for mon_name, new_ip in mon_nodes: rc, _, err = self.ssh.run_on_node( first_host, f"monmaptool --addv {mon_name} " f"[v2:{new_ip}:3300/0,v1:{new_ip}:6789/0] /tmp/monmap", first_node.is_local, timeout=10, ) if rc != 0: # Fallback: try legacy --add (older Ceph versions) self.ssh.run_on_node( first_host, f"monmaptool --add {mon_name} {new_ip}:6789 /tmp/monmap", first_node.is_local, timeout=10, ) # Show updated monmap print(f" [{first_node.name}] Neue MON-Map:") self.ssh.run_on_node( first_host, "monmaptool --print /tmp/monmap", first_node.is_local, timeout=10, ) # Inject into first node rc, _, err = self.ssh.run_on_node( first_host, f"ceph-mon -i {first_node.name} --inject-monmap /tmp/monmap", first_node.is_local, timeout=30, ) if rc == 0: print(f" [{first_node.name}] MON-Map injiziert") else: print(f" [{first_node.name}] FEHLER MON-Map reinject: {err}") self.ssh.run_on_node(first_host, "rm -f /tmp/monmap", first_node.is_local) return # Start first MON so we can get the authoritative map print(f" [{first_node.name}] Starte ceph-mon (Primary)...") self.ssh.run_on_node( first_host, f"systemctl start ceph-mon@{first_node.name}", first_node.is_local, timeout=30, ) # Give it a moment to initialize time.sleep(3) # --- Phase 2: Get authoritative monmap from running MON --- if remaining_nodes: print(f"\n Hole autoritative MON-Map vom laufenden MON...") rc, _, err = self.ssh.run_on_node( first_host, "ceph mon getmap -o /tmp/monmap_auth", first_node.is_local, timeout=30, ) if rc == 0: # Use authoritative map from running MON monmap_path = "/tmp/monmap_auth" print(f" Autoritative MON-Map erhalten") else: # Fallback: use the manually built map print(f" WARNUNG: Konnte autoritative Map nicht holen ({err})") print(f" Verwende manuell erstellte Map als Fallback") monmap_path = "/tmp/monmap" # --- Phase 3: Inject authoritative map into remaining nodes --- for node in remaining_nodes: new_host = node.new_ip if not node.is_local else node.ssh_host # Copy monmap from first node to this node via SSH if first_node.is_local: # First node is local: SCP the map to remote node rc, _, err = self.ssh.execute_local( f"sshpass -p '{self.ssh.ssh_password}' " f"scp -o StrictHostKeyChecking=no " f"-o PubkeyAuthentication=no " f"-P {self.ssh.ssh_port} " f"{monmap_path} " f"{self.ssh.ssh_user}@{new_host}:/tmp/monmap", timeout=30, ) elif node.is_local: # This node is local: SCP from remote first node rc, _, err = self.ssh.execute_local( f"sshpass -p '{self.ssh.ssh_password}' " f"scp -o StrictHostKeyChecking=no " f"-o PubkeyAuthentication=no " f"-P {self.ssh.ssh_port} " f"{self.ssh.ssh_user}@{first_host}:{monmap_path} " f"/tmp/monmap", timeout=30, ) else: # Both remote: read from first, write to second rc_read, stdout, _ = self.ssh.execute( first_host, f"base64 {monmap_path}", timeout=30, ) if rc_read == 0: rc, _, err = self.ssh.execute( new_host, f"echo '{stdout.strip()}' | base64 -d > /tmp/monmap", timeout=30, ) else: rc = -1 err = "Konnte monmap nicht vom Primary lesen" if rc != 0: print(f" [{node.name}] WARNUNG: monmap kopieren fehlgeschlagen: {err}") print(f" [{node.name}] Erstelle Map manuell als Fallback...") # Fallback: build map manually on this node self._update_monmap_manual(node, new_host, mon_nodes) continue # Inject monmap rc, _, err = self.ssh.run_on_node( new_host, f"ceph-mon -i {node.name} --inject-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] Autoritative MON-Map injiziert") else: print(f" [{node.name}] FEHLER MON-Map reinject: {err}") # Cleanup self.ssh.run_on_node(new_host, "rm -f /tmp/monmap", node.is_local) # Cleanup on first node self.ssh.run_on_node( first_host, "rm -f /tmp/monmap /tmp/monmap_auth", first_node.is_local, ) def _update_monmap_manual(self, node, host: str, mon_nodes: list): """Fallback: manually build and inject monmap on a single node.""" rc, _, err = self.ssh.run_on_node( host, f"ceph-mon -i {node.name} --extract-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc != 0: print(f" [{node.name}] FEHLER: monmap extrahieren fehlgeschlagen") return for mon_name, _ in mon_nodes: self.ssh.run_on_node( host, f"monmaptool --rm {mon_name} /tmp/monmap 2>/dev/null", node.is_local, timeout=10, ) for mon_name, new_ip in mon_nodes: rc, _, _ = self.ssh.run_on_node( host, f"monmaptool --addv {mon_name} " f"[v2:{new_ip}:3300/0,v1:{new_ip}:6789/0] /tmp/monmap", node.is_local, timeout=10, ) if rc != 0: self.ssh.run_on_node( host, f"monmaptool --add {mon_name} {new_ip}:6789 /tmp/monmap", node.is_local, timeout=10, ) rc, _, err = self.ssh.run_on_node( host, f"ceph-mon -i {node.name} --inject-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] MON-Map manuell aktualisiert (Fallback)") else: print(f" [{node.name}] FEHLER MON-Map reinject: {err}") self.ssh.run_on_node(host, "rm -f /tmp/monmap", node.is_local)