feat: Echtzeit-Sync via SSE + Journal-basierter 3-Wege-Vergleich

Desktop-Client komplett ueberarbeitet nach Nextcloud-Vorbild:
- Persistentes SQLite-Journal (journal.rs) speichert letzten bekannten
  Stand pro Datei - ueberlebt Client-Neustarts (Hauptbug behoben).
- Engine.rs neu: 3-Wege-Vergleich Local <-> Journal <-> Server mit
  sauberer Konflikt-Kopie (inkl. Username + Zeitstempel).
- Loesch-Propagation: Lokal geloeschte Dateien landen im Server-
  Papierkorb des Owners (auch bei Freigaben). Auf dem Server
  geloeschte Dateien werden lokal entfernt.
- Lock-Flow repariert: frischer Token bei jedem Call, Fehler-Feedback.

Echtzeit-Sync:
- Backend: SSE-Endpoint /api/sync/events mit In-Memory-Broadcaster.
  Events bei Create/Update/Delete/Lock/Unlock, Zustellung an Owner
  plus alle User mit Share-Permission.
- Client: persistente SSE-Verbindung mit Auto-Reconnect. Events
  triggern sofortigen Sync (<100ms). 30s-Polling bleibt als
  Fallback fuer Netzwerk-Aussetzer.

Weitere Fixes:
- /api/sync/tree filtert is_trashed=False (Papierkorb wird nicht
  mehr an Clients gesynct).
- Web-GUI: Lock/Unlock-Buttons pro Datei, Admin darf fremde Locks
  zwangsweise loesen. Rename/Delete disabled bei fremdem Lock.
- Lock-Check im Backend bei PUT/DELETE (423 Locked Response).
- Background-Sync nur noch einmal pro Prozess gestartet, liest
  sync_paths pro Iteration neu - add/remove wirkt sofort, kein
  Client-Neustart mehr noetig.
