minmal-file-cloud-email-pim.../backend/app/dav/taskdav.py

369 lines
13 KiB
Python

"""CalDAV Task-List Handler (VTODO).
TaskLists werden parallel zu Calendars als Calendar-Collection
ausgeliefert, jedoch mit `<supported-calendar-component-set>` = VTODO
(statt VEVENT). DAVx5/OpenTasks erkennen sie dadurch automatisch als
Aufgabenliste.
URL-Schema:
/dav/<user>/tl-<id>/ Collection
/dav/<user>/tl-<id>/<uid>.ics VTODO-Resource
Diese Funktionen werden aus caldav.py heraus aufgerufen, sobald der
URL-Bestandteil mit `tl-` beginnt - parallel zur ab-/CardDAV-Delegation.
"""
from __future__ import annotations
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from flask import Response, request
from app.extensions import db
from app.models.task import TaskList, Task
from app.models.user import User
from app.api.tasks import build_vtodo, parse_vtodo, _list_recipients
from app.services.events import notify_tasklist_change
# Re-use XML helpers from caldav.py
def _import_caldav_helpers():
from . import caldav
return caldav
def _qn(prefix, name):
return _import_caldav_helpers()._qn(prefix, name)
def _xml_response(elem):
return _import_caldav_helpers()._xml_response(elem)
def _make_response(href, populate):
return _import_caldav_helpers()._make_response(href, populate)
# ---------------------------------------------------------------------------
# Path / URL helpers
# ---------------------------------------------------------------------------
def parse_tl_path(part: str):
m = re.match(r'tl-(\d+)$', part)
return int(m.group(1)) if m else None
def href_list(username, lid):
return f'/dav/{username}/tl-{lid}/'
def href_task(username, lid, uid):
return f'/dav/{username}/tl-{lid}/{uid}.ics'
def user_lists(user: User):
return TaskList.query.filter_by(owner_id=user.id).all()
def list_for(user: User, lid: int):
tl = db.session.get(TaskList, lid)
if not tl or tl.owner_id != user.id:
return None
return tl
def _ctag(tl: TaskList) -> str:
last = db.session.query(db.func.max(Task.updated_at)).filter_by(task_list_id=tl.id).scalar()
ts = int((last or tl.updated_at or datetime.now(timezone.utc)).timestamp())
return f'"tl{tl.id}-{ts}"'
def _etag(t: Task) -> str:
ts = int((t.updated_at or t.created_at or datetime.now(timezone.utc)).timestamp() * 1000)
return f'"{t.id}-{ts}"'
def _wrap_vcalendar(t: Task) -> str:
block = (t.ical_data or '').strip() or build_vtodo(t)
return '\r\n'.join([
'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE',
'CALSCALE:GREGORIAN', block, 'END:VCALENDAR',
])
# ---------------------------------------------------------------------------
# PROPFIND building blocks
# ---------------------------------------------------------------------------
def list_response(user: User, tl: TaskList) -> ET.Element:
href = href_list(user.username, tl.id)
def populate(prop):
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(rt, _qn('c', 'calendar'))
ET.SubElement(prop, _qn('d', 'displayname')).text = tl.name
ET.SubElement(prop, _qn('c', 'calendar-description')).text = tl.description or ''
supported = ET.SubElement(prop, _qn('c', 'supported-calendar-component-set'))
comp = ET.SubElement(supported, _qn('c', 'comp'))
comp.set('name', 'VTODO')
srs = ET.SubElement(prop, _qn('d', 'supported-report-set'))
for r in ('calendar-query', 'calendar-multiget'):
sup = ET.SubElement(srs, _qn('d', 'supported-report'))
rep = ET.SubElement(sup, _qn('d', 'report'))
ET.SubElement(rep, _qn('c', r))
ET.SubElement(prop, _qn('ic', 'calendar-color')).text = tl.color or '#10b981'
ET.SubElement(prop, _qn('cs', 'getctag')).text = _ctag(tl)
cups = ET.SubElement(prop, _qn('d', 'current-user-privilege-set'))
for priv in ('read', 'write', 'write-properties', 'write-content', 'bind', 'unbind'):
p = ET.SubElement(cups, _qn('d', 'privilege'))
ET.SubElement(p, _qn('d', priv))
return _make_response(href, populate)
def task_response(user: User, tl: TaskList, t: Task, include_data=False) -> ET.Element:
href = href_task(user.username, tl.id, t.uid)
def populate(prop):
ET.SubElement(prop, _qn('d', 'getetag')).text = _etag(t)
ET.SubElement(prop, _qn('d', 'getcontenttype')).text = \
'text/calendar; charset=utf-8; component=VTODO'
ET.SubElement(prop, _qn('d', 'resourcetype'))
if include_data:
ET.SubElement(prop, _qn('c', 'calendar-data')).text = _wrap_vcalendar(t)
return _make_response(href, populate)
# ---------------------------------------------------------------------------
# Handlers (entered from caldav.py when path starts with tl-)
# ---------------------------------------------------------------------------
def tl_propfind(username, tl_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
depth = request.headers.get('Depth', '0')
multi = ET.Element(_qn('d', 'multistatus'))
multi.append(list_response(user, tl))
if depth != '0':
for t in tl.tasks.all():
multi.append(task_response(user, tl, t))
return _xml_response(multi)
def tl_task_propfind(username, tl_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
if not t:
return Response('Not found', 404)
multi = ET.Element(_qn('d', 'multistatus'))
multi.append(task_response(user, tl, t, include_data=True))
return _xml_response(multi)
def tl_report(username, tl_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
try:
root = ET.fromstring(request.data or b'<x/>')
except ET.ParseError:
return Response('Malformed XML', 400)
wants_data = root.find(f".//{_qn('c', 'calendar-data')}") is not None
multi = ET.Element(_qn('d', 'multistatus'))
if root.tag == _qn('c', 'calendar-multiget'):
hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text]
for href in hrefs:
uid = href.rsplit('/', 1)[-1].removesuffix('.ics')
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
if t:
multi.append(task_response(user, tl, t, include_data=True))
return _xml_response(multi)
if root.tag == _qn('c', 'calendar-query'):
for t in tl.tasks.all():
multi.append(task_response(user, tl, t, include_data=wants_data))
return _xml_response(multi)
return _xml_response(multi)
def tl_get(username, tl_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
if not t:
return Response('Not found', 404)
return Response(_wrap_vcalendar(t),
mimetype='text/calendar; charset=utf-8',
headers={'ETag': _etag(t)})
def tl_put(username, tl_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
raw = request.get_data(as_text=True) or ''
parsed = parse_vtodo(raw)
if not parsed:
return Response('Cannot parse VTODO', 400)
body_uid = parsed.get('uid') or uid
existing = Task.query.filter_by(task_list_id=tl.id, uid=body_uid).first()
if_match = request.headers.get('If-Match')
if_none_match = request.headers.get('If-None-Match')
if existing and if_none_match == '*':
return Response('', 412)
if if_match and existing and if_match.strip() != _etag(existing):
return Response('', 412)
is_new = existing is None
if is_new:
existing = Task(task_list_id=tl.id, uid=body_uid, ical_data=raw)
db.session.add(existing)
existing.summary = parsed.get('summary') or '(ohne Titel)'
existing.description = parsed.get('description')
existing.status = parsed.get('status') or 'NEEDS-ACTION'
existing.priority = parsed.get('priority')
existing.percent_complete = parsed.get('percent_complete')
existing.due = parsed.get('due')
existing.dtstart = parsed.get('dtstart')
existing.completed_at = parsed.get('completed_at')
cats = parsed.get('categories')
if isinstance(cats, str):
existing.categories = cats or None
elif isinstance(cats, list):
existing.categories = ','.join(cats) or None
# Roh-Block sichern fuer Round-Trip
block = re.search(r'BEGIN:VTODO.*?END:VTODO', raw, flags=re.DOTALL | re.IGNORECASE)
existing.ical_data = (block.group(0).strip() if block else raw.strip()) or build_vtodo(existing)
existing.updated_at = datetime.now(timezone.utc)
db.session.commit()
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
return Response('', 201 if is_new else 204, {'ETag': _etag(existing)})
def tl_delete(username, tl_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
if not t:
return Response('', 404)
db.session.delete(t)
db.session.commit()
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
return Response('', 204)
def tl_delete_collection(username, tl_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('', 404)
recipients = _list_recipients(tl)
owner_id = tl.owner_id
list_id = tl.id
db.session.delete(tl)
db.session.commit()
notify_tasklist_change(owner_id, list_id, 'deleted', shared_with=recipients)
return Response('', 204)
def tl_options(username, tl_part):
return Response('', 200, {
'DAV': '1, 2, 3, calendar-access, addressbook',
'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR, PROPPATCH',
})
def tl_proppatch(username, tl_part):
"""Bestaetige Property-Updates damit Clients zufrieden sind. Wir
persistieren Displayname + Color, alles andere wird stillschweigend
akzeptiert."""
user: User = request.dav_user
if username != user.username:
return Response('', 403)
lid = parse_tl_path(tl_part)
tl = list_for(user, lid) if lid else None
if not tl:
return Response('Not found', 404)
try:
root = ET.fromstring(request.data or b'<x/>')
except ET.ParseError:
return Response('Malformed XML', 400)
changed = False
for el in root.iter():
tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower()
if tag == 'displayname' and el.text:
tl.name = el.text
changed = True
elif tag == 'calendar-color' and el.text:
tl.color = el.text[:7]
changed = True
if changed:
db.session.commit()
multi = ET.Element(_qn('d', 'multistatus'))
resp = ET.SubElement(multi, _qn('d', 'response'))
ET.SubElement(resp, _qn('d', 'href')).text = href_list(user.username, tl.id)
ps = ET.SubElement(resp, _qn('d', 'propstat'))
ET.SubElement(ps, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
return _xml_response(multi)
def tl_mkcol(username, tl_part):
"""Erstelle eine neue TaskList per MKCOL/MKCALENDAR. Der Pfadteil
`tl-N` ist bei MKCOL aber unbekannt - DAVx5 schickt einen frei
gewaehlten Namen wie `mein-task-uuid`. Daher: wir akzeptieren jeden
Pfadteil und legen eine TaskList an."""
user: User = request.dav_user
if username != user.username:
return Response('', 403)
name = 'Neue Aufgabenliste'
try:
body = request.get_data()
if body:
root = ET.fromstring(body)
for el in root.iter():
tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower()
if tag == 'displayname' and el.text:
name = el.text
except ET.ParseError:
pass
tl = TaskList(owner_id=user.id, name=name)
db.session.add(tl)
db.session.commit()
notify_tasklist_change(user.id, tl.id, 'created')
return Response('', 201, {'Location': href_list(user.username, tl.id)})