Files
minmal-file-cloud-email-pim…/clients/desktop/src-tauri/src/sync/engine.rs
T
Stefan Hacker 5f905b4925 fix: Sync-Fehler "error decoding response body" + Server-Edits
Drei Probleme in einem:

1. create_folder/get_sync_tree parsten die Response auch bei HTTP-
   Fehlern als JSON. Bei 401/409/etc. kam "error decoding response
   body" statt der eigentlichen Fehlermeldung. Status wird jetzt
   zuerst geprueft, Body-Text wird bei Fehlern zurueckgegeben.

2. Ohne Journal-Eintrag und unterschiedlichen Hashes wurde vorher
   eine Konflikt-Kopie erstellt. Fuer Server-Edits aus dem Web-UI
   (wo der Client die Datei gar nie mit Journal erfasst hatte) war
   das falsch. Nextcloud-Ansatz: beim Erstkontakt Server
   autoritativ - Download statt Konflikt-Kopie.

3. run_sync_now uebernimmt neu konfigurierte sync_paths aus dem
   State, damit manuelle Syncs auch nach add_sync_path greifen.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 10:25:01 +02:00

488 lines
19 KiB
Rust

use crate::sync::api::{FileEntry, MiniCloudApi};
use crate::sync::journal::{Journal, JournalEntry};
use sha2::{Digest, Sha256};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
/// A configured sync path: maps a server folder to a local folder.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncPath {
pub id: String,
pub server_path: String,
pub server_folder_id: Option<i64>,
pub local_dir: String,
pub mode: SyncMode,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SyncMode {
Virtual,
Full,
}
/// `.cloud` placeholder content (JSON payload of the 0-byte-ish placeholder).
#[derive(Debug, Serialize, Deserialize)]
struct CloudPlaceholder {
id: i64,
name: String,
size: i64,
checksum: String,
updated_at: String,
server_path: String,
}
pub struct SyncEngine {
pub api: MiniCloudApi,
pub sync_paths: Vec<SyncPath>,
pub journal: Arc<Journal>,
pub username: String,
}
impl SyncEngine {
pub fn new(api: MiniCloudApi, journal: Arc<Journal>, username: String) -> Self {
Self { api, sync_paths: Vec::new(), journal, username }
}
/// Sync every configured path.
pub async fn sync_all(&mut self) -> Result<Vec<String>, String> {
let mut log = Vec::new();
let tree = self.api.get_sync_tree().await?;
let sync_paths = self.sync_paths.clone();
for sp in &sync_paths {
if !sp.enabled { continue; }
let local_dir = PathBuf::from(&sp.local_dir);
std::fs::create_dir_all(&local_dir).ok();
let subtree = match sp.server_folder_id {
Some(id) => find_subtree(&tree, id).unwrap_or_default(),
None => tree.clone(),
};
// Phase 1: propagate deletions based on journal history.
self.detect_deletions(sp, &subtree, &local_dir, &mut log).await;
// Phase 2: normal sync (downloads, uploads, conflicts).
self.sync_dir(&subtree, &local_dir, "", sp.server_folder_id, sp, &mut log).await;
}
Ok(log)
}
/// Walks the journal for this sync path and reconciles existence:
/// - file was in journal and is gone locally but still on server -> delete on server
/// - file was in journal and is gone on server but still local -> delete locally
/// - file is gone on both sides -> clean journal entry
async fn detect_deletions(
&self,
sp: &SyncPath,
subtree: &[FileEntry],
local_root: &Path,
log: &mut Vec<String>,
) {
use std::collections::HashMap;
let mut server_files: HashMap<String, i64> = HashMap::new();
collect_server_files(subtree, "", &mut server_files);
for je in self.journal.list_for_sync(&sp.id) {
let local_real = local_root.join(&je.relative_path);
let local_cloud = {
let parent = local_real.parent().map(|p| p.to_path_buf());
let fname = local_real.file_name().map(|n| n.to_string_lossy().to_string());
match (parent, fname) {
(Some(p), Some(n)) => p.join(format!("{}.cloud", n)),
_ => PathBuf::new(),
}
};
let local_exists = local_real.exists() || local_cloud.exists();
let server_id = server_files.get(&je.relative_path).copied();
match (local_exists, server_id) {
(true, Some(_)) => { /* present on both sides - normal sync handles it */ }
(false, None) => {
let _ = self.journal.delete(&sp.id, &je.relative_path);
}
(false, Some(id)) => {
match self.api.delete_file(id).await {
Ok(_) => {
log.push(format!("Server-Papierkorb: {}", je.relative_path));
let _ = self.journal.delete(&sp.id, &je.relative_path);
}
Err(e) => log.push(format!("Server-Delete-Fehler {}: {}", je.relative_path, e)),
}
}
(true, None) => {
std::fs::remove_file(&local_real).ok();
std::fs::remove_file(&local_cloud).ok();
let _ = self.journal.delete(&sp.id, &je.relative_path);
log.push(format!("Lokal geloescht: {}", je.relative_path));
}
}
}
}
/// Recursively sync a single directory level.
/// `rel_prefix` is the journal-relative path prefix (e.g. "", or "sub/dir/").
async fn sync_dir(
&mut self,
server_entries: &[FileEntry],
local_dir: &Path,
rel_prefix: &str,
parent_id: Option<i64>,
sp: &SyncPath,
log: &mut Vec<String>,
) {
use std::collections::HashMap;
let server_by_name: HashMap<String, &FileEntry> = server_entries
.iter().map(|e| (e.name.clone(), e)).collect();
// --- Pass 1: iterate server entries, reconcile each against local/journal ---
for entry in server_entries {
let rel = if rel_prefix.is_empty() {
entry.name.clone()
} else {
format!("{}/{}", rel_prefix, entry.name)
};
if entry.is_folder {
let sub_local = local_dir.join(&entry.name);
std::fs::create_dir_all(&sub_local).ok();
if let Some(children) = &entry.children {
Box::pin(self.sync_dir(children, &sub_local, &rel, Some(entry.id), sp, log)).await;
}
continue;
}
self.reconcile_file(entry, local_dir, &rel, parent_id, sp, log).await;
}
// --- Pass 2: iterate local entries, upload new local files/folders ---
let dir_iter = match std::fs::read_dir(local_dir) {
Ok(d) => d,
Err(_) => return,
};
for e in dir_iter.flatten() {
let name = e.file_name().to_string_lossy().to_string();
if should_skip_name(&name) { continue; }
let path = e.path();
let is_dir = path.is_dir();
// `.cloud` placeholders are stored locally under "foo.txt.cloud"
// but represent the server-side "foo.txt".
let real_name = name.trim_end_matches(".cloud").to_string();
let is_placeholder = name.ends_with(".cloud") && !is_dir;
// Already covered by server pass?
if server_by_name.contains_key(&real_name) { continue; }
if is_placeholder { continue; } // orphan placeholder - handled below
let rel = if rel_prefix.is_empty() {
real_name.clone()
} else {
format!("{}/{}", rel_prefix, real_name)
};
if is_dir {
match self.api.create_folder(&real_name, parent_id).await {
Ok(folder) => {
log.push(format!("Ordner erstellt: {}", rel));
self.upload_local_tree(&path, Some(folder.id), &rel, sp, log).await;
}
Err(e) => log.push(format!("Ordner-Fehler {}: {}", rel, e)),
}
} else {
match self.api.upload_file(&path, parent_id).await {
Ok(fe) => {
log.push(format!("Hochgeladen: {}", rel));
let checksum = fe.checksum.unwrap_or_default();
let size = fe.size.unwrap_or(0);
let _ = self.journal.upsert(&JournalEntry {
sync_path_id: sp.id.clone(),
relative_path: rel.clone(),
file_id: Some(fe.id),
synced_checksum: checksum,
synced_size: size,
synced_mtime: fe.updated_at.unwrap_or_default(),
local_state: "offline".to_string(),
});
}
Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)),
}
}
}
// --- Pass 3: clean up orphan .cloud placeholders for files gone from server ---
if let Ok(dir_iter) = std::fs::read_dir(local_dir) {
for e in dir_iter.flatten() {
let name = e.file_name().to_string_lossy().to_string();
if !name.ends_with(".cloud") || e.path().is_dir() { continue; }
let real_name = name.trim_end_matches(".cloud");
if server_by_name.contains_key(real_name) { continue; }
std::fs::remove_file(e.path()).ok();
let rel = if rel_prefix.is_empty() {
real_name.to_string()
} else {
format!("{}/{}", rel_prefix, real_name)
};
let _ = self.journal.delete(&sp.id, &rel);
log.push(format!("Entfernt (Server): {}", name));
}
}
}
/// Core 3-way reconciliation for a single server file.
async fn reconcile_file(
&self,
entry: &FileEntry,
local_dir: &Path,
rel: &str,
parent_id: Option<i64>,
sp: &SyncPath,
log: &mut Vec<String>,
) {
let real_path = local_dir.join(&entry.name);
let cloud_path = local_dir.join(format!("{}.cloud", entry.name));
let journal_entry = self.journal.get(&sp.id, rel);
let server_hash = entry.checksum.clone().unwrap_or_default();
let server_size = entry.size.unwrap_or(0);
let server_mtime = entry.updated_at.clone().unwrap_or_default();
// Case A: real file exists locally = offline state
if real_path.exists() && !real_path.is_dir() {
// Avoid race: if placeholder still around, remove it
if cloud_path.exists() { std::fs::remove_file(&cloud_path).ok(); }
let local_hash = compute_file_hash(&real_path);
if local_hash == server_hash {
// In sync - just (re)record journal
self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime);
return;
}
// Hashes differ. Locked by someone else? Hold back.
if entry.locked.unwrap_or(false) {
let by = entry.locked_by.clone().unwrap_or_default();
if by != self.username {
log.push(format!("Zurueckgehalten (gesperrt von {}): {}", by, rel));
return;
}
}
let (local_changed, server_changed) = match &journal_entry {
Some(j) => (local_hash != j.synced_checksum, server_hash != j.synced_checksum),
None => {
// No journal history: this is the first time we're tracking
// this file. Treat the server as authoritative (Nextcloud
// does the same on first sync) so edits made on the web
// GUI or other clients propagate down cleanly.
(false, true)
}
};
if local_changed && !server_changed {
// Upload
match self.api.upload_file(&real_path, parent_id).await {
Ok(fe) => {
log.push(format!("Lokal->Server: {}", rel));
let new_hash = fe.checksum.unwrap_or(local_hash.clone());
self.journal_offline(sp, rel, entry, &new_hash,
fe.size.unwrap_or(server_size),
&fe.updated_at.unwrap_or(server_mtime.clone()));
}
Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)),
}
} else if server_changed && !local_changed {
// Download
match self.api.download_file(entry.id, &real_path).await {
Ok(_) => {
log.push(format!("Server->Lokal: {}", rel));
self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime);
}
Err(e) => log.push(format!("Download-Fehler {}: {}", rel, e)),
}
} else {
// Both changed OR no journal -> conflict copy
let conflict_path = make_conflict_path(&real_path, &self.username);
std::fs::rename(&real_path, &conflict_path).ok();
match self.api.download_file(entry.id, &real_path).await {
Ok(_) => {
log.push(format!("KONFLIKT: {} (lokal: {})", rel,
conflict_path.file_name().unwrap().to_string_lossy()));
self.journal_offline(sp, rel, entry, &server_hash, server_size, &server_mtime);
}
Err(e) => {
// Restore original
std::fs::rename(&conflict_path, &real_path).ok();
log.push(format!("Download-Fehler {}: {}", rel, e));
}
}
}
return;
}
// Case B: local has a .cloud placeholder (or neither) = virtual state
// Virtual placeholders never have local edits, just keep them fresh.
let needs_write = match std::fs::read_to_string(&cloud_path) {
Ok(content) => match serde_json::from_str::<CloudPlaceholder>(&content) {
Ok(old) => old.checksum != server_hash || old.id != entry.id,
Err(_) => true,
},
Err(_) => true,
};
if needs_write {
let placeholder = CloudPlaceholder {
id: entry.id,
name: entry.name.clone(),
size: server_size,
checksum: server_hash.clone(),
updated_at: server_mtime.clone(),
server_path: rel.to_string(),
};
if let Ok(json) = serde_json::to_string_pretty(&placeholder) {
if !cloud_path.exists() {
log.push(format!("Platzhalter: {}.cloud", entry.name));
} else {
log.push(format!("Platzhalter aktualisiert: {}.cloud", entry.name));
}
std::fs::write(&cloud_path, json).ok();
}
}
self.journal.upsert(&JournalEntry {
sync_path_id: sp.id.clone(),
relative_path: rel.to_string(),
file_id: Some(entry.id),
synced_checksum: server_hash,
synced_size: server_size,
synced_mtime: server_mtime,
local_state: "virtual".to_string(),
}).ok();
// If Full mode and no real file yet, download now
if sp.mode == SyncMode::Full && !real_path.exists() {
if let Err(e) = self.api.download_file(entry.id, &real_path).await {
log.push(format!("Full-Download-Fehler {}: {}", rel, e));
} else {
std::fs::remove_file(&cloud_path).ok();
log.push(format!("Heruntergeladen: {}", rel));
// Update journal to offline
if let Some(mut j) = self.journal.get(&sp.id, rel) {
j.local_state = "offline".to_string();
let _ = self.journal.upsert(&j);
}
}
}
}
fn journal_offline(
&self, sp: &SyncPath, rel: &str, entry: &FileEntry,
hash: &str, size: i64, mtime: &str,
) {
let _ = self.journal.upsert(&JournalEntry {
sync_path_id: sp.id.clone(),
relative_path: rel.to_string(),
file_id: Some(entry.id),
synced_checksum: hash.to_string(),
synced_size: size,
synced_mtime: mtime.to_string(),
local_state: "offline".to_string(),
});
}
/// Walk a freshly-created local tree and upload every file (used after
/// creating a new folder on the server).
async fn upload_local_tree(
&self, dir: &Path, parent_id: Option<i64>, rel_prefix: &str,
sp: &SyncPath, log: &mut Vec<String>,
) {
let iter = match std::fs::read_dir(dir) { Ok(d) => d, Err(_) => return };
for e in iter.flatten() {
let name = e.file_name().to_string_lossy().to_string();
if should_skip_name(&name) { continue; }
let path = e.path();
let rel = format!("{}/{}", rel_prefix, name);
if path.is_dir() {
match self.api.create_folder(&name, parent_id).await {
Ok(folder) => {
log.push(format!("Ordner erstellt: {}", rel));
Box::pin(self.upload_local_tree(&path, Some(folder.id), &rel, sp, log)).await;
}
Err(e) => log.push(format!("Ordner-Fehler {}: {}", rel, e)),
}
} else {
match self.api.upload_file(&path, parent_id).await {
Ok(fe) => {
log.push(format!("Hochgeladen: {}", rel));
self.journal_offline(sp, &rel, &fe,
&fe.checksum.clone().unwrap_or_default(),
fe.size.unwrap_or(0),
&fe.updated_at.clone().unwrap_or_default());
}
Err(e) => log.push(format!("Upload-Fehler {}: {}", rel, e)),
}
}
}
}
}
fn should_skip_name(name: &str) -> bool {
name.starts_with('.') || name.starts_with('~') || name.ends_with(".tmp")
}
fn make_conflict_path(original: &Path, username: &str) -> PathBuf {
let stem = original.file_stem().map(|s| s.to_string_lossy().to_string()).unwrap_or_default();
let ext = original.extension().map(|e| e.to_string_lossy().to_string());
let ts = chrono::Local::now().format("%Y-%m-%d %H%M%S").to_string();
let name = match ext {
Some(e) if !e.is_empty() => format!("{} (Konflikt {} {}).{}", stem, username, ts, e),
_ => format!("{} (Konflikt {} {})", stem, username, ts),
};
original.parent().map(|p| p.join(&name)).unwrap_or_else(|| PathBuf::from(&name))
}
fn collect_server_files(
entries: &[FileEntry],
prefix: &str,
out: &mut std::collections::HashMap<String, i64>,
) {
for e in entries {
let rel = if prefix.is_empty() {
e.name.clone()
} else {
format!("{}/{}", prefix, e.name)
};
if e.is_folder {
if let Some(children) = &e.children {
collect_server_files(children, &rel, out);
}
} else {
out.insert(rel, e.id);
}
}
}
fn find_subtree(tree: &[FileEntry], folder_id: i64) -> Option<Vec<FileEntry>> {
for entry in tree {
if entry.id == folder_id { return entry.children.clone(); }
if let Some(children) = &entry.children {
if let Some(r) = find_subtree(children, folder_id) { return Some(r); }
}
}
None
}
pub fn compute_file_hash(path: &Path) -> String {
let data = match std::fs::read(path) {
Ok(d) => d,
Err(_) => return String::new(),
};
let mut hasher = Sha256::new();
hasher.update(&data);
format!("{:x}", hasher.finalize())
}