- Watcher werden pro Sync-Pfad individuell verwaltet.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Stefan Hacker
2026-04-12 09:50:44 +02:00
parent e65d330d1d
commit 50385faa02
11 changed files with 849 additions and 448 deletions
+146 -18
View File
@@ -1,6 +1,7 @@
mod sync;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tauri::{
@@ -12,6 +13,7 @@ use tauri::{
use sync::api::MiniCloudApi;
use sync::config::AppConfig;
use sync::engine::{SyncEngine, SyncMode, SyncPath};
use sync::journal::Journal;
use sync::watcher::{FileWatcher, ChangeKind};
struct AppState {
@@ -22,6 +24,8 @@ struct AppState {
locked_files: Mutex<Vec<i64>>, // file IDs we have locked on server
sync_running: Arc<Mutex<bool>>,
sync_paths: Mutex<Vec<SyncPath>>,
journal: Arc<Journal>,
background_started: AtomicBool,
}
// --- Auth ---
@@ -133,6 +137,11 @@ fn add_sync_path(
state.sync_paths.lock().unwrap().push(sp.clone());
// Also attach a filesystem watcher for this path so background sync picks it up
if let Ok(w) = FileWatcher::new(&local) {
state.watchers.lock().unwrap().push(w);
}
// Save to config
let mut config = AppConfig::load();
config.sync_paths = state.sync_paths.lock().unwrap().clone();
@@ -143,8 +152,18 @@ fn add_sync_path(
#[tauri::command]
fn remove_sync_path(state: State<'_, AppState>, id: String) -> Result<String, String> {
// Capture the local_dir of the removed path so we can drop its watcher too
let removed_dir = {
let paths = state.sync_paths.lock().unwrap();
paths.iter().find(|p| p.id == id).map(|p| p.local_dir.clone())
};
state.sync_paths.lock().unwrap().retain(|p| p.id != id);
if let Some(dir) = removed_dir {
let target = PathBuf::from(&dir);
state.watchers.lock().unwrap().retain(|w| w.path != target);
}
let mut config = AppConfig::load();
config.sync_paths = state.sync_paths.lock().unwrap().clone();
let _ = config.save();
@@ -183,23 +202,34 @@ async fn start_sync(app: AppHandle, state: State<'_, AppState>) -> Result<Vec<St
return Err("Keine Sync-Pfade konfiguriert".to_string());
}
let mut engine = SyncEngine::new(api.clone());
let username = state.username.lock().unwrap().clone().unwrap_or_default();
let journal = state.journal.clone();
let mut engine = SyncEngine::new(api.clone(), journal, username);
engine.sync_paths = paths.clone();
let log = engine.sync_all().await?;
*state.sync_engine.lock().unwrap() = Some(engine);
// Start watchers for each sync path
let mut watchers = Vec::new();
for sp in &paths {
if let Ok(w) = FileWatcher::new(&PathBuf::from(&sp.local_dir)) {
watchers.push(w);
// Ensure a watcher exists for every sync path (skip paths already watched)
{
let mut guard = state.watchers.lock().unwrap();
for sp in &paths {
let target = PathBuf::from(&sp.local_dir);
if guard.iter().any(|w| w.path == target) { continue; }
if let Ok(w) = FileWatcher::new(&target) {
guard.push(w);
}
}
}
*state.watchers.lock().unwrap() = watchers;
// Start background threads
start_background_sync(app, state.sync_running.clone(), api, paths);
// Start background threads only once per process lifetime.
// They re-read sync_paths from state each iteration, so adding/removing
// paths later takes effect without respawning threads.
if !state.background_started.swap(true, Ordering::SeqCst) {
let username = state.username.lock().unwrap().clone().unwrap_or_default();
let journal = state.journal.clone();
start_background_sync(app, state.sync_running.clone(), api, journal, username);
}
Ok(log)
}
@@ -210,6 +240,10 @@ async fn run_sync_now(state: State<'_, AppState>) -> Result<Vec<String>, String>
let mut guard = state.sync_engine.lock().unwrap();
guard.take().ok_or("Sync nicht gestartet")?
};
// Sync engine's API token with current state (refresh_token may have updated it)
if let Some(ref api) = *state.api.lock().unwrap() {
engine.api.access_token = api.access_token.clone();
}
let result = engine.sync_all().await;
*state.sync_engine.lock().unwrap() = Some(engine);
result
@@ -262,13 +296,17 @@ async fn open_cloud_file(state: State<'_, AppState>, cloud_path: String) -> Resu
// User can "unmark offline" or "unlock" via right-click
std::fs::remove_file(&path).ok();
// Lock on server (prevents others from editing)
match api.lock_file(file_id, "Desktop Sync Client").await {
// Lock on server (fresh token) - prevents others from editing
let fresh_api = state.api.lock().unwrap().clone().ok_or("Nicht eingeloggt")?;
match fresh_api.lock_file(file_id, "Desktop Sync Client").await {
Ok(_) => {
eprintln!("[OpenCloud] Locked on server");
state.locked_files.lock().unwrap().push(file_id);
}
Err(e) => eprintln!("[OpenCloud] Lock failed (file may be locked by someone else): {}", e),
Err(e) => {
eprintln!("[OpenCloud] Lock failed: {}", e);
return Err(format!("Datei heruntergeladen, aber Sperre fehlgeschlagen: {}", e));
}
}
// Open with default application for this file type
@@ -446,7 +484,8 @@ fn start_background_sync(
app: AppHandle,
sync_running: Arc<Mutex<bool>>,
api: MiniCloudApi,
paths: Vec<SyncPath>,
journal: Arc<Journal>,
username: String,
) {
// Shared flag: watcher sets true when changes detected, sync thread checks it
let watcher_triggered = Arc::new(Mutex::new(false));
@@ -454,13 +493,13 @@ fn start_background_sync(
// Main sync thread: syncs on watcher trigger OR every 60s as fallback
let app_sync = app.clone();
let api_sync = api.clone();
let paths_sync = paths.clone();
let trigger_sync = watcher_triggered.clone();
let journal_sync = journal.clone();
let username_sync = username.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut engine = SyncEngine::new(api_sync);
engine.sync_paths = paths_sync;
let mut engine = SyncEngine::new(api_sync, journal_sync, username_sync);
let mut idle_counter = 0u32;
loop {
@@ -474,18 +513,38 @@ fn start_background_sync(
*triggered = false;
true
} else {
// Fallback: sync every 60 seconds even without changes
idle_counter >= 60
// Fallback: sync every 30 seconds even without changes
idle_counter >= 30
}
};
if !should_sync { continue; }
idle_counter = 0;
// Re-read sync_paths from state every iteration so add/remove
// takes effect without restarting the thread.
let paths_now = {
let state = app_sync.state::<AppState>();
state.sync_paths.lock().unwrap().clone()
};
if paths_now.is_empty() {
// Nothing to sync - idle quietly.
continue;
}
engine.sync_paths = paths_now;
// Run sync
*sync_running.lock().unwrap() = true;
let _ = app_sync.emit("sync-status", "syncing");
// Refresh engine's API token from state (token may have been refreshed)
{
let state = app_sync.state::<AppState>();
if let Some(ref api) = *state.api.lock().unwrap() {
engine.api.access_token = api.access_token.clone();
}
}
match rt.block_on(engine.sync_all()) {
Ok(log) => {
if !log.is_empty() {
@@ -534,6 +593,73 @@ fn start_background_sync(
}
});
// Server-Sent Events: real-time change notifications from server
let app_sse = app.clone();
let trigger_sse = watcher_triggered.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
loop {
let (server_url, token) = {
let state = app_sse.state::<AppState>();
let guard = state.api.lock().unwrap();
match guard.as_ref() {
Some(a) => (a.server_url.clone(), a.access_token.clone()),
None => { drop(guard); std::thread::sleep(Duration::from_secs(3)); continue; }
}
};
if token.is_empty() {
std::thread::sleep(Duration::from_secs(3));
continue;
}
let url = format!("{}/api/sync/events?token={}", server_url, token);
let trigger = trigger_sse.clone();
let app_cb = app_sse.clone();
let result: Result<(), String> = rt.block_on(async move {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(10))
.build()
.map_err(|e| e.to_string())?;
let mut resp = client.get(&url).send().await.map_err(|e| e.to_string())?;
if !resp.status().is_success() {
return Err(format!("SSE status {}", resp.status()));
}
eprintln!("[SSE] Connected");
let _ = app_cb.emit("sse-status", "connected");
let mut buffer = String::new();
while let Some(chunk) = resp.chunk().await.map_err(|e| e.to_string())? {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find("\n\n") {
let raw = buffer[..pos].to_string();
buffer.drain(..pos + 2);
let lines: Vec<&str> = raw.lines().collect();
// Skip keepalive/comment lines (start with ':')
if lines.iter().all(|l| l.starts_with(':') || l.is_empty()) {
continue;
}
let mut event_name = String::from("message");
for l in &lines {
if let Some(v) = l.strip_prefix("event: ") { event_name = v.to_string(); }
}
if event_name == "hello" { continue; }
// Any real event -> trigger sync
*trigger.lock().unwrap() = true;
let _ = app_cb.emit("sse-event", event_name);
}
}
Ok(())
});
if let Err(e) = result {
eprintln!("[SSE] Disconnected: {}", e);
let _ = app_sse.emit("sse-status", format!("reconnecting: {}", e));
}
std::thread::sleep(Duration::from_secs(3));
}
});
// File watcher: detects changes and triggers immediate sync
let app_w = app.clone();
let trigger_w = watcher_triggered.clone();
@@ -651,6 +777,8 @@ pub fn run() {
sync_running: Arc::new(Mutex::new(false)),
locked_files: Mutex::new(Vec::new()),
sync_paths: Mutex::new(Vec::new()),
journal: Arc::new(Journal::open().expect("Journal konnte nicht geoeffnet werden")),
background_started: AtomicBool::new(false),
})
.on_window_event(|window, event| {
// Close button = minimize to tray instead of quit