591 lines
22 KiB
Python
591 lines
22 KiB
Python
"""REST API for task lists / tasks (VTODO).
|
|
|
|
Mirror der calendar.py-Architektur: TaskList = Calendar-aehnliche Sammlung,
|
|
Task = VTODO. CalDAV-Anbindung erfolgt in app/dav/caldav.py: TaskLists
|
|
erscheinen als Kalender-Collection mit supported-calendar-component-set
|
|
auf VTODO und unter URL /dav/<user>/tl-<id>/.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from flask import request, jsonify, Response
|
|
|
|
from app.api import api_bp
|
|
from app.api.auth import token_required
|
|
from app.extensions import db
|
|
from app.models.task import TaskList, Task, TaskListShare
|
|
from app.models.user import User
|
|
from app.services.events import notify_tasklist_change
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Access helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _list_recipients(tl: TaskList):
|
|
return [s.shared_with_id for s in
|
|
TaskListShare.query.filter_by(task_list_id=tl.id).all()]
|
|
|
|
|
|
def _get_list_or_err(list_id, user, need_write=False):
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl:
|
|
return None, (jsonify({'error': 'Aufgabenliste nicht gefunden'}), 404)
|
|
if tl.owner_id == user.id:
|
|
return tl, None
|
|
share = TaskListShare.query.filter_by(
|
|
task_list_id=list_id, shared_with_id=user.id
|
|
).first()
|
|
if not share:
|
|
return None, (jsonify({'error': 'Zugriff verweigert'}), 403)
|
|
if need_write and share.permission != 'readwrite':
|
|
return None, (jsonify({'error': 'Schreibzugriff verweigert'}), 403)
|
|
return tl, None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VTODO build / parse
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _fmt_dt(dt: datetime | None) -> str | None:
|
|
if not dt:
|
|
return None
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
|
|
|
|
|
|
def build_vtodo(task: Task) -> str:
|
|
lines = ['BEGIN:VTODO', f'UID:{task.uid}',
|
|
f'DTSTAMP:{_fmt_dt(datetime.now(timezone.utc))}',
|
|
f'SUMMARY:{(task.summary or "").replace(chr(10), " ")}']
|
|
if task.description:
|
|
lines.append(f'DESCRIPTION:{task.description.replace(chr(10), chr(92) + "n")}')
|
|
if task.status:
|
|
lines.append(f'STATUS:{task.status}')
|
|
if task.priority is not None:
|
|
lines.append(f'PRIORITY:{task.priority}')
|
|
if task.percent_complete is not None:
|
|
lines.append(f'PERCENT-COMPLETE:{task.percent_complete}')
|
|
if task.due:
|
|
lines.append(f'DUE:{_fmt_dt(task.due)}')
|
|
if task.dtstart:
|
|
lines.append(f'DTSTART:{_fmt_dt(task.dtstart)}')
|
|
if task.completed_at:
|
|
lines.append(f'COMPLETED:{_fmt_dt(task.completed_at)}')
|
|
if task.categories:
|
|
lines.append(f'CATEGORIES:{task.categories}')
|
|
lines.append('END:VTODO')
|
|
return '\r\n'.join(lines)
|
|
|
|
|
|
def _unfold(text: str):
|
|
out, current = [], ''
|
|
for line in text.replace('\r\n', '\n').split('\n'):
|
|
if line.startswith((' ', '\t')) and current:
|
|
current += line[1:]
|
|
else:
|
|
if current:
|
|
out.append(current)
|
|
current = line
|
|
if current:
|
|
out.append(current)
|
|
return out
|
|
|
|
|
|
def _parse_dt(value: str) -> datetime | None:
|
|
value = value.strip()
|
|
try:
|
|
if value.endswith('Z'):
|
|
return datetime.strptime(value, '%Y%m%dT%H%M%SZ').replace(tzinfo=timezone.utc)
|
|
if 'T' in value:
|
|
return datetime.strptime(value, '%Y%m%dT%H%M%S')
|
|
return datetime.strptime(value, '%Y%m%d')
|
|
except ValueError:
|
|
try:
|
|
return datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_vtodo(raw: str) -> dict | None:
|
|
if 'BEGIN:VTODO' not in raw.upper():
|
|
return None
|
|
result: dict = {}
|
|
in_block = False
|
|
for line in _unfold(raw):
|
|
upper = line.upper()
|
|
if upper.startswith('BEGIN:VTODO'):
|
|
in_block = True
|
|
continue
|
|
if upper.startswith('END:VTODO'):
|
|
break
|
|
if not in_block or ':' not in line:
|
|
continue
|
|
key, _, value = line.partition(':')
|
|
name = key.split(';')[0].upper()
|
|
if name == 'UID':
|
|
result['uid'] = value.strip()
|
|
elif name == 'SUMMARY':
|
|
result['summary'] = value.strip()
|
|
elif name == 'DESCRIPTION':
|
|
result['description'] = value.replace('\\n', '\n').replace('\\,', ',').strip()
|
|
elif name == 'STATUS':
|
|
result['status'] = value.strip().upper()
|
|
elif name == 'PRIORITY':
|
|
try:
|
|
result['priority'] = int(value.strip())
|
|
except ValueError:
|
|
pass
|
|
elif name == 'PERCENT-COMPLETE':
|
|
try:
|
|
result['percent_complete'] = int(value.strip())
|
|
except ValueError:
|
|
pass
|
|
elif name == 'DUE':
|
|
result['due'] = _parse_dt(value)
|
|
elif name == 'DTSTART':
|
|
result['dtstart'] = _parse_dt(value)
|
|
elif name == 'COMPLETED':
|
|
result['completed_at'] = _parse_dt(value)
|
|
elif name == 'CATEGORIES':
|
|
result['categories'] = value.strip()
|
|
return result if result.get('summary') or result.get('uid') else None
|
|
|
|
|
|
def _apply(task: Task, data: dict):
|
|
if 'summary' in data:
|
|
task.summary = (data.get('summary') or '').strip() or None
|
|
if 'description' in data:
|
|
task.description = (data.get('description') or '').strip() or None
|
|
if 'status' in data:
|
|
s = (data.get('status') or '').upper().strip() or None
|
|
task.status = s
|
|
if s == 'COMPLETED' and not task.completed_at:
|
|
task.completed_at = datetime.now(timezone.utc)
|
|
task.percent_complete = 100
|
|
elif s != 'COMPLETED':
|
|
task.completed_at = None
|
|
if 'priority' in data:
|
|
task.priority = data['priority'] if data['priority'] is not None else None
|
|
if 'percent_complete' in data:
|
|
task.percent_complete = data['percent_complete']
|
|
if 'due' in data:
|
|
v = data['due']
|
|
task.due = datetime.fromisoformat(v) if v else None
|
|
if 'dtstart' in data:
|
|
v = data['dtstart']
|
|
task.dtstart = datetime.fromisoformat(v) if v else None
|
|
if 'completed_at' in data:
|
|
v = data['completed_at']
|
|
task.completed_at = datetime.fromisoformat(v) if v else None
|
|
if 'categories' in data:
|
|
cats = data['categories']
|
|
if isinstance(cats, list):
|
|
task.categories = ','.join(c.strip() for c in cats if c and c.strip()) or None
|
|
else:
|
|
task.categories = (cats or '').strip() or None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# REST endpoints - lists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@api_bp.route('/tasklists', methods=['GET'])
|
|
@token_required
|
|
def list_tasklists():
|
|
user = request.current_user
|
|
own = TaskList.query.filter_by(owner_id=user.id).all()
|
|
shared = TaskListShare.query.filter_by(shared_with_id=user.id).all()
|
|
out = []
|
|
for tl in own:
|
|
d = tl.to_dict()
|
|
d['permission'] = 'owner'
|
|
d['task_count'] = tl.tasks.count()
|
|
out.append(d)
|
|
for s in shared:
|
|
tl = s.task_list
|
|
if not tl:
|
|
continue
|
|
d = tl.to_dict()
|
|
d['permission'] = s.permission
|
|
owner = tl.owner
|
|
d['owner_name'] = owner.username if owner else ''
|
|
d['owner_full_name'] = owner.full_name if owner else ''
|
|
d['owner_display_name'] = owner.display_name if owner else ''
|
|
d['task_count'] = tl.tasks.count()
|
|
if s.color:
|
|
d['color'] = s.color
|
|
out.append(d)
|
|
return jsonify(out), 200
|
|
|
|
|
|
@api_bp.route('/tasklists', methods=['POST'])
|
|
@token_required
|
|
def create_tasklist():
|
|
user = request.current_user
|
|
data = request.get_json() or {}
|
|
name = (data.get('name') or '').strip()
|
|
if not name:
|
|
return jsonify({'error': 'Name erforderlich'}), 400
|
|
tl = TaskList(owner_id=user.id, name=name,
|
|
color=data.get('color') or '#10b981',
|
|
description=(data.get('description') or '').strip() or None)
|
|
db.session.add(tl)
|
|
db.session.commit()
|
|
notify_tasklist_change(user.id, tl.id, 'created')
|
|
return jsonify(tl.to_dict()), 201
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>', methods=['PUT'])
|
|
@token_required
|
|
def update_tasklist(list_id):
|
|
user = request.current_user
|
|
tl, err = _get_list_or_err(list_id, user, need_write=True)
|
|
if err:
|
|
return err
|
|
if tl.owner_id != user.id:
|
|
return jsonify({'error': 'Nur Eigentuemer kann die Liste umbenennen'}), 403
|
|
data = request.get_json() or {}
|
|
if 'name' in data:
|
|
tl.name = data['name'].strip()
|
|
if 'color' in data:
|
|
tl.color = data['color']
|
|
if 'description' in data:
|
|
tl.description = (data['description'] or '').strip() or None
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'updated', shared_with=_list_recipients(tl))
|
|
return jsonify(tl.to_dict()), 200
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/my-color', methods=['PUT'])
|
|
@token_required
|
|
def set_my_tasklist_color(list_id):
|
|
user = request.current_user
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl:
|
|
return jsonify({'error': 'Nicht gefunden'}), 404
|
|
color = (request.get_json() or {}).get('color')
|
|
if not color:
|
|
return jsonify({'error': 'color erforderlich'}), 400
|
|
if tl.owner_id == user.id:
|
|
tl.color = color
|
|
db.session.commit()
|
|
return jsonify({'color': tl.color}), 200
|
|
share = TaskListShare.query.filter_by(task_list_id=list_id, shared_with_id=user.id).first()
|
|
if not share:
|
|
return jsonify({'error': 'Zugriff verweigert'}), 403
|
|
share.color = color
|
|
db.session.commit()
|
|
return jsonify({'color': share.color}), 200
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>', methods=['DELETE'])
|
|
@token_required
|
|
def delete_tasklist(list_id):
|
|
user = request.current_user
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl or tl.owner_id != user.id:
|
|
return jsonify({'error': 'Nur Eigentuemer kann loeschen'}), 403
|
|
recipients = _list_recipients(tl)
|
|
db.session.delete(tl)
|
|
db.session.commit()
|
|
notify_tasklist_change(user.id, list_id, 'deleted', shared_with=recipients)
|
|
return jsonify({'message': 'Aufgabenliste geloescht'}), 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# REST endpoints - tasks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/tasks', methods=['GET'])
|
|
@token_required
|
|
def list_tasks(list_id):
|
|
user = request.current_user
|
|
tl, err = _get_list_or_err(list_id, user)
|
|
if err:
|
|
return err
|
|
show_done = (request.args.get('include_done') or 'true').lower() != 'false'
|
|
q = Task.query.filter_by(task_list_id=list_id)
|
|
if not show_done:
|
|
q = q.filter((Task.status.is_(None)) | (Task.status != 'COMPLETED'))
|
|
tasks = q.order_by(Task.due.asc().nullslast(), Task.priority.desc().nullslast(), Task.id).all()
|
|
return jsonify([t.to_dict() for t in tasks]), 200
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/tasks', methods=['POST'])
|
|
@token_required
|
|
def create_task(list_id):
|
|
user = request.current_user
|
|
tl, err = _get_list_or_err(list_id, user, need_write=True)
|
|
if err:
|
|
return err
|
|
data = request.get_json() or {}
|
|
if not (data.get('summary') or '').strip():
|
|
return jsonify({'error': 'Titel erforderlich'}), 400
|
|
task = Task(task_list_id=list_id, uid=str(uuid.uuid4()), ical_data='')
|
|
_apply(task, data)
|
|
if not task.status:
|
|
task.status = 'NEEDS-ACTION'
|
|
task.ical_data = build_vtodo(task)
|
|
db.session.add(task)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return jsonify(task.to_dict()), 201
|
|
|
|
|
|
@api_bp.route('/tasks/<int:task_id>', methods=['GET'])
|
|
@token_required
|
|
def get_task(task_id):
|
|
user = request.current_user
|
|
task = db.session.get(Task, task_id)
|
|
if not task:
|
|
return jsonify({'error': 'Aufgabe nicht gefunden'}), 404
|
|
tl, err = _get_list_or_err(task.task_list_id, user)
|
|
if err:
|
|
return err
|
|
return jsonify(task.to_dict()), 200
|
|
|
|
|
|
@api_bp.route('/tasks/<int:task_id>', methods=['PUT'])
|
|
@token_required
|
|
def update_task(task_id):
|
|
user = request.current_user
|
|
task = db.session.get(Task, task_id)
|
|
if not task:
|
|
return jsonify({'error': 'Aufgabe nicht gefunden'}), 404
|
|
tl, err = _get_list_or_err(task.task_list_id, user, need_write=True)
|
|
if err:
|
|
return err
|
|
data = request.get_json() or {}
|
|
if 'task_list_id' in data and data['task_list_id'] != task.task_list_id:
|
|
new_tl, e2 = _get_list_or_err(data['task_list_id'], user, need_write=True)
|
|
if e2:
|
|
return e2
|
|
task.task_list_id = data['task_list_id']
|
|
_apply(task, data)
|
|
task.ical_data = build_vtodo(task)
|
|
task.updated_at = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return jsonify(task.to_dict()), 200
|
|
|
|
|
|
@api_bp.route('/tasks/<int:task_id>', methods=['DELETE'])
|
|
@token_required
|
|
def delete_task(task_id):
|
|
user = request.current_user
|
|
task = db.session.get(Task, task_id)
|
|
if not task:
|
|
return jsonify({'error': 'Aufgabe nicht gefunden'}), 404
|
|
tl, err = _get_list_or_err(task.task_list_id, user, need_write=True)
|
|
if err:
|
|
return err
|
|
db.session.delete(task)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return jsonify({'message': 'Aufgabe geloescht'}), 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sharing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/share', methods=['POST'])
|
|
@token_required
|
|
def share_tasklist(list_id):
|
|
user = request.current_user
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl or tl.owner_id != user.id:
|
|
return jsonify({'error': 'Nur Eigentuemer kann teilen'}), 403
|
|
data = request.get_json() or {}
|
|
username = (data.get('username') or '').strip()
|
|
permission = data.get('permission', 'read')
|
|
if permission not in ('read', 'readwrite'):
|
|
return jsonify({'error': 'Ungueltige Berechtigung'}), 400
|
|
target = User.query.filter_by(username=username).first()
|
|
if not target:
|
|
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
|
|
if target.id == user.id:
|
|
return jsonify({'error': 'Kann nicht mit sich selbst teilen'}), 400
|
|
existing = TaskListShare.query.filter_by(task_list_id=list_id, shared_with_id=target.id).first()
|
|
if existing:
|
|
existing.permission = permission
|
|
else:
|
|
db.session.add(TaskListShare(task_list_id=list_id, shared_with_id=target.id,
|
|
permission=permission))
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'share',
|
|
shared_with=[target.id, *_list_recipients(tl)])
|
|
return jsonify({'message': f'Geteilt mit {username}'}), 200
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/shares', methods=['GET'])
|
|
@token_required
|
|
def list_tasklist_shares(list_id):
|
|
user = request.current_user
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl or tl.owner_id != user.id:
|
|
return jsonify({'error': 'Nicht gefunden'}), 404
|
|
shares = TaskListShare.query.filter_by(task_list_id=list_id).all()
|
|
return jsonify([{
|
|
'id': s.id, 'user_id': s.shared_with_id,
|
|
'username': s.shared_with.username, 'permission': s.permission,
|
|
} for s in shares]), 200
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/shares/<int:share_id>', methods=['DELETE'])
|
|
@token_required
|
|
def remove_tasklist_share(list_id, share_id):
|
|
user = request.current_user
|
|
tl = db.session.get(TaskList, list_id)
|
|
if not tl or tl.owner_id != user.id:
|
|
return jsonify({'error': 'Nicht gefunden'}), 404
|
|
share = db.session.get(TaskListShare, share_id)
|
|
if not share or share.task_list_id != list_id:
|
|
return jsonify({'error': 'Freigabe nicht gefunden'}), 404
|
|
target_id = share.shared_with_id
|
|
db.session.delete(share)
|
|
db.session.commit()
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'share',
|
|
shared_with=[target_id, *_list_recipients(tl)])
|
|
return jsonify({'message': 'Freigabe entfernt'}), 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import / Export (.ics with VTODO; CSV)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/export', methods=['GET'])
|
|
@token_required
|
|
def export_tasklist(list_id):
|
|
import csv
|
|
import io
|
|
user = request.current_user
|
|
tl, err = _get_list_or_err(list_id, user)
|
|
if err:
|
|
return err
|
|
fmt = (request.args.get('format') or 'ics').lower()
|
|
tasks = Task.query.filter_by(task_list_id=list_id).all()
|
|
safe = re.sub(r'[^A-Za-z0-9._-]+', '_', tl.name or 'aufgaben') or 'aufgaben'
|
|
|
|
if fmt == 'ics':
|
|
lines = ['BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//Mini-Cloud//DE', 'CALSCALE:GREGORIAN']
|
|
for t in tasks:
|
|
block = (t.ical_data or '').strip() or build_vtodo(t)
|
|
lines.append(block)
|
|
lines.append('END:VCALENDAR')
|
|
return Response(
|
|
'\r\n'.join(lines) + '\r\n',
|
|
mimetype='text/calendar; charset=utf-8',
|
|
headers={'Content-Disposition': f'attachment; filename="{safe}.ics"'},
|
|
)
|
|
if fmt == 'csv':
|
|
out = io.StringIO()
|
|
w = csv.writer(out, delimiter=';', quoting=csv.QUOTE_ALL)
|
|
w.writerow(['summary', 'status', 'priority', 'percent_complete',
|
|
'due', 'dtstart', 'completed_at', 'categories', 'description', 'uid'])
|
|
for t in tasks:
|
|
w.writerow([
|
|
t.summary or '', t.status or '',
|
|
t.priority if t.priority is not None else '',
|
|
t.percent_complete if t.percent_complete is not None else '',
|
|
t.due.isoformat() if t.due else '',
|
|
t.dtstart.isoformat() if t.dtstart else '',
|
|
t.completed_at.isoformat() if t.completed_at else '',
|
|
t.categories or '',
|
|
(t.description or '').replace('\r\n', ' ').replace('\n', ' '),
|
|
t.uid or '',
|
|
])
|
|
return Response(
|
|
'\ufeff' + out.getvalue(), mimetype='text/csv; charset=utf-8',
|
|
headers={'Content-Disposition': f'attachment; filename="{safe}.csv"'},
|
|
)
|
|
return jsonify({'error': 'Unbekanntes Format'}), 400
|
|
|
|
|
|
@api_bp.route('/tasklists/<int:list_id>/import', methods=['POST'])
|
|
@token_required
|
|
def import_tasklist(list_id):
|
|
import csv
|
|
import io
|
|
user = request.current_user
|
|
tl, err = _get_list_or_err(list_id, user, need_write=True)
|
|
if err:
|
|
return err
|
|
file = request.files.get('file')
|
|
if not file:
|
|
return jsonify({'error': 'Keine Datei'}), 400
|
|
raw = file.read()
|
|
try:
|
|
text = raw.decode('utf-8-sig')
|
|
except UnicodeDecodeError:
|
|
text = raw.decode('latin-1', errors='replace')
|
|
name = (file.filename or '').lower()
|
|
imported, skipped = 0, 0
|
|
|
|
def _save(parsed: dict, ical_block: str | None = None):
|
|
nonlocal imported, skipped
|
|
if not parsed.get('summary'):
|
|
skipped += 1
|
|
return
|
|
uid = parsed.get('uid') or str(uuid.uuid4())
|
|
existing = Task.query.filter_by(task_list_id=list_id, uid=uid).first()
|
|
t = existing or Task(task_list_id=list_id, uid=uid, ical_data='')
|
|
t.summary = parsed.get('summary')
|
|
t.description = parsed.get('description')
|
|
t.status = parsed.get('status') or 'NEEDS-ACTION'
|
|
t.priority = parsed.get('priority')
|
|
t.percent_complete = parsed.get('percent_complete')
|
|
t.due = parsed.get('due')
|
|
t.dtstart = parsed.get('dtstart')
|
|
t.completed_at = parsed.get('completed_at')
|
|
cats = parsed.get('categories')
|
|
if isinstance(cats, list):
|
|
t.categories = ','.join(cats)
|
|
elif isinstance(cats, str):
|
|
t.categories = cats or None
|
|
t.ical_data = (ical_block or '').strip() or build_vtodo(t)
|
|
if not existing:
|
|
db.session.add(t)
|
|
imported += 1
|
|
|
|
if name.endswith('.csv') or (b';' in raw[:200] and b'BEGIN:VCALENDAR' not in raw[:200]):
|
|
reader = csv.DictReader(__import__('io').StringIO(text), delimiter=';')
|
|
if not reader.fieldnames or len(reader.fieldnames) < 2:
|
|
reader = csv.DictReader(__import__('io').StringIO(text), delimiter=',')
|
|
for row in reader:
|
|
row = {k.strip().lower(): (v or '').strip() for k, v in row.items() if k}
|
|
try:
|
|
due = datetime.fromisoformat(row['due']) if row.get('due') else None
|
|
except ValueError:
|
|
due = None
|
|
_save({
|
|
'uid': row.get('uid'),
|
|
'summary': row.get('summary') or row.get('titel'),
|
|
'description': row.get('description') or row.get('beschreibung'),
|
|
'status': (row.get('status') or '').upper() or None,
|
|
'priority': int(row['priority']) if row.get('priority', '').isdigit() else None,
|
|
'percent_complete': int(row['percent_complete']) if row.get('percent_complete', '').isdigit() else None,
|
|
'due': due,
|
|
'categories': row.get('categories') or row.get('kategorien'),
|
|
})
|
|
else:
|
|
blocks = re.findall(r'BEGIN:VTODO.*?END:VTODO', text, flags=re.DOTALL | re.IGNORECASE)
|
|
if not blocks:
|
|
return jsonify({'error': 'Keine VTODO-Daten gefunden'}), 400
|
|
for block in blocks:
|
|
parsed = parse_vtodo(block)
|
|
if not parsed:
|
|
skipped += 1
|
|
continue
|
|
_save(parsed, ical_block=block)
|
|
|
|
db.session.commit()
|
|
if imported:
|
|
notify_tasklist_change(tl.owner_id, tl.id, 'task', shared_with=_list_recipients(tl))
|
|
return jsonify({'imported': imported, 'skipped': skipped}), 200
|