"""Phase 4: Execute the network migration.""" import time from models import MigrationPlan from ssh_manager import SSHManager class Migrator: """Executes the actual network migration across all nodes.""" def __init__(self, ssh: SSHManager): self.ssh = ssh def run(self, plan: MigrationPlan, configs: dict, dry_run: bool = False) -> bool: """Execute the migration. Args: plan: The migration plan configs: Generated configs from Planner.generate_new_configs() dry_run: If True, only show what would be done """ print("\n=== Phase 4: Migration ===\n") if dry_run: print(" *** DRY RUN - Es werden keine Änderungen vorgenommen ***\n") ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} reachable_nodes = [n for n in plan.nodes if n.is_reachable] if not reachable_nodes: print(" FEHLER: Keine Nodes erreichbar!") return False # Step 1: Write new configs to all nodes (but don't activate yet) print("[1/7] Neue Konfigurationen verteilen...") if not self._distribute_configs(plan, configs, dry_run): return False # Step 2: Stop Corosync on all nodes print("\n[2/7] Corosync stoppen auf allen Nodes...") if not self._stop_corosync(reachable_nodes, dry_run): return False # Step 3: Stop pve-cluster (pmxcfs) to release corosync.conf print("\n[3/7] pve-cluster stoppen...") if not self._stop_pve_cluster(reachable_nodes, dry_run): return False # Step 4: Write corosync config directly print("\n[4/7] Corosync-Konfiguration aktualisieren...") if not self._update_corosync(reachable_nodes, configs, dry_run): return False # Step 5: Update /etc/hosts on all nodes print("\n[5/7] /etc/hosts aktualisieren...") if not self._update_hosts(plan, configs, dry_run): return False # Step 6: Update network interfaces and restart networking print("\n[6/7] Netzwerk-Interfaces aktualisieren und Netzwerk neu starten...") if not self._update_network(plan, configs, dry_run): return False # Step 7: Start services back up print("\n[7/7] Services starten...") if not self._start_services(plan, configs, dry_run): return False return True def _distribute_configs(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Write prepared configs as staged files (not yet active).""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue node_configs = configs['nodes'][node.name] staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde Configs nach {staging_dir}/ schreiben") continue # Create staging directory self.ssh.run_on_node( node.ssh_host, f"mkdir -p {staging_dir}", node.is_local ) # Stage network interfaces ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/interfaces", node_configs['interfaces'], node.is_local, ) if ok: print(f" [{node.name}] interfaces staged") else: print(f" [{node.name}] FEHLER interfaces: {msg}") return False # Stage hosts ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/hosts", node_configs['hosts'], node.is_local, ) if ok: print(f" [{node.name}] hosts staged") else: print(f" [{node.name}] FEHLER hosts: {msg}") return False # Stage corosync config if configs['corosync']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde corosync.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/corosync.conf", configs['corosync'], node.is_local, ) if ok: print(f" [{node.name}] corosync.conf staged") else: print(f" [{node.name}] FEHLER corosync.conf: {msg}") return False # Stage ceph config if configs['ceph']: for node in plan.nodes: if not node.is_reachable: continue staging_dir = "/root/.network-migration-staged" if dry_run: print(f" [{node.name}] Würde ceph.conf stagen") continue ok, msg = self.ssh.write_node_file( node.ssh_host, f"{staging_dir}/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] ceph.conf staged") else: print(f" [{node.name}] FEHLER ceph.conf: {msg}") return False return True def _stop_corosync(self, nodes: list, dry_run: bool) -> bool: """Stop corosync on all nodes.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde corosync stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop corosync", node.is_local ) if rc == 0: print(f" [{node.name}] corosync gestoppt") else: print(f" [{node.name}] WARNUNG beim Stoppen: {err}") return True def _stop_pve_cluster(self, nodes: list, dry_run: bool) -> bool: """Stop pve-cluster service to unmount /etc/pve.""" for node in nodes: if dry_run: print(f" [{node.name}] Würde pve-cluster stoppen") continue rc, _, err = self.ssh.run_on_node( node.ssh_host, "systemctl stop pve-cluster", node.is_local ) if rc == 0: print(f" [{node.name}] pve-cluster gestoppt") else: print(f" [{node.name}] WARNUNG: {err}") return True def _update_corosync(self, nodes: list, configs: dict, dry_run: bool) -> bool: """Write new corosync.conf directly to /etc/corosync/.""" if not configs['corosync']: print(" Keine Corosync-Änderungen") return True for node in nodes: if dry_run: print(f" [{node.name}] Würde /etc/corosync/corosync.conf schreiben") continue staging = "/root/.network-migration-staged/corosync.conf" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/corosync/corosync.conf", node.is_local, ) if rc == 0: print(f" [{node.name}] corosync.conf aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_hosts(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/hosts on all nodes.""" for node in plan.nodes: if not node.is_reachable or node.name not in configs['nodes']: continue if dry_run: print(f" [{node.name}] Würde /etc/hosts aktualisieren") continue staging = "/root/.network-migration-staged/hosts" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/hosts", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/hosts aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False return True def _update_network(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Update /etc/network/interfaces and restart networking. Strategy: 1. Copy staged interfaces to /etc/network/interfaces on ALL nodes first 2. Reload remote nodes with nohup + delay (fire-and-forget, SSH will die) 3. Reload local node LAST This avoids cutting off our own SSH connectivity before reaching remote nodes. """ active_nodes = [ n for n in plan.nodes if n.is_reachable and n.name in configs['nodes'] ] remote_nodes = [n for n in active_nodes if not n.is_local] local_node = next((n for n in active_nodes if n.is_local), None) # Phase 1: Copy staged interfaces on ALL nodes (no reload yet) for node in active_nodes: if dry_run: print(f" [{node.name}] Würde /etc/network/interfaces aktualisieren") continue staging = "/root/.network-migration-staged/interfaces" rc, _, err = self.ssh.run_on_node( node.ssh_host, f"cp {staging} /etc/network/interfaces", node.is_local, ) if rc == 0: print(f" [{node.name}] /etc/network/interfaces aktualisiert") else: print(f" [{node.name}] FEHLER: {err}") return False if dry_run: for node in active_nodes: print(f" [{node.name}] Würde 'ifreload -a' ausführen") return True # Determine reload command rc, _, _ = self.ssh.execute_local("which ifreload") reload_cmd = "ifreload -a" if rc == 0 else "systemctl restart networking" # Phase 2: Reload REMOTE nodes first (fire-and-forget with nohup) # The SSH connection will die when the remote network changes, # so we use nohup + delay to let the SSH command return first. for node in remote_nodes: # nohup with 2s delay: SSH returns immediately, then network reloads bg_cmd = ( f"nohup bash -c 'sleep 2 && {reload_cmd}' " f">/tmp/ifreload.log 2>&1 &" ) print(f" [{node.name}] Netzwerk-Reload geplant (fire-and-forget)...") self.ssh.run_on_node(node.ssh_host, bg_cmd, False, timeout=10) print(f" [{node.name}] {reload_cmd} wird in 2s ausgeführt") # Phase 3: Reload LOCAL node last if local_node: print(f" [{local_node.name}] Netzwerk wird neu geladen ({reload_cmd})...") rc, _, err = self.ssh.run_on_node( local_node.ssh_host, reload_cmd, True, timeout=60 ) if rc == 0: print(f" [{local_node.name}] Netzwerk neu geladen") else: print(f" [{local_node.name}] WARNUNG beim Netzwerk-Reload: {err}") # Wait for remote nodes to finish their reload if remote_nodes: wait_secs = 8 print(f"\n Warte {wait_secs}s bis alle Remote-Nodes ihr Netzwerk neu geladen haben...") time.sleep(wait_secs) # Verify: try to reach remote nodes on NEW IPs (with retries) print(" [Verifikation] Prüfe Erreichbarkeit auf neuen IPs...") for node in remote_nodes: if not node.new_ip: continue reachable = False for attempt in range(3): reachable = self.ssh.is_reachable(node.new_ip) if reachable: break if attempt < 2: print(f" [{node.name}] {node.new_ip} noch nicht erreichbar, warte 5s...") time.sleep(5) if reachable: print(f" [{node.name}] {node.new_ip} erreichbar") else: print(f" [{node.name}] {node.new_ip} NICHT erreichbar nach 3 Versuchen!") print(f" [{node.name}] WARNUNG: Service-Start auf diesem Node könnte fehlschlagen") return True def _start_services(self, plan: MigrationPlan, configs: dict, dry_run: bool) -> bool: """Start pve-cluster and corosync, then handle Ceph.""" # Now we need to reach nodes on their NEW IPs for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host is_local = node.is_local # Start pve-cluster if dry_run: print(f" [{node.name}] Würde pve-cluster starten") print(f" [{node.name}] Würde corosync starten") continue print(f" [{node.name}] Starte pve-cluster...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start pve-cluster", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] pve-cluster gestartet") else: print(f" [{node.name}] WARNUNG pve-cluster: {err}") print(f" [{node.name}] Starte corosync...") rc, _, err = self.ssh.run_on_node( new_host, "systemctl start corosync", is_local, timeout=30 ) if rc == 0: print(f" [{node.name}] corosync gestartet") else: print(f" [{node.name}] WARNUNG corosync: {err}") if dry_run: print("\n Würde auf Quorum warten...") return True # Wait for quorum print("\n Warte auf Quorum...") if not self._wait_for_quorum(timeout=60): print(" [!] Quorum nicht erreicht! Versuche 'pvecm expected 1'...") rc, _, _ = self.ssh.execute_local("pvecm expected 1") if rc == 0: print(" Quorum erzwungen mit 'pvecm expected 1'") time.sleep(5) else: print(" [!] Konnte Quorum nicht erzwingen!") # Update corosync.conf in cluster FS (now that /etc/pve is writable) if configs.get('corosync'): self._update_corosync_cluster_fs(configs) # Update Ceph config via cluster FS if possible if configs.get('ceph'): self._update_ceph(plan, configs) # Update storage.cfg (CephFS/RBD monhost entries) and remount CephFS ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} if ip_mapping: self._update_storage_cfg(ip_mapping) self._remount_cephfs(plan) # Cleanup staging directories print("\n Staging-Verzeichnisse aufräumen...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host self.ssh.run_on_node( new_host, "rm -rf /root/.network-migration-staged", node.is_local, ) return True def _update_corosync_cluster_fs(self, configs: dict): """Write corosync.conf to /etc/pve/ now that quorum is available. During migration, we only wrote to /etc/corosync/corosync.conf (directly). But Proxmox reads from /etc/pve/corosync.conf (cluster FS) on service start, so we need to update that too, otherwise the old IPs show up in the GUI. """ print("\n [Corosync] /etc/pve/corosync.conf aktualisieren...") # Check if /etc/pve is writable rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.corosync_test && rm -f /etc/pve/.corosync_test" ) if rc == 0: ok, msg = self.ssh.write_local_file( "/etc/pve/corosync.conf", configs['corosync'] ) if ok: print(" [Corosync] /etc/pve/corosync.conf aktualisiert (via Cluster-FS)") else: print(f" [Corosync] FEHLER /etc/pve/corosync.conf: {msg}") else: print(" [Corosync] WARNUNG: /etc/pve nicht beschreibbar!") print(" [Corosync] Manuell ausführen: cp /etc/corosync/corosync.conf /etc/pve/corosync.conf") def _update_storage_cfg(self, ip_mapping: dict[str, str]): """Update /etc/pve/storage.cfg with new MON IPs and remount CephFS. CephFS and RBD storage entries may contain a 'monhost' field with MON IP addresses. If present, these must be updated. Additionally, active CephFS mounts use the old IPs in the kernel and need a remount regardless of whether monhost is in storage.cfg or not. """ print("\n [Storage] /etc/pve/storage.cfg prüfen...") # Check if /etc/pve is writable rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.storage_test && rm -f /etc/pve/.storage_test" ) if rc != 0: print(" [Storage] WARNUNG: /etc/pve nicht beschreibbar!") print(" [Storage] Manuell ausführen: pvesm set --monhost ") return # Read current storage.cfg rc, content, err = self.ssh.execute_local("cat /etc/pve/storage.cfg") if rc != 0 or not content: print(f" [Storage] WARNUNG: storage.cfg nicht lesbar: {err}") return # Check if any old IPs are present in storage.cfg needs_update = False for old_ip in ip_mapping: if old_ip in content: needs_update = True break if needs_update: # Replace old IPs with new IPs new_content = content for old_ip, new_ip in ip_mapping.items(): new_content = new_content.replace(old_ip, new_ip) ok, msg = self.ssh.write_local_file("/etc/pve/storage.cfg", new_content) if ok: print(" [Storage] /etc/pve/storage.cfg aktualisiert (monhost IPs ersetzt)") else: print(f" [Storage] FEHLER: {msg}") else: print(" [Storage] Keine alten IPs in storage.cfg (monhost wird aus ceph.conf gelesen)") def _remount_cephfs(self, plan: MigrationPlan): """Remount CephFS on all nodes after migration. The kernel CephFS mount caches the old MON IPs. Even if ceph.conf is updated, the existing mount still uses the old addresses. A remount picks up the new IPs from the updated config. """ # Check if any CephFS mounts exist on the local node rc, mounts, _ = self.ssh.execute_local("mount -t ceph 2>/dev/null") if rc != 0 or not mounts or not mounts.strip(): return # Extract CephFS mount points (skip non-cephfs mounts) mount_points = [] for line in mounts.strip().split('\n'): # Format: 1.2.3.4,5.6.7.8:/ on /mnt/pve/cephfs type ceph (...) if ' type ceph ' in line: parts = line.split(' on ') if len(parts) >= 2: mp = parts[1].split(' type ')[0].strip() mount_points.append(mp) if not mount_points: return print(f"\n [CephFS] {len(mount_points)} Mount(s) gefunden, Remount auf allen Nodes...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host for mp in mount_points: if node.is_local or True: # Remount on all nodes rc, _, err = self.ssh.run_on_node( new_host, f"umount {mp} 2>/dev/null; mount {mp}", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] CephFS {mp} remounted") else: # Mount might not exist on this node yet, try just mount rc2, _, _ = self.ssh.run_on_node( new_host, f"mount {mp} 2>/dev/null", node.is_local, timeout=30, ) if rc2 == 0: print(f" [{node.name}] CephFS {mp} mounted") else: print(f" [{node.name}] CephFS {mp} WARNUNG: Remount fehlgeschlagen: {err}") def _wait_for_quorum(self, timeout: int = 60) -> bool: """Wait for cluster quorum to be established.""" start = time.time() while time.time() - start < timeout: rc, stdout, _ = self.ssh.execute_local("pvecm status 2>/dev/null") if rc == 0 and "Quorate: Yes" in stdout: print(" Quorum erreicht!") return True print(" ... warte auf Quorum ...") time.sleep(5) return False def _update_ceph(self, plan: MigrationPlan, configs: dict): """Update Ceph configuration after quorum is available.""" print("\n [Ceph] Konfiguration aktualisieren...") # Try to write via /etc/pve/ceph.conf first rc, _, _ = self.ssh.execute_local( "touch /etc/pve/.ceph_test && rm -f /etc/pve/.ceph_test" ) if rc == 0: # /etc/pve is writable - use cluster filesystem ok, msg = self.ssh.write_local_file("/etc/pve/ceph.conf", configs['ceph']) if ok: print(" [Ceph] /etc/pve/ceph.conf aktualisiert (via Cluster-FS)") else: print(f" [Ceph] FEHLER /etc/pve/ceph.conf: {msg}") self._update_ceph_direct(plan, configs) else: # /etc/pve not writable - write directly on each node print(" [Ceph] /etc/pve nicht beschreibbar, schreibe direkt...") self._update_ceph_direct(plan, configs) # Determine MON nodes (needed for monmap update and service restart) mon_node_names = self._get_mon_node_names(plan) # Update Ceph MON map with new IPs (MUST happen before restart) self._update_ceph_mon_map(plan, mon_node_names) # Restart Ceph services # Note: first MON is already running (started during monmap update) print("\n [Ceph] Services neu starten...") first_mon_started = False for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host is_mon_node = not mon_node_names or node.name in mon_node_names if is_mon_node: if not first_mon_started: # First MON node was already started during monmap update first_mon_started = True print(f" [{node.name}] ceph-mon läuft bereits (Primary)") else: # Start MON on remaining MON nodes rc, _, err = self.ssh.run_on_node( new_host, f"systemctl start ceph-mon@{node.name} 2>/dev/null", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] ceph-mon gestartet") else: print(f" [{node.name}] WARNUNG ceph-mon: {err}") # Restart MGR (only on MON nodes) self.ssh.run_on_node( new_host, f"systemctl restart ceph-mgr@{node.name} 2>/dev/null", node.is_local, timeout=30, ) # Restart all OSDs on this node (OSDs can be on any node) self.ssh.run_on_node( new_host, "systemctl restart ceph-osd.target 2>/dev/null", node.is_local, timeout=60, ) # Restart MDS if present (CephFS metadata server) self.ssh.run_on_node( new_host, f"systemctl restart ceph-mds@{node.name} 2>/dev/null", node.is_local, timeout=30, ) print(f" [{node.name}] Ceph-Services gestartet") def _update_ceph_direct(self, plan: MigrationPlan, configs: dict): """Write ceph.conf directly on each node (fallback when no quorum).""" for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host ok, msg = self.ssh.write_node_file( new_host, "/etc/ceph/ceph.conf", configs['ceph'], node.is_local, ) if ok: print(f" [{node.name}] /etc/ceph/ceph.conf direkt geschrieben") else: print(f" [{node.name}] FEHLER /etc/ceph/ceph.conf: {msg}") def _get_mon_node_names(self, plan: MigrationPlan) -> set[str]: """Determine which nodes actually run a Ceph MON daemon.""" mon_node_names = set() if plan.ceph_config: # From [mon.hostname] sections in ceph.conf for section_name in plan.ceph_config.mon_sections: # section_name is like "mon.pvetest01" name = section_name.replace("mon.", "", 1) mon_node_names.add(name) # From mon_host IP list — match IPs to nodes if not mon_node_names and plan.ceph_config.mon_hosts: mon_ips = set(plan.ceph_config.mon_hosts) for node in plan.nodes: if node.current_ip in mon_ips: mon_node_names.add(node.name) # Fallback: check which nodes have the MON data directory if not mon_node_names: print(" [Ceph] Prüfe welche Nodes einen MON-Dienst haben...") for node in plan.nodes: if not node.is_reachable: continue new_host = node.new_ip if not node.is_local else node.ssh_host rc, _, _ = self.ssh.run_on_node( new_host, f"test -d /var/lib/ceph/mon/ceph-{node.name}", node.is_local, timeout=10, ) if rc == 0: mon_node_names.add(node.name) if mon_node_names: print(f" [Ceph] MON-Nodes erkannt: {', '.join(sorted(mon_node_names))}") return mon_node_names def _update_ceph_mon_map(self, plan: MigrationPlan, mon_node_names: set[str] | None = None): """Update Ceph MON map with new addresses. When MON IPs change, the internal monmap (stored in MON's RocksDB) must be explicitly updated. Just updating ceph.conf is NOT enough. Strategy: Update monmap on the FIRST node, start its MON, then get the authoritative monmap from the running MON and inject it into all remaining nodes. This avoids epoch mismatches between nodes. """ ip_mapping = {n.current_ip: n.new_ip for n in plan.nodes if n.new_ip} if not ip_mapping: print(" [Ceph] Keine IP-Änderungen für MON-Map") return # Build the list of MON nodes with their new IPs (only actual MON nodes) mon_nodes = [] reachable_nodes = [] for node in plan.nodes: if not node.is_reachable: continue if mon_node_names and node.name not in mon_node_names: continue new_ip = node.new_ip or node.current_ip mon_nodes.append((node.name, new_ip)) reachable_nodes.append(node) if not reachable_nodes: print(" [Ceph] Keine erreichbaren Nodes für MON-Map Update") return print("\n [Ceph] MON-Map aktualisieren...") # Stop ceph-mon on all nodes first for node in reachable_nodes: new_host = node.new_ip if not node.is_local else node.ssh_host self.ssh.run_on_node( new_host, f"systemctl stop ceph-mon@{node.name} 2>/dev/null", node.is_local, timeout=30, ) print(f" [{node.name}] ceph-mon gestoppt") # --- Phase 1: Update monmap on the FIRST node --- first_node = reachable_nodes[0] first_host = first_node.new_ip if not first_node.is_local else first_node.ssh_host remaining_nodes = reachable_nodes[1:] print(f"\n [{first_node.name}] Erstelle neue MON-Map (Primary)...") # Extract current monmap from first node rc, _, err = self.ssh.run_on_node( first_host, f"ceph-mon -i {first_node.name} --extract-monmap /tmp/monmap", first_node.is_local, timeout=30, ) if rc != 0: print(f" [{first_node.name}] FEHLER: monmap extrahieren fehlgeschlagen: {err}") return # Show current monmap self.ssh.run_on_node( first_host, "monmaptool --print /tmp/monmap", first_node.is_local, timeout=10, ) # Remove all existing MON entries for mon_name, _ in mon_nodes: self.ssh.run_on_node( first_host, f"monmaptool --rm {mon_name} /tmp/monmap 2>/dev/null", first_node.is_local, timeout=10, ) # Re-add all MON entries with new IPs (msgr2 on 3300 + msgr1 on 6789) for mon_name, new_ip in mon_nodes: rc, _, err = self.ssh.run_on_node( first_host, f"monmaptool --addv {mon_name} " f"[v2:{new_ip}:3300/0,v1:{new_ip}:6789/0] /tmp/monmap", first_node.is_local, timeout=10, ) if rc != 0: # Fallback: try legacy --add (older Ceph versions) self.ssh.run_on_node( first_host, f"monmaptool --add {mon_name} {new_ip}:6789 /tmp/monmap", first_node.is_local, timeout=10, ) # Show updated monmap print(f" [{first_node.name}] Neue MON-Map:") self.ssh.run_on_node( first_host, "monmaptool --print /tmp/monmap", first_node.is_local, timeout=10, ) # Inject into first node rc, _, err = self.ssh.run_on_node( first_host, f"ceph-mon -i {first_node.name} --inject-monmap /tmp/monmap", first_node.is_local, timeout=30, ) if rc == 0: print(f" [{first_node.name}] MON-Map injiziert") else: print(f" [{first_node.name}] FEHLER MON-Map reinject: {err}") self.ssh.run_on_node(first_host, "rm -f /tmp/monmap", first_node.is_local) return # Start first MON so we can get the authoritative map print(f" [{first_node.name}] Starte ceph-mon (Primary)...") self.ssh.run_on_node( first_host, f"systemctl start ceph-mon@{first_node.name}", first_node.is_local, timeout=30, ) # Give it a moment to initialize time.sleep(3) # --- Phase 2: Get authoritative monmap from running MON --- if remaining_nodes: print(f"\n Hole autoritative MON-Map vom laufenden MON...") rc, _, err = self.ssh.run_on_node( first_host, "ceph mon getmap -o /tmp/monmap_auth", first_node.is_local, timeout=30, ) if rc == 0: # Use authoritative map from running MON monmap_path = "/tmp/monmap_auth" print(f" Autoritative MON-Map erhalten") else: # Fallback: use the manually built map print(f" WARNUNG: Konnte autoritative Map nicht holen ({err})") print(f" Verwende manuell erstellte Map als Fallback") monmap_path = "/tmp/monmap" # --- Phase 3: Inject authoritative map into remaining nodes --- for node in remaining_nodes: new_host = node.new_ip if not node.is_local else node.ssh_host # Copy monmap from first node to this node via SSH if first_node.is_local: # First node is local: SCP the map to remote node rc, _, err = self.ssh.execute_local( f"sshpass -p '{self.ssh.ssh_password}' " f"scp -o StrictHostKeyChecking=no " f"-o PubkeyAuthentication=no " f"-P {self.ssh.ssh_port} " f"{monmap_path} " f"{self.ssh.ssh_user}@{new_host}:/tmp/monmap", timeout=30, ) elif node.is_local: # This node is local: SCP from remote first node rc, _, err = self.ssh.execute_local( f"sshpass -p '{self.ssh.ssh_password}' " f"scp -o StrictHostKeyChecking=no " f"-o PubkeyAuthentication=no " f"-P {self.ssh.ssh_port} " f"{self.ssh.ssh_user}@{first_host}:{monmap_path} " f"/tmp/monmap", timeout=30, ) else: # Both remote: read from first, write to second rc_read, stdout, _ = self.ssh.execute( first_host, f"base64 {monmap_path}", timeout=30, ) if rc_read == 0: rc, _, err = self.ssh.execute( new_host, f"echo '{stdout.strip()}' | base64 -d > /tmp/monmap", timeout=30, ) else: rc = -1 err = "Konnte monmap nicht vom Primary lesen" if rc != 0: print(f" [{node.name}] WARNUNG: monmap kopieren fehlgeschlagen: {err}") print(f" [{node.name}] Erstelle Map manuell als Fallback...") # Fallback: build map manually on this node self._update_monmap_manual(node, new_host, mon_nodes) continue # Inject monmap rc, _, err = self.ssh.run_on_node( new_host, f"ceph-mon -i {node.name} --inject-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] Autoritative MON-Map injiziert") else: print(f" [{node.name}] FEHLER MON-Map reinject: {err}") # Cleanup self.ssh.run_on_node(new_host, "rm -f /tmp/monmap", node.is_local) # Cleanup on first node self.ssh.run_on_node( first_host, "rm -f /tmp/monmap /tmp/monmap_auth", first_node.is_local, ) def _update_monmap_manual(self, node, host: str, mon_nodes: list): """Fallback: manually build and inject monmap on a single node.""" rc, _, err = self.ssh.run_on_node( host, f"ceph-mon -i {node.name} --extract-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc != 0: print(f" [{node.name}] FEHLER: monmap extrahieren fehlgeschlagen") return for mon_name, _ in mon_nodes: self.ssh.run_on_node( host, f"monmaptool --rm {mon_name} /tmp/monmap 2>/dev/null", node.is_local, timeout=10, ) for mon_name, new_ip in mon_nodes: rc, _, _ = self.ssh.run_on_node( host, f"monmaptool --addv {mon_name} " f"[v2:{new_ip}:3300/0,v1:{new_ip}:6789/0] /tmp/monmap", node.is_local, timeout=10, ) if rc != 0: self.ssh.run_on_node( host, f"monmaptool --add {mon_name} {new_ip}:6789 /tmp/monmap", node.is_local, timeout=10, ) rc, _, err = self.ssh.run_on_node( host, f"ceph-mon -i {node.name} --inject-monmap /tmp/monmap", node.is_local, timeout=30, ) if rc == 0: print(f" [{node.name}] MON-Map manuell aktualisiert (Fallback)") else: print(f" [{node.name}] FEHLER MON-Map reinject: {err}") self.ssh.run_on_node(host, "rm -f /tmp/monmap", node.is_local)