628 lines
22 KiB
Python
628 lines
22 KiB
Python
"""Minimal CalDAV server (RFC 4791 subset).
|
|
|
|
Implements the endpoints that Thunderbird, DAVx5 and Apple Calendar
|
|
actually use in practice:
|
|
|
|
OPTIONS - capability advertisement (DAV: 1, 2, calendar-access)
|
|
PROPFIND Depth 0/1 - discovery chain + listings
|
|
REPORT calendar-query + calendar-multiget
|
|
GET single VCALENDAR resource
|
|
PUT create/update VCALENDAR resource
|
|
DELETE remove a resource or calendar collection
|
|
|
|
Non-goals for this revision: ACL reports, free-busy, sync-token based
|
|
incremental sync, scheduling (iTIP/iMIP). Clients fall back to full
|
|
PROPFIND refresh when sync-token isn't advertised, which is fine for
|
|
small personal calendars.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import uuid
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from functools import wraps
|
|
|
|
from flask import Response, request
|
|
|
|
from app.extensions import db
|
|
from app.models.calendar import Calendar, CalendarEvent
|
|
from app.models.user import User
|
|
|
|
from . import dav_bp
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# XML namespace plumbing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
NS = {
|
|
'd': 'DAV:',
|
|
'c': 'urn:ietf:params:xml:ns:caldav',
|
|
'cs': 'http://calendarserver.org/ns/',
|
|
'ic': 'http://apple.com/ns/ical/',
|
|
}
|
|
for prefix, uri in NS.items():
|
|
ET.register_namespace('' if prefix == 'd' else prefix, uri)
|
|
|
|
|
|
def _qn(prefix: str, local: str) -> str:
|
|
return f'{{{NS[prefix]}}}{local}'
|
|
|
|
|
|
def _xml_response(root: ET.Element, status: int = 207) -> Response:
|
|
body = b'<?xml version="1.0" encoding="utf-8"?>\n' + ET.tostring(root, encoding='utf-8')
|
|
return Response(body, status=status, mimetype='application/xml; charset=utf-8')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authentication (HTTP Basic over the existing user table)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _challenge() -> Response:
|
|
return Response(
|
|
'Authentication required', 401,
|
|
{'WWW-Authenticate': 'Basic realm="Mini-Cloud DAV"'}
|
|
)
|
|
|
|
|
|
def basic_auth(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
auth = request.authorization
|
|
if not auth or not auth.username or not auth.password:
|
|
return _challenge()
|
|
user = User.query.filter_by(username=auth.username).first()
|
|
if not user or not user.is_active or not user.check_password(auth.password):
|
|
return _challenge()
|
|
request.dav_user = user
|
|
return f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DAV_HEADERS = {
|
|
'DAV': '1, 2, 3, calendar-access',
|
|
}
|
|
|
|
ALLOW_COLLECTION = 'OPTIONS, PROPFIND, REPORT, DELETE, MKCALENDAR'
|
|
ALLOW_RESOURCE = 'OPTIONS, PROPFIND, GET, PUT, DELETE'
|
|
|
|
|
|
def _etag_for_event(event: CalendarEvent) -> str:
|
|
ts = int((event.updated_at or event.created_at or datetime.now(timezone.utc)).timestamp() * 1000)
|
|
return f'"{event.id}-{ts}"'
|
|
|
|
|
|
def _href_calendar(username: str, cal_id: int) -> str:
|
|
return f'/dav/{username}/cal-{cal_id}/'
|
|
|
|
|
|
def _href_event(username: str, cal_id: int, uid: str) -> str:
|
|
return f'/dav/{username}/cal-{cal_id}/{uid}.ics'
|
|
|
|
|
|
def _user_calendars(user: User):
|
|
return Calendar.query.filter_by(owner_id=user.id).all()
|
|
|
|
|
|
def _parse_calendar_path(path_part: str):
|
|
"""Input: "cal-42" -> 42, otherwise None."""
|
|
m = re.match(r'cal-(\d+)$', path_part)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
def _calendar_for(user: User, cal_id: int):
|
|
cal = db.session.get(Calendar, cal_id)
|
|
if not cal or cal.owner_id != user.id:
|
|
return None
|
|
return cal
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OPTIONS (advertise DAV capabilities on any path)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/', methods=['OPTIONS'])
|
|
@dav_bp.route('/<path:subpath>', methods=['OPTIONS'])
|
|
def options(subpath=''):
|
|
headers = {
|
|
**DAV_HEADERS,
|
|
'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, MKCALENDAR',
|
|
}
|
|
return Response('', status=200, headers=headers)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PROPFIND
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _propstat_ok(href: str, props: dict) -> ET.Element:
|
|
"""Build a <response> element with one 200 propstat containing the
|
|
given (qname -> element or string or None) properties."""
|
|
resp = ET.Element(_qn('d', 'response'))
|
|
ET.SubElement(resp, _qn('d', 'href')).text = href
|
|
propstat = ET.SubElement(resp, _qn('d', 'propstat'))
|
|
prop = ET.SubElement(propstat, _qn('d', 'prop'))
|
|
for qname, value in props.items():
|
|
el = ET.SubElement(prop, qname)
|
|
if isinstance(value, ET.Element):
|
|
el.append(value)
|
|
elif isinstance(value, list):
|
|
for child in value:
|
|
el.append(child)
|
|
elif value is not None:
|
|
el.text = str(value)
|
|
ET.SubElement(propstat, _qn('d', 'status')).text = 'HTTP/1.1 200 OK'
|
|
return resp
|
|
|
|
|
|
def _resource_type(*kinds: str) -> ET.Element:
|
|
el = ET.Element(_qn('d', 'resourcetype'))
|
|
for k in kinds:
|
|
if k == 'collection':
|
|
ET.SubElement(el, _qn('d', 'collection'))
|
|
elif k == 'calendar':
|
|
ET.SubElement(el, _qn('c', 'calendar'))
|
|
elif k == 'principal':
|
|
ET.SubElement(el, _qn('d', 'principal'))
|
|
return el
|
|
|
|
|
|
def _root_response(href: str, user: User) -> ET.Element:
|
|
principal = ET.Element(_qn('d', 'current-user-principal'))
|
|
ET.SubElement(principal, _qn('d', 'href')).text = f'/dav/{user.username}/'
|
|
return _propstat_ok(href, {
|
|
_qn('d', 'resourcetype'): _resource_type('collection'),
|
|
_qn('d', 'displayname'): 'Mini-Cloud DAV',
|
|
_qn('d', 'current-user-principal'): principal,
|
|
})
|
|
|
|
|
|
def _principal_response(user: User) -> ET.Element:
|
|
href = f'/dav/{user.username}/'
|
|
home = ET.Element(_qn('c', 'calendar-home-set'))
|
|
ET.SubElement(home, _qn('d', 'href')).text = href
|
|
principal_url = ET.Element(_qn('d', 'principal-URL'))
|
|
ET.SubElement(principal_url, _qn('d', 'href')).text = href
|
|
return _propstat_ok(href, {
|
|
_qn('d', 'resourcetype'): _resource_type('principal', 'collection'),
|
|
_qn('d', 'displayname'): user.username,
|
|
_qn('c', 'calendar-home-set'): home,
|
|
_qn('d', 'principal-URL'): principal_url,
|
|
})
|
|
|
|
|
|
def _calendar_response(user: User, cal: Calendar) -> ET.Element:
|
|
href = _href_calendar(user.username, cal.id)
|
|
supported = ET.Element(_qn('c', 'supported-calendar-component-set'))
|
|
comp = ET.SubElement(supported, _qn('c', 'comp'))
|
|
comp.set('name', 'VEVENT')
|
|
color_el = ET.Element(_qn('ic', 'calendar-color'))
|
|
color_el.text = cal.color or '#3788d8'
|
|
getctag = ET.Element(_qn('cs', 'getctag'))
|
|
getctag.text = _calendar_ctag(cal)
|
|
return _propstat_ok(href, {
|
|
_qn('d', 'resourcetype'): _resource_type('collection', 'calendar'),
|
|
_qn('d', 'displayname'): cal.name,
|
|
_qn('c', 'calendar-description'): cal.description or '',
|
|
_qn('c', 'supported-calendar-component-set'): supported,
|
|
_qn('ic', 'calendar-color'): color_el.text,
|
|
_qn('cs', 'getctag'): getctag.text,
|
|
})
|
|
|
|
|
|
def _calendar_ctag(cal: Calendar) -> str:
|
|
"""Collection tag: changes when any event in the calendar changes."""
|
|
last = db.session.query(db.func.max(CalendarEvent.updated_at)).filter_by(calendar_id=cal.id).scalar()
|
|
ts = int((last or cal.updated_at or datetime.now(timezone.utc)).timestamp())
|
|
return f'"{cal.id}-{ts}"'
|
|
|
|
|
|
def _event_response(user: User, cal: Calendar, event: CalendarEvent, include_data: bool = False) -> ET.Element:
|
|
href = _href_event(user.username, cal.id, event.uid)
|
|
props = {
|
|
_qn('d', 'getetag'): _etag_for_event(event),
|
|
_qn('d', 'getcontenttype'): 'text/calendar; charset=utf-8; component=VEVENT',
|
|
_qn('d', 'resourcetype'): ET.Element(_qn('d', 'resourcetype')),
|
|
}
|
|
if include_data:
|
|
props[_qn('c', 'calendar-data')] = _wrap_vcalendar(cal, event)
|
|
return _propstat_ok(href, props)
|
|
|
|
|
|
def _wrap_vcalendar(cal: Calendar, event: CalendarEvent) -> str:
|
|
"""Return a full VCALENDAR envelope around the event's ical_data."""
|
|
lines = [
|
|
'BEGIN:VCALENDAR',
|
|
'VERSION:2.0',
|
|
'PRODID:-//Mini-Cloud//DE',
|
|
'CALSCALE:GREGORIAN',
|
|
event.ical_data.strip() if event.ical_data else '',
|
|
'END:VCALENDAR',
|
|
]
|
|
return '\r\n'.join(lines)
|
|
|
|
|
|
@dav_bp.route('/', methods=['PROPFIND'])
|
|
@dav_bp.route('/<path:subpath>', methods=['PROPFIND'])
|
|
@basic_auth
|
|
def propfind(subpath=''):
|
|
user: User = request.dav_user
|
|
depth = request.headers.get('Depth', '0')
|
|
|
|
multistatus = ET.Element(_qn('d', 'multistatus'))
|
|
parts = [p for p in subpath.split('/') if p]
|
|
|
|
# /dav/ (root): return principal pointer
|
|
if not parts:
|
|
multistatus.append(_root_response('/dav/', user))
|
|
if depth != '0':
|
|
multistatus.append(_principal_response(user))
|
|
return _xml_response(multistatus)
|
|
|
|
# /dav/<username>/ : principal + list calendars
|
|
if len(parts) == 1:
|
|
if parts[0] != user.username:
|
|
return Response('', 403)
|
|
multistatus.append(_principal_response(user))
|
|
if depth != '0':
|
|
for cal in _user_calendars(user):
|
|
multistatus.append(_calendar_response(user, cal))
|
|
return _xml_response(multistatus)
|
|
|
|
# /dav/<username>/cal-<id>/ : calendar + events
|
|
if len(parts) == 2:
|
|
if parts[0] != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(parts[1])
|
|
if cal_id is None:
|
|
return Response('Not found', 404)
|
|
cal = _calendar_for(user, cal_id)
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
multistatus.append(_calendar_response(user, cal))
|
|
if depth != '0':
|
|
for ev in CalendarEvent.query.filter_by(calendar_id=cal.id).all():
|
|
multistatus.append(_event_response(user, cal, ev))
|
|
return _xml_response(multistatus)
|
|
|
|
# /dav/<username>/cal-<id>/<uid>.ics : single event
|
|
if len(parts) == 3:
|
|
if parts[0] != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(parts[1])
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
uid = parts[2].removesuffix('.ics')
|
|
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
|
|
if not ev:
|
|
return Response('Not found', 404)
|
|
multistatus.append(_event_response(user, cal, ev, include_data=True))
|
|
return _xml_response(multistatus)
|
|
|
|
return Response('Not found', 404)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# REPORT (calendar-query, calendar-multiget)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/<path:subpath>', methods=['REPORT'])
|
|
@basic_auth
|
|
def report(subpath):
|
|
user: User = request.dav_user
|
|
parts = [p for p in subpath.split('/') if p]
|
|
if len(parts) < 2 or parts[0] != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(parts[1])
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
|
|
try:
|
|
root = ET.fromstring(request.data or b'<x/>')
|
|
except ET.ParseError:
|
|
return Response('Malformed XML', 400)
|
|
|
|
multistatus = ET.Element(_qn('d', 'multistatus'))
|
|
tag = root.tag
|
|
|
|
if tag == _qn('c', 'calendar-multiget'):
|
|
hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text]
|
|
for href in hrefs:
|
|
uid = href.rsplit('/', 1)[-1].removesuffix('.ics')
|
|
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
|
|
if ev:
|
|
multistatus.append(_event_response(user, cal, ev, include_data=True))
|
|
return _xml_response(multistatus)
|
|
|
|
if tag == _qn('c', 'calendar-query'):
|
|
# Parse optional time-range
|
|
start, end = _extract_time_range(root)
|
|
q = CalendarEvent.query.filter_by(calendar_id=cal.id)
|
|
if start is not None:
|
|
q = q.filter(CalendarEvent.dtstart < end)
|
|
q = q.filter(
|
|
(CalendarEvent.dtend >= start) | (CalendarEvent.dtstart >= start)
|
|
| (CalendarEvent.recurrence_rule.isnot(None))
|
|
)
|
|
for ev in q.all():
|
|
multistatus.append(_event_response(user, cal, ev, include_data=True))
|
|
return _xml_response(multistatus)
|
|
|
|
# Unknown report - return empty multistatus so clients don't break
|
|
return _xml_response(multistatus)
|
|
|
|
|
|
def _extract_time_range(root: ET.Element):
|
|
tr = root.find(f".//{_qn('c', 'time-range')}")
|
|
if tr is None:
|
|
return None, None
|
|
def parse(s):
|
|
if not s:
|
|
return None
|
|
s = s.replace('Z', '+00:00')
|
|
try:
|
|
return datetime.fromisoformat(s)
|
|
except ValueError:
|
|
# Compact ICS form: 20260412T120000Z
|
|
try:
|
|
return datetime.strptime(s, '%Y%m%dT%H%M%S%z')
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(s[:15], '%Y%m%dT%H%M%S').replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
return parse(tr.get('start')), parse(tr.get('end'))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET single event
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['GET'])
|
|
@basic_auth
|
|
def get_event(username, cal_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(cal_part)
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
|
|
if not ev:
|
|
return Response('Not found', 404)
|
|
return Response(
|
|
_wrap_vcalendar(cal, ev),
|
|
mimetype='text/calendar; charset=utf-8',
|
|
headers={'ETag': _etag_for_event(ev)},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT event (create or update)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['PUT'])
|
|
@basic_auth
|
|
def put_event(username, cal_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(cal_part)
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
|
|
uid = filename.removesuffix('.ics')
|
|
raw = request.get_data(as_text=True) or ''
|
|
parsed = _parse_vevent(raw)
|
|
if not parsed:
|
|
return Response('Cannot parse VEVENT', 400)
|
|
|
|
# UID inside the body wins over the filename if present
|
|
body_uid = parsed.get('uid') or uid
|
|
|
|
existing = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=body_uid).first()
|
|
if_match = request.headers.get('If-Match')
|
|
if_none_match = request.headers.get('If-None-Match')
|
|
|
|
if existing and if_none_match == '*':
|
|
return Response('', 412)
|
|
if if_match and existing and if_match.strip() != _etag_for_event(existing):
|
|
return Response('', 412)
|
|
|
|
if not existing:
|
|
existing = CalendarEvent(calendar_id=cal.id, uid=body_uid, ical_data=raw)
|
|
db.session.add(existing)
|
|
|
|
existing.summary = parsed.get('summary') or '(ohne Titel)'
|
|
existing.description = parsed.get('description')
|
|
existing.location = parsed.get('location')
|
|
existing.dtstart = parsed.get('dtstart')
|
|
existing.dtend = parsed.get('dtend')
|
|
existing.all_day = parsed.get('all_day', False)
|
|
existing.recurrence_rule = parsed.get('rrule')
|
|
existing.exdates = ','.join(parsed.get('exdates', [])) or None
|
|
# Keep the raw VEVENT as-is so CalDAV clients round-trip faithfully.
|
|
existing.ical_data = _extract_vevent_block(raw)
|
|
existing.updated_at = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
|
|
status = 201 if request.method == 'PUT' and not if_match else 204
|
|
return Response('', status, {'ETag': _etag_for_event(existing)})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/<username>/<cal_part>/<filename>', methods=['DELETE'])
|
|
@basic_auth
|
|
def delete_event(username, cal_part, filename):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(cal_part)
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('Not found', 404)
|
|
uid = filename.removesuffix('.ics')
|
|
ev = CalendarEvent.query.filter_by(calendar_id=cal.id, uid=uid).first()
|
|
if not ev:
|
|
return Response('', 404)
|
|
db.session.delete(ev)
|
|
db.session.commit()
|
|
return Response('', 204)
|
|
|
|
|
|
@dav_bp.route('/<username>/<cal_part>/', methods=['DELETE'])
|
|
@dav_bp.route('/<username>/<cal_part>', methods=['DELETE'])
|
|
@basic_auth
|
|
def delete_calendar(username, cal_part):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
cal_id = _parse_calendar_path(cal_part)
|
|
cal = _calendar_for(user, cal_id) if cal_id else None
|
|
if not cal:
|
|
return Response('', 404)
|
|
db.session.delete(cal)
|
|
db.session.commit()
|
|
return Response('', 204)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MKCALENDAR (create a new calendar collection via the DAV URL)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dav_bp.route('/<username>/<cal_part>/', methods=['MKCALENDAR'])
|
|
@dav_bp.route('/<username>/<cal_part>', methods=['MKCALENDAR'])
|
|
@basic_auth
|
|
def mkcalendar(username, cal_part):
|
|
user: User = request.dav_user
|
|
if username != user.username:
|
|
return Response('', 403)
|
|
|
|
# Extract display name from body if present
|
|
name = 'Neuer Kalender'
|
|
color = '#3788d8'
|
|
try:
|
|
body = request.get_data()
|
|
if body:
|
|
root = ET.fromstring(body)
|
|
dn = root.find(f".//{_qn('d', 'displayname')}")
|
|
if dn is not None and dn.text:
|
|
name = dn.text
|
|
col = root.find(f".//{_qn('ic', 'calendar-color')}")
|
|
if col is not None and col.text:
|
|
color = col.text[:7]
|
|
except ET.ParseError:
|
|
pass
|
|
|
|
cal = Calendar(owner_id=user.id, name=name, color=color)
|
|
db.session.add(cal)
|
|
db.session.commit()
|
|
return Response('', 201, {'Location': _href_calendar(user.username, cal.id)})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VEVENT parser (quick & pragmatic - covers what the major CalDAV clients send)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_vevent_block(raw: str) -> str:
|
|
"""Return only the VEVENT block from a full VCALENDAR body. If none
|
|
is found the input is returned as-is."""
|
|
m = re.search(r'BEGIN:VEVENT[\s\S]*?END:VEVENT', raw, flags=re.IGNORECASE)
|
|
return m.group(0) if m else raw
|
|
|
|
|
|
def _unfold(raw: str) -> list[str]:
|
|
"""Undo RFC 5545 line folding (continuation lines start with space/tab)."""
|
|
lines = []
|
|
for line in raw.replace('\r\n', '\n').split('\n'):
|
|
if line.startswith((' ', '\t')) and lines:
|
|
lines[-1] += line[1:]
|
|
else:
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
def _parse_dt(value: str, params: dict) -> tuple[datetime | None, bool]:
|
|
"""Parse an iCalendar DATE or DATE-TIME. Returns (datetime, all_day)."""
|
|
if not value:
|
|
return None, False
|
|
is_date = params.get('VALUE', '').upper() == 'DATE' or len(value) == 8
|
|
if is_date:
|
|
try:
|
|
return datetime.strptime(value, '%Y%m%d'), True
|
|
except ValueError:
|
|
return None, True
|
|
# Try Z (UTC), TZID-tagged, or naive floating time
|
|
val = value.replace('Z', '')
|
|
for fmt in ('%Y%m%dT%H%M%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'):
|
|
try:
|
|
dt = datetime.strptime(val, fmt)
|
|
if value.endswith('Z'):
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt, False
|
|
except ValueError:
|
|
continue
|
|
return None, False
|
|
|
|
|
|
def _parse_vevent(raw: str) -> dict | None:
|
|
block = _extract_vevent_block(raw)
|
|
if 'BEGIN:VEVENT' not in block.upper():
|
|
return None
|
|
result: dict = {'exdates': []}
|
|
for line in _unfold(block):
|
|
if ':' not in line:
|
|
continue
|
|
key, _, value = line.partition(':')
|
|
# Separate parameters: "DTSTART;TZID=Europe/Berlin"
|
|
parts = key.split(';')
|
|
name = parts[0].upper()
|
|
params = {}
|
|
for p in parts[1:]:
|
|
if '=' in p:
|
|
k, v = p.split('=', 1)
|
|
params[k.upper()] = v
|
|
|
|
if name == 'UID':
|
|
result['uid'] = value.strip()
|
|
elif name == 'SUMMARY':
|
|
result['summary'] = _unescape(value)
|
|
elif name == 'DESCRIPTION':
|
|
result['description'] = _unescape(value)
|
|
elif name == 'LOCATION':
|
|
result['location'] = _unescape(value)
|
|
elif name == 'DTSTART':
|
|
dt, all_day = _parse_dt(value, params)
|
|
result['dtstart'] = dt
|
|
result['all_day'] = all_day
|
|
elif name == 'DTEND':
|
|
dt, _ = _parse_dt(value, params)
|
|
result['dtend'] = dt
|
|
elif name == 'RRULE':
|
|
result['rrule'] = value.strip()
|
|
elif name == 'EXDATE':
|
|
dt, all_day = _parse_dt(value, params)
|
|
if dt:
|
|
result['exdates'].append(
|
|
dt.strftime('%Y-%m-%d' if all_day else '%Y-%m-%dT%H:%M:%S')
|
|
)
|
|
if 'uid' not in result:
|
|
result['uid'] = str(uuid.uuid4())
|
|
return result
|
|
|
|
|
|
def _unescape(s: str) -> str:
|
|
return s.replace('\\n', '\n').replace('\\,', ',').replace('\\;', ';').replace('\\\\', '\\')
|