50385faa02
Desktop-Client komplett ueberarbeitet nach Nextcloud-Vorbild: - Persistentes SQLite-Journal (journal.rs) speichert letzten bekannten Stand pro Datei - ueberlebt Client-Neustarts (Hauptbug behoben). - Engine.rs neu: 3-Wege-Vergleich Local <-> Journal <-> Server mit sauberer Konflikt-Kopie (inkl. Username + Zeitstempel). - Loesch-Propagation: Lokal geloeschte Dateien landen im Server- Papierkorb des Owners (auch bei Freigaben). Auf dem Server geloeschte Dateien werden lokal entfernt. - Lock-Flow repariert: frischer Token bei jedem Call, Fehler-Feedback. Echtzeit-Sync: - Backend: SSE-Endpoint /api/sync/events mit In-Memory-Broadcaster. Events bei Create/Update/Delete/Lock/Unlock, Zustellung an Owner plus alle User mit Share-Permission. - Client: persistente SSE-Verbindung mit Auto-Reconnect. Events triggern sofortigen Sync (<100ms). 30s-Polling bleibt als Fallback fuer Netzwerk-Aussetzer. Weitere Fixes: - /api/sync/tree filtert is_trashed=False (Papierkorb wird nicht mehr an Clients gesynct). - Web-GUI: Lock/Unlock-Buttons pro Datei, Admin darf fremde Locks zwangsweise loesen. Rename/Delete disabled bei fremdem Lock. - Lock-Check im Backend bei PUT/DELETE (423 Locked Response). - Background-Sync nur noch einmal pro Prozess gestartet, liest sync_paths pro Iteration neu - add/remove wirkt sofort, kein Client-Neustart mehr noetig. - Watcher werden pro Sync-Pfad individuell verwaltet. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
"""In-memory event broadcaster for SSE clients.
|
|
|
|
Each logged-in user can have multiple connected clients (desktop, web,
|
|
mobile). Every client gets its own queue. Mutating file operations push
|
|
an event into the queues of every affected user.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import queue
|
|
import threading
|
|
import time
|
|
from typing import Iterable
|
|
|
|
|
|
class EventBroadcaster:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
# user_id -> list[queue.Queue]
|
|
self._subs: dict[int, list[queue.Queue]] = {}
|
|
|
|
def subscribe(self, user_id: int) -> queue.Queue:
|
|
q: queue.Queue = queue.Queue(maxsize=256)
|
|
with self._lock:
|
|
self._subs.setdefault(user_id, []).append(q)
|
|
return q
|
|
|
|
def unsubscribe(self, user_id: int, q: queue.Queue) -> None:
|
|
with self._lock:
|
|
lst = self._subs.get(user_id)
|
|
if not lst:
|
|
return
|
|
try:
|
|
lst.remove(q)
|
|
except ValueError:
|
|
pass
|
|
if not lst:
|
|
self._subs.pop(user_id, None)
|
|
|
|
def publish(self, user_ids: Iterable[int], event: dict) -> None:
|
|
payload = dict(event)
|
|
payload.setdefault('ts', time.time())
|
|
with self._lock:
|
|
for uid in set(user_ids):
|
|
for q in self._subs.get(uid, []):
|
|
try:
|
|
q.put_nowait(payload)
|
|
except queue.Full:
|
|
pass # slow client - drop event
|
|
|
|
def stream(self, user_id: int):
|
|
"""Generator yielding SSE-formatted strings for one client."""
|
|
q = self.subscribe(user_id)
|
|
try:
|
|
# Initial hello so the client knows it's connected
|
|
yield f"event: hello\ndata: {json.dumps({'user_id': user_id})}\n\n"
|
|
while True:
|
|
try:
|
|
event = q.get(timeout=20.0)
|
|
except queue.Empty:
|
|
# Heartbeat / keepalive comment - also keeps proxies happy
|
|
yield ": keepalive\n\n"
|
|
continue
|
|
kind = event.get('type', 'change')
|
|
yield f"event: {kind}\ndata: {json.dumps(event)}\n\n"
|
|
finally:
|
|
self.unsubscribe(user_id, q)
|
|
|
|
|
|
broadcaster = EventBroadcaster()
|
|
|
|
|
|
def notify_file_change(owner_id: int, file_id: int | None, change: str,
|
|
shared_with: Iterable[int] = ()) -> None:
|
|
"""Emit a file change event to the owner plus any users with share access."""
|
|
recipients = [owner_id, *shared_with]
|
|
broadcaster.publish(recipients, {
|
|
'type': 'file',
|
|
'change': change, # 'created' | 'updated' | 'deleted' | 'locked' | 'unlocked'
|
|
'file_id': file_id,
|
|
})
|