369 lines
13 KiB
Python
369 lines
13 KiB
Python
"""CalDAV Task-List Handler (VTODO).
|
|
|
|
TaskLists werden parallel zu Calendars als Calendar-Collection
|
|
ausgeliefert, jedoch mit `<supported-calendar-component-set>` = VTODO
|
|
(statt VEVENT). DAVx5/OpenTasks erkennen sie dadurch automatisch als
|
|
Aufgabenliste.
|
|
|
|
URL-Schema:
|
|
/dav/<user>/tl-<id>/ Collection
|
|
/dav/<user>/tl-<id>/<uid>.ics VTODO-Resource
|
|
|
|
Diese Funktionen werden aus caldav.py heraus aufgerufen, sobald der
|
|
URL-Bestandteil mit `tl-` beginnt - parallel zur ab-/CardDAV-Delegation.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
|
|
from flask import Response, request
|
|
|
|
from app.extensions import db
|
|
from app.models.task import TaskList, Task
|
|
from app.models.user import User
|
|
from app.api.tasks import build_vtodo, parse_vtodo, _list_recipients
|
|
from app.services.events import notify_tasklist_change
|
|
|
|
|
|
# Re-use XML helpers from caldav.py
|
|
def _import_caldav_helpers():
|
|
from . import caldav
|
|
return caldav
|
|
|
|
|
|
def _qn(prefix, name):
|
|
return _import_caldav_helpers()._qn(prefix, name)
|
|
|
|
|
|
def _xml_response(elem):
|
|
return _import_caldav_helpers()._xml_response(elem)
|
|
|
|
|
|
def _make_response(href, populate):
|
|
return _import_caldav_helpers()._make_response(href, populate)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path / URL helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_tl_path(part: str):
|
|
m = re.match(r'tl-(\d+)$', part)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
def href_list(username, lid):
|
|
return f'/dav/{username}/tl-{lid}/'
|
|
|
|
|
|
def href_task(username, lid, uid):
|
|
return f'/dav/{username}/tl-{lid}/{uid}.ics'
|
|
|
|
|
|
def user_lists(user: User):
|
|
return TaskList.query.filter_by(owner_id=user.id).all()
|
|
|
|
|
|
def list_for(user: User, lid: int):
|
|
tl = db.session.get(TaskList, lid)
|
|
if not tl or tl.owner_id != user.id:
|
|
return None
|
|
return tl
|
|
|
|
|
|
def _ctag(tl: TaskList) -> str:
|
|
last = db.session.query(db.func.max(Task.updated_at)).filter_by(task_list_id=tl.id).scalar()
|
|
ts = int((last or tl.updated_at or datetime.now(timezone.utc)).timestamp())
|
|
return f'"tl{tl.id}-{ts}"'
|
|
|
|
|
|
def _etag(t: Task) -> str:
|
|
ts = int((t.updated_at or t.created_at or datetime.now(timezone.utc)).timestamp() * 1000)
|
|
return f'"{t.id}-{ts}"'
|
|
|
|
|
|
def _wrap_vcalendar(t: Task) -> str:
|
|
block = (t.ical_data or '').strip() or build_vtodo(t)
|
|
return '\r\n'.join([
|
|
'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE',
|
|
'CALSCALE:GREGORIAN', block, 'END:VCALENDAR',
|
|
])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PROPFIND building blocks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def list_response(user: User, tl: TaskList) -> ET.Element:
|
|
href = href_list(user.username, tl.id)
|
|
|
|
def populate(prop):
|
|
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
|
|
ET.SubElement(rt, _qn('d', 'collection'))
|
|
ET.SubElement(rt, _qn('c', 'calendar'))
|
|
ET.SubElement(prop, _qn('d', 'displayname')).text = tl.name
|
|
ET.SubElement(prop, _qn('c', 'calendar-description')).text = tl.description or ''
|
|
supported = ET.SubElement(prop, _qn('c', 'supported-calendar-component-set'))
|
|
comp = ET.SubElement(supported, _qn('c', 'comp'))
|
|
comp.set('name', 'VTODO')
|
|
srs = ET.SubElement(prop, _qn('d', 'supported-report-set'))
|
|
for r in ('calendar-query', 'calendar-multiget'):
|
|
sup = ET.SubElement(srs, _qn('d', 'supported-report'))
|
|
rep = ET.SubElement(sup, _qn('d', 'report'))
|
|
ET.SubElement(rep, _qn('c', r))
|
|
ET.SubElement(prop, _qn('ic', 'calendar-color')).text = tl.color or '#10b981'
|
|
ET.SubElement(prop, _qn('cs', 'getctag')).text = _ctag(tl)
|
|
cups = ET.SubElement(prop, _qn('d', 'current-user-privilege-set'))
|
|
for priv in ('read', 'write', 'write-properties', 'write-content', 'bind', 'unbind'):
|
|
p = ET.SubElement(cups, _qn('d', 'privilege'))
|
|
ET.SubElement(p, _qn('d', priv))
|
|
return _make_response(href, populate)
|
|
|
|
|
|
def task_response(user: User, tl: TaskList, t: Task, include_data=False) -> ET.Element:
|
|
href = href_task(user.username, tl.id, t.uid)
|
|
|
|
def populate(prop):
|
|
ET.SubElement(prop, _qn('d', 'getetag')).text = _etag(t)
|
|
ET.SubElement(prop, _qn('d', 'getcontenttype')).text = \
|
|
'text/calendar; charset=utf-8; component=VTODO'
|
|
ET.SubElement(prop, _qn('d', 'resourcetype'))
|
|
if include_data:
|
|
ET.SubElement(prop, _qn('c', 'calendar-data')).text = _wrap_vcalendar(t)
|
|
return _make_response(href, populate)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Handlers (entered from caldav.py when path starts with tl-)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def tl_propfind(username, tl_part):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
depth = request.headers.get('Depth', '0')
|
|
multi = ET.Element(_qn('d', 'multistatus'))
|
|
multi.append(list_response(user, tl))
|
|
if depth != '0':
|
|
for t in tl.tasks.all():
|
|
multi.append(task_response(user, tl, t))
|
|
return _xml_response(multi)
|
|
|
|
|
|
def tl_task_propfind(username, tl_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
|
|
if not t:
|
|
return Response('Not found', 404)
|
|
multi = ET.Element(_qn('d', 'multistatus'))
|
|
multi.append(task_response(user, tl, t, include_data=True))
|
|
return _xml_response(multi)
|
|
|
|
|
|
def tl_report(username, tl_part):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
try:
|
|
root = ET.fromstring(request.data or b'<x/>')
|
|
except ET.ParseError:
|
|
return Response('Malformed XML', 400)
|
|
wants_data = root.find(f".//{_qn('c', 'calendar-data')}") is not None
|
|
multi = ET.Element(_qn('d', 'multistatus'))
|
|
if root.tag == _qn('c', 'calendar-multiget'):
|
|
hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text]
|
|
for href in hrefs:
|
|
uid = href.rsplit('/', 1)[-1].removesuffix('.ics')
|
|
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
|
|
if t:
|
|
multi.append(task_response(user, tl, t, include_data=True))
|
|
return _xml_response(multi)
|
|
if root.tag == _qn('c', 'calendar-query'):
|
|
for t in tl.tasks.all():
|
|
multi.append(task_response(user, tl, t, include_data=wants_data))
|
|
return _xml_response(multi)
|
|
return _xml_response(multi)
|
|
|
|
|
|
def tl_get(username, tl_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
|
|
if not t:
|
|
return Response('Not found', 404)
|
|
return Response(_wrap_vcalendar(t),
|
|
mimetype='text/calendar; charset=utf-8',
|
|
headers={'ETag': _etag(t)})
|
|
|
|
|
|
def tl_put(username, tl_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
raw = request.get_data(as_text=True) or ''
|
|
parsed = parse_vtodo(raw)
|
|
if not parsed:
|
|
return Response('Cannot parse VTODO', 400)
|
|
body_uid = parsed.get('uid') or uid
|
|
existing = Task.query.filter_by(task_list_id=tl.id, uid=body_uid).first()
|
|
if_match = request.headers.get('If-Match')
|
|
if_none_match = request.headers.get('If-None-Match')
|
|
if existing and if_none_match == '*':
|
|
return Response('', 412)
|
|
if if_match and existing and if_match.strip() != _etag(existing):
|
|
return Response('', 412)
|
|
is_new = existing is None
|
|
if is_new:
|
|
existing = Task(task_list_id=tl.id, uid=body_uid, ical_data=raw)
|
|
db.session.add(existing)
|
|
existing.summary = parsed.get('summary') or '(ohne Titel)'
|
|
existing.description = parsed.get('description')
|
|
existing.status = parsed.get('status') or 'NEEDS-ACTION'
|
|
existing.priority = parsed.get('priority')
|
|
existing.percent_complete = parsed.get('percent_complete')
|
|
existing.due = parsed.get('due')
|
|
existing.dtstart = parsed.get('dtstart')
|
|
existing.completed_at = parsed.get('completed_at')
|
|
cats = parsed.get('categories')
|
|
if isinstance(cats, str):
|
|
existing.categories = cats or None
|
|
elif isinstance(cats, list):
|
|
existing.categories = ','.join(cats) or None
|
|
# Roh-Block sichern fuer Round-Trip
|
|
block = re.search(r'BEGIN:VTODO.*?END:VTODO', raw, flags=re.DOTALL | re.IGNORECASE)
|
|
existing.ical_data = (block.group(0).strip() if block else raw.strip()) or build_vtodo(existing)
|
|
existing.updated_at = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return Response('', 201 if is_new else 204, {'ETag': _etag(existing)})
|
|
|
|
|
|
def tl_delete(username, tl_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first()
|
|
if not t:
|
|
return Response('', 404)
|
|
db.session.delete(t)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return Response('', 204)
|
|
|
|
|
|
def tl_delete_collection(username, tl_part):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('', 404)
|
|
recipients = _list_recipients(tl)
|
|
owner_id = tl.owner_id
|
|
list_id = tl.id
|
|
db.session.delete(tl)
|
|
db.session.commit()
|
|
notify_tasklist_change(owner_id, list_id, 'deleted', shared_with=recipients)
|
|
return Response('', 204)
|
|
|
|
|
|
def tl_options(username, tl_part):
|
|
return Response('', 200, {
|
|
'DAV': '1, 2, 3, calendar-access, addressbook',
|
|
'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR, PROPPATCH',
|
|
})
|
|
|
|
|
|
def tl_proppatch(username, tl_part):
|
|
"""Bestaetige Property-Updates damit Clients zufrieden sind. Wir
|
|
persistieren Displayname + Color, alles andere wird stillschweigend
|
|
akzeptiert."""
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
lid = parse_tl_path(tl_part)
|
|
tl = list_for(user, lid) if lid else None
|
|
if not tl:
|
|
return Response('Not found', 404)
|
|
try:
|
|
root = ET.fromstring(request.data or b'<x/>')
|
|
except ET.ParseError:
|
|
return Response('Malformed XML', 400)
|
|
changed = False
|
|
for el in root.iter():
|
|
tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower()
|
|
if tag == 'displayname' and el.text:
|
|
tl.name = el.text
|
|
changed = True
|
|
elif tag == 'calendar-color' and el.text:
|
|
tl.color = el.text[:7]
|
|
changed = True
|
|
if changed:
|
|
db.session.commit()
|
|
multi = ET.Element(_qn('d', 'multistatus'))
|
|
resp = ET.SubElement(multi, _qn('d', 'response'))
|
|
ET.SubElement(resp, _qn('d', 'href')).text = href_list(user.username, tl.id)
|
|
ps = ET.SubElement(resp, _qn('d', 'propstat'))
|
|
ET.SubElement(ps, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
|
|
return _xml_response(multi)
|
|
|
|
|
|
def tl_mkcol(username, tl_part):
|
|
"""Erstelle eine neue TaskList per MKCOL/MKCALENDAR. Der Pfadteil
|
|
`tl-N` ist bei MKCOL aber unbekannt - DAVx5 schickt einen frei
|
|
gewaehlten Namen wie `mein-task-uuid`. Daher: wir akzeptieren jeden
|
|
Pfadteil und legen eine TaskList an."""
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
name = 'Neue Aufgabenliste'
|
|
try:
|
|
body = request.get_data()
|
|
if body:
|
|
root = ET.fromstring(body)
|
|
for el in root.iter():
|
|
tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower()
|
|
if tag == 'displayname' and el.text:
|
|
name = el.text
|
|
except ET.ParseError:
|
|
pass
|
|
tl = TaskList(owner_id=user.id, name=name)
|
|
db.session.add(tl)
|
|
db.session.commit()
|
|
notify_tasklist_change(user.id, tl.id, 'created')
|
|
return Response('', 201, {'Location': href_list(user.username, tl.id)})
|