minmal-file-cloud-email-pim.../backend/app/services/events.py

105 lines
3.6 KiB
Python

"""In-memory event broadcaster for SSE clients.
Each logged-in user can have multiple connected clients (desktop, web,
mobile). Every client gets its own queue. Mutating file operations push
an event into the queues of every affected user.
"""
from __future__ import annotations
import json
import queue
import threading
import time
from typing import Iterable
class EventBroadcaster:
def __init__(self) -> None:
self._lock = threading.Lock()
# user_id -> list[queue.Queue]
self._subs: dict[int, list[queue.Queue]] = {}
def subscribe(self, user_id: int) -> queue.Queue:
q: queue.Queue = queue.Queue(maxsize=256)
with self._lock:
self._subs.setdefault(user_id, []).append(q)
return q
def unsubscribe(self, user_id: int, q: queue.Queue) -> None:
with self._lock:
lst = self._subs.get(user_id)
if not lst:
return
try:
lst.remove(q)
except ValueError:
pass
if not lst:
self._subs.pop(user_id, None)
def publish(self, user_ids: Iterable[int], event: dict) -> None:
payload = dict(event)
payload.setdefault('ts', time.time())
with self._lock:
for uid in set(user_ids):
for q in self._subs.get(uid, []):
try:
q.put_nowait(payload)
except queue.Full:
pass # slow client - drop event
def stream(self, user_id: int):
"""Generator yielding SSE-formatted strings for one client."""
q = self.subscribe(user_id)
try:
# Initial hello so the client knows it's connected
yield f"event: hello\ndata: {json.dumps({'user_id': user_id})}\n\n"
while True:
try:
event = q.get(timeout=20.0)
except queue.Empty:
# Heartbeat / keepalive comment - also keeps proxies happy
yield ": keepalive\n\n"
continue
kind = event.get('type', 'change')
yield f"event: {kind}\ndata: {json.dumps(event)}\n\n"
finally:
self.unsubscribe(user_id, q)
broadcaster = EventBroadcaster()
def notify_file_change(owner_id: int, file_id: int | None, change: str,
shared_with: Iterable[int] = ()) -> None:
"""Emit a file change event to the owner plus any users with share access."""
recipients = [owner_id, *shared_with]
broadcaster.publish(recipients, {
'type': 'file',
'change': change, # 'created' | 'updated' | 'deleted' | 'locked' | 'unlocked'
'file_id': file_id,
})
def notify_calendar_change(owner_id: int, calendar_id: int, change: str,
shared_with: Iterable[int] = ()) -> None:
"""Emit a calendar-level change event (event added/changed/deleted or
share membership changed). Sent to owner + all users the calendar is
shared with."""
recipients = [owner_id, *shared_with]
broadcaster.publish(recipients, {
'type': 'calendar',
'change': change, # 'event'|'share'|'deleted'
'calendar_id': calendar_id,
})
def notify_tasklist_change(owner_id: int, list_id: int, change: str,
shared_with: Iterable[int] = ()) -> None:
recipients = [owner_id, *shared_with]
broadcaster.publish(recipients, {
'type': 'tasklist',
'change': change, # 'task'|'share'|'deleted'|'created'
'task_list_id': list_id,
})