minmal-file-cloud-email-pim.../backend/app/dav/caldav.py

728 lines
28 KiB
Python

"""Minimal CalDAV server (RFC 4791 subset).
Implements the endpoints that Thunderbird, DAVx5 and Apple Calendar
actually use in practice:
OPTIONS - capability advertisement (DAV: 1, 2, calendar-access)
PROPFIND Depth 0/1 - discovery chain + listings
REPORT calendar-query + calendar-multiget
GET single VCALENDAR resource
PUT create/update VCALENDAR resource
DELETE remove a resource or calendar collection
Non-goals for this revision: ACL reports, free-busy, sync-token based
incremental sync, scheduling (iTIP/iMIP). Clients fall back to full
PROPFIND refresh when sync-token isn't advertised, which is fine for
small personal calendars.
"""
from __future__ import annotations
import re
import uuid
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from functools import wraps
from flask import Response, request
from app.extensions import db
from app.models.calendar import Calendar, CalendarEvent
from app.models.user import User
from . import dav_bp
# ---------------------------------------------------------------------------
# XML namespace plumbing
# ---------------------------------------------------------------------------
NS = {
'd': 'DAV:',
'c': 'urn:ietf:params:xml:ns:caldav',
'cs': 'http://calendarserver.org/ns/',
'ic': 'http://apple.com/ns/ical/',
}
for prefix, uri in NS.items():
ET.register_namespace('' if prefix == 'd' else prefix, uri)
def _qn(prefix: str, local: str) -> str:
return f'{{{NS[prefix]}}}{local}'
def _xml_response(root: ET.Element, status: int = 207) -> Response:
body = b'<?xml version="1.0" encoding="utf-8"?>\n' + ET.tostring(root, encoding='utf-8')
headers = {
'DAV': '1, 2, 3, calendar-access',
'Content-Type': 'application/xml; charset=utf-8',
}
return Response(body, status=status, headers=headers)
# ---------------------------------------------------------------------------
# Authentication (HTTP Basic over the existing user table)
# ---------------------------------------------------------------------------
def _challenge() -> Response:
return Response(
'Authentication required', 401,
{'WWW-Authenticate': 'Basic realm="Mini-Cloud DAV"'}
)
def basic_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
auth = request.authorization
if not auth or not auth.username or not auth.password:
return _challenge()
user = User.query.filter_by(username=auth.username).first()
if not user or not user.is_active or not user.check_password(auth.password):
return _challenge()
request.dav_user = user
return f(*args, **kwargs)
return wrapper
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
DAV_HEADERS = {
'DAV': '1, 2, 3, calendar-access',
}
ALLOW_COLLECTION = 'OPTIONS, PROPFIND, REPORT, DELETE, MKCALENDAR'
ALLOW_RESOURCE = 'OPTIONS, PROPFIND, GET, PUT, DELETE'
def _etag_for_event(event: CalendarEvent) -> str:
ts = int((event.updated_at or event.created_at or datetime.now(timezone.utc)).timestamp() * 1000)
return f'"{event.id}-{ts}"'
def _href_calendar(username: str, cal_id: int) -> str:
return f'/dav/{username}/cal-{cal_id}/'
def _href_event(username: str, cal_id: int, uid: str) -> str:
return f'/dav/{username}/cal-{cal_id}/{uid}.ics'
def _user_calendars(user: User):
return Calendar.query.filter_by(owner_id=user.id).all()
def _parse_calendar_path(path_part: str):
"""Input: "cal-42" -> 42, otherwise None."""
m = re.match(r'cal-(\d+)$', path_part)
return int(m.group(1)) if m else None
def _calendar_for(user: User, cal_id: int):
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return None
return cal
# ---------------------------------------------------------------------------
# OPTIONS (advertise DAV capabilities on any path)
# ---------------------------------------------------------------------------
@dav_bp.route('/', methods=['OPTIONS'])
@dav_bp.route('/<path:subpath>', methods=['OPTIONS'])
def options(subpath=''):
headers = {
**DAV_HEADERS,
'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR',
}
return Response('', status=200, headers=headers)
# ---------------------------------------------------------------------------
# PROPFIND
# ---------------------------------------------------------------------------
def _make_response(href: str, populate_prop) -> ET.Element:
"""Build a <response><href/><propstat><prop>...</prop><status>200</status>
</propstat></response> element. `populate_prop` is a callable that gets
the <prop> element and appends the actual property sub-elements to it."""
resp = ET.Element(_qn('d', 'response'))
ET.SubElement(resp, _qn('d', 'href')).text = href
propstat = ET.SubElement(resp, _qn('d', 'propstat'))
prop = ET.SubElement(propstat, _qn('d', 'prop'))
populate_prop(prop)
ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
return resp
def _root_response(href: str, user: User) -> ET.Element:
def populate(prop):
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(prop, _qn('d', 'displayname')).text = 'Mini-Cloud DAV'
cup = ET.SubElement(prop, _qn('d', 'current-user-principal'))
ET.SubElement(cup, _qn('d', 'href')).text = f'/dav/{user.username}/'
return _make_response(href, populate)
def _principal_response(user: User) -> ET.Element:
href = f'/dav/{user.username}/'
def populate(prop):
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(rt, _qn('d', 'principal'))
ET.SubElement(prop, _qn('d', 'displayname')).text = user.username
cup = ET.SubElement(prop, _qn('d', 'current-user-principal'))
ET.SubElement(cup, _qn('d', 'href')).text = href
pu = ET.SubElement(prop, _qn('d', 'principal-URL'))
ET.SubElement(pu, _qn('d', 'href')).text = href
# Separate home-sets so clients (DAVx5!) don't mix calendars and
# addressbooks in the same listing.
cal_home = ET.SubElement(prop, _qn('c', 'calendar-home-set'))
ET.SubElement(cal_home, _qn('d', 'href')).text = f'/dav/{user.username}/calendars/'
ab_home = ET.SubElement(prop, '{urn:ietf:params:xml:ns:carddav}addressbook-home-set')
ET.SubElement(ab_home, _qn('d', 'href')).text = f'/dav/{user.username}/addressbooks/'
return _make_response(href, populate)
def _calendar_response(user: User, cal: Calendar) -> ET.Element:
href = _href_calendar(user.username, cal.id)
def populate(prop):
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(rt, _qn('c', 'calendar'))
ET.SubElement(prop, _qn('d', 'displayname')).text = cal.name
ET.SubElement(prop, _qn('c', 'calendar-description')).text = cal.description or ''
supported = ET.SubElement(prop, _qn('c', 'supported-calendar-component-set'))
comp = ET.SubElement(supported, _qn('c', 'comp'))
comp.set('name', 'VEVENT')
# supported-report-set: advertise which REPORTs this collection handles
srs = ET.SubElement(prop, _qn('d', 'supported-report-set'))
for report_name in ('calendar-query', 'calendar-multiget'):
sup = ET.SubElement(srs, _qn('d', 'supported-report'))
rep = ET.SubElement(sup, _qn('d', 'report'))
ET.SubElement(rep, _qn('c', report_name))
ET.SubElement(prop, _qn('ic', 'calendar-color')).text = cal.color or '#3788d8'
ET.SubElement(prop, _qn('cs', 'getctag')).text = _calendar_ctag(cal)
# current-user-privilege-set: advertise what the authenticated user is
# allowed to do. DAVx5 checks this to decide read-only vs read-write.
cups = ET.SubElement(prop, _qn('d', 'current-user-privilege-set'))
for priv_name in ('read', 'write', 'write-properties', 'write-content', 'bind', 'unbind'):
p = ET.SubElement(cups, _qn('d', 'privilege'))
ET.SubElement(p, _qn('d', priv_name))
return _make_response(href, populate)
def _calendar_ctag(cal: Calendar) -> str:
"""Collection tag: changes when any event in the calendar changes."""
last = db.session.query(db.func.max(CalendarEvent.updated_at)).filter_by(calendar_id=cal.id).scalar()
ts = int((last or cal.updated_at or datetime.now(timezone.utc)).timestamp())
return f'"{cal.id}-{ts}"'
def _event_response(user: User, cal: Calendar, event: CalendarEvent, include_data: bool = False) -> ET.Element:
href = _href_event(user.username, cal.id, event.uid)
def populate(prop):
ET.SubElement(prop, _qn('d', 'getetag')).text = _etag_for_event(event)
ET.SubElement(prop, _qn('d', 'getcontenttype')).text = \
'text/calendar; charset=utf-8; component=VEVENT'
ET.SubElement(prop, _qn('d', 'resourcetype')) # empty -> regular resource
if include_data:
ET.SubElement(prop, _qn('c', 'calendar-data')).text = _wrap_vcalendar(cal, event)
return _make_response(href, populate)
def _wrap_vcalendar(cal: Calendar, event: CalendarEvent) -> str:
"""Return a full VCALENDAR envelope around the event's ical_data."""
lines = [
'BEGIN:VCALENDAR',
'VERSION:2.0',
'PRODID:-//Mini-Cloud//DE',
'CALSCALE:GREGORIAN',
event.ical_data.strip() if event.ical_data else '',
'END:VCALENDAR',
]
return '\r\n'.join(lines)
@dav_bp.route('/', methods=['PROPFIND'])
@dav_bp.route('/<path:subpath>', methods=['PROPFIND'])
@basic_auth
def propfind(subpath=''):
user: User = request.dav_user
depth = request.headers.get('Depth', '0')
multistatus = ET.Element(_qn('d', 'multistatus'))
parts = [p for p in subpath.split('/') if p]
# /dav/ (root) or / (when called via the app-level shortcut for DAVx5)
if not parts:
# Use the actual request path so Clients wie DAVx5 die href passend
# zu ihrer Anfrage sehen.
request_href = request.path if request.path.endswith('/') else request.path + '/'
multistatus.append(_root_response(request_href, user))
if depth != '0':
multistatus.append(_principal_response(user))
return _xml_response(multistatus)
# /dav/<username>/ : principal only (no child collections in this listing
# so clients don't mix calendars and addressbooks). Clients follow
# calendar-home-set / addressbook-home-set for the actual lists.
if len(parts) == 1:
if parts[0] != user.username:
return Response('', 403)
multistatus.append(_principal_response(user))
return _xml_response(multistatus)
# /dav/<username>/calendars/ : only calendar collections
if len(parts) == 2 and parts[1] == 'calendars':
if parts[0] != user.username:
return Response('', 403)
# A plain collection container
container = ET.Element(_qn('d', 'response'))
ET.SubElement(container, _qn('d', 'href')).text = f'/dav/{user.username}/calendars/'
propstat = ET.SubElement(container, _qn('d', 'propstat'))
prop = ET.SubElement(propstat, _qn('d', 'prop'))
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(prop, _qn('d', 'displayname')).text = 'Kalender'
ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
multistatus.append(container)
if depth != '0':
for cal in _user_calendars(user):
multistatus.append(_calendar_response(user, cal))
return _xml_response(multistatus)
# /dav/<username>/addressbooks/ : only addressbook collections
if len(parts) == 2 and parts[1] == 'addressbooks':
if parts[0] != user.username:
return Response('', 403)
from .carddav import _addressbook_response, _user_addressbooks
container = ET.Element(_qn('d', 'response'))
ET.SubElement(container, _qn('d', 'href')).text = f'/dav/{user.username}/addressbooks/'
propstat = ET.SubElement(container, _qn('d', 'propstat'))
prop = ET.SubElement(propstat, _qn('d', 'prop'))
rt = ET.SubElement(prop, _qn('d', 'resourcetype'))
ET.SubElement(rt, _qn('d', 'collection'))
ET.SubElement(prop, _qn('d', 'displayname')).text = 'Adressbücher'
ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
multistatus.append(container)
if depth != '0':
for ab in _user_addressbooks(user):
multistatus.append(_addressbook_response(user, ab))
return _xml_response(multistatus)
# /dav/<username>/cal-<id>/ : calendar + events
if len(parts) == 2:
if parts[0] != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(parts[1])
if cal_id is None:
return Response('Not found', 404)
cal = _calendar_for(user, cal_id)
if not cal:
return Response('Not found', 404)
multistatus.append(_calendar_response(user, cal))
if depth != '0':
for ev in CalendarEvent.query.filter_by(calendar_id=cal.id).all():
multistatus.append(_event_response(user, cal, ev))
return _xml_response(multistatus)
# /dav/<username>/cal-<id>/<uid>.ics : single event
if len(parts) == 3:
if parts[0] != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(parts[1])
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
uid = parts[2].removesuffix('.ics')
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
if not ev:
return Response('Not found', 404)
multistatus.append(_event_response(user, cal, ev, include_data=True))
return _xml_response(multistatus)
return Response('Not found', 404)
# ---------------------------------------------------------------------------
# REPORT (calendar-query, calendar-multiget)
# ---------------------------------------------------------------------------
@dav_bp.route('/<path:subpath>', methods=['REPORT'])
@basic_auth
def report(subpath):
user: User = request.dav_user
parts = [p for p in subpath.split('/') if p]
if len(parts) < 2 or parts[0] != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(parts[1])
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
try:
root = ET.fromstring(request.data or b'<x/>')
except ET.ParseError:
return Response('Malformed XML', 400)
multistatus = ET.Element(_qn('d', 'multistatus'))
tag = root.tag
# Prüfen ob der Client calendar-data angefragt hat. Falls nicht,
# liefern wir es auch nicht mit - strikter nach RFC und DAVx5
# entscheidet dann sauber "ich brauche Phase 2: multiget".
wants_data = root.find(f".//{_qn('c', 'calendar-data')}") is not None
if tag == _qn('c', 'calendar-multiget'):
hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text]
for href in hrefs:
uid = href.rsplit('/', 1)[-1].removesuffix('.ics')
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
if ev:
multistatus.append(_event_response(user, cal, ev, include_data=True))
return _xml_response(multistatus)
if tag == _qn('c', 'calendar-query'):
start, end = _extract_time_range(root)
q = CalendarEvent.query.filter_by(calendar_id=cal.id)
if end is not None:
q = q.filter(CalendarEvent.dtstart < end)
if start is not None:
q = q.filter(
(CalendarEvent.dtend >= start) | (CalendarEvent.dtstart >= start)
| (CalendarEvent.recurrence_rule.isnot(None))
)
for ev in q.all():
multistatus.append(_event_response(user, cal, ev, include_data=wants_data))
return _xml_response(multistatus)
# Unknown report - return empty multistatus so clients don't break
return _xml_response(multistatus)
def _extract_time_range(root: ET.Element):
tr = root.find(f".//{_qn('c', 'time-range')}")
if tr is None:
return None, None
def parse(s):
if not s:
return None
s = s.replace('Z', '+00:00')
dt = None
try:
dt = datetime.fromisoformat(s)
except ValueError:
try:
dt = datetime.strptime(s, '%Y%m%dT%H%M%S%z')
except ValueError:
try:
dt = datetime.strptime(s[:15], '%Y%m%dT%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
return None
# Unsere DB-Spalten sind tz-naive (lokal UTC) - Vergleich ginge
# sonst mit TypeError. Also tz-Info abstreifen.
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
return parse(tr.get('start')), parse(tr.get('end'))
# ---------------------------------------------------------------------------
# GET single event
# ---------------------------------------------------------------------------
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['GET', 'HEAD'])
@basic_auth
def get_event(username, cal_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(cal_part)
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
if not ev:
return Response('Not found', 404)
return Response(
_wrap_vcalendar(cal, ev),
mimetype='text/calendar; charset=utf-8',
headers={'ETag': _etag_for_event(ev)},
)
# ---------------------------------------------------------------------------
# PUT event (create or update)
# ---------------------------------------------------------------------------
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['PUT'])
@basic_auth
def put_event(username, cal_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(cal_part)
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
raw = request.get_data(as_text=True) or ''
parsed = _parse_vevent(raw)
if not parsed:
return Response('Cannot parse VEVENT', 400)
# UID inside the body wins over the filename if present
body_uid = parsed.get('uid') or uid
existing = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=body_uid).first()
if_match = request.headers.get('If-Match')
if_none_match = request.headers.get('If-None-Match')
if existing and if_none_match == '*':
return Response('', 412)
if if_match and existing and if_match.strip() != _etag_for_event(existing):
return Response('', 412)
if not existing:
existing = CalendarEvent(calendar_id=cal.id, uid=body_uid, ical_data=raw)
db.session.add(existing)
existing.summary = parsed.get('summary') or '(ohne Titel)'
existing.description = parsed.get('description')
existing.location = parsed.get('location')
existing.dtstart = parsed.get('dtstart')
existing.dtend = parsed.get('dtend')
existing.all_day = parsed.get('all_day', False)
existing.recurrence_rule = parsed.get('rrule')
existing.exdates = ','.join(parsed.get('exdates', [])) or None
# Keep the raw VEVENT as-is so CalDAV clients round-trip faithfully.
existing.ical_data = _extract_vevent_block(raw)
existing.updated_at = datetime.now(timezone.utc)
db.session.commit()
status = 201 if request.method == 'PUT' and not if_match else 204
return Response('', status, {'ETag': _etag_for_event(existing)})
# ---------------------------------------------------------------------------
# DELETE
# ---------------------------------------------------------------------------
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['DELETE'])
@basic_auth
def delete_event(username, cal_part, filename):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(cal_part)
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
uid = filename.removesuffix('.ics')
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
if not ev:
return Response('', 404)
db.session.delete(ev)
db.session.commit()
return Response('', 204)
@dav_bp.route('/<username>/<cal_part>/', methods=['DELETE'])
@dav_bp.route('/<username>/<cal_part>', methods=['DELETE'])
@basic_auth
def delete_calendar(username, cal_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(cal_part)
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('', 404)
db.session.delete(cal)
db.session.commit()
return Response('', 204)
# ---------------------------------------------------------------------------
# PROPPATCH (Clients setzen gerne Anzeigefarbe/-name). Wir persistieren
# den Kalenderfarbe (calendar-color) + Displayname; andere Properties
# bestaetigen wir als "angewendet" damit DAVx5/Apple zufrieden sind.
# ---------------------------------------------------------------------------
@dav_bp.route('/<username>/<cal_part>/', methods=['PROPPATCH'])
@dav_bp.route('/<username>/<cal_part>', methods=['PROPPATCH'])
@basic_auth
def proppatch_calendar(username, cal_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
cal_id = _parse_calendar_path(cal_part)
cal = _calendar_for(user, cal_id) if cal_id else None
if not cal:
return Response('Not found', 404)
try:
root = ET.fromstring(request.data or b'<x/>')
except ET.ParseError:
return Response('Malformed XML', 400)
for el in root.iter():
tag = el.tag
if tag == _qn('ic', 'calendar-color') and el.text:
cal.color = el.text.strip()[:7]
elif tag == _qn('d', 'displayname') and el.text:
cal.name = el.text.strip()[:255]
db.session.commit()
# Respond with 207 marking everything as applied so the client is happy.
multistatus = ET.Element(_qn('d', 'multistatus'))
href = _href_calendar(user.username, cal.id)
resp = ET.SubElement(multistatus, _qn('d', 'response'))
ET.SubElement(resp, _qn('d', 'href')).text = href
propstat = ET.SubElement(resp, _qn('d', 'propstat'))
prop = ET.SubElement(propstat, _qn('d', 'prop'))
# Echo back everything the client asked to set
for set_block in root.findall(_qn('d', 'set')):
inner_prop = set_block.find(_qn('d', 'prop'))
if inner_prop is not None:
for child in inner_prop:
ET.SubElement(prop, child.tag)
ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
return _xml_response(multistatus)
# ---------------------------------------------------------------------------
# MKCALENDAR (create a new calendar collection via the DAV URL)
# ---------------------------------------------------------------------------
@dav_bp.route('/<username>/<cal_part>/', methods=['MKCALENDAR'])
@dav_bp.route('/<username>/<cal_part>', methods=['MKCALENDAR'])
@basic_auth
def mkcalendar(username, cal_part):
user: User = request.dav_user
if username != user.username:
return Response('', 403)
# Extract display name from body if present
name = 'Neuer Kalender'
color = '#3788d8'
try:
body = request.get_data()
if body:
root = ET.fromstring(body)
dn = root.find(f".//{_qn('d', 'displayname')}")
if dn is not None and dn.text:
name = dn.text
col = root.find(f".//{_qn('ic', 'calendar-color')}")
if col is not None and col.text:
color = col.text[:7]
except ET.ParseError:
pass
cal = Calendar(owner_id=user.id, name=name, color=color)
db.session.add(cal)
db.session.commit()
return Response('', 201, {'Location': _href_calendar(user.username, cal.id)})
# ---------------------------------------------------------------------------
# VEVENT parser (quick & pragmatic - covers what the major CalDAV clients send)
# ---------------------------------------------------------------------------
def _extract_vevent_block(raw: str) -> str:
"""Return only the VEVENT block from a full VCALENDAR body. If none
is found the input is returned as-is."""
m = re.search(r'BEGIN:VEVENT[\s\S]*?END:VEVENT', raw, flags=re.IGNORECASE)
return m.group(0) if m else raw
def _unfold(raw: str) -> list[str]:
"""Undo RFC 5545 line folding (continuation lines start with space/tab)."""
lines = []
for line in raw.replace('\r\n', '\n').split('\n'):
if line.startswith((' ', '\t')) and lines:
lines[-1] += line[1:]
else:
lines.append(line)
return lines
def _parse_dt(value: str, params: dict) -> tuple[datetime | None, bool]:
"""Parse an iCalendar DATE or DATE-TIME. Returns (datetime, all_day)."""
if not value:
return None, False
is_date = params.get('VALUE', '').upper() == 'DATE' or len(value) == 8
if is_date:
try:
return datetime.strptime(value, '%Y%m%d'), True
except ValueError:
return None, True
# Try Z (UTC), TZID-tagged, or naive floating time
val = value.replace('Z', '')
for fmt in ('%Y%m%dT%H%M%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'):
try:
dt = datetime.strptime(val, fmt)
if value.endswith('Z'):
dt = dt.replace(tzinfo=timezone.utc)
return dt, False
except ValueError:
continue
return None, False
def _parse_vevent(raw: str) -> dict | None:
block = _extract_vevent_block(raw)
if 'BEGIN:VEVENT' not in block.upper():
return None
result: dict = {'exdates': []}
for line in _unfold(block):
if ':' not in line:
continue
key, _, value = line.partition(':')
# Separate parameters: "DTSTART;TZID=Europe/Berlin"
parts = key.split(';')
name = parts[0].upper()
params = {}
for p in parts[1:]:
if '=' in p:
k, v = p.split('=', 1)
params[k.upper()] = v
if name == 'UID':
result['uid'] = value.strip()
elif name == 'SUMMARY':
result['summary'] = _unescape(value)
elif name == 'DESCRIPTION':
result['description'] = _unescape(value)
elif name == 'LOCATION':
result['location'] = _unescape(value)
elif name == 'DTSTART':
dt, all_day = _parse_dt(value, params)
result['dtstart'] = dt
result['all_day'] = all_day
elif name == 'DTEND':
dt, _ = _parse_dt(value, params)
result['dtend'] = dt
elif name == 'RRULE':
result['rrule'] = value.strip()
elif name == 'EXDATE':
dt, all_day = _parse_dt(value, params)
if dt:
result['exdates'].append(
dt.strftime('%Y-%m-%d' if all_day else '%Y-%m-%dT%H:%M:%S')
)
if 'uid' not in result:
result['uid'] = str(uuid.uuid4())
return result
def _unescape(s: str) -> str:
return s.replace('\\n', '\n').replace('\\,', ',').replace('\\;', ';').replace('\\\\', '\\')