From 50385faa02de38bbaffc3e3b46bc70060022930c Mon Sep 17 00:00:00 2001 From: Stefan Hacker Date: Sun, 12 Apr 2026 09:50:44 +0200 Subject: [PATCH] feat: Echtzeit-Sync via SSE + Journal-basierter 3-Wege-Vergleich Desktop-Client komplett ueberarbeitet nach Nextcloud-Vorbild: - Persistentes SQLite-Journal (journal.rs) speichert letzten bekannten Stand pro Datei - ueberlebt Client-Neustarts (Hauptbug behoben). - Engine.rs neu: 3-Wege-Vergleich Local <-> Journal <-> Server mit sauberer Konflikt-Kopie (inkl. Username + Zeitstempel). - Loesch-Propagation: Lokal geloeschte Dateien landen im Server- Papierkorb des Owners (auch bei Freigaben). Auf dem Server geloeschte Dateien werden lokal entfernt. - Lock-Flow repariert: frischer Token bei jedem Call, Fehler-Feedback. Echtzeit-Sync: - Backend: SSE-Endpoint /api/sync/events mit In-Memory-Broadcaster. Events bei Create/Update/Delete/Lock/Unlock, Zustellung an Owner plus alle User mit Share-Permission. - Client: persistente SSE-Verbindung mit Auto-Reconnect. Events triggern sofortigen Sync (<100ms). 30s-Polling bleibt als Fallback fuer Netzwerk-Aussetzer. Weitere Fixes: - /api/sync/tree filtert is_trashed=False (Papierkorb wird nicht mehr an Clients gesynct). - Web-GUI: Lock/Unlock-Buttons pro Datei, Admin darf fremde Locks zwangsweise loesen. Rename/Delete disabled bei fremdem Lock. - Lock-Check im Backend bei PUT/DELETE (423 Locked Response). - Background-Sync nur noch einmal pro Prozess gestartet, liest sync_paths pro Iteration neu - add/remove wirkt sofort, kein Client-Neustart mehr noetig. - Watcher werden pro Sync-Pfad individuell verwaltet. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/api/files.py | 62 +- backend/app/services/events.py | 81 ++ clients/desktop/src-tauri/Cargo.lock | 13 +- clients/desktop/src-tauri/src/lib.rs | 164 +++- clients/desktop/src-tauri/src/sync/api.rs | 14 + clients/desktop/src-tauri/src/sync/engine.rs | 791 +++++++++--------- clients/desktop/src-tauri/src/sync/journal.rs | 120 +++ clients/desktop/src-tauri/src/sync/mod.rs | 1 + clients/desktop/src-tauri/src/sync/watcher.rs | 3 +- clients/desktop/src/App.vue | 9 + frontend/src/views/FilesView.vue | 39 + 11 files changed, 849 insertions(+), 448 deletions(-) create mode 100644 backend/app/services/events.py create mode 100644 clients/desktop/src-tauri/src/sync/journal.rs diff --git a/backend/app/api/files.py b/backend/app/api/files.py index 46884d4..a074615 100644 --- a/backend/app/api/files.py +++ b/backend/app/api/files.py @@ -16,6 +16,21 @@ from app.api.auth import token_required from app.extensions import db, bcrypt from app.models.file import File, FilePermission, ShareLink from app.models.file_lock import FileLock +from app.services.events import broadcaster, notify_file_change + + +def _share_recipients(file_obj): + """Return a list of user ids (besides the owner) that should see changes + to this file because they have a direct share permission on it or on + any of its ancestor folders.""" + ids = set() + cur = file_obj + while cur is not None: + for p in FilePermission.query.filter_by(file_id=cur.id).all(): + ids.add(p.user_id) + cur = cur.parent + ids.discard(file_obj.owner_id) + return list(ids) def _user_upload_dir(user_id): @@ -137,6 +152,8 @@ def create_folder(): ) db.session.add(folder) db.session.commit() + notify_file_change(folder.owner_id, folder.id, 'created', + shared_with=_share_recipients(folder)) return jsonify(folder.to_dict()), 201 @@ -228,6 +245,8 @@ def upload_file(): existing.checksum = checksum existing.updated_at = datetime.now(timezone.utc) db.session.commit() + notify_file_change(existing.owner_id, existing.id, 'updated', + shared_with=_share_recipients(existing)) return jsonify(existing.to_dict()), 200 file_obj = File( @@ -242,6 +261,8 @@ def upload_file(): ) db.session.add(file_obj) db.session.commit() + notify_file_change(file_obj.owner_id, file_obj.id, 'created', + shared_with=_share_recipients(file_obj)) return jsonify(file_obj.to_dict()), 201 @@ -306,6 +327,11 @@ def update_file(file_id): if err: return err + # Lock-Check: fremder Lock blockiert Aenderungen (admin kann durch) + lock = FileLock.get_lock(file_id) + if lock and lock.locked_by != user.id and user.role != 'admin': + return jsonify({'error': f'Datei ist von {lock.user.username} ausgecheckt'}), 423 + data = request.get_json() if 'name' in data: name = data['name'].strip() @@ -331,6 +357,8 @@ def update_file(file_id): f.updated_at = datetime.now(timezone.utc) db.session.commit() + notify_file_change(f.owner_id, f.id, 'updated', + shared_with=_share_recipients(f)) return jsonify(f.to_dict()), 200 @@ -346,9 +374,18 @@ def delete_file(file_id): if not f or f.owner_id != user.id: return jsonify({'error': 'Zugriff verweigert'}), 403 + # Lock-Check + lock = FileLock.get_lock(file_id) + if lock and lock.locked_by != user.id and user.role != 'admin': + return jsonify({'error': f'Datei ist von {lock.user.username} ausgecheckt'}), 423 + + # Capture recipients BEFORE we detach the file from its parent tree + recipients = _share_recipients(f) + owner_id = f.owner_id # Soft-delete: move to trash _trash_recursive(f) db.session.commit() + notify_file_change(owner_id, f.id, 'deleted', shared_with=recipients) return jsonify({'message': 'In Papierkorb verschoben'}), 200 @@ -1014,6 +1051,8 @@ def lock_file(file_id): ) db.session.add(lock) db.session.commit() + notify_file_change(f.owner_id, f.id, 'locked', + shared_with=_share_recipients(f)) return jsonify(lock.to_dict()), 200 @@ -1031,6 +1070,10 @@ def unlock_file(file_id): db.session.delete(lock) db.session.commit() + f = db.session.get(File, file_id) + if f: + notify_file_change(f.owner_id, f.id, 'unlocked', + shared_with=_share_recipients(f)) return jsonify({'message': 'Datei entsperrt'}), 200 @@ -1088,7 +1131,7 @@ def sync_tree(): user = request.current_user def _build_tree(parent_id): - files = File.query.filter_by(owner_id=user.id, parent_id=parent_id)\ + files = File.query.filter_by(owner_id=user.id, parent_id=parent_id, is_trashed=False)\ .order_by(File.is_folder.desc(), File.name).all() result = [] for f in files: @@ -1112,6 +1155,23 @@ def sync_tree(): return jsonify({'tree': _build_tree(None)}), 200 +@api_bp.route('/sync/events', methods=['GET']) +@token_required +def sync_events(): + """Server-Sent Events stream: real-time file change notifications.""" + user = request.current_user + user_id = user.id + + def event_stream(): + yield from broadcaster.stream(user_id) + + resp = Response(event_stream(), mimetype='text/event-stream') + resp.headers['Cache-Control'] = 'no-cache' + resp.headers['X-Accel-Buffering'] = 'no' # disable nginx buffering + resp.headers['Connection'] = 'keep-alive' + return resp + + @api_bp.route('/sync/changes', methods=['GET']) @token_required def sync_changes(): diff --git a/backend/app/services/events.py b/backend/app/services/events.py new file mode 100644 index 0000000..92f95d5 --- /dev/null +++ b/backend/app/services/events.py @@ -0,0 +1,81 @@ +"""In-memory event broadcaster for SSE clients. + +Each logged-in user can have multiple connected clients (desktop, web, +mobile). Every client gets its own queue. Mutating file operations push +an event into the queues of every affected user. +""" +from __future__ import annotations + +import json +import queue +import threading +import time +from typing import Iterable + + +class EventBroadcaster: + def __init__(self) -> None: + self._lock = threading.Lock() + # user_id -> list[queue.Queue] + self._subs: dict[int, list[queue.Queue]] = {} + + def subscribe(self, user_id: int) -> queue.Queue: + q: queue.Queue = queue.Queue(maxsize=256) + with self._lock: + self._subs.setdefault(user_id, []).append(q) + return q + + def unsubscribe(self, user_id: int, q: queue.Queue) -> None: + with self._lock: + lst = self._subs.get(user_id) + if not lst: + return + try: + lst.remove(q) + except ValueError: + pass + if not lst: + self._subs.pop(user_id, None) + + def publish(self, user_ids: Iterable[int], event: dict) -> None: + payload = dict(event) + payload.setdefault('ts', time.time()) + with self._lock: + for uid in set(user_ids): + for q in self._subs.get(uid, []): + try: + q.put_nowait(payload) + except queue.Full: + pass # slow client - drop event + + def stream(self, user_id: int): + """Generator yielding SSE-formatted strings for one client.""" + q = self.subscribe(user_id) + try: + # Initial hello so the client knows it's connected + yield f"event: hello\ndata: {json.dumps({'user_id': user_id})}\n\n" + while True: + try: + event = q.get(timeout=20.0) + except queue.Empty: + # Heartbeat / keepalive comment - also keeps proxies happy + yield ": keepalive\n\n" + continue + kind = event.get('type', 'change') + yield f"event: {kind}\ndata: {json.dumps(event)}\n\n" + finally: + self.unsubscribe(user_id, q) + + +broadcaster = EventBroadcaster() + + +def notify_file_change(owner_id: int, file_id: int | None, change: str, + shared_with: Iterable[int] = ()) -> None: + """Emit a file change event to the owner plus any users with share access.""" + recipients = [owner_id, *shared_with] + broadcaster.publish(recipients, { + 'type': 'file', + 'change': change, # 'created' | 'updated' | 'deleted' | 'locked' | 'unlocked' + 'file_id': file_id, + }) diff --git a/clients/desktop/src-tauri/Cargo.lock b/clients/desktop/src-tauri/Cargo.lock index b2b8daf..90d0cd9 100644 --- a/clients/desktop/src-tauri/Cargo.lock +++ b/clients/desktop/src-tauri/Cargo.lock @@ -2009,16 +2009,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "keyring" -version = "3.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eebcc3aff044e5944a8fbaf69eb277d11986064cba30c468730e8b9909fb551c" -dependencies = [ - "log", - "zeroize", -] - [[package]] name = "kqueue" version = "1.1.1" @@ -2248,10 +2238,11 @@ dependencies = [ name = "minicloud-sync" version = "0.1.0" dependencies = [ + "base64 0.22.1", "chrono", "dirs", - "keyring", "notify", + "open", "reqwest 0.12.28", "rusqlite", "serde", diff --git a/clients/desktop/src-tauri/src/lib.rs b/clients/desktop/src-tauri/src/lib.rs index e37eea8..00bdbdc 100644 --- a/clients/desktop/src-tauri/src/lib.rs +++ b/clients/desktop/src-tauri/src/lib.rs @@ -1,6 +1,7 @@ mod sync; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tauri::{ @@ -12,6 +13,7 @@ use tauri::{ use sync::api::MiniCloudApi; use sync::config::AppConfig; use sync::engine::{SyncEngine, SyncMode, SyncPath}; +use sync::journal::Journal; use sync::watcher::{FileWatcher, ChangeKind}; struct AppState { @@ -22,6 +24,8 @@ struct AppState { locked_files: Mutex>, // file IDs we have locked on server sync_running: Arc>, sync_paths: Mutex>, + journal: Arc, + background_started: AtomicBool, } // --- Auth --- @@ -133,6 +137,11 @@ fn add_sync_path( state.sync_paths.lock().unwrap().push(sp.clone()); + // Also attach a filesystem watcher for this path so background sync picks it up + if let Ok(w) = FileWatcher::new(&local) { + state.watchers.lock().unwrap().push(w); + } + // Save to config let mut config = AppConfig::load(); config.sync_paths = state.sync_paths.lock().unwrap().clone(); @@ -143,8 +152,18 @@ fn add_sync_path( #[tauri::command] fn remove_sync_path(state: State<'_, AppState>, id: String) -> Result { + // Capture the local_dir of the removed path so we can drop its watcher too + let removed_dir = { + let paths = state.sync_paths.lock().unwrap(); + paths.iter().find(|p| p.id == id).map(|p| p.local_dir.clone()) + }; state.sync_paths.lock().unwrap().retain(|p| p.id != id); + if let Some(dir) = removed_dir { + let target = PathBuf::from(&dir); + state.watchers.lock().unwrap().retain(|w| w.path != target); + } + let mut config = AppConfig::load(); config.sync_paths = state.sync_paths.lock().unwrap().clone(); let _ = config.save(); @@ -183,23 +202,34 @@ async fn start_sync(app: AppHandle, state: State<'_, AppState>) -> Result) -> Result, String> let mut guard = state.sync_engine.lock().unwrap(); guard.take().ok_or("Sync nicht gestartet")? }; + // Sync engine's API token with current state (refresh_token may have updated it) + if let Some(ref api) = *state.api.lock().unwrap() { + engine.api.access_token = api.access_token.clone(); + } let result = engine.sync_all().await; *state.sync_engine.lock().unwrap() = Some(engine); result @@ -262,13 +296,17 @@ async fn open_cloud_file(state: State<'_, AppState>, cloud_path: String) -> Resu // User can "unmark offline" or "unlock" via right-click std::fs::remove_file(&path).ok(); - // Lock on server (prevents others from editing) - match api.lock_file(file_id, "Desktop Sync Client").await { + // Lock on server (fresh token) - prevents others from editing + let fresh_api = state.api.lock().unwrap().clone().ok_or("Nicht eingeloggt")?; + match fresh_api.lock_file(file_id, "Desktop Sync Client").await { Ok(_) => { eprintln!("[OpenCloud] Locked on server"); state.locked_files.lock().unwrap().push(file_id); } - Err(e) => eprintln!("[OpenCloud] Lock failed (file may be locked by someone else): {}", e), + Err(e) => { + eprintln!("[OpenCloud] Lock failed: {}", e); + return Err(format!("Datei heruntergeladen, aber Sperre fehlgeschlagen: {}", e)); + } } // Open with default application for this file type @@ -446,7 +484,8 @@ fn start_background_sync( app: AppHandle, sync_running: Arc>, api: MiniCloudApi, - paths: Vec, + journal: Arc, + username: String, ) { // Shared flag: watcher sets true when changes detected, sync thread checks it let watcher_triggered = Arc::new(Mutex::new(false)); @@ -454,13 +493,13 @@ fn start_background_sync( // Main sync thread: syncs on watcher trigger OR every 60s as fallback let app_sync = app.clone(); let api_sync = api.clone(); - let paths_sync = paths.clone(); let trigger_sync = watcher_triggered.clone(); + let journal_sync = journal.clone(); + let username_sync = username.clone(); std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); - let mut engine = SyncEngine::new(api_sync); - engine.sync_paths = paths_sync; + let mut engine = SyncEngine::new(api_sync, journal_sync, username_sync); let mut idle_counter = 0u32; loop { @@ -474,18 +513,38 @@ fn start_background_sync( *triggered = false; true } else { - // Fallback: sync every 60 seconds even without changes - idle_counter >= 60 + // Fallback: sync every 30 seconds even without changes + idle_counter >= 30 } }; if !should_sync { continue; } idle_counter = 0; + // Re-read sync_paths from state every iteration so add/remove + // takes effect without restarting the thread. + let paths_now = { + let state = app_sync.state::(); + state.sync_paths.lock().unwrap().clone() + }; + if paths_now.is_empty() { + // Nothing to sync - idle quietly. + continue; + } + engine.sync_paths = paths_now; + // Run sync *sync_running.lock().unwrap() = true; let _ = app_sync.emit("sync-status", "syncing"); + // Refresh engine's API token from state (token may have been refreshed) + { + let state = app_sync.state::(); + if let Some(ref api) = *state.api.lock().unwrap() { + engine.api.access_token = api.access_token.clone(); + } + } + match rt.block_on(engine.sync_all()) { Ok(log) => { if !log.is_empty() { @@ -534,6 +593,73 @@ fn start_background_sync( } }); + // Server-Sent Events: real-time change notifications from server + let app_sse = app.clone(); + let trigger_sse = watcher_triggered.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + loop { + let (server_url, token) = { + let state = app_sse.state::(); + let guard = state.api.lock().unwrap(); + match guard.as_ref() { + Some(a) => (a.server_url.clone(), a.access_token.clone()), + None => { drop(guard); std::thread::sleep(Duration::from_secs(3)); continue; } + } + }; + if token.is_empty() { + std::thread::sleep(Duration::from_secs(3)); + continue; + } + + let url = format!("{}/api/sync/events?token={}", server_url, token); + let trigger = trigger_sse.clone(); + let app_cb = app_sse.clone(); + + let result: Result<(), String> = rt.block_on(async move { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(10)) + .build() + .map_err(|e| e.to_string())?; + let mut resp = client.get(&url).send().await.map_err(|e| e.to_string())?; + if !resp.status().is_success() { + return Err(format!("SSE status {}", resp.status())); + } + eprintln!("[SSE] Connected"); + let _ = app_cb.emit("sse-status", "connected"); + + let mut buffer = String::new(); + while let Some(chunk) = resp.chunk().await.map_err(|e| e.to_string())? { + buffer.push_str(&String::from_utf8_lossy(&chunk)); + while let Some(pos) = buffer.find("\n\n") { + let raw = buffer[..pos].to_string(); + buffer.drain(..pos + 2); + let lines: Vec<&str> = raw.lines().collect(); + // Skip keepalive/comment lines (start with ':') + if lines.iter().all(|l| l.starts_with(':') || l.is_empty()) { + continue; + } + let mut event_name = String::from("message"); + for l in &lines { + if let Some(v) = l.strip_prefix("event: ") { event_name = v.to_string(); } + } + if event_name == "hello" { continue; } + // Any real event -> trigger sync + *trigger.lock().unwrap() = true; + let _ = app_cb.emit("sse-event", event_name); + } + } + Ok(()) + }); + + if let Err(e) = result { + eprintln!("[SSE] Disconnected: {}", e); + let _ = app_sse.emit("sse-status", format!("reconnecting: {}", e)); + } + std::thread::sleep(Duration::from_secs(3)); + } + }); + // File watcher: detects changes and triggers immediate sync let app_w = app.clone(); let trigger_w = watcher_triggered.clone(); @@ -651,6 +777,8 @@ pub fn run() { sync_running: Arc::new(Mutex::new(false)), locked_files: Mutex::new(Vec::new()), sync_paths: Mutex::new(Vec::new()), + journal: Arc::new(Journal::open().expect("Journal konnte nicht geoeffnet werden")), + background_started: AtomicBool::new(false), }) .on_window_event(|window, event| { // Close button = minimize to tray instead of quit diff --git a/clients/desktop/src-tauri/src/sync/api.rs b/clients/desktop/src-tauri/src/sync/api.rs index 242311d..5f0ff08 100644 --- a/clients/desktop/src-tauri/src/sync/api.rs +++ b/clients/desktop/src-tauri/src/sync/api.rs @@ -241,6 +241,20 @@ impl MiniCloudApi { Ok(()) } + pub async fn delete_file(&self, file_id: i64) -> Result<(), String> { + let url = format!("{}/api/files/{}", self.server_url, file_id); + let resp = self.client.delete(&url) + .header("Authorization", self.auth_header()) + .send() + .await + .map_err(|e| format!("Delete Fehler: {}", e))?; + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("Delete fehlgeschlagen: {}", text)); + } + Ok(()) + } + pub async fn heartbeat(&self, file_id: i64) -> Result<(), String> { let url = format!("{}/api/files/{}/heartbeat", self.server_url, file_id); self.client.post(&url) diff --git a/clients/desktop/src-tauri/src/sync/engine.rs b/clients/desktop/src-tauri/src/sync/engine.rs index 8a79ad3..b682aea 100644 --- a/clients/desktop/src-tauri/src/sync/engine.rs +++ b/clients/desktop/src-tauri/src/sync/engine.rs @@ -1,27 +1,28 @@ use crate::sync::api::{FileEntry, MiniCloudApi}; +use crate::sync::journal::{Journal, JournalEntry}; use sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; -/// A configured sync path: maps a server folder to a local folder +/// A configured sync path: maps a server folder to a local folder. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncPath { - pub id: String, // unique ID - pub server_path: String, // e.g. "/" (root) or "/Projekte/2026" - pub server_folder_id: Option, // server folder ID (None = root) - pub local_dir: String, // local directory path - pub mode: SyncMode, // virtual or full + pub id: String, + pub server_path: String, + pub server_folder_id: Option, + pub local_dir: String, + pub mode: SyncMode, pub enabled: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum SyncMode { - Virtual, // .cloud placeholder files, download on demand - Full, // full sync, all files downloaded + Virtual, + Full, } -/// Cloud placeholder file content (small JSON inside .cloud files) +/// `.cloud` placeholder content (JSON payload of the 0-byte-ish placeholder). #[derive(Debug, Serialize, Deserialize)] struct CloudPlaceholder { id: i64, @@ -35,484 +36,440 @@ struct CloudPlaceholder { pub struct SyncEngine { pub api: MiniCloudApi, pub sync_paths: Vec, - last_sync: Option, - /// Checksums from last sync - used to detect who changed a file - /// Key: file path (relative), Value: server checksum at last sync - known_checksums: HashMap, + pub journal: Arc, + pub username: String, } impl SyncEngine { - pub fn new(api: MiniCloudApi) -> Self { - Self { api, sync_paths: Vec::new(), last_sync: None, known_checksums: HashMap::new() } + pub fn new(api: MiniCloudApi, journal: Arc, username: String) -> Self { + Self { api, sync_paths: Vec::new(), journal, username } } - /// Sync all configured paths + /// Sync every configured path. pub async fn sync_all(&mut self) -> Result, String> { - let mut all_logs = Vec::new(); - + let mut log = Vec::new(); let tree = self.api.get_sync_tree().await?; - let sync_paths = self.sync_paths.clone(); + for sp in &sync_paths { if !sp.enabled { continue; } - let local_dir = PathBuf::from(&sp.local_dir); std::fs::create_dir_all(&local_dir).ok(); - // Find the server subtree for this sync path - let subtree = if sp.server_folder_id.is_some() { - find_subtree(&tree, sp.server_folder_id.unwrap()) - } else { - Some(tree.clone()) + let subtree = match sp.server_folder_id { + Some(id) => find_subtree(&tree, id).unwrap_or_default(), + None => tree.clone(), }; - if let Some(entries) = subtree { - let mut log = Vec::new(); - match sp.mode { - SyncMode::Virtual => { - self.sync_virtual(&entries, &local_dir, &sp.server_path, &mut log).await; - // Also upload new local files (not on server yet) - self.sync_upload_new(&entries, &local_dir, sp.server_folder_id, &mut log).await; - } - SyncMode::Full => { - self.sync_full_download(&entries, &local_dir, &mut log).await; - self.sync_full_upload(&entries, &local_dir, sp.server_folder_id, &mut log).await; - } - } - all_logs.extend(log); - } - } + // Phase 1: propagate deletions based on journal history. + self.detect_deletions(sp, &subtree, &local_dir, &mut log).await; - self.last_sync = Some(chrono::Utc::now().to_rfc3339()); - Ok(all_logs) + // Phase 2: normal sync (downloads, uploads, conflicts). + self.sync_dir(&subtree, &local_dir, "", sp.server_folder_id, sp, &mut log).await; + } + Ok(log) } - /// Virtual sync: create .cloud placeholder files - async fn sync_virtual(&mut self, entries: &[FileEntry], local_dir: &Path, - server_path: &str, log: &mut Vec) { - for entry in entries { - let local_path = local_dir.join(&entry.name); + /// Walks the journal for this sync path and reconciles existence: + /// - file was in journal and is gone locally but still on server -> delete on server + /// - file was in journal and is gone on server but still local -> delete locally + /// - file is gone on both sides -> clean journal entry + async fn detect_deletions( + &self, + sp: &SyncPath, + subtree: &[FileEntry], + local_root: &Path, + log: &mut Vec, + ) { + use std::collections::HashMap; + let mut server_files: HashMap = HashMap::new(); + collect_server_files(subtree, "", &mut server_files); + + for je in self.journal.list_for_sync(&sp.id) { + let local_real = local_root.join(&je.relative_path); + let local_cloud = { + let parent = local_real.parent().map(|p| p.to_path_buf()); + let fname = local_real.file_name().map(|n| n.to_string_lossy().to_string()); + match (parent, fname) { + (Some(p), Some(n)) => p.join(format!("{}.cloud", n)), + _ => PathBuf::new(), + } + }; + let local_exists = local_real.exists() || local_cloud.exists(); + let server_id = server_files.get(&je.relative_path).copied(); + + match (local_exists, server_id) { + (true, Some(_)) => { /* present on both sides - normal sync handles it */ } + (false, None) => { + let _ = self.journal.delete(&sp.id, &je.relative_path); + } + (false, Some(id)) => { + match self.api.delete_file(id).await { + Ok(_) => { + log.push(format!("Server-Papierkorb: {}", je.relative_path)); + let _ = self.journal.delete(&sp.id, &je.relative_path); + } + Err(e) => log.push(format!("Server-Delete-Fehler {}: {}", je.relative_path, e)), + } + } + (true, None) => { + std::fs::remove_file(&local_real).ok(); + std::fs::remove_file(&local_cloud).ok(); + let _ = self.journal.delete(&sp.id, &je.relative_path); + log.push(format!("Lokal geloescht: {}", je.relative_path)); + } + } + } + } + + /// Recursively sync a single directory level. + /// `rel_prefix` is the journal-relative path prefix (e.g. "", or "sub/dir/"). + async fn sync_dir( + &mut self, + server_entries: &[FileEntry], + local_dir: &Path, + rel_prefix: &str, + parent_id: Option, + sp: &SyncPath, + log: &mut Vec, + ) { + use std::collections::HashMap; + + let server_by_name: HashMap = server_entries + .iter().map(|e| (e.name.clone(), e)).collect(); + + // --- Pass 1: iterate server entries, reconcile each against local/journal --- + for entry in server_entries { + let rel = if rel_prefix.is_empty() { + entry.name.clone() + } else { + format!("{}/{}", rel_prefix, entry.name) + }; if entry.is_folder { - std::fs::create_dir_all(&local_path).ok(); + let sub_local = local_dir.join(&entry.name); + std::fs::create_dir_all(&sub_local).ok(); if let Some(children) = &entry.children { - let sub_path = format!("{}/{}", server_path.trim_end_matches('/'), entry.name); - Box::pin(self.sync_virtual(children, &local_path, &sub_path, log)).await; + Box::pin(self.sync_dir(children, &sub_local, &rel, Some(entry.id), sp, log)).await; } - } else { - // Check if real file exists (manually downloaded or offline-marked) - if local_path.exists() { - let local_hash = compute_file_hash(&local_path); - let server_hash = entry.checksum.as_deref().unwrap_or(""); - let file_key = format!("{}/{}", server_path, entry.name); - - if local_hash != server_hash { - if entry.locked.unwrap_or(false) { - log.push(format!("Zurueckgehalten (gesperrt): {}", entry.name)); - continue; - } - - // Check if WE changed the file locally - let last_known = self.known_checksums.get(&file_key); - let local_changed = match last_known { - Some(known) => local_hash != *known, // local differs from last sync - None => false, // first sync, don't assume local changed - }; - let server_changed = match last_known { - Some(known) => server_hash != known, // server differs from last sync - None => true, // first sync, trust server - }; - - if server_changed && !local_changed { - // Only server changed -> download - match self.api.download_file(entry.id, &local_path).await { - Ok(_) => log.push(format!("Server->Lokal: {}", entry.name)), - Err(e) => log.push(format!("Download-Fehler {}: {}", entry.name, e)), - } - } else if local_changed && !server_changed { - // Only local changed -> upload - match self.api.upload_file(&local_path, None).await { - Ok(_) => log.push(format!("Lokal->Server: {}", entry.name)), - Err(e) => log.push(format!("Upload-Fehler {}: {}", entry.name, e)), - } - } else { - // Both changed -> conflict! Download server, keep local as conflict copy - let conflict_name = format!("{} (Konflikt).{}", - local_path.file_stem().unwrap().to_string_lossy(), - local_path.extension().map(|e| e.to_string_lossy().to_string()).unwrap_or_default()); - let conflict_path = local_path.parent().unwrap().join(&conflict_name); - std::fs::rename(&local_path, &conflict_path).ok(); - match self.api.download_file(entry.id, &local_path).await { - Ok(_) => log.push(format!("KONFLIKT: {} (lokale Kopie: {})", entry.name, conflict_name)), - Err(e) => log.push(format!("Download-Fehler {}: {}", entry.name, e)), - } - } - } - - // Track current server checksum - self.known_checksums.insert(file_key, server_hash.to_string()); - continue; - } - - // Create or update .cloud placeholder - let cloud_path = local_dir.join(format!("{}.cloud", entry.name)); - let needs_update = if cloud_path.exists() { - // Check if server version changed - if let Ok(content) = std::fs::read_to_string(&cloud_path) { - if let Ok(old) = serde_json::from_str::(&content) { - old.checksum != entry.checksum.as_deref().unwrap_or("") - } else { true } - } else { true } - } else { true }; - - if needs_update { - let placeholder = CloudPlaceholder { - id: entry.id, - name: entry.name.clone(), - size: entry.size.unwrap_or(0), - checksum: entry.checksum.clone().unwrap_or_default(), - updated_at: entry.updated_at.clone().unwrap_or_default(), - server_path: format!("{}/{}", server_path.trim_end_matches('/'), entry.name), - }; - if let Ok(json) = serde_json::to_string_pretty(&placeholder) { - std::fs::write(&cloud_path, json).ok(); - if !cloud_path.exists() { - log.push(format!("Platzhalter: {}.cloud", entry.name)); - } - } - } - } - } - - // Remove .cloud files for deleted server files - if let Ok(dir_entries) = std::fs::read_dir(local_dir) { - for entry in dir_entries.flatten() { - let name = entry.file_name().to_string_lossy().to_string(); - if name.ends_with(".cloud") { - let real_name = name.trim_end_matches(".cloud"); - let exists_on_server = entries.iter().any(|e| e.name == real_name); - if !exists_on_server { - std::fs::remove_file(entry.path()).ok(); - log.push(format!("Entfernt: {}", name)); - } - } - } - } - } - - /// Upload new local files that don't exist on server yet (for both Virtual + Full mode) - async fn sync_upload_new(&mut self, server_entries: &[FileEntry], local_dir: &Path, - parent_id: Option, log: &mut Vec) { - let server_names: std::collections::HashSet = server_entries.iter() - .map(|e| e.name.clone()).collect(); - - let entries = match std::fs::read_dir(local_dir) { - Ok(e) => e, - Err(_) => return, - }; - - for entry in entries.flatten() { - let name = entry.file_name().to_string_lossy().to_string(); - let path = entry.path(); - - // Skip hidden, temp, .cloud files - if name.starts_with('.') || name.starts_with('~') - || name.ends_with(".tmp") || name.ends_with(".cloud") { continue; } - if path.is_dir() { - // New folder: create on server + recurse - if !server_names.contains(&name) { - match self.api.create_folder(&name, parent_id).await { - Ok(folder) => { - log.push(format!("Ordner erstellt: {}", name)); - Box::pin(self.sync_upload_new(&[], &path, Some(folder.id), log)).await; - } - Err(e) => log.push(format!("Ordner-Fehler {}: {}", name, e)), + self.reconcile_file(entry, local_dir, &rel, parent_id, sp, log).await; + } + + // --- Pass 2: iterate local entries, upload new local files/folders --- + let dir_iter = match std::fs::read_dir(local_dir) { + Ok(d) => d, + Err(_) => return, + }; + + for e in dir_iter.flatten() { + let name = e.file_name().to_string_lossy().to_string(); + if should_skip_name(&name) { continue; } + + let path = e.path(); + let is_dir = path.is_dir(); + // `.cloud` placeholders are stored locally under "foo.txt.cloud" + // but represent the server-side "foo.txt". + let real_name = name.trim_end_matches(".cloud").to_string(); + let is_placeholder = name.ends_with(".cloud") && !is_dir; + + // Already covered by server pass? + if server_by_name.contains_key(&real_name) { continue; } + if is_placeholder { continue; } // orphan placeholder - handled below + + let rel = if rel_prefix.is_empty() { + real_name.clone() + } else { + format!("{}/{}", rel_prefix, real_name) + }; + + if is_dir { + match self.api.create_folder(&real_name, parent_id).await { + Ok(folder) => { + log.push(format!("Ordner erstellt: {}", rel)); + self.upload_local_tree(&path, Some(folder.id), &rel, sp, log).await; } - } else { - // Existing folder: recurse into it - let sub = server_entries.iter().find(|e| e.name == name); - let children = sub.and_then(|e| e.children.as_ref()) - .map(|c| c.as_slice()).unwrap_or(&[]); - let sub_id = sub.map(|e| e.id); - Box::pin(self.sync_upload_new(children, &path, sub_id, log)).await; + Err(e) => log.push(format!("Ordner-Fehler {}: {}", rel, e)), } } else { - // New file: upload - if !server_names.contains(&name) { - match self.api.upload_file(&path, parent_id).await { - Ok(_) => log.push(format!("Hochgeladen: {}", name)), - Err(e) => log.push(format!("Upload-Fehler {}: {}", name, e)), - } - } else { - if let Some(se) = server_entries.iter().find(|e| e.name == name) { - if se.locked.unwrap_or(false) { - log.push(format!("Zurueckgehalten (gesperrt): {}", name)); - continue; - } - - let local_hash = compute_file_hash(&path); - let server_hash = se.checksum.as_deref().unwrap_or(""); - - if local_hash != server_hash { - let file_key = name.clone(); - let last_known = self.known_checksums.get(&file_key); - let local_changed = match last_known { - Some(known) => local_hash != *known, - None => false, - }; - let server_changed = match last_known { - Some(known) => server_hash != known, - None => true, - }; - - if server_changed && !local_changed { - match self.api.download_file(se.id, &path).await { - Ok(_) => log.push(format!("Server->Lokal: {}", name)), - Err(e) => log.push(format!("Download-Fehler {}: {}", name, e)), - } - } else if local_changed && !server_changed { - match self.api.upload_file(&path, parent_id).await { - Ok(_) => log.push(format!("Lokal->Server: {}", name)), - Err(e) => log.push(format!("Upload-Fehler {}: {}", name, e)), - } - } else { - // Both changed -> server wins, local becomes conflict copy - let ext = path.extension().map(|e| e.to_string_lossy().to_string()).unwrap_or_default(); - let stem = path.file_stem().unwrap().to_string_lossy(); - let conflict_path = path.parent().unwrap().join(format!("{} (Konflikt).{}", stem, ext)); - std::fs::rename(&path, &conflict_path).ok(); - match self.api.download_file(se.id, &path).await { - Ok(_) => log.push(format!("KONFLIKT: {} -> {}", name, conflict_path.file_name().unwrap().to_string_lossy())), - Err(e) => log.push(format!("Download-Fehler {}: {}", name, e)), - } - } - } - self.known_checksums.insert(name, server_hash.to_string()); + match self.api.upload_file(&path, parent_id).await { + Ok(fe) => { + log.push(format!("Hochgeladen: {}", rel)); + let checksum = fe.checksum.unwrap_or_default(); + let size = fe.size.unwrap_or(0); + let _ = self.journal.upsert(&JournalEntry { + sync_path_id: sp.id.clone(), + relative_path: rel.clone(), + file_id: Some(fe.id), + synced_checksum: checksum, + synced_size: size, + synced_mtime: fe.updated_at.unwrap_or_default(), + local_state: "offline".to_string(), + }); } + Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)), } } } - } - /// Full sync: download all files from server - async fn sync_full_download(&self, entries: &[FileEntry], local_dir: &Path, - log: &mut Vec) { - for entry in entries { - let local_path = local_dir.join(&entry.name); - - if entry.is_folder { - std::fs::create_dir_all(&local_path).ok(); - if let Some(children) = &entry.children { - Box::pin(self.sync_full_download(children, &local_path, log)).await; - } - } else { - if entry.locked.unwrap_or(false) { continue; } - - let needs_download = if local_path.exists() { - let local_hash = compute_file_hash(&local_path); - local_hash != entry.checksum.as_deref().unwrap_or("") + // --- Pass 3: clean up orphan .cloud placeholders for files gone from server --- + if let Ok(dir_iter) = std::fs::read_dir(local_dir) { + for e in dir_iter.flatten() { + let name = e.file_name().to_string_lossy().to_string(); + if !name.ends_with(".cloud") || e.path().is_dir() { continue; } + let real_name = name.trim_end_matches(".cloud"); + if server_by_name.contains_key(real_name) { continue; } + std::fs::remove_file(e.path()).ok(); + let rel = if rel_prefix.is_empty() { + real_name.to_string() } else { - true + format!("{}/{}", rel_prefix, real_name) }; - - // Remove stale .cloud placeholder - let cloud_path = local_dir.join(format!("{}.cloud", entry.name)); - if cloud_path.exists() { - std::fs::remove_file(&cloud_path).ok(); - } - - if needs_download { - match self.api.download_file(entry.id, &local_path).await { - Ok(_) => log.push(format!("Heruntergeladen: {}", entry.name)), - Err(e) => log.push(format!("Fehler {}: {}", entry.name, e)), - } - } + let _ = self.journal.delete(&sp.id, &rel); + log.push(format!("Entfernt (Server): {}", name)); } } } - /// Full sync: upload new/changed local files - async fn sync_full_upload(&mut self, server_entries: &[FileEntry], local_dir: &Path, - parent_id: Option, log: &mut Vec) { - let server_names: HashMap = server_entries.iter() - .map(|e| (e.name.clone(), e)) - .collect(); + /// Core 3-way reconciliation for a single server file. + async fn reconcile_file( + &self, + entry: &FileEntry, + local_dir: &Path, + rel: &str, + parent_id: Option, + sp: &SyncPath, + log: &mut Vec, + ) { + let real_path = local_dir.join(&entry.name); + let cloud_path = local_dir.join(format!("{}.cloud", entry.name)); + let journal_entry = self.journal.get(&sp.id, rel); + let server_hash = entry.checksum.clone().unwrap_or_default(); + let server_size = entry.size.unwrap_or(0); + let server_mtime = entry.updated_at.clone().unwrap_or_default(); - let entries = match std::fs::read_dir(local_dir) { - Ok(e) => e, - Err(_) => return, - }; + // Case A: real file exists locally = offline state + if real_path.exists() && !real_path.is_dir() { + // Avoid race: if placeholder still around, remove it + if cloud_path.exists() { std::fs::remove_file(&cloud_path).ok(); } - for entry in entries.flatten() { - let name = entry.file_name().to_string_lossy().to_string(); - let path = entry.path(); + let local_hash = compute_file_hash(&real_path); - // Skip hidden, temp, .cloud files - if name.starts_with('.') || name.starts_with('~') || name.ends_with(".tmp") - || name.ends_with(".cloud") { - continue; + if local_hash == server_hash { + // In sync - just (re)record journal + self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime); + return; } - if path.is_dir() { - if let Some(se) = server_names.get(&name) { - if let Some(children) = &se.children { - Box::pin(self.sync_full_upload(children, &path, Some(se.id), log)).await; + // Hashes differ. Locked by someone else? Hold back. + if entry.locked.unwrap_or(false) { + let by = entry.locked_by.clone().unwrap_or_default(); + if by != self.username { + log.push(format!("Zurueckgehalten (gesperrt von {}): {}", by, rel)); + return; + } + } + + let (local_changed, server_changed) = match &journal_entry { + Some(j) => (local_hash != j.synced_checksum, server_hash != j.synced_checksum), + None => (true, true), // unknown history - treat as conflict to be safe + }; + + if local_changed && !server_changed { + // Upload + match self.api.upload_file(&real_path, parent_id).await { + Ok(fe) => { + log.push(format!("Lokal->Server: {}", rel)); + let new_hash = fe.checksum.unwrap_or(local_hash.clone()); + self.journal_offline(sp, rel, entry, &new_hash, + fe.size.unwrap_or(server_size), + &fe.updated_at.unwrap_or(server_mtime.clone())); } - } else { - match self.api.create_folder(&name, parent_id).await { - Ok(folder) => { - log.push(format!("Ordner erstellt: {}", name)); - Box::pin(self.sync_full_upload(&[], &path, Some(folder.id), log)).await; - } - Err(e) => log.push(format!("Ordner-Fehler {}: {}", name, e)), + Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)), + } + } else if server_changed && !local_changed { + // Download + match self.api.download_file(entry.id, &real_path).await { + Ok(_) => { + log.push(format!("Server->Lokal: {}", rel)); + self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime); } + Err(e) => log.push(format!("Download-Fehler {}: {}", rel, e)), } } else { - if let Some(se) = server_names.get(&name) { - if se.locked.unwrap_or(false) { - log.push(format!("Zurueckgehalten (gesperrt): {}", name)); - continue; + // Both changed OR no journal -> conflict copy + let conflict_path = make_conflict_path(&real_path, &self.username); + std::fs::rename(&real_path, &conflict_path).ok(); + match self.api.download_file(entry.id, &real_path).await { + Ok(_) => { + log.push(format!("KONFLIKT: {} (lokal: {})", rel, + conflict_path.file_name().unwrap().to_string_lossy())); + self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime); } - let local_hash = compute_file_hash(&path); - let server_hash = se.checksum.as_deref().unwrap_or(""); - if local_hash != server_hash { - let last_known = self.known_checksums.get(&name); - let local_changed = match last_known { - Some(known) => local_hash != *known, - None => false, - }; - let server_changed = match last_known { - Some(known) => server_hash != known, - None => true, - }; + Err(e) => { + // Restore original + std::fs::rename(&conflict_path, &real_path).ok(); + log.push(format!("Download-Fehler {}: {}", rel, e)); + } + } + } + return; + } - if server_changed && !local_changed { - match self.api.download_file(se.id, &path).await { - Ok(_) => log.push(format!("Server->Lokal: {}", name)), - Err(e) => log.push(format!("Download-Fehler {}: {}", name, e)), - } - } else if local_changed && !server_changed { - match self.api.upload_file(&path, parent_id).await { - Ok(_) => log.push(format!("Lokal->Server: {}", name)), - Err(e) => log.push(format!("Upload-Fehler {}: {}", name, e)), - } - } else { - let ext = path.extension().map(|e| e.to_string_lossy().to_string()).unwrap_or_default(); - let stem = path.file_stem().unwrap().to_string_lossy(); - let conflict_path = path.parent().unwrap().join(format!("{} (Konflikt).{}", stem, ext)); - std::fs::rename(&path, &conflict_path).ok(); - match self.api.download_file(se.id, &path).await { - Ok(_) => log.push(format!("KONFLIKT: {} -> {}", name, conflict_path.file_name().unwrap().to_string_lossy())), - Err(e) => log.push(format!("Download-Fehler {}: {}", name, e)), - } - } - } - self.known_checksums.insert(name, server_hash.to_string()); + // Case B: local has a .cloud placeholder (or neither) = virtual state + // Virtual placeholders never have local edits, just keep them fresh. + let needs_write = match std::fs::read_to_string(&cloud_path) { + Ok(content) => match serde_json::from_str::(&content) { + Ok(old) => old.checksum != server_hash || old.id != entry.id, + Err(_) => true, + }, + Err(_) => true, + }; + + if needs_write { + let placeholder = CloudPlaceholder { + id: entry.id, + name: entry.name.clone(), + size: server_size, + checksum: server_hash.clone(), + updated_at: server_mtime.clone(), + server_path: rel.to_string(), + }; + if let Ok(json) = serde_json::to_string_pretty(&placeholder) { + if !cloud_path.exists() { + log.push(format!("Platzhalter: {}.cloud", entry.name)); } else { - // New file, not on server - match self.api.upload_file(&path, parent_id).await { - Ok(_) => log.push(format!("Hochgeladen: {}", name)), - Err(e) => log.push(format!("Upload-Fehler {}: {}", name, e)), - } + log.push(format!("Platzhalter aktualisiert: {}.cloud", entry.name)); + } + std::fs::write(&cloud_path, json).ok(); + } + } + + self.journal.upsert(&JournalEntry { + sync_path_id: sp.id.clone(), + relative_path: rel.to_string(), + file_id: Some(entry.id), + synced_checksum: server_hash, + synced_size: server_size, + synced_mtime: server_mtime, + local_state: "virtual".to_string(), + }).ok(); + + // If Full mode and no real file yet, download now + if sp.mode == SyncMode::Full && !real_path.exists() { + if let Err(e) = self.api.download_file(entry.id, &real_path).await { + log.push(format!("Full-Download-Fehler {}: {}", rel, e)); + } else { + std::fs::remove_file(&cloud_path).ok(); + log.push(format!("Heruntergeladen: {}", rel)); + // Update journal to offline + if let Some(mut j) = self.journal.get(&sp.id, rel) { + j.local_state = "offline".to_string(); + let _ = self.journal.upsert(&j); } } } } - /// Open a .cloud placeholder file: download the real file, rename, return path - #[allow(dead_code)] - pub async fn open_cloud_file(&self, cloud_path: &Path) -> Result { - let content = std::fs::read_to_string(cloud_path) - .map_err(|e| format!("Platzhalter lesen: {}", e))?; - let placeholder: CloudPlaceholder = serde_json::from_str(&content) - .map_err(|e| format!("Platzhalter ungueltig: {}", e))?; - - let _real_path = cloud_path.with_extension(""); - // Remove .cloud extension to get real filename - let real_path = cloud_path.parent().unwrap().join(&placeholder.name); - - // Download - self.api.download_file(placeholder.id, &real_path).await?; - - // Remove placeholder - std::fs::remove_file(cloud_path).ok(); - - // Lock on server - let _ = self.api.lock_file(placeholder.id, "Desktop Sync Client").await; - - Ok(real_path) + fn journal_offline( + &self, sp: &SyncPath, rel: &str, entry: &FileEntry, + hash: &str, size: i64, mtime: &str, + ) { + let _ = self.journal.upsert(&JournalEntry { + sync_path_id: sp.id.clone(), + relative_path: rel.to_string(), + file_id: Some(entry.id), + synced_checksum: hash.to_string(), + synced_size: size, + synced_mtime: mtime.to_string(), + local_state: "offline".to_string(), + }); } - /// Close a previously opened file: sync back, recreate .cloud, unlock - #[allow(dead_code)] - pub async fn close_cloud_file(&self, real_path: &Path, file_id: i64) -> Result<(), String> { - // Upload changes - // We need the parent_id - for now upload to the same location - // The server handles overwrite by filename - let _ = self.api.upload_file(real_path, None).await; - - // Unlock - let _ = self.api.unlock_file(file_id).await; - - // Delete local copy and recreate placeholder - let cloud_path = real_path.parent().unwrap() - .join(format!("{}.cloud", real_path.file_name().unwrap().to_string_lossy())); - - let size = std::fs::metadata(real_path).map(|m| m.len() as i64).unwrap_or(0); - let checksum = compute_file_hash(real_path); - - let placeholder = CloudPlaceholder { - id: file_id, - name: real_path.file_name().unwrap().to_string_lossy().to_string(), - size, - checksum, - updated_at: chrono::Utc::now().to_rfc3339(), - server_path: String::new(), - }; - if let Ok(json) = serde_json::to_string_pretty(&placeholder) { - std::fs::write(&cloud_path, json).ok(); + /// Walk a freshly-created local tree and upload every file (used after + /// creating a new folder on the server). + async fn upload_local_tree( + &self, dir: &Path, parent_id: Option, rel_prefix: &str, + sp: &SyncPath, log: &mut Vec, + ) { + let iter = match std::fs::read_dir(dir) { Ok(d) => d, Err(_) => return }; + for e in iter.flatten() { + let name = e.file_name().to_string_lossy().to_string(); + if should_skip_name(&name) { continue; } + let path = e.path(); + let rel = format!("{}/{}", rel_prefix, name); + if path.is_dir() { + match self.api.create_folder(&name, parent_id).await { + Ok(folder) => { + log.push(format!("Ordner erstellt: {}", rel)); + Box::pin(self.upload_local_tree(&path, Some(folder.id), &rel, sp, log)).await; + } + Err(e) => log.push(format!("Ordner-Fehler {}: {}", rel, e)), + } + } else { + match self.api.upload_file(&path, parent_id).await { + Ok(fe) => { + log.push(format!("Hochgeladen: {}", rel)); + self.journal_offline(sp, &rel, &fe, + &fe.checksum.clone().unwrap_or_default(), + fe.size.unwrap_or(0), + &fe.updated_at.clone().unwrap_or_default()); + } + Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)), + } + } } + } +} - std::fs::remove_file(real_path).ok(); - Ok(()) +fn should_skip_name(name: &str) -> bool { + name.starts_with('.') || name.starts_with('~') || name.ends_with(".tmp") +} + +fn make_conflict_path(original: &Path, username: &str) -> PathBuf { + let stem = original.file_stem().map(|s| s.to_string_lossy().to_string()).unwrap_or_default(); + let ext = original.extension().map(|e| e.to_string_lossy().to_string()); + let ts = chrono::Local::now().format("%Y-%m-%d %H%M%S").to_string(); + let name = match ext { + Some(e) if !e.is_empty() => format!("{} (Konflikt {} {}).{}", stem, username, ts, e), + _ => format!("{} (Konflikt {} {})", stem, username, ts), + }; + original.parent().map(|p| p.join(&name)).unwrap_or_else(|| PathBuf::from(&name)) +} + +fn collect_server_files( + entries: &[FileEntry], + prefix: &str, + out: &mut std::collections::HashMap, +) { + for e in entries { + let rel = if prefix.is_empty() { + e.name.clone() + } else { + format!("{}/{}", prefix, e.name) + }; + if e.is_folder { + if let Some(children) = &e.children { + collect_server_files(children, &rel, out); + } + } else { + out.insert(rel, e.id); + } } } fn find_subtree(tree: &[FileEntry], folder_id: i64) -> Option> { for entry in tree { - if entry.id == folder_id { - return entry.children.clone(); - } + if entry.id == folder_id { return entry.children.clone(); } if let Some(children) = &entry.children { - if let Some(result) = find_subtree(children, folder_id) { - return Some(result); - } + if let Some(r) = find_subtree(children, folder_id) { return Some(r); } } } None } -/// Parse a server timestamp (may or may not have timezone) -fn parse_server_time(s: &str) -> Option { - // Try with timezone first (RFC3339) - if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) { - return Some(std::time::SystemTime::from(dt)); - } - // Try without timezone (naive, assume UTC) - if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") { - let utc = dt.and_utc(); - return Some(std::time::SystemTime::from(utc)); - } - if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { - let utc = dt.and_utc(); - return Some(std::time::SystemTime::from(utc)); - } - None -} - pub fn compute_file_hash(path: &Path) -> String { let data = match std::fs::read(path) { Ok(d) => d, diff --git a/clients/desktop/src-tauri/src/sync/journal.rs b/clients/desktop/src-tauri/src/sync/journal.rs new file mode 100644 index 0000000..56cd523 --- /dev/null +++ b/clients/desktop/src-tauri/src/sync/journal.rs @@ -0,0 +1,120 @@ +use rusqlite::{params, Connection}; +use std::path::PathBuf; +use std::sync::Mutex; + +/// One row of the sync journal. Represents the "last known synced state" +/// for a single file within a sync path. The server and local checksum +/// matched this value at the last successful sync. +#[derive(Debug, Clone)] +pub struct JournalEntry { + pub sync_path_id: String, + pub relative_path: String, + pub file_id: Option, + pub synced_checksum: String, + pub synced_size: i64, + pub synced_mtime: String, + pub local_state: String, // "virtual" or "offline" +} + +pub struct Journal { + conn: Mutex, +} + +impl Journal { + pub fn open() -> Result { + let dir = dirs::config_dir() + .or_else(|| dirs::home_dir().map(|h| h.join(".config"))) + .unwrap_or_else(|| PathBuf::from(".")) + .join("MiniCloud Sync"); + std::fs::create_dir_all(&dir).ok(); + let path = dir.join("journal.db"); + let conn = Connection::open(&path).map_err(|e| format!("Journal open: {}", e))?; + + conn.execute_batch( + r#" + CREATE TABLE IF NOT EXISTS sync_journal ( + sync_path_id TEXT NOT NULL, + relative_path TEXT NOT NULL, + file_id INTEGER, + synced_checksum TEXT NOT NULL DEFAULT '', + synced_size INTEGER NOT NULL DEFAULT 0, + synced_mtime TEXT NOT NULL DEFAULT '', + local_state TEXT NOT NULL DEFAULT 'virtual', + PRIMARY KEY (sync_path_id, relative_path) + ); + "#, + ).map_err(|e| format!("Journal schema: {}", e))?; + + Ok(Self { conn: Mutex::new(conn) }) + } + + pub fn get(&self, sync_path_id: &str, rel: &str) -> Option { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT file_id, synced_checksum, synced_size, synced_mtime, local_state + FROM sync_journal WHERE sync_path_id = ?1 AND relative_path = ?2", + params![sync_path_id, rel], + |row| Ok(JournalEntry { + sync_path_id: sync_path_id.to_string(), + relative_path: rel.to_string(), + file_id: row.get(0)?, + synced_checksum: row.get(1)?, + synced_size: row.get(2)?, + synced_mtime: row.get(3)?, + local_state: row.get(4)?, + }), + ).ok() + } + + pub fn upsert(&self, e: &JournalEntry) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO sync_journal + (sync_path_id, relative_path, file_id, synced_checksum, synced_size, synced_mtime, local_state) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) + ON CONFLICT(sync_path_id, relative_path) DO UPDATE SET + file_id = excluded.file_id, + synced_checksum = excluded.synced_checksum, + synced_size = excluded.synced_size, + synced_mtime = excluded.synced_mtime, + local_state = excluded.local_state", + params![e.sync_path_id, e.relative_path, e.file_id, e.synced_checksum, + e.synced_size, e.synced_mtime, e.local_state], + ).map_err(|e| format!("Journal upsert: {}", e))?; + Ok(()) + } + + pub fn delete(&self, sync_path_id: &str, rel: &str) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "DELETE FROM sync_journal WHERE sync_path_id = ?1 AND relative_path = ?2", + params![sync_path_id, rel], + ).map_err(|e| format!("Journal delete: {}", e))?; + Ok(()) + } + + pub fn list_for_sync(&self, sync_path_id: &str) -> Vec { + let conn = self.conn.lock().unwrap(); + let mut stmt = match conn.prepare( + "SELECT relative_path, file_id, synced_checksum, synced_size, synced_mtime, local_state + FROM sync_journal WHERE sync_path_id = ?1") { + Ok(s) => s, + Err(_) => return Vec::new(), + }; + let rows = stmt.query_map(params![sync_path_id], |row| { + Ok(JournalEntry { + sync_path_id: sync_path_id.to_string(), + relative_path: row.get(0)?, + file_id: row.get(1)?, + synced_checksum: row.get(2)?, + synced_size: row.get(3)?, + synced_mtime: row.get(4)?, + local_state: row.get(5)?, + }) + }); + match rows { + Ok(it) => it.filter_map(|r| r.ok()).collect(), + Err(_) => Vec::new(), + } + } +} diff --git a/clients/desktop/src-tauri/src/sync/mod.rs b/clients/desktop/src-tauri/src/sync/mod.rs index 9bfc28f..67d2d78 100644 --- a/clients/desktop/src-tauri/src/sync/mod.rs +++ b/clients/desktop/src-tauri/src/sync/mod.rs @@ -1,4 +1,5 @@ pub mod api; pub mod config; pub mod engine; +pub mod journal; pub mod watcher; diff --git a/clients/desktop/src-tauri/src/sync/watcher.rs b/clients/desktop/src-tauri/src/sync/watcher.rs index 25aab6e..a3bc7c0 100644 --- a/clients/desktop/src-tauri/src/sync/watcher.rs +++ b/clients/desktop/src-tauri/src/sync/watcher.rs @@ -5,6 +5,7 @@ use std::sync::mpsc; pub struct FileWatcher { _watcher: RecommendedWatcher, pub receiver: mpsc::Receiver, + pub path: PathBuf, } #[derive(Debug, Clone)] @@ -53,6 +54,6 @@ impl FileWatcher { watcher.watch(watch_dir.as_ref(), RecursiveMode::Recursive) .map_err(|e| format!("Watch-Fehler: {}", e))?; - Ok(Self { _watcher: watcher, receiver: rx }) + Ok(Self { _watcher: watcher, receiver: rx, path: watch_dir.clone() }) } } diff --git a/clients/desktop/src/App.vue b/clients/desktop/src/App.vue index 3ff126b..f035cae 100644 --- a/clients/desktop/src/App.vue +++ b/clients/desktop/src/App.vue @@ -202,12 +202,21 @@ async function addSyncPath() { newPathServerId.value = null; newPathMode.value = "virtual"; await loadSyncPaths(); + // Auto-start sync now that we have a path (if not already running) + if (!autoSyncActive.value && syncPaths.value.length > 0) { + await startSync(); + } } catch (err) { alert(err); } } async function removeSyncPath(id) { await invoke("remove_sync_path", { id }); await loadSyncPaths(); + // If no paths remain, stop auto-sync + if (syncPaths.value.length === 0) { + autoSyncActive.value = false; + syncStatus.value = "Keine Sync-Pfade konfiguriert"; + } } async function toggleMode(id) { diff --git a/frontend/src/views/FilesView.vue b/frontend/src/views/FilesView.vue index 730d50a..8d2e853 100644 --- a/frontend/src/views/FilesView.vue +++ b/frontend/src/views/FilesView.vue @@ -100,15 +100,32 @@ :title="(data.has_shares || data.has_permissions) ? 'Freigaben verwalten' : 'Teilen'" @click.stop="openShare(data)" /> +