import secrets import uuid from datetime import datetime, timezone from flask import request, jsonify, Response from app.api import api_bp from app.api.auth import token_required from app.extensions import db, bcrypt from app.models.calendar import Calendar, CalendarEvent, CalendarShare from app.models.user import User from app.services.events import notify_calendar_change def _calendar_recipients(cal: Calendar): return [s.shared_with_id for s in CalendarShare.query.filter_by(calendar_id=cal.id).all()] def _redact_if_private(event_dict: dict, is_owner: bool) -> dict: """For shared viewers, strip summary/description/location from private events so only the time slot remains visible.""" if is_owner or not event_dict.get('is_private'): return event_dict d = dict(event_dict) d['summary'] = 'Privat' d['description'] = None d['location'] = None return d def _redact_vevent(raw: str) -> str: """Strip SUMMARY/DESCRIPTION/LOCATION from a VEVENT block and set CLASS:PRIVATE. Used for shared iCal exports and CalDAV responses.""" if not raw: return raw import re as _re out_lines = [] has_class = False for line in raw.split('\n'): stripped = line.rstrip('\r') upper = stripped.split(':', 1)[0].split(';', 1)[0].upper() if upper == 'SUMMARY': out_lines.append('SUMMARY:Privat') elif upper in ('DESCRIPTION', 'LOCATION'): continue elif upper == 'CLASS': has_class = True out_lines.append('CLASS:PRIVATE') else: out_lines.append(stripped) if not has_class: # Inject CLASS right after UID if possible, else before END:VEVENT for i, l in enumerate(out_lines): if l.startswith('UID:'): out_lines.insert(i + 1, 'CLASS:PRIVATE') break else: for i, l in enumerate(out_lines): if l.upper().startswith('END:VEVENT'): out_lines.insert(i, 'CLASS:PRIVATE') break return '\r\n'.join(out_lines) def _get_calendar_or_err(cal_id, user, need_write=False): cal = db.session.get(Calendar, cal_id) if not cal: return None, (jsonify({'error': 'Kalender nicht gefunden'}), 404) if cal.owner_id == user.id: return cal, None share = CalendarShare.query.filter_by( calendar_id=cal_id, shared_with_id=user.id ).first() if not share: return None, (jsonify({'error': 'Zugriff verweigert'}), 403) if need_write and share.permission != 'readwrite': return None, (jsonify({'error': 'Schreibzugriff verweigert'}), 403) return cal, None # --- Calendars --- @api_bp.route('/calendars', methods=['GET']) @token_required def list_calendars(): user = request.current_user own = Calendar.query.filter_by(owner_id=user.id).all() shared_ids = [s.calendar_id for s in CalendarShare.query.filter_by(shared_with_id=user.id).all()] shared = Calendar.query.filter(Calendar.id.in_(shared_ids)).all() if shared_ids else [] result = [] for c in own: d = c.to_dict() d['permission'] = 'owner' result.append(d) for c in shared: d = c.to_dict() share = CalendarShare.query.filter_by( calendar_id=c.id, shared_with_id=user.id ).first() d['permission'] = share.permission if share else 'read' # Per-user color override: the owner's color is kept in 'owner_color' # so the UI can show both, and 'color' reflects what this user picked. d['owner_color'] = c.color if share and share.color: d['color'] = share.color d['owner_name'] = c.owner.username result.append(d) return jsonify(result), 200 @api_bp.route('/calendars', methods=['POST']) @token_required def create_calendar(): user = request.current_user data = request.get_json() name = data.get('name', '').strip() if not name: return jsonify({'error': 'Name erforderlich'}), 400 cal = Calendar( owner_id=user.id, name=name, color=data.get('color', '#3788d8'), description=data.get('description', ''), ) db.session.add(cal) db.session.commit() return jsonify(cal.to_dict()), 201 @api_bp.route('/calendars/', methods=['PUT']) @token_required def update_calendar(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404 data = request.get_json() if 'name' in data: cal.name = data['name'].strip() if 'color' in data: cal.color = data['color'] if 'description' in data: cal.description = data['description'] db.session.commit() return jsonify(cal.to_dict()), 200 @api_bp.route('/calendars//my-color', methods=['PUT']) @token_required def set_my_calendar_color(cal_id): """Personal display color for a shared calendar. Doesn't affect the owner's calendar color or any other user's view.""" user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal: return jsonify({'error': 'Nicht gefunden'}), 404 color = (request.get_json() or {}).get('color', '').strip() if cal.owner_id == user.id: # Owner -> update the calendar itself if color: cal.color = color db.session.commit() return jsonify({'color': cal.color}), 200 share = CalendarShare.query.filter_by(calendar_id=cal_id, shared_with_id=user.id).first() if not share: return jsonify({'error': 'Kein Zugriff'}), 403 share.color = color or None db.session.commit() return jsonify({'color': share.color or cal.color}), 200 @api_bp.route('/calendars/', methods=['DELETE']) @token_required def delete_calendar(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404 recipients = _calendar_recipients(cal) owner_id = cal.owner_id cal_id = cal.id db.session.delete(cal) db.session.commit() notify_calendar_change(owner_id, cal_id, 'deleted', shared_with=recipients) return jsonify({'message': 'Kalender geloescht'}), 200 # --- Events --- @api_bp.route('/calendars//events', methods=['GET']) @token_required def list_events(cal_id): user = request.current_user cal, err = _get_calendar_or_err(cal_id, user) if err: return err start = request.args.get('start') end = request.args.get('end') query = CalendarEvent.query.filter_by(calendar_id=cal_id) # Wiederkehrende Termine duerfen nicht per Range gefiltert werden - # die FullCalendar-RRULE-Plugin-Expansion im Frontend braucht den # Master-Event auch wenn dessen dtstart vor dem sichtbaren Bereich liegt. if start: try: start_dt = datetime.fromisoformat(start) query = query.filter(db.or_( CalendarEvent.recurrence_rule.isnot(None), CalendarEvent.dtend >= start_dt, )) except ValueError: pass if end: try: end_dt = datetime.fromisoformat(end) query = query.filter(db.or_( CalendarEvent.recurrence_rule.isnot(None), CalendarEvent.dtstart <= end_dt, )) except ValueError: pass events = query.order_by(CalendarEvent.dtstart).all() is_owner = (cal.owner_id == user.id) return jsonify([_redact_if_private(e.to_dict(), is_owner) for e in events]), 200 @api_bp.route('/calendars//events', methods=['POST']) @token_required def create_event(cal_id): user = request.current_user cal, err = _get_calendar_or_err(cal_id, user, need_write=True) if err: return err data = request.get_json() summary = data.get('summary', '').strip() if not summary: return jsonify({'error': 'Zusammenfassung erforderlich'}), 400 dtstart = data.get('dtstart') dtend = data.get('dtend') all_day = data.get('all_day', False) if not dtstart: return jsonify({'error': 'Startdatum erforderlich'}), 400 try: dtstart_dt = datetime.fromisoformat(dtstart) dtend_dt = datetime.fromisoformat(dtend) if dtend else dtstart_dt except ValueError: return jsonify({'error': 'Ungueltiges Datumsformat'}), 400 event_uid = str(uuid.uuid4()) description = (data.get('description') or '').strip() location = (data.get('location') or '').strip() rrule = (data.get('recurrence_rule') or '').strip() ical_data = _build_ical(event_uid, summary, dtstart_dt, dtend_dt, all_day, description, location, rrule, None) event = CalendarEvent( calendar_id=cal_id, uid=event_uid, ical_data=ical_data, summary=summary, description=description or None, location=location or None, dtstart=dtstart_dt, dtend=dtend_dt, all_day=all_day, recurrence_rule=rrule or None, is_private=bool(data.get('is_private', False)), ) db.session.add(event) db.session.commit() notify_calendar_change(cal.owner_id, cal.id, 'event', shared_with=_calendar_recipients(cal)) return jsonify(event.to_dict()), 201 @api_bp.route('/events/', methods=['PUT']) @token_required def update_event(event_id): user = request.current_user event = db.session.get(CalendarEvent, event_id) if not event: return jsonify({'error': 'Event nicht gefunden'}), 404 cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True) if err: return err data = request.get_json() if 'summary' in data: event.summary = data['summary'].strip() if 'description' in data: event.description = (data['description'] or '').strip() or None if 'location' in data: event.location = (data['location'] or '').strip() or None if 'dtstart' in data: event.dtstart = datetime.fromisoformat(data['dtstart']) if 'dtend' in data: event.dtend = datetime.fromisoformat(data['dtend']) if data['dtend'] else None if 'all_day' in data: event.all_day = data['all_day'] if 'recurrence_rule' in data: event.recurrence_rule = (data['recurrence_rule'] or '').strip() or None if 'is_private' in data: event.is_private = bool(data['is_private']) if 'calendar_id' in data: new_cal, cerr = _get_calendar_or_err(data['calendar_id'], user, need_write=True) if cerr: return cerr event.calendar_id = data['calendar_id'] event.ical_data = _build_ical( event.uid, event.summary, event.dtstart, event.dtend, event.all_day, event.description or '', event.location or '', event.recurrence_rule or '', event.exdates.split(',') if event.exdates else None, ) event.updated_at = datetime.now(timezone.utc) db.session.commit() notify_calendar_change(cal.owner_id, cal.id, 'event', shared_with=_calendar_recipients(cal)) return jsonify(event.to_dict()), 200 @api_bp.route('/events//exception', methods=['POST']) @token_required def add_event_exception(event_id): """Exclude a single occurrence of a recurring event ("nur dieser Termin"). Optionally creates a standalone replacement event for that date.""" user = request.current_user event = db.session.get(CalendarEvent, event_id) if not event: return jsonify({'error': 'Event nicht gefunden'}), 404 cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True) if err: return err if not event.recurrence_rule: return jsonify({'error': 'Kein Serientermin'}), 400 data = request.get_json() occurrence_date = data.get('occurrence_date') # ISO date or datetime if not occurrence_date: return jsonify({'error': 'occurrence_date erforderlich'}), 400 # Normalize to YYYY-MM-DD for storage key try: parsed = datetime.fromisoformat(occurrence_date.replace('Z', '+00:00')) key = parsed.strftime('%Y-%m-%d' if event.all_day else '%Y-%m-%dT%H:%M:%S') except ValueError: key = occurrence_date existing = (event.exdates or '').split(',') if event.exdates else [] if key not in existing: existing.append(key) event.exdates = ','.join(filter(None, existing)) # Optional: create replacement single event replacement = None if data.get('replacement'): r = data['replacement'] rep_uid = str(uuid.uuid4()) rep_start = datetime.fromisoformat(r['dtstart']) rep_end = datetime.fromisoformat(r['dtend']) if r.get('dtend') else rep_start replacement = CalendarEvent( calendar_id=event.calendar_id, uid=rep_uid, summary=r.get('summary', event.summary), description=r.get('description', event.description), location=r.get('location', event.location), dtstart=rep_start, dtend=rep_end, all_day=r.get('all_day', event.all_day), recurrence_rule=None, ical_data='', ) replacement.ical_data = _build_ical( rep_uid, replacement.summary, rep_start, rep_end, replacement.all_day, replacement.description or '', replacement.location or '', '', ) db.session.add(replacement) event.ical_data = _build_ical( event.uid, event.summary, event.dtstart, event.dtend, event.all_day, event.description or '', event.location or '', event.recurrence_rule or '', event.exdates.split(',') if event.exdates else None, ) event.updated_at = datetime.now(timezone.utc) db.session.commit() return jsonify({ 'event': event.to_dict(), 'replacement': replacement.to_dict() if replacement else None, }), 200 @api_bp.route('/events/', methods=['DELETE']) @token_required def delete_event(event_id): user = request.current_user event = db.session.get(CalendarEvent, event_id) if not event: return jsonify({'error': 'Event nicht gefunden'}), 404 cal, err = _get_calendar_or_err(event.calendar_id, user, need_write=True) if err: return err cal = db.session.get(Calendar, event.calendar_id) db.session.delete(event) db.session.commit() if cal: notify_calendar_change(cal.owner_id, cal.id, 'event', shared_with=_calendar_recipients(cal)) return jsonify({'message': 'Event geloescht'}), 200 # --- Calendar sharing --- @api_bp.route('/calendars//share', methods=['POST']) @token_required def share_calendar(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nur der Eigentuemer kann teilen'}), 403 data = request.get_json() username = data.get('username', '').strip() permission = data.get('permission', 'read') if permission not in ('read', 'readwrite'): return jsonify({'error': 'Ungueltige Berechtigung'}), 400 target = User.query.filter_by(username=username).first() if not target: return jsonify({'error': 'Benutzer nicht gefunden'}), 404 if target.id == user.id: return jsonify({'error': 'Kann nicht mit sich selbst teilen'}), 400 existing = CalendarShare.query.filter_by( calendar_id=cal_id, shared_with_id=target.id ).first() is_new = not existing if existing: existing.permission = permission else: share = CalendarShare( calendar_id=cal_id, shared_with_id=target.id, permission=permission ) db.session.add(share) db.session.commit() if is_new: try: from app.services.system_mail import notify_calendar_shared notify_calendar_shared(cal.name, user.username, target, permission) except Exception: pass notify_calendar_change(cal.owner_id, cal.id, 'share', shared_with=[target.id, *_calendar_recipients(cal)]) return jsonify({'message': f'Kalender mit {username} geteilt'}), 200 @api_bp.route('/calendars//shares', methods=['GET']) @token_required def list_calendar_shares(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden'}), 404 shares = CalendarShare.query.filter_by(calendar_id=cal_id).all() return jsonify([{ 'id': s.id, 'user_id': s.shared_with_id, 'username': s.shared_with.username, 'permission': s.permission, } for s in shares]), 200 @api_bp.route('/calendars//shares/', methods=['DELETE']) @token_required def remove_calendar_share(cal_id, share_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden'}), 404 share = db.session.get(CalendarShare, share_id) if not share or share.calendar_id != cal_id: return jsonify({'error': 'Freigabe nicht gefunden'}), 404 target_id = share.shared_with_id db.session.delete(share) db.session.commit() notify_calendar_change(cal.owner_id, cal.id, 'share', shared_with=[target_id, *_calendar_recipients(cal)]) return jsonify({'message': 'Freigabe entfernt'}), 200 # --- iCal Export --- @api_bp.route('/calendars//ical-link', methods=['POST']) @token_required def generate_ical_link(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden'}), 404 data = request.get_json(silent=True) or {} password = (data.get('password') or '').strip() if not cal.ical_token: cal.ical_token = secrets.token_urlsafe(32) if password: cal.ical_password_hash = bcrypt.generate_password_hash(password).decode('utf-8') elif data.get('clear_password'): cal.ical_password_hash = None db.session.commit() return jsonify({ 'ical_url': f'/ical/{cal.ical_token}', 'token': cal.ical_token, 'has_password': bool(cal.ical_password_hash), }), 200 @api_bp.route('/calendars//ical-link', methods=['DELETE']) @token_required def revoke_ical_link(cal_id): user = request.current_user cal = db.session.get(Calendar, cal_id) if not cal or cal.owner_id != user.id: return jsonify({'error': 'Nicht gefunden'}), 404 cal.ical_token = None cal.ical_password_hash = None db.session.commit() return jsonify({'message': 'Link zurueckgezogen'}), 200 def _basic_auth_challenge(): return Response( 'Kalender erfordert Passwort', 401, {'WWW-Authenticate': 'Basic realm="Mini-Cloud Kalender"'} ) def ical_export(token): cal = Calendar.query.filter_by(ical_token=token).first() if not cal: return jsonify({'error': 'Nicht gefunden'}), 404 # Password protection via HTTP Basic (compatible with DAVx5, Apple Cal, # Thunderbird, curl, etc.). Username is ignored. if cal.ical_password_hash: auth = request.authorization if not auth or not auth.password: return _basic_auth_challenge() if not bcrypt.check_password_hash(cal.ical_password_hash, auth.password): return _basic_auth_challenge() events = CalendarEvent.query.filter_by(calendar_id=cal.id).all() lines = [ 'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE', f'X-WR-CALNAME:{cal.name}', ] for e in events: if e.ical_data: block = _redact_vevent(e.ical_data) if e.is_private else e.ical_data lines.append(block) elif e.is_private: lines.append(_build_vevent(e.uid, 'Privat', e.dtstart, e.dtend, e.all_day)) else: lines.append(_build_vevent(e.uid, e.summary, e.dtstart, e.dtend, e.all_day)) lines.append('END:VCALENDAR') return Response( '\r\n'.join(lines), mimetype='text/calendar', headers={'Content-Disposition': f'attachment; filename="{cal.name}.ics"'}, ) # --- Helpers --- def _format_dt(dt, all_day=False): if all_day: return dt.strftime('%Y%m%d') return dt.strftime('%Y%m%dT%H%M%SZ') def _build_vevent(uid, summary, dtstart, dtend, all_day, description='', location='', rrule='', exdates=None): if not dtend: dtend = dtstart lines = [ 'BEGIN:VEVENT', f'UID:{uid}', ] if all_day: lines.append(f'DTSTART;VALUE=DATE:{_format_dt(dtstart, True)}') lines.append(f'DTEND;VALUE=DATE:{_format_dt(dtend, True)}') else: lines.append(f'DTSTART:{_format_dt(dtstart)}') lines.append(f'DTEND:{_format_dt(dtend)}') lines.append(f'SUMMARY:{summary}') if description: lines.append(f'DESCRIPTION:{description}') if location: lines.append(f'LOCATION:{location}') if rrule: lines.append(f'RRULE:{rrule}') if exdates: for ex in exdates: if all_day: lines.append(f'EXDATE;VALUE=DATE:{ex.replace("-", "")}') else: # Convert ISO datetime (with or without TZ) into YYYYMMDDTHHMMSSZ try: dt = datetime.fromisoformat(ex.replace('Z', '+00:00')) lines.append(f'EXDATE:{dt.strftime("%Y%m%dT%H%M%SZ")}') except ValueError: pass lines.append(f'DTSTAMP:{datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")}') lines.append('END:VEVENT') return '\r\n'.join(lines) def _build_ical(uid, summary, dtstart, dtend, all_day, description='', location='', rrule='', exdates=None): return _build_vevent(uid, summary, dtstart, dtend, all_day, description, location, rrule, exdates)