minmal-file-cloud-email-pim.../backend/app/api/calendar.py

810 lines
28 KiB
Python

import csv
import io
import re
import secrets
import uuid
from datetime import datetime, timezone
from flask import request, jsonify, Response
from app.api import api_bp
from app.api.auth import token_required
from app.extensions import db, bcrypt
from app.models.calendar import Calendar, CalendarEvent, CalendarShare
from app.models.user import User
from app.services.events import notify_calendar_change
def _calendar_recipients(cal: Calendar):
return [s.shared_with_id for s in CalendarShare.query.filter_by(calendar_id=cal.id).all()]
def _redact_if_private(event_dict: dict, is_owner: bool) -> dict:
"""For shared viewers, strip summary/description/location from private
events so only the time slot remains visible."""
if is_owner or not event_dict.get('is_private'):
return event_dict
d = dict(event_dict)
d['summary'] = 'Privat'
d['description'] = None
d['location'] = None
return d
def _redact_vevent(raw: str) -> str:
"""Strip SUMMARY/DESCRIPTION/LOCATION from a VEVENT block and set
CLASS:PRIVATE. Used for shared iCal exports and CalDAV responses."""
if not raw:
return raw
import re as _re
out_lines = []
has_class = False
for line in raw.split('\n'):
stripped = line.rstrip('\r')
upper = stripped.split(':', 1)[0].split(';', 1)[0].upper()
if upper == 'SUMMARY':
out_lines.append('SUMMARY:Privat')
elif upper in ('DESCRIPTION', 'LOCATION'):
continue
elif upper == 'CLASS':
has_class = True
out_lines.append('CLASS:PRIVATE')
else:
out_lines.append(stripped)
if not has_class:
# Inject CLASS right after UID if possible, else before END:VEVENT
for i, l in enumerate(out_lines):
if l.startswith('UID:'):
out_lines.insert(i + 1, 'CLASS:PRIVATE')
break
else:
for i, l in enumerate(out_lines):
if l.upper().startswith('END:VEVENT'):
out_lines.insert(i, 'CLASS:PRIVATE')
break
return '\r\n'.join(out_lines)
def _get_calendar_or_err(cal_id, user, need_write=False):
cal = db.session.get(Calendar, cal_id)
if not cal:
return None, (jsonify({'error': 'Kalender nicht gefunden'}), 404)
if cal.owner_id == user.id:
return cal, None
share = CalendarShare.query.filter_by(
calendar_id=cal_id, shared_with_id=user.id
).first()
if not share:
return None, (jsonify({'error': 'Zugriff verweigert'}), 403)
if need_write and share.permission != 'readwrite':
return None, (jsonify({'error': 'Schreibzugriff verweigert'}), 403)
return cal, None
# --- Calendars ---
@api_bp.route('/calendars', methods=['GET'])
@token_required
def list_calendars():
user = request.current_user
own = Calendar.query.filter_by(owner_id=user.id).all()
shared_ids = [s.calendar_id for s in
CalendarShare.query.filter_by(shared_with_id=user.id).all()]
shared = Calendar.query.filter(Calendar.id.in_(shared_ids)).all() if shared_ids else []
result = []
for c in own:
d = c.to_dict()
d['permission'] = 'owner'
result.append(d)
for c in shared:
d = c.to_dict()
share = CalendarShare.query.filter_by(
calendar_id=c.id, shared_with_id=user.id
).first()
d['permission'] = share.permission if share else 'read'
# Per-user color override: the owner's color is kept in 'owner_color'
# so the UI can show both, and 'color' reflects what this user picked.
d['owner_color'] = c.color
if share and share.color:
d['color'] = share.color
d['owner_name'] = c.owner.username
result.append(d)
return jsonify(result), 200
@api_bp.route('/calendars', methods=['POST'])
@token_required
def create_calendar():
user = request.current_user
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Name erforderlich'}), 400
cal = Calendar(
owner_id=user.id,
name=name,
color=data.get('color', '#3788d8'),
description=data.get('description', ''),
)
db.session.add(cal)
db.session.commit()
return jsonify(cal.to_dict()), 201
@api_bp.route('/calendars/<int:cal_id>', methods=['PUT'])
@token_required
def update_calendar(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404
data = request.get_json()
if 'name' in data:
cal.name = data['name'].strip()
if 'color' in data:
cal.color = data['color']
if 'description' in data:
cal.description = data['description']
db.session.commit()
return jsonify(cal.to_dict()), 200
@api_bp.route('/calendars/<int:cal_id>/my-color', methods=['PUT'])
@token_required
def set_my_calendar_color(cal_id):
"""Personal display color for a shared calendar. Doesn't affect the
owner's calendar color or any other user's view."""
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal:
return jsonify({'error': 'Nicht gefunden'}), 404
color = (request.get_json() or {}).get('color', '').strip()
if cal.owner_id == user.id:
# Owner -> update the calendar itself
if color:
cal.color = color
db.session.commit()
return jsonify({'color': cal.color}), 200
share = CalendarShare.query.filter_by(calendar_id=cal_id, shared_with_id=user.id).first()
if not share:
return jsonify({'error': 'Kein Zugriff'}), 403
share.color = color or None
db.session.commit()
return jsonify({'color': share.color or cal.color}), 200
@api_bp.route('/calendars/<int:cal_id>', methods=['DELETE'])
@token_required
def delete_calendar(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404
recipients = _calendar_recipients(cal)
owner_id = cal.owner_id
cal_id = cal.id
db.session.delete(cal)
db.session.commit()
notify_calendar_change(owner_id, cal_id, 'deleted', shared_with=recipients)
return jsonify({'message': 'Kalender geloescht'}), 200
# --- Events ---
@api_bp.route('/calendars/<int:cal_id>/events', methods=['GET'])
@token_required
def list_events(cal_id):
user = request.current_user
cal, err = _get_calendar_or_err(cal_id, user)
if err:
return err
start = request.args.get('start')
end = request.args.get('end')
query = CalendarEvent.query.filter_by(calendar_id=cal_id)
# Wiederkehrende Termine duerfen nicht per Range gefiltert werden -
# die FullCalendar-RRULE-Plugin-Expansion im Frontend braucht den
# Master-Event auch wenn dessen dtstart vor dem sichtbaren Bereich liegt.
if start:
try:
start_dt = datetime.fromisoformat(start)
query = query.filter(db.or_(
CalendarEvent.recurrence_rule.isnot(None),
CalendarEvent.dtend >= start_dt,
))
except ValueError:
pass
if end:
try:
end_dt = datetime.fromisoformat(end)
query = query.filter(db.or_(
CalendarEvent.recurrence_rule.isnot(None),
CalendarEvent.dtstart <= end_dt,
))
except ValueError:
pass
events = query.order_by(CalendarEvent.dtstart).all()
is_owner = (cal.owner_id == user.id)
return jsonify([_redact_if_private(e.to_dict(), is_owner) for e in events]), 200
@api_bp.route('/calendars/<int:cal_id>/export', methods=['GET'])
@token_required
def export_calendar(cal_id):
"""Export VEVENTs als .ics oder .csv."""
user = request.current_user
cal, err = _get_calendar_or_err(cal_id, user)
if err:
return err
fmt = (request.args.get('format') or 'ics').lower()
events = CalendarEvent.query.filter_by(calendar_id=cal_id).order_by(CalendarEvent.dtstart).all()
safe_name = re.sub(r'[^A-Za-z0-9._-]+', '_', cal.name or 'kalender') or 'kalender'
if fmt == 'ics':
lines = ['BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE', 'CALSCALE:GREGORIAN']
for e in events:
block = (e.ical_data or '').strip()
if not block:
block = _build_vevent(e.uid, e.summary or '', e.dtstart, e.dtend,
e.all_day, e.description or '', e.location or '',
e.recurrence_rule or '',
(e.exdates or '').split(',') if e.exdates else None)
# Make sure block contains BEGIN/END VEVENT
if 'BEGIN:VEVENT' not in block.upper():
continue
lines.append(block.strip())
lines.append('END:VCALENDAR')
body = '\r\n'.join(lines) + '\r\n'
return Response(
body, mimetype='text/calendar; charset=utf-8',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.ics"'},
)
if fmt == 'csv':
out = io.StringIO()
cols = ['summary', 'dtstart', 'dtend', 'all_day', 'location',
'description', 'recurrence_rule', 'uid']
w = csv.writer(out, delimiter=';', quoting=csv.QUOTE_ALL)
w.writerow(cols)
for e in events:
w.writerow([
e.summary or '',
e.dtstart.isoformat() if e.dtstart else '',
e.dtend.isoformat() if e.dtend else '',
'1' if e.all_day else '0',
e.location or '',
(e.description or '').replace('\r\n', ' ').replace('\n', ' '),
e.recurrence_rule or '',
e.uid or '',
])
return Response(
'\ufeff' + out.getvalue(), mimetype='text/csv; charset=utf-8',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.csv"'},
)
return jsonify({'error': 'Unbekanntes Format'}), 400
@api_bp.route('/calendars/<int:cal_id>/import', methods=['POST'])
@token_required
def import_calendar(cal_id):
"""Import .ics oder .csv -> Termine ins Kalender."""
from app.dav.caldav import _parse_vevent, _extract_vevent_block
user = request.current_user
cal, err = _get_calendar_or_err(cal_id, user, need_write=True)
if err:
return err
file = request.files.get('file')
if not file:
return jsonify({'error': 'Keine Datei'}), 400
raw = file.read()
name = (file.filename or '').lower()
try:
text = raw.decode('utf-8-sig')
except UnicodeDecodeError:
text = raw.decode('latin-1', errors='replace')
imported = 0
skipped = 0
def _save(parsed: dict, ical_block: str | None = None):
nonlocal imported, skipped
if not parsed.get('summary') or not parsed.get('dtstart'):
skipped += 1
return
uid = parsed.get('uid') or str(uuid.uuid4())
existing = CalendarEvent.query.filter_by(calendar_id=cal_id, uid=uid).first()
ev = existing or CalendarEvent(calendar_id=cal_id, uid=uid, ical_data='')
ev.summary = parsed.get('summary') or '(ohne Titel)'
ev.description = parsed.get('description')
ev.location = parsed.get('location')
ev.dtstart = parsed.get('dtstart')
ev.dtend = parsed.get('dtend')
ev.all_day = parsed.get('all_day', False)
ev.recurrence_rule = parsed.get('rrule')
ev.exdates = ','.join(parsed.get('exdates', [])) or None
ev.ical_data = (ical_block or '').strip() or _build_vevent(
uid, ev.summary, ev.dtstart, ev.dtend, ev.all_day,
ev.description or '', ev.location or '', ev.recurrence_rule or '',
(ev.exdates or '').split(',') if ev.exdates else None,
)
ev.updated_at = datetime.now(timezone.utc)
if not existing:
db.session.add(ev)
imported += 1
if name.endswith('.csv') or (b';' in raw[:200] and b'BEGIN:VCALENDAR' not in raw[:200]):
reader = csv.DictReader(io.StringIO(text), delimiter=';')
if not reader.fieldnames or len(reader.fieldnames) < 2:
reader = csv.DictReader(io.StringIO(text), delimiter=',')
for row in reader:
row = {k.strip().lower(): (v or '').strip() for k, v in row.items() if k}
try:
dtstart = datetime.fromisoformat(row.get('dtstart') or row.get('start') or '')
except (ValueError, TypeError):
skipped += 1
continue
try:
dtend = datetime.fromisoformat(row.get('dtend') or row.get('end') or '') if (row.get('dtend') or row.get('end')) else None
except ValueError:
dtend = None
parsed = {
'uid': row.get('uid'),
'summary': row.get('summary') or row.get('titel') or row.get('title'),
'description': row.get('description') or row.get('beschreibung'),
'location': row.get('location') or row.get('ort'),
'dtstart': dtstart,
'dtend': dtend,
'all_day': (row.get('all_day') or '').lower() in ('1', 'true', 'ja', 'yes'),
'rrule': row.get('recurrence_rule') or row.get('rrule'),
'exdates': [],
}
_save(parsed)
else:
# iCal: Kalender-Datei mit beliebig vielen VEVENTs
blocks = re.findall(r'BEGIN:VEVENT.*?END:VEVENT', text, flags=re.DOTALL | re.IGNORECASE)
if not blocks:
return jsonify({'error': 'Keine VEVENT-Daten gefunden'}), 400
for block in blocks:
try:
parsed = _parse_vevent(block)
except Exception:
parsed = None
if not parsed:
skipped += 1
continue
_save(parsed, ical_block=block)
db.session.commit()
if imported:
notify_calendar_change(cal.owner_id, cal.id, 'event',
shared_with=_calendar_recipients(cal))
return jsonify({'imported': imported, 'skipped': skipped}), 200
@api_bp.route('/calendars/<int:cal_id>/events', methods=['POST'])
@token_required
def create_event(cal_id):
user = request.current_user
cal, err = _get_calendar_or_err(cal_id, user, need_write=True)
if err:
return err
data = request.get_json()
summary = data.get('summary', '').strip()
if not summary:
return jsonify({'error': 'Zusammenfassung erforderlich'}), 400
dtstart = data.get('dtstart')
dtend = data.get('dtend')
all_day = data.get('all_day', False)
if not dtstart:
return jsonify({'error': 'Startdatum erforderlich'}), 400
try:
dtstart_dt = datetime.fromisoformat(dtstart)
dtend_dt = datetime.fromisoformat(dtend) if dtend else dtstart_dt
except ValueError:
return jsonify({'error': 'Ungueltiges Datumsformat'}), 400
event_uid = str(uuid.uuid4())
description = (data.get('description') or '').strip()
location = (data.get('location') or '').strip()
rrule = (data.get('recurrence_rule') or '').strip()
ical_data = _build_ical(event_uid, summary, dtstart_dt, dtend_dt, all_day,
description, location, rrule, None)
event = CalendarEvent(
calendar_id=cal_id,
uid=event_uid,
ical_data=ical_data,
summary=summary,
description=description or None,
location=location or None,
dtstart=dtstart_dt,
dtend=dtend_dt,
all_day=all_day,
recurrence_rule=rrule or None,
is_private=bool(data.get('is_private', False)),
)
db.session.add(event)
db.session.commit()
notify_calendar_change(cal.owner_id, cal.id, 'event',
shared_with=_calendar_recipients(cal))
return jsonify(event.to_dict()), 201
@api_bp.route('/events/<int:event_id>', methods=['PUT'])
@token_required
def update_event(event_id):
user = request.current_user
event = db.session.get(CalendarEvent, event_id)
if not event:
return jsonify({'error': 'Event nicht gefunden'}), 404
cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True)
if err:
return err
data = request.get_json()
if 'summary' in data:
event.summary = data['summary'].strip()
if 'description' in data:
event.description = (data['description'] or '').strip() or None
if 'location' in data:
event.location = (data['location'] or '').strip() or None
if 'dtstart' in data:
event.dtstart = datetime.fromisoformat(data['dtstart'])
if 'dtend' in data:
event.dtend = datetime.fromisoformat(data['dtend']) if data['dtend'] else None
if 'all_day' in data:
event.all_day = data['all_day']
if 'recurrence_rule' in data:
event.recurrence_rule = (data['recurrence_rule'] or '').strip() or None
if 'is_private' in data:
event.is_private = bool(data['is_private'])
if 'calendar_id' in data:
new_cal, cerr = _get_calendar_or_err(data['calendar_id'], user, need_write=True)
if cerr:
return cerr
event.calendar_id = data['calendar_id']
event.ical_data = _build_ical(
event.uid, event.summary, event.dtstart, event.dtend,
event.all_day, event.description or '', event.location or '',
event.recurrence_rule or '',
event.exdates.split(',') if event.exdates else None,
)
event.updated_at = datetime.now(timezone.utc)
db.session.commit()
notify_calendar_change(cal.owner_id, cal.id, 'event',
shared_with=_calendar_recipients(cal))
return jsonify(event.to_dict()), 200
@api_bp.route('/events/<int:event_id>/exception', methods=['POST'])
@token_required
def add_event_exception(event_id):
"""Exclude a single occurrence of a recurring event ("nur dieser Termin").
Optionally creates a standalone replacement event for that date."""
user = request.current_user
event = db.session.get(CalendarEvent, event_id)
if not event:
return jsonify({'error': 'Event nicht gefunden'}), 404
cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True)
if err:
return err
if not event.recurrence_rule:
return jsonify({'error': 'Kein Serientermin'}), 400
data = request.get_json()
occurrence_date = data.get('occurrence_date') # ISO date or datetime
if not occurrence_date:
return jsonify({'error': 'occurrence_date erforderlich'}), 400
# Normalize to YYYY-MM-DD for storage key
try:
parsed = datetime.fromisoformat(occurrence_date.replace('Z', '+00:00'))
key = parsed.strftime('%Y-%m-%d' if event.all_day else '%Y-%m-%dT%H:%M:%S')
except ValueError:
key = occurrence_date
existing = (event.exdates or '').split(',') if event.exdates else []
if key not in existing:
existing.append(key)
event.exdates = ','.join(filter(None, existing))
# Optional: create replacement single event
replacement = None
if data.get('replacement'):
r = data['replacement']
rep_uid = str(uuid.uuid4())
rep_start = datetime.fromisoformat(r['dtstart'])
rep_end = datetime.fromisoformat(r['dtend']) if r.get('dtend') else rep_start
replacement = CalendarEvent(
calendar_id=event.calendar_id,
uid=rep_uid,
summary=r.get('summary', event.summary),
description=r.get('description', event.description),
location=r.get('location', event.location),
dtstart=rep_start,
dtend=rep_end,
all_day=r.get('all_day', event.all_day),
recurrence_rule=None,
ical_data='',
)
replacement.ical_data = _build_ical(
rep_uid, replacement.summary, rep_start, rep_end,
replacement.all_day, replacement.description or '',
replacement.location or '', '',
)
db.session.add(replacement)
event.ical_data = _build_ical(
event.uid, event.summary, event.dtstart, event.dtend,
event.all_day, event.description or '', event.location or '',
event.recurrence_rule or '',
event.exdates.split(',') if event.exdates else None,
)
event.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify({
'event': event.to_dict(),
'replacement': replacement.to_dict() if replacement else None,
}), 200
@api_bp.route('/events/<int:event_id>', methods=['DELETE'])
@token_required
def delete_event(event_id):
user = request.current_user
event = db.session.get(CalendarEvent, event_id)
if not event:
return jsonify({'error': 'Event nicht gefunden'}), 404
cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True)
if err:
return err
cal = db.session.get(Calendar, event.calendar_id)
db.session.delete(event)
db.session.commit()
if cal:
notify_calendar_change(cal.owner_id, cal.id, 'event',
shared_with=_calendar_recipients(cal))
return jsonify({'message': 'Event geloescht'}), 200
# --- Calendar sharing ---
@api_bp.route('/calendars/<int:cal_id>/share', methods=['POST'])
@token_required
def share_calendar(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nur der Eigentuemer kann teilen'}), 403
data = request.get_json()
username = data.get('username', '').strip()
permission = data.get('permission', 'read')
if permission not in ('read', 'readwrite'):
return jsonify({'error': 'Ungueltige Berechtigung'}), 400
target = User.query.filter_by(username=username).first()
if not target:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
if target.id == user.id:
return jsonify({'error': 'Kann nicht mit sich selbst teilen'}), 400
existing = CalendarShare.query.filter_by(
calendar_id=cal_id, shared_with_id=target.id
).first()
is_new = not existing
if existing:
existing.permission = permission
else:
share = CalendarShare(
calendar_id=cal_id, shared_with_id=target.id, permission=permission
)
db.session.add(share)
db.session.commit()
if is_new:
try:
from app.services.system_mail import notify_calendar_shared
notify_calendar_shared(cal.name, user.username, target, permission)
except Exception:
pass
notify_calendar_change(cal.owner_id, cal.id, 'share',
shared_with=[target.id, *_calendar_recipients(cal)])
return jsonify({'message': f'Kalender mit {username} geteilt'}), 200
@api_bp.route('/calendars/<int:cal_id>/shares', methods=['GET'])
@token_required
def list_calendar_shares(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
shares = CalendarShare.query.filter_by(calendar_id=cal_id).all()
return jsonify([{
'id': s.id,
'user_id': s.shared_with_id,
'username': s.shared_with.username,
'permission': s.permission,
} for s in shares]), 200
@api_bp.route('/calendars/<int:cal_id>/shares/<int:share_id>', methods=['DELETE'])
@token_required
def remove_calendar_share(cal_id, share_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
share = db.session.get(CalendarShare, share_id)
if not share or share.calendar_id != cal_id:
return jsonify({'error': 'Freigabe nicht gefunden'}), 404
target_id = share.shared_with_id
db.session.delete(share)
db.session.commit()
notify_calendar_change(cal.owner_id, cal.id, 'share',
shared_with=[target_id, *_calendar_recipients(cal)])
return jsonify({'message': 'Freigabe entfernt'}), 200
# --- iCal Export ---
@api_bp.route('/calendars/<int:cal_id>/ical-link', methods=['POST'])
@token_required
def generate_ical_link(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
data = request.get_json(silent=True) or {}
password = (data.get('password') or '').strip()
if not cal.ical_token:
cal.ical_token = secrets.token_urlsafe(32)
if password:
cal.ical_password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
elif data.get('clear_password'):
cal.ical_password_hash = None
db.session.commit()
return jsonify({
'ical_url': f'/ical/{cal.ical_token}',
'token': cal.ical_token,
'has_password': bool(cal.ical_password_hash),
}), 200
@api_bp.route('/calendars/<int:cal_id>/ical-link', methods=['DELETE'])
@token_required
def revoke_ical_link(cal_id):
user = request.current_user
cal = db.session.get(Calendar, cal_id)
if not cal or cal.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
cal.ical_token = None
cal.ical_password_hash = None
db.session.commit()
return jsonify({'message': 'Link zurueckgezogen'}), 200
def _basic_auth_challenge():
return Response(
'Kalender erfordert Passwort', 401,
{'WWW-Authenticate': 'Basic realm="Mini-Cloud Kalender"'}
)
def ical_export(token):
cal = Calendar.query.filter_by(ical_token=token).first()
if not cal:
return jsonify({'error': 'Nicht gefunden'}), 404
# Password protection via HTTP Basic (compatible with DAVx5, Apple Cal,
# Thunderbird, curl, etc.). Username is ignored.
if cal.ical_password_hash:
auth = request.authorization
if not auth or not auth.password:
return _basic_auth_challenge()
if not bcrypt.check_password_hash(cal.ical_password_hash, auth.password):
return _basic_auth_challenge()
events = CalendarEvent.query.filter_by(calendar_id=cal.id).all()
lines = [
'BEGIN:VCALENDAR',
'VERSION:2.0',
'PRODID:-//Mini-Cloud//DE',
f'X-WR-CALNAME:{cal.name}',
]
for e in events:
if e.ical_data:
block = _redact_vevent(e.ical_data) if e.is_private else e.ical_data
lines.append(block)
elif e.is_private:
lines.append(_build_vevent(e.uid, 'Privat', e.dtstart, e.dtend, e.all_day))
else:
lines.append(_build_vevent(e.uid, e.summary, e.dtstart, e.dtend, e.all_day))
lines.append('END:VCALENDAR')
return Response(
'\r\n'.join(lines),
mimetype='text/calendar',
headers={'Content-Disposition': f'attachment; filename="{cal.name}.ics"'},
)
# --- Helpers ---
def _format_dt(dt, all_day=False):
if all_day:
return dt.strftime('%Y%m%d')
return dt.strftime('%Y%m%dT%H%M%SZ')
def _build_vevent(uid, summary, dtstart, dtend, all_day, description='', location='', rrule='', exdates=None):
if not dtend:
dtend = dtstart
lines = [
'BEGIN:VEVENT',
f'UID:{uid}',
]
if all_day:
lines.append(f'DTSTART;VALUE=DATE:{_format_dt(dtstart, True)}')
lines.append(f'DTEND;VALUE=DATE:{_format_dt(dtend, True)}')
else:
lines.append(f'DTSTART:{_format_dt(dtstart)}')
lines.append(f'DTEND:{_format_dt(dtend)}')
lines.append(f'SUMMARY:{summary}')
if description:
lines.append(f'DESCRIPTION:{description}')
if location:
lines.append(f'LOCATION:{location}')
if rrule:
lines.append(f'RRULE:{rrule}')
if exdates:
for ex in exdates:
if all_day:
lines.append(f'EXDATE;VALUE=DATE:{ex.replace("-", "")}')
else:
# Convert ISO datetime (with or without TZ) into YYYYMMDDTHHMMSSZ
try:
dt = datetime.fromisoformat(ex.replace('Z', '+00:00'))
lines.append(f'EXDATE:{dt.strftime("%Y%m%dT%H%M%SZ")}')
except ValueError:
pass
lines.append(f'DTSTAMP:{datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")}')
lines.append('END:VEVENT')
return '\r\n'.join(lines)
def _build_ical(uid, summary, dtstart, dtend, all_day, description='', location='', rrule='', exdates=None):
return _build_vevent(uid, summary, dtstart, dtend, all_day, description, location, rrule, exdates)