"""Minimal CalDAV server (RFC 4791 subset). Implements the endpoints that Thunderbird, DAVx5 and Apple Calendar actually use in practice: OPTIONS - capability advertisement (DAV: 1, 2, calendar-access) PROPFIND Depth 0/1 - discovery chain + listings REPORT calendar-query + calendar-multiget GET single VCALENDAR resource PUT create/update VCALENDAR resource DELETE remove a resource or calendar collection Non-goals for this revision: ACL reports, free-busy, sync-token based incremental sync, scheduling (iTIP/iMIP). Clients fall back to full PROPFIND refresh when sync-token isn't advertised, which is fine for small personal calendars. """ from __future__ import annotations import re import uuid import xml.etree.ElementTree as ET from datetime import datetime, timezone from functools import wraps from flask import Response, request from app.extensions import db from app.models.calendar import Calendar, CalendarEvent from app.models.user import User from . import dav_bp # --------------------------------------------------------------------------- # XML namespace plumbing # --------------------------------------------------------------------------- NS = { 'd': 'DAV:', 'c': 'urn:ietf:params:xml:ns:caldav', 'cs': 'http://calendarserver.org/ns/', 'ic': 'http://apple.com/ns/ical/', } for prefix, uri in NS.items(): ET.register_namespace('' if prefix == 'd' else prefix, uri) def _qn(prefix: str, local: str) -> str: return f'{{{NS[prefix]}}}{local}' def _xml_response(root: ET.Element, status: int = 207) -> Response: body = b'\n' + ET.tostring(root, encoding='utf-8') return Response(body, status=status, mimetype='application/xml; charset=utf-8') # --------------------------------------------------------------------------- # Authentication (HTTP Basic over the existing user table) # --------------------------------------------------------------------------- def _challenge() -> Response: return Response( 'Authentication required', 401, {'WWW-Authenticate': 'Basic realm="Mini-Cloud DAV"'} ) def basic_auth(f): @wraps(f) def wrapper(*args, **kwargs): auth = request.authorization if not auth or not auth.username or not auth.password: return _challenge() user = User.query.filter_by(username=auth.username).first() if not user or not user.is_active or not user.check_password(auth.password): return _challenge() request.dav_user = user return f(*args, **kwargs) return wrapper # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- DAV_HEADERS = { 'DAV': '1, 2, 3, calendar-access', } ALLOW_COLLECTION = 'OPTIONS, PROPFIND, REPORT, DELETE, MKCALENDAR' ALLOW_RESOURCE = 'OPTIONS, PROPFIND, GET, PUT, DELETE' def _etag_for_event(event: CalendarEvent) -> str: ts = int((event.updated_at or event.created_at or datetime.now(timezone.utc)).timestamp() * 1000) return f'"{event.id}-{ts}"' def _href_calendar(username: str, cal_id: int) -> str: return f'/dav/{username}/cal-{cal_id}/' def _href_event(username: str, cal_id: int, uid: str) -> str: return f'/dav/{username}/cal-{cal_id}/{uid}.ics' def _user_calendars(user: User): return Calendar.query.filter_by(owner_id=user.id).all() def _parse_calendar_path(path_part: str): """Input: "cal-42" -> 42, otherwise None.""" m = re.match(r'cal-(\d+)$', path_part) return int(m.group(1)) if m else None def _calendar_for(user: User, cal_id: int): cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return None return cal # --------------------------------------------------------------------------- # OPTIONS (advertise DAV capabilities on any path) # --------------------------------------------------------------------------- @dav_bp.route('/', methods=['OPTIONS']) @dav_bp.route('/', methods=['OPTIONS']) def options(subpath=''): headers = { **DAV_HEADERS, 'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR', } return Response('', status=200, headers=headers) # --------------------------------------------------------------------------- # PROPFIND # --------------------------------------------------------------------------- def _make_response(href: str, populate_prop) -> ET.Element: """Build a ...200 element. `populate_prop` is a callable that gets the element and appends the actual property sub-elements to it.""" resp = ET.Element(_qn('d', 'response')) ET.SubElement(resp, _qn('d', 'href')).text = href propstat = ET.SubElement(resp, _qn('d', 'propstat')) prop = ET.SubElement(propstat, _qn('d', 'prop')) populate_prop(prop) ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK' return resp def _root_response(href: str, user: User) -> ET.Element: def populate(prop): rt = ET.SubElement(prop, _qn('d', 'resourcetype')) ET.SubElement(rt, _qn('d', 'collection')) ET.SubElement(prop, _qn('d', 'displayname')).text = 'Mini-Cloud DAV' cup = ET.SubElement(prop, _qn('d', 'current-user-principal')) ET.SubElement(cup, _qn('d', 'href')).text = f'/dav/{user.username}/' return _make_response(href, populate) def _principal_response(user: User) -> ET.Element: href = f'/dav/{user.username}/' def populate(prop): rt = ET.SubElement(prop, _qn('d', 'resourcetype')) ET.SubElement(rt, _qn('d', 'collection')) ET.SubElement(rt, _qn('d', 'principal')) ET.SubElement(prop, _qn('d', 'displayname')).text = user.username cup = ET.SubElement(prop, _qn('d', 'current-user-principal')) ET.SubElement(cup, _qn('d', 'href')).text = href pu = ET.SubElement(prop, _qn('d', 'principal-URL')) ET.SubElement(pu, _qn('d', 'href')).text = href home = ET.SubElement(prop, _qn('c', 'calendar-home-set')) ET.SubElement(home, _qn('d', 'href')).text = href return _make_response(href, populate) def _calendar_response(user: User, cal: Calendar) -> ET.Element: href = _href_calendar(user.username, cal.id) def populate(prop): rt = ET.SubElement(prop, _qn('d', 'resourcetype')) ET.SubElement(rt, _qn('d', 'collection')) ET.SubElement(rt, _qn('c', 'calendar')) ET.SubElement(prop, _qn('d', 'displayname')).text = cal.name ET.SubElement(prop, _qn('c', 'calendar-description')).text = cal.description or '' supported = ET.SubElement(prop, _qn('c', 'supported-calendar-component-set')) comp = ET.SubElement(supported, _qn('c', 'comp')) comp.set('name', 'VEVENT') ET.SubElement(prop, _qn('ic', 'calendar-color')).text = cal.color or '#3788d8' ET.SubElement(prop, _qn('cs', 'getctag')).text = _calendar_ctag(cal) ET.SubElement(prop, _qn('d', 'current-user-privilege-set')) return _make_response(href, populate) def _calendar_ctag(cal: Calendar) -> str: """Collection tag: changes when any event in the calendar changes.""" last = db.session.query(db.func.max(CalendarEvent.updated_at)).filter_by(calendar_id=cal.id).scalar() ts = int((last or cal.updated_at or datetime.now(timezone.utc)).timestamp()) return f'"{cal.id}-{ts}"' def _event_response(user: User, cal: Calendar, event: CalendarEvent, include_data: bool = False) -> ET.Element: href = _href_event(user.username, cal.id, event.uid) def populate(prop): ET.SubElement(prop, _qn('d', 'getetag')).text = _etag_for_event(event) ET.SubElement(prop, _qn('d', 'getcontenttype')).text = \ 'text/calendar; charset=utf-8; component=VEVENT' ET.SubElement(prop, _qn('d', 'resourcetype')) # empty -> regular resource if include_data: ET.SubElement(prop, _qn('c', 'calendar-data')).text = _wrap_vcalendar(cal, event) return _make_response(href, populate) def _wrap_vcalendar(cal: Calendar, event: CalendarEvent) -> str: """Return a full VCALENDAR envelope around the event's ical_data.""" lines = [ 'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE', 'CALSCALE:GREGORIAN', event.ical_data.strip() if event.ical_data else '', 'END:VCALENDAR', ] return '\r\n'.join(lines) @dav_bp.route('/', methods=['PROPFIND']) @dav_bp.route('/', methods=['PROPFIND']) @basic_auth def propfind(subpath=''): user: User = request.dav_user depth = request.headers.get('Depth', '0') multistatus = ET.Element(_qn('d', 'multistatus')) parts = [p for p in subpath.split('/') if p] # /dav/ (root): return principal pointer if not parts: multistatus.append(_root_response('/dav/', user)) if depth != '0': multistatus.append(_principal_response(user)) return _xml_response(multistatus) # /dav// : principal + list calendars if len(parts) == 1: if parts[0] != user.username: return Response('', 403) multistatus.append(_principal_response(user)) if depth != '0': for cal in _user_calendars(user): multistatus.append(_calendar_response(user, cal)) return _xml_response(multistatus) # /dav//cal-/ : calendar + events if len(parts) == 2: if parts[0] != user.username: return Response('', 403) cal_id = _parse_calendar_path(parts[1]) if cal_id is None: return Response('Not found', 404) cal = _calendar_for(user, cal_id) if not cal: return Response('Not found', 404) multistatus.append(_calendar_response(user, cal)) if depth != '0': for ev in CalendarEvent.query.filter_by(calendar_id=cal.id).all(): multistatus.append(_event_response(user, cal, ev)) return _xml_response(multistatus) # /dav//cal-/.ics : single event if len(parts) == 3: if parts[0] != user.username: return Response('', 403) cal_id = _parse_calendar_path(parts[1]) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('Not found', 404) uid = parts[2].removesuffix('.ics') ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first() if not ev: return Response('Not found', 404) multistatus.append(_event_response(user, cal, ev, include_data=True)) return _xml_response(multistatus) return Response('Not found', 404) # --------------------------------------------------------------------------- # REPORT (calendar-query, calendar-multiget) # --------------------------------------------------------------------------- @dav_bp.route('/', methods=['REPORT']) @basic_auth def report(subpath): user: User = request.dav_user parts = [p for p in subpath.split('/') if p] if len(parts) < 2 or parts[0] != user.username: return Response('', 403) cal_id = _parse_calendar_path(parts[1]) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('Not found', 404) try: root = ET.fromstring(request.data or b'') except ET.ParseError: return Response('Malformed XML', 400) multistatus = ET.Element(_qn('d', 'multistatus')) tag = root.tag if tag == _qn('c', 'calendar-multiget'): hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text] for href in hrefs: uid = href.rsplit('/', 1)[-1].removesuffix('.ics') ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first() if ev: multistatus.append(_event_response(user, cal, ev, include_data=True)) return _xml_response(multistatus) if tag == _qn('c', 'calendar-query'): # Parse optional time-range start, end = _extract_time_range(root) q = CalendarEvent.query.filter_by(calendar_id=cal.id) if start is not None: q = q.filter(CalendarEvent.dtstart < end) q = q.filter( (CalendarEvent.dtend >= start) | (CalendarEvent.dtstart >= start) | (CalendarEvent.recurrence_rule.isnot(None)) ) for ev in q.all(): multistatus.append(_event_response(user, cal, ev, include_data=True)) return _xml_response(multistatus) # Unknown report - return empty multistatus so clients don't break return _xml_response(multistatus) def _extract_time_range(root: ET.Element): tr = root.find(f".//{_qn('c', 'time-range')}") if tr is None: return None, None def parse(s): if not s: return None s = s.replace('Z', '+00:00') try: return datetime.fromisoformat(s) except ValueError: # Compact ICS form: 20260412T120000Z try: return datetime.strptime(s, '%Y%m%dT%H%M%S%z') except ValueError: try: return datetime.strptime(s[:15], '%Y%m%dT%H%M%S').replace(tzinfo=timezone.utc) except ValueError: return None return parse(tr.get('start')), parse(tr.get('end')) # --------------------------------------------------------------------------- # GET single event # --------------------------------------------------------------------------- @dav_bp.route('///', methods=['GET']) @basic_auth def get_event(username, cal_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) cal_id = _parse_calendar_path(cal_part) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('Not found', 404) uid = filename.removesuffix('.ics') ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first() if not ev: return Response('Not found', 404) return Response( _wrap_vcalendar(cal, ev), mimetype='text/calendar; charset=utf-8', headers={'ETag': _etag_for_event(ev)}, ) # --------------------------------------------------------------------------- # PUT event (create or update) # --------------------------------------------------------------------------- @dav_bp.route('///', methods=['PUT']) @basic_auth def put_event(username, cal_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) cal_id = _parse_calendar_path(cal_part) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('Not found', 404) uid = filename.removesuffix('.ics') raw = request.get_data(as_text=True) or '' parsed = _parse_vevent(raw) if not parsed: return Response('Cannot parse VEVENT', 400) # UID inside the body wins over the filename if present body_uid = parsed.get('uid') or uid existing = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=body_uid).first() if_match = request.headers.get('If-Match') if_none_match = request.headers.get('If-None-Match') if existing and if_none_match == '*': return Response('', 412) if if_match and existing and if_match.strip() != _etag_for_event(existing): return Response('', 412) if not existing: existing = CalendarEvent(calendar_id=cal.id, uid=body_uid, ical_data=raw) db.session.add(existing) existing.summary = parsed.get('summary') or '(ohne Titel)' existing.description = parsed.get('description') existing.location = parsed.get('location') existing.dtstart = parsed.get('dtstart') existing.dtend = parsed.get('dtend') existing.all_day = parsed.get('all_day', False) existing.recurrence_rule = parsed.get('rrule') existing.exdates = ','.join(parsed.get('exdates', [])) or None # Keep the raw VEVENT as-is so CalDAV clients round-trip faithfully. existing.ical_data = _extract_vevent_block(raw) existing.updated_at = datetime.now(timezone.utc) db.session.commit() status = 201 if request.method == 'PUT' and not if_match else 204 return Response('', status, {'ETag': _etag_for_event(existing)}) # --------------------------------------------------------------------------- # DELETE # --------------------------------------------------------------------------- @dav_bp.route('///', methods=['DELETE']) @basic_auth def delete_event(username, cal_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) cal_id = _parse_calendar_path(cal_part) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('Not found', 404) uid = filename.removesuffix('.ics') ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first() if not ev: return Response('', 404) db.session.delete(ev) db.session.commit() return Response('', 204) @dav_bp.route('///', methods=['DELETE']) @dav_bp.route('//', methods=['DELETE']) @basic_auth def delete_calendar(username, cal_part): user: User = request.dav_user if username != user.username: return Response('', 403) cal_id = _parse_calendar_path(cal_part) cal = _calendar_for(user, cal_id) if cal_id else None if not cal: return Response('', 404) db.session.delete(cal) db.session.commit() return Response('', 204) # --------------------------------------------------------------------------- # MKCALENDAR (create a new calendar collection via the DAV URL) # --------------------------------------------------------------------------- @dav_bp.route('///', methods=['MKCALENDAR']) @dav_bp.route('//', methods=['MKCALENDAR']) @basic_auth def mkcalendar(username, cal_part): user: User = request.dav_user if username != user.username: return Response('', 403) # Extract display name from body if present name = 'Neuer Kalender' color = '#3788d8' try: body = request.get_data() if body: root = ET.fromstring(body) dn = root.find(f".//{_qn('d', 'displayname')}") if dn is not None and dn.text: name = dn.text col = root.find(f".//{_qn('ic', 'calendar-color')}") if col is not None and col.text: color = col.text[:7] except ET.ParseError: pass cal = Calendar(owner_id=user.id, name=name, color=color) db.session.add(cal) db.session.commit() return Response('', 201, {'Location': _href_calendar(user.username, cal.id)}) # --------------------------------------------------------------------------- # VEVENT parser (quick & pragmatic - covers what the major CalDAV clients send) # --------------------------------------------------------------------------- def _extract_vevent_block(raw: str) -> str: """Return only the VEVENT block from a full VCALENDAR body. If none is found the input is returned as-is.""" m = re.search(r'BEGIN:VEVENT[\s\S]*?END:VEVENT', raw, flags=re.IGNORECASE) return m.group(0) if m else raw def _unfold(raw: str) -> list[str]: """Undo RFC 5545 line folding (continuation lines start with space/tab).""" lines = [] for line in raw.replace('\r\n', '\n').split('\n'): if line.startswith((' ', '\t')) and lines: lines[-1] += line[1:] else: lines.append(line) return lines def _parse_dt(value: str, params: dict) -> tuple[datetime | None, bool]: """Parse an iCalendar DATE or DATE-TIME. Returns (datetime, all_day).""" if not value: return None, False is_date = params.get('VALUE', '').upper() == 'DATE' or len(value) == 8 if is_date: try: return datetime.strptime(value, '%Y%m%d'), True except ValueError: return None, True # Try Z (UTC), TZID-tagged, or naive floating time val = value.replace('Z', '') for fmt in ('%Y%m%dT%H%M%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'): try: dt = datetime.strptime(val, fmt) if value.endswith('Z'): dt = dt.replace(tzinfo=timezone.utc) return dt, False except ValueError: continue return None, False def _parse_vevent(raw: str) -> dict | None: block = _extract_vevent_block(raw) if 'BEGIN:VEVENT' not in block.upper(): return None result: dict = {'exdates': []} for line in _unfold(block): if ':' not in line: continue key, _, value = line.partition(':') # Separate parameters: "DTSTART;TZID=Europe/Berlin" parts = key.split(';') name = parts[0].upper() params = {} for p in parts[1:]: if '=' in p: k, v = p.split('=', 1) params[k.upper()] = v if name == 'UID': result['uid'] = value.strip() elif name == 'SUMMARY': result['summary'] = _unescape(value) elif name == 'DESCRIPTION': result['description'] = _unescape(value) elif name == 'LOCATION': result['location'] = _unescape(value) elif name == 'DTSTART': dt, all_day = _parse_dt(value, params) result['dtstart'] = dt result['all_day'] = all_day elif name == 'DTEND': dt, _ = _parse_dt(value, params) result['dtend'] = dt elif name == 'RRULE': result['rrule'] = value.strip() elif name == 'EXDATE': dt, all_day = _parse_dt(value, params) if dt: result['exdates'].append( dt.strftime('%Y-%m-%d' if all_day else '%Y-%m-%dT%H:%M:%S') ) if 'uid' not in result: result['uid'] = str(uuid.uuid4()) return result def _unescape(s: str) -> str: return s.replace('\\n', '\n').replace('\\,', ',').replace('\\;', ';').replace('\\\\', '\\')