"""Minimal CardDAV server (RFC 6352 subset). Mirror structure of caldav.py - adds addressbook collections under /dav//ab-/ and serves vCard 3.0 resources via GET/PUT/DELETE plus addressbook-query and addressbook-multiget REPORTs. Reuses the auth + XML helpers from caldav.py to stay consistent. """ from __future__ import annotations import re import uuid import xml.etree.ElementTree as ET from datetime import datetime, timezone from flask import Response, request from app.extensions import db from app.models.contact import AddressBook, Contact, AddressBookShare from app.models.user import User from app.api.contacts import ( _apply_fields_to_contact, _build_vcard, parse_vcard, _notify_addressbook, _book_recipients, ) from . import dav_bp from .caldav import ( NS, _qn, _xml_response, basic_auth, _make_response, _principal_response, # reused - we extend below ) # --------------------------------------------------------------------------- # URL helpers # --------------------------------------------------------------------------- def _href_addressbook(username: str, book_id: int) -> str: return f'/dav/{username}/ab-{book_id}/' def _href_vcard(username: str, book_id: int, uid: str) -> str: return f'/dav/{username}/ab-{book_id}/{uid}.vcf' def _parse_addressbook_path(part: str): m = re.match(r'ab-(\d+)$', part) return int(m.group(1)) if m else None def _user_addressbooks(user: User): return AddressBook.query.filter_by(owner_id=user.id).all() def _addressbook_for(user: User, book_id: int): book = db.session.get(AddressBook, book_id) if not book or book.owner_id != user.id: return None return book # --------------------------------------------------------------------------- # Property responses # --------------------------------------------------------------------------- def _addressbook_ctag(book: AddressBook) -> str: last = db.session.query(db.func.max(Contact.updated_at)).filter_by(address_book_id=book.id).scalar() ts = int((last or book.updated_at or datetime.now(timezone.utc)).timestamp()) return f'"ab{book.id}-{ts}"' def _addressbook_response(user: User, book: AddressBook) -> ET.Element: href = _href_addressbook(user.username, book.id) def populate(prop): rt = ET.SubElement(prop, _qn('d', 'resourcetype')) ET.SubElement(rt, _qn('d', 'collection')) # urn:ietf:params:xml:ns:carddav addressbook element ab = ET.SubElement(rt, '{urn:ietf:params:xml:ns:carddav}addressbook') # noqa: F841 ET.SubElement(prop, _qn('d', 'displayname')).text = book.name ET.SubElement(prop, '{urn:ietf:params:xml:ns:carddav}addressbook-description').text = book.description or '' srs = ET.SubElement(prop, _qn('d', 'supported-report-set')) for r in ('addressbook-query', 'addressbook-multiget'): sup = ET.SubElement(srs, _qn('d', 'supported-report')) rep = ET.SubElement(sup, _qn('d', 'report')) ET.SubElement(rep, '{urn:ietf:params:xml:ns:carddav}' + r) ET.SubElement(prop, _qn('ic', 'calendar-color')).text = book.color or '#3788d8' ET.SubElement(prop, _qn('cs', 'getctag')).text = _addressbook_ctag(book) cups = ET.SubElement(prop, _qn('d', 'current-user-privilege-set')) for priv in ('read', 'write', 'write-properties', 'write-content', 'bind', 'unbind'): p = ET.SubElement(cups, _qn('d', 'privilege')) ET.SubElement(p, _qn('d', priv)) return _make_response(href, populate) def _vcard_response(user: User, book: AddressBook, contact: Contact, include_data: bool = False) -> ET.Element: href = _href_vcard(user.username, book.id, contact.uid) def populate(prop): ts = int((contact.updated_at or datetime.now(timezone.utc)).timestamp() * 1000) ET.SubElement(prop, _qn('d', 'getetag')).text = f'"{contact.id}-{ts}"' ET.SubElement(prop, _qn('d', 'getcontenttype')).text = 'text/vcard; charset=utf-8' ET.SubElement(prop, _qn('d', 'resourcetype')) if include_data: ET.SubElement(prop, '{urn:ietf:params:xml:ns:carddav}address-data').text = \ contact.vcard_data or _build_vcard(contact) return _make_response(href, populate) def _etag_for_contact(contact: Contact) -> str: ts = int((contact.updated_at or contact.created_at or datetime.now(timezone.utc)).timestamp() * 1000) return f'"{contact.id}-{ts}"' # --------------------------------------------------------------------------- # Extend the principal response from caldav.py to include addressbook-home-set # This is done by wrapping the existing helper and appending the extra prop. # --------------------------------------------------------------------------- # We import caldav.propfind and add a separate URL-rule set here. For the # principal, caldav._principal_response already emits calendar-home-set; we # leave the combined principal to that function. CardDAV clients that check # addressbook-home-set via PROPFIND get it via our own route below, because # the URL `/dav//` is handled by caldav's propfind. To also return # addressbook-home-set there we monkey-patch the principal populate. # Simpler approach: re-implement the principal for our own URL-space by # hooking into the propfind dispatcher's principal branch. # Since caldav.propfind already builds the principal response, we inject the # addressbook-home-set via a wrapper. Let's override by providing our own # handler in the blueprint that augments the response. # --------------------------------------------------------------------------- # OPTIONS / PROPFIND / REPORT / GET / PUT / DELETE for /dav//ab-/... # --------------------------------------------------------------------------- _DAV_HEADERS = {'DAV': '1, 2, 3, addressbook'} @dav_bp.route('///', methods=['OPTIONS']) @dav_bp.route('//', methods=['OPTIONS']) def ab_options(username, ab_part): if not ab_part.startswith('ab-'): from .caldav import options as _cal_options return _cal_options(subpath=f'{username}/{ab_part}') return Response('', 200, { 'DAV': '1, 2, 3, addressbook', 'Allow': 'OPTIONS, PROPFIND, REPORT, GET, PUT, DELETE, PROPPATCH, MKCOL', }) @dav_bp.route('///', methods=['PROPFIND']) @dav_bp.route('//', methods=['PROPFIND']) @basic_auth def ab_propfind(username, ab_part): if not ab_part.startswith('ab-'): from .caldav import propfind as _cal_propfind return _cal_propfind(subpath=f'{username}/{ab_part}') user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) depth = request.headers.get('Depth', '0') multistatus = ET.Element(_qn('d', 'multistatus')) multistatus.append(_addressbook_response(user, book)) if depth != '0': for c in book.contacts.all(): multistatus.append(_vcard_response(user, book, c)) return _xml_response(multistatus) @dav_bp.route('///', methods=['PROPFIND']) @basic_auth def ab_contact_propfind(username, ab_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) uid = filename.removesuffix('.vcf') contact = Contact.query.filter_by(address_book_id=book.id, uid=uid).first() if not contact: return Response('Not found', 404) multistatus = ET.Element(_qn('d', 'multistatus')) multistatus.append(_vcard_response(user, book, contact, include_data=True)) return _xml_response(multistatus) @dav_bp.route('///', methods=['REPORT']) @dav_bp.route('//', methods=['REPORT']) @basic_auth def ab_report(username, ab_part): if not ab_part.startswith('ab-'): from .caldav import report as _cal_report return _cal_report(subpath=f'{username}/{ab_part}') user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) try: root = ET.fromstring(request.data or b'') except ET.ParseError: return Response('Malformed XML', 400) wants_data = root.find(f".//{{urn:ietf:params:xml:ns:carddav}}address-data") is not None multistatus = ET.Element(_qn('d', 'multistatus')) if root.tag == '{urn:ietf:params:xml:ns:carddav}addressbook-multiget': hrefs = [h.text for h in root.findall(_qn('d', 'href')) if h.text] for href in hrefs: uid = href.rsplit('/', 1)[-1].removesuffix('.vcf') contact = Contact.query.filter_by(address_book_id=book.id, uid=uid).first() if contact: multistatus.append(_vcard_response(user, book, contact, include_data=True)) return _xml_response(multistatus) if root.tag == '{urn:ietf:params:xml:ns:carddav}addressbook-query': # No filter implementation yet - return all for contact in book.contacts.all(): multistatus.append(_vcard_response(user, book, contact, include_data=wants_data)) return _xml_response(multistatus) return _xml_response(multistatus) @dav_bp.route('///', methods=['GET', 'HEAD']) @basic_auth def ab_get(username, ab_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) uid = filename.removesuffix('.vcf') contact = Contact.query.filter_by(address_book_id=book.id, uid=uid).first() if not contact: return Response('Not found', 404) return Response( contact.vcard_data or _build_vcard(contact), mimetype='text/vcard; charset=utf-8', headers={'ETag': _etag_for_contact(contact)}, ) @dav_bp.route('///', methods=['PUT']) @basic_auth def ab_put(username, ab_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) uid = filename.removesuffix('.vcf') raw = request.get_data(as_text=True) or '' parsed = parse_vcard(raw) body_uid = parsed.get('uid') or uid existing = Contact.query.filter_by(address_book_id=book.id, uid=body_uid).first() if_match = request.headers.get('If-Match') if_none_match = request.headers.get('If-None-Match') if existing and if_none_match == '*': return Response('', 412) if if_match and existing and if_match.strip() != _etag_for_contact(existing): return Response('', 412) is_new = existing is None if is_new: existing = Contact(address_book_id=book.id, uid=body_uid, vcard_data=raw) db.session.add(existing) _apply_fields_to_contact(existing, parsed) # Keep the original raw VCARD so round-tripping is faithful - but also # record the server-rebuilt version for web UI consumers. We prefer the # raw source of truth here. existing.vcard_data = raw.strip() or _build_vcard(existing) existing.updated_at = datetime.now(timezone.utc) db.session.commit() _notify_addressbook(book.owner_id, book.id, 'contact', shared_with=_book_recipients(book)) status = 201 if is_new else 204 return Response('', status, {'ETag': _etag_for_contact(existing)}) @dav_bp.route('///', methods=['DELETE']) @basic_auth def ab_delete(username, ab_part, filename): user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('Not found', 404) uid = filename.removesuffix('.vcf') contact = Contact.query.filter_by(address_book_id=book.id, uid=uid).first() if not contact: return Response('', 404) db.session.delete(contact) db.session.commit() _notify_addressbook(book.owner_id, book.id, 'contact', shared_with=_book_recipients(book)) return Response('', 204) @dav_bp.route('///', methods=['DELETE']) @dav_bp.route('//', methods=['DELETE']) @basic_auth def ab_delete_collection(username, ab_part): if not ab_part.startswith('ab-'): return Response('', 404) user: User = request.dav_user if username != user.username: return Response('', 403) book_id = _parse_addressbook_path(ab_part) book = _addressbook_for(user, book_id) if book_id else None if not book: return Response('', 404) recipients = _book_recipients(book) owner_id = book.owner_id book_id = book.id db.session.delete(book) db.session.commit() _notify_addressbook(owner_id, book_id, 'deleted', shared_with=recipients) return Response('', 204) @dav_bp.route('///', methods=['MKCOL']) @dav_bp.route('//', methods=['MKCOL']) @basic_auth def ab_mkcol(username, ab_part): """Create a new addressbook collection via MKCOL (RFC 5689 extended). Some CardDAV clients (Apple) use this instead of MKCALENDAR.""" user: User = request.dav_user if username != user.username: return Response('', 403) name = 'Neues Adressbuch' try: body = request.get_data() if body: root = ET.fromstring(body) dn = root.find(f".//{_qn('d', 'displayname')}") if dn is not None and dn.text: name = dn.text except ET.ParseError: pass book = AddressBook(owner_id=user.id, name=name) db.session.add(book) db.session.commit() _notify_addressbook(user.id, book.id, 'created') return Response('', 201, {'Location': _href_addressbook(user.username, book.id)})