feat: Einzeldatei-Restore aus Backups (lokal + SFTP)

Backup-Dateien koennen jetzt durchsucht werden ohne einen
Komplett-Restore durchfuehren zu muessen:

- "Einzelne Dateien durchsuchen" Button bei lokalen ZIP-Backups
- "Durchsuchen" Button bei jeder SFTP-Backup-Version
- Datei-Browser-Dialog mit:
  - Filterfeld zum Suchen nach Dateinamen
  - Dateianzahl-Anzeige (gefiltert/gesamt)
  - Icons nach Typ (DB, Metadaten, User-Dateien)
  - Download-Button: Einzelne Datei herunterladen
  - Restore-Button: Einzelne Datei direkt ins Live-System
    wiederherstellen (nur fuer files/-Eintraege)
- Browse-Session wird serverseitig verwaltet und beim Schliessen
  des Dialogs automatisch aufgeraeumt

Backend: /admin/restore/browse, /browse/<id>/download/<path>,
         /browse/<id>/restore-file, /browse/<id>/close
         + SFTP: /targets/<id>/versions/<name>/browse

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Stefan Hacker
2026-04-11 18:13:16 +02:00
parent d42d6d5d96
commit 10fde2396d
2 changed files with 323 additions and 2 deletions
+183
View File
@@ -355,6 +355,189 @@ def _perform_restore(zip_path):
return stats
# ========== Single File Browse & Restore ==========
def _list_zip_contents(zip_path):
"""List files inside a backup ZIP with metadata."""
entries = []
with zipfile.ZipFile(zip_path, 'r') as zf:
for info in zf.infolist():
if info.is_dir():
continue
entries.append({
'path': info.filename,
'size': info.file_size,
'compressed_size': info.compress_size,
'modified': datetime(*info.date_time).isoformat() if info.date_time else None,
})
return entries
def _extract_single_file(zip_path, file_path):
"""Extract a single file from backup ZIP. Returns bytes + filename."""
with zipfile.ZipFile(zip_path, 'r') as zf:
if file_path not in zf.namelist():
return None, None
data = zf.read(file_path)
filename = os.path.basename(file_path)
return data, filename
def _restore_single_file_to_cloud(zip_path, file_path_in_zip):
"""Restore a single file from backup into the live data/files directory."""
upload_path = Path(current_app.config['UPLOAD_PATH'])
if not file_path_in_zip.startswith('files/'):
return False, 'Nur Dateien aus dem files/-Verzeichnis koennen wiederhergestellt werden'
rel_path = file_path_in_zip[len('files/'):]
with zipfile.ZipFile(zip_path, 'r') as zf:
if file_path_in_zip not in zf.namelist():
return False, 'Datei nicht im Backup gefunden'
dest = upload_path / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
with zf.open(file_path_in_zip) as src, open(str(dest), 'wb') as dst:
shutil.copyfileobj(src, dst)
return True, f'Datei {rel_path} wiederhergestellt'
# --- Local ZIP: browse & single restore ---
@api_bp.route('/admin/restore/browse', methods=['POST'])
@admin_required
def browse_local_backup():
"""Upload a ZIP and list its contents (without restoring)."""
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei gesendet'}), 400
backup_file = request.files['file']
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp:
backup_file.save(tmp.name)
tmp_path = tmp.name
try:
if not zipfile.is_zipfile(tmp_path):
os.unlink(tmp_path)
return jsonify({'error': 'Ungueltige ZIP-Datei'}), 400
entries = _list_zip_contents(tmp_path)
# Store path temporarily for subsequent file requests
browse_id = str(uuid.uuid4())
_active_uploads[browse_id] = {
'zip_path': tmp_path,
'created_at': datetime.now(timezone.utc).isoformat(),
'type': 'browse',
}
return jsonify({
'browse_id': browse_id,
'files': entries,
'total': len(entries),
}), 200
except Exception as e:
os.unlink(tmp_path)
return jsonify({'error': str(e)}), 500
@api_bp.route('/admin/restore/browse/<browse_id>/download/<path:file_path>', methods=['GET'])
@admin_required
def download_file_from_backup(browse_id, file_path):
"""Download a single file from a browsed backup ZIP."""
if browse_id not in _active_uploads:
return jsonify({'error': 'Browse-Session abgelaufen'}), 404
info = _active_uploads[browse_id]
zip_path = info.get('zip_path', '')
if not os.path.exists(zip_path):
return jsonify({'error': 'ZIP nicht mehr verfuegbar'}), 404
data, filename = _extract_single_file(zip_path, file_path)
if data is None:
return jsonify({'error': 'Datei nicht gefunden'}), 404
import mimetypes
mime = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
return Response(data, mimetype=mime, headers={
'Content-Disposition': f'attachment; filename="{filename}"',
})
@api_bp.route('/admin/restore/browse/<browse_id>/restore-file', methods=['POST'])
@admin_required
def restore_single_from_local(browse_id):
"""Restore a single file from browsed backup into the live system."""
if browse_id not in _active_uploads:
return jsonify({'error': 'Browse-Session abgelaufen'}), 404
data = request.get_json()
file_path = data.get('file_path', '')
if not file_path:
return jsonify({'error': 'file_path erforderlich'}), 400
info = _active_uploads[browse_id]
zip_path = info.get('zip_path', '')
if not os.path.exists(zip_path):
return jsonify({'error': 'ZIP nicht mehr verfuegbar'}), 404
success, message = _restore_single_file_to_cloud(zip_path, file_path)
if success:
return jsonify({'message': message}), 200
return jsonify({'error': message}), 400
@api_bp.route('/admin/restore/browse/<browse_id>/close', methods=['POST'])
@admin_required
def close_browse_session(browse_id):
"""Clean up a browse session."""
if browse_id in _active_uploads:
info = _active_uploads.pop(browse_id)
zip_path = info.get('zip_path', '')
if zip_path and os.path.exists(zip_path):
os.unlink(zip_path)
return jsonify({'message': 'Session geschlossen'}), 200
# --- SFTP: browse & single restore ---
@api_bp.route('/admin/backup/targets/<int:target_id>/versions/<version_name>/browse', methods=['POST'])
@admin_required
def browse_sftp_version(target_id, version_name):
"""Download a version from SFTP and list its contents."""
from app.models.backup_target import BackupTarget
from app.services.sftp_backup import download_version_from_sftp
target = db.session.get(BackupTarget, target_id)
if not target:
return jsonify({'error': 'Nicht gefunden'}), 404
try:
zip_path = download_version_from_sftp(target, version_name)
entries = _list_zip_contents(zip_path)
browse_id = str(uuid.uuid4())
_active_uploads[browse_id] = {
'zip_path': zip_path,
'created_at': datetime.now(timezone.utc).isoformat(),
'type': 'sftp_browse',
}
return jsonify({
'browse_id': browse_id,
'files': entries,
'total': len(entries),
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# ========== SFTP Backup Targets ==========
@api_bp.route('/admin/backup/targets', methods=['GET'])