"""CalDAV Task-List Handler (VTODO). TaskLists werden parallel zu Calendars als Calendar-Collection ausgeliefert, jedoch mit `` = VTODO (statt VEVENT). DAVx5/OpenTasks erkennen sie dadurch automatisch als Aufgabenliste. URL-Schema: /dav//tl-/ Collection /dav//tl-/.ics VTODO-Resource Diese Funktionen werden aus caldav.py heraus aufgerufen, sobald der URL-Bestandteil mit `tl-` beginnt - parallel zur ab-/CardDAV-Delegation. """ from __future__ import annotations import re import xml.etree.ElementTree as ET from datetime import datetime, timezone from flask import Response, request from app.extensions import db from app.models.task import TaskList, Task from app.models.user import User from app.api.tasks import build_vtodo, parse_vtodo, _list_recipients from app.services.events import notify_tasklist_change # Re-use XML helpers from caldav.py def _import_caldav_helpers(): from . import caldav return caldav def _qn(prefix, name): return _import_caldav_helpers()._qn(prefix, name) def _xml_response(elem): return _import_caldav_helpers()._xml_response(elem) def _make_response(href, populate): return _import_caldav_helpers()._make_response(href, populate) # --------------------------------------------------------------------------- # Path / URL helpers # --------------------------------------------------------------------------- def parse_tl_path(part: str): m = re.match(r'tl-(\d+)$', part) return int(m.group(1)) if m else None def href_list(username, lid): return f'/dav/{username}/tl-{lid}/' def href_task(username, lid, uid): return f'/dav/{username}/tl-{lid}/{uid}.ics' def user_lists(user: User): return TaskList.query.filter_by(owner_id=user.id).all() def list_for(user: User, lid: int): tl = db.session.get(TaskList, lid) if not tl or tl.owner_id != user.id: return None return tl def _ctag(tl: TaskList) -> str: last = db.session.query(db.func.max(Task.updated_at)).filter_by(task_list_id=tl.id).scalar() ts = int((last or tl.updated_at or datetime.now(timezone.utc)).timestamp()) return f'"tl{tl.id}-{ts}"' def _etag(t: Task) -> str: ts = int((t.updated_at or t.created_at or datetime.now(timezone.utc)).timestamp() * 1000) return f'"{t.id}-{ts}"' def _wrap_vcalendar(t: Task) -> str: block = (t.ical_data or '').strip() or build_vtodo(t) return '\r\n'.join([ 'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE', 'CALSCALE:GREGORIAN', block, 'END:VCALENDAR', ]) # --------------------------------------------------------------------------- # PROPFIND building blocks # --------------------------------------------------------------------------- def list_response(user: User, tl: TaskList) -> ET.Element: href = href_list(user.username, tl.id) def populate(prop): rt = ET.SubElement(prop, _qn('d', 'resourcetype')) ET.SubElement(rt, _qn('d', 'collection')) ET.SubElement(rt, _qn('c', 'calendar')) ET.SubElement(prop, _qn('d', 'displayname')).text = tl.name ET.SubElement(prop, _qn('c', 'calendar-description')).text = tl.description or '' supported = ET.SubElement(prop, _qn('c', 'supported-calendar-component-set')) comp = ET.SubElement(supported, _qn('c', 'comp')) comp.set('name', 'VTODO') srs = ET.SubElement(prop, _qn('d', 'supported-report-set')) for r in ('calendar-query', 'calendar-multiget'): sup = ET.SubElement(srs, _qn('d', 'supported-report')) rep = ET.SubElement(sup, _qn('d', 'report')) ET.SubElement(rep, _qn('c', r)) ET.SubElement(prop, _qn('ic', 'calendar-color')).text = tl.color or '#10b981' ET.SubElement(prop, _qn('cs', 'getctag')).text = _ctag(tl) cups = ET.SubElement(prop, _qn('d', 'current-user-privilege-set')) for priv in ('read', 'write', 'write-properties', 'write-content', 'bind', 'unbind'): p = ET.SubElement(cups, _qn('d', 'privilege')) ET.SubElement(p, _qn('d', priv)) return _make_response(href, populate) def task_response(user: User, tl: TaskList, t: Task, include_data=False) -> ET.Element: href = href_task(user.username, tl.id, t.uid) def populate(prop): ET.SubElement(prop, _qn('d', 'getetag')).text = _etag(t) ET.SubElement(prop, _qn('d', 'getcontenttype')).text = \ 'text/calendar; charset=utf-8; component=VTODO' ET.SubElement(prop, _qn('d', 'resourcetype')) if include_data: ET.SubElement(prop, _qn('c', 'calendar-data')).text = _wrap_vcalendar(t) return _make_response(href, populate) # --------------------------------------------------------------------------- # Handlers (entered from caldav.py when path starts with tl-) # --------------------------------------------------------------------------- def tl_propfind(username, tl_part): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) depth = request.headers.get('Depth', '0') multi = ET.Element(_qn('d', 'multistatus')) multi.append(list_response(user, tl)) if depth != '0': for t in tl.tasks.all(): multi.append(task_response(user, tl, t)) return _xml_response(multi) def tl_task_propfind(username, tl_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) uid = filename.removesuffix('.ics') t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first() if not t: return Response('Not found', 404) multi = ET.Element(_qn('d', 'multistatus')) multi.append(task_response(user, tl, t, include_data=True)) return _xml_response(multi) def tl_report(username, tl_part): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) try: root = ET.fromstring(request.data or b'') except ET.ParseError: return Response('Malformed XML', 400) wants_data = root.find(f".//{_qn('c', 'calendar-data')}") is not None multi = ET.Element(_qn('d', 'multistatus')) if root.tag == _qn('c', 'calendar-multiget'): hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text] for href in hrefs: uid = href.rsplit('/', 1)[-1].removesuffix('.ics') t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first() if t: multi.append(task_response(user, tl, t, include_data=True)) return _xml_response(multi) if root.tag == _qn('c', 'calendar-query'): for t in tl.tasks.all(): multi.append(task_response(user, tl, t, include_data=wants_data)) return _xml_response(multi) return _xml_response(multi) def tl_get(username, tl_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) uid = filename.removesuffix('.ics') t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first() if not t: return Response('Not found', 404) return Response(_wrap_vcalendar(t), mimetype='text/calendar; charset=utf-8', headers={'ETag': _etag(t)}) def tl_put(username, tl_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) uid = filename.removesuffix('.ics') raw = request.get_data(as_text=True) or '' parsed = parse_vtodo(raw) if not parsed: return Response('Cannot parse VTODO', 400) body_uid = parsed.get('uid') or uid existing = Task.query.filter_by(task_list_id=tl.id, uid=body_uid).first() if_match = request.headers.get('If-Match') if_none_match = request.headers.get('If-None-Match') if existing and if_none_match == '*': return Response('', 412) if if_match and existing and if_match.strip() != _etag(existing): return Response('', 412) is_new = existing is None if is_new: existing = Task(task_list_id=tl.id, uid=body_uid, ical_data=raw) db.session.add(existing) existing.summary = parsed.get('summary') or '(ohne Titel)' existing.description = parsed.get('description') existing.status = parsed.get('status') or 'NEEDS-ACTION' existing.priority = parsed.get('priority') existing.percent_complete = parsed.get('percent_complete') existing.due = parsed.get('due') existing.dtstart = parsed.get('dtstart') existing.completed_at = parsed.get('completed_at') cats = parsed.get('categories') if isinstance(cats, str): existing.categories = cats or None elif isinstance(cats, list): existing.categories = ','.join(cats) or None # Roh-Block sichern fuer Round-Trip block = re.search(r'BEGIN:VTODO.*?END:VTODO', raw, flags=re.DOTALL | re.IGNORECASE) existing.ical_data = (block.group(0).strip() if block else raw.strip()) or build_vtodo(existing) existing.updated_at = datetime.now(timezone.utc) db.session.commit() notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl)) return Response('', 201 if is_new else 204, {'ETag': _etag(existing)}) def tl_delete(username, tl_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) uid = filename.removesuffix('.ics') t = Task.query.filter_by(task_list_id=tl.id, uid=uid).first() if not t: return Response('', 404) db.session.delete(t) db.session.commit() notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl)) return Response('', 204) def tl_delete_collection(username, tl_part): user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('', 404) recipients = _list_recipients(tl) owner_id = tl.owner_id list_id = tl.id db.session.delete(tl) db.session.commit() notify_tasklist_change(owner_id, list_id, 'deleted', shared_with=recipients) return Response('', 204) def tl_options(username, tl_part): return Response('', 200, { 'DAV': '1, 2, 3, calendar-access, addressbook', 'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR, PROPPATCH', }) def tl_proppatch(username, tl_part): """Bestaetige Property-Updates damit Clients zufrieden sind. Wir persistieren Displayname + Color, alles andere wird stillschweigend akzeptiert.""" user: User = request.dav_user if username != user.username: return Response('', 403) lid = parse_tl_path(tl_part) tl = list_for(user, lid) if lid else None if not tl: return Response('Not found', 404) try: root = ET.fromstring(request.data or b'') except ET.ParseError: return Response('Malformed XML', 400) changed = False for el in root.iter(): tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower() if tag == 'displayname' and el.text: tl.name = el.text changed = True elif tag == 'calendar-color' and el.text: tl.color = el.text[:7] changed = True if changed: db.session.commit() multi = ET.Element(_qn('d', 'multistatus')) resp = ET.SubElement(multi, _qn('d', 'response')) ET.SubElement(resp, _qn('d', 'href')).text = href_list(user.username, tl.id) ps = ET.SubElement(resp, _qn('d', 'propstat')) ET.SubElement(ps, _qn('d', 'status')).text = 'HTTP/1.1 200 OK' return _xml_response(multi) def tl_mkcol(username, tl_part): """Erstelle eine neue TaskList per MKCOL/MKCALENDAR. Der Pfadteil `tl-N` ist bei MKCOL aber unbekannt - DAVx5 schickt einen frei gewaehlten Namen wie `mein-task-uuid`. Daher: wir akzeptieren jeden Pfadteil und legen eine TaskList an.""" user: User = request.dav_user if username != user.username: return Response('', 403) name = 'Neue Aufgabenliste' try: body = request.get_data() if body: root = ET.fromstring(body) for el in root.iter(): tag = (el.tag.split('}', 1)[1] if '}' in el.tag else el.tag).lower() if tag == 'displayname' and el.text: name = el.text except ET.ParseError: pass tl = TaskList(owner_id=user.id, name=name) db.session.add(tl) db.session.commit() notify_tasklist_change(user.id, tl.id, 'created') return Response('', 201, {'Location': href_list(user.username, tl.id)})