minmal-file-cloud-email-pim.../backend/app/api/contacts.py

732 lines
28 KiB
Python

import csv
import io
import json
import re
import uuid
import zipfile
from datetime import datetime, timezone
from flask import request, jsonify, Response
from app.api import api_bp
from app.api.auth import token_required
from app.extensions import db
from app.models.contact import AddressBook, Contact, AddressBookShare
from app.models.user import User
from app.services.events import broadcaster
def _notify_addressbook(owner_id: int, book_id: int, change: str, shared_with=()):
"""SSE event for a vcard or share change. Re-uses the calendar event
infrastructure with a separate 'addressbook' type."""
recipients = [owner_id, *shared_with]
broadcaster.publish(recipients, {
'type': 'addressbook',
'change': change,
'address_book_id': book_id,
})
def _book_recipients(book: AddressBook):
return [s.shared_with_id for s in
AddressBookShare.query.filter_by(address_book_id=book.id).all()]
def _get_addressbook_or_err(book_id, user, need_write=False):
book = db.session.get(AddressBook, book_id)
if not book:
return None, (jsonify({'error': 'Adressbuch nicht gefunden'}), 404)
if book.owner_id == user.id:
return book, None
share = AddressBookShare.query.filter_by(
address_book_id=book_id, shared_with_id=user.id
).first()
if not share:
return None, (jsonify({'error': 'Zugriff verweigert'}), 403)
if need_write and share.permission != 'readwrite':
return None, (jsonify({'error': 'Schreibzugriff verweigert'}), 403)
return book, None
# ---------------------------------------------------------------------------
# vCard helpers
# ---------------------------------------------------------------------------
def _escape(s):
if s is None:
return ''
return str(s).replace('\\', '\\\\').replace(',', '\\,').replace(';', '\\;').replace('\n', '\\n')
def _unescape(s):
if not s:
return ''
return s.replace('\\n', '\n').replace('\\;', ';').replace('\\,', ',').replace('\\\\', '\\')
def _apply_fields_to_contact(contact: Contact, data: dict):
"""Copy fields from a JSON request into a Contact model instance."""
for field in ('prefix', 'first_name', 'middle_name', 'last_name', 'suffix',
'nickname', 'organization', 'department', 'job_title',
'notes', 'photo', 'birthday', 'anniversary'):
if field in data:
value = data[field]
setattr(contact, field, (value.strip() if isinstance(value, str) else value) or None)
if 'display_name' in data:
contact.display_name = (data['display_name'] or '').strip() or None
for jsonfield in ('emails', 'phones', 'addresses', 'websites', 'impp', 'categories'):
if jsonfield in data:
value = data[jsonfield] or []
setattr(contact, jsonfield, json.dumps(value) if value else None)
# Denormalised primary fields for list display
emails = data.get('emails') if 'emails' in data else json.loads(contact.emails) if contact.emails else []
phones = data.get('phones') if 'phones' in data else json.loads(contact.phones) if contact.phones else []
contact.primary_email = (emails[0]['value'] if emails else None)
contact.primary_phone = (phones[0]['value'] if phones else None)
# Legacy columns
contact.email = contact.primary_email
contact.phone = contact.primary_phone
# Compose display name if not provided
if not contact.display_name:
parts = [contact.prefix, contact.first_name, contact.middle_name,
contact.last_name, contact.suffix]
contact.display_name = ' '.join(p for p in parts if p) or contact.organization or None
def _build_vcard(contact: Contact) -> str:
"""Render a Contact into vCard 3.0 text."""
lines = ['BEGIN:VCARD', 'VERSION:3.0', f'UID:{contact.uid}']
if contact.display_name:
lines.append(f'FN:{_escape(contact.display_name)}')
# N: lastname;firstname;middle;prefix;suffix
n_parts = [_escape(contact.last_name), _escape(contact.first_name),
_escape(contact.middle_name), _escape(contact.prefix),
_escape(contact.suffix)]
if any(n_parts):
lines.append('N:' + ';'.join(n_parts))
if contact.nickname:
lines.append(f'NICKNAME:{_escape(contact.nickname)}')
if contact.organization or contact.department:
lines.append(f'ORG:{_escape(contact.organization or "")};{_escape(contact.department or "")}')
if contact.job_title:
lines.append(f'TITLE:{_escape(contact.job_title)}')
for e in (json.loads(contact.emails) if contact.emails else []):
typ = (e.get('type') or 'home').upper()
lines.append(f'EMAIL;TYPE={typ}:{_escape(e.get("value", ""))}')
for p in (json.loads(contact.phones) if contact.phones else []):
typ = (p.get('type') or 'cell').upper()
lines.append(f'TEL;TYPE={typ}:{_escape(p.get("value", ""))}')
for a in (json.loads(contact.addresses) if contact.addresses else []):
typ = (a.get('type') or 'home').upper()
# ADR: po_box;extended;street;city;region;postal_code;country
parts = [_escape(a.get('po_box', '')), '', _escape(a.get('street', '')),
_escape(a.get('city', '')), _escape(a.get('region', '')),
_escape(a.get('postal_code', '')), _escape(a.get('country', ''))]
lines.append(f'ADR;TYPE={typ}:' + ';'.join(parts))
for w in (json.loads(contact.websites) if contact.websites else []):
typ = (w.get('type') or '').upper()
tag = f'URL;TYPE={typ}' if typ else 'URL'
lines.append(f'{tag}:{_escape(w.get("value", ""))}')
for i in (json.loads(contact.impp) if contact.impp else []):
proto = (i.get('protocol') or 'xmpp').lower()
lines.append(f'IMPP:{proto}:{_escape(i.get("value", ""))}')
if contact.birthday:
lines.append(f'BDAY:{contact.birthday}')
if contact.anniversary:
lines.append(f'ANNIVERSARY:{contact.anniversary}')
cats = json.loads(contact.categories) if contact.categories else []
if cats:
lines.append('CATEGORIES:' + ','.join(_escape(c) for c in cats))
if contact.notes:
lines.append(f'NOTE:{_escape(contact.notes)}')
if contact.photo:
# Photo can be a data: URL or http URL. In vCard 3.0 we use PHOTO;VALUE=uri.
lines.append(f'PHOTO;VALUE=uri:{contact.photo}')
lines.append(f'REV:{datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")}')
lines.append('END:VCARD')
return '\r\n'.join(lines)
def _unfold_vcard(raw: str):
"""Undo RFC 6350 line folding (continuation lines start with space/tab)."""
lines = []
for line in raw.replace('\r\n', '\n').split('\n'):
if line.startswith((' ', '\t')) and lines:
lines[-1] += line[1:]
else:
lines.append(line)
return lines
def parse_vcard(raw: str) -> dict:
"""Parse a VCARD text into a dict of fields usable by _apply_fields_to_contact.
Returns dict with keys matching contact fields + 'uid'."""
result = {
'emails': [], 'phones': [], 'addresses': [],
'websites': [], 'impp': [], 'categories': [],
}
for line in _unfold_vcard(raw):
if ':' not in line:
continue
key, _, value = line.partition(':')
parts = key.split(';')
name = parts[0].upper()
params = {}
for p in parts[1:]:
if '=' in p:
k, v = p.split('=', 1)
params[k.upper()] = v.upper()
if name == 'UID':
result['uid'] = value.strip()
elif name == 'FN':
result['display_name'] = _unescape(value)
elif name == 'N':
fields = value.split(';')
if len(fields) >= 5:
result['last_name'] = _unescape(fields[0]) or None
result['first_name'] = _unescape(fields[1]) or None
result['middle_name'] = _unescape(fields[2]) or None
result['prefix'] = _unescape(fields[3]) or None
result['suffix'] = _unescape(fields[4]) or None
elif name == 'NICKNAME':
result['nickname'] = _unescape(value)
elif name == 'ORG':
fields = value.split(';')
result['organization'] = _unescape(fields[0]) if fields else None
if len(fields) > 1:
result['department'] = _unescape(fields[1]) or None
elif name == 'TITLE':
result['job_title'] = _unescape(value)
elif name == 'EMAIL':
result['emails'].append({
'type': (params.get('TYPE') or 'home').lower(),
'value': _unescape(value),
})
elif name == 'TEL':
result['phones'].append({
'type': (params.get('TYPE') or 'cell').lower(),
'value': _unescape(value),
})
elif name == 'ADR':
fields = value.split(';')
pad = fields + [''] * (7 - len(fields))
result['addresses'].append({
'type': (params.get('TYPE') or 'home').lower(),
'po_box': _unescape(pad[0]),
'street': _unescape(pad[2]),
'city': _unescape(pad[3]),
'region': _unescape(pad[4]),
'postal_code': _unescape(pad[5]),
'country': _unescape(pad[6]),
})
elif name == 'URL':
result['websites'].append({
'type': (params.get('TYPE') or '').lower(),
'value': _unescape(value),
})
elif name == 'IMPP':
proto, _, addr = value.partition(':')
result['impp'].append({'protocol': proto.lower(), 'value': _unescape(addr or value)})
elif name == 'CATEGORIES':
result['categories'] = [_unescape(c).strip() for c in value.split(',') if c.strip()]
elif name == 'BDAY':
result['birthday'] = _normalise_date(value)
elif name == 'ANNIVERSARY':
result['anniversary'] = _normalise_date(value)
elif name == 'NOTE':
result['notes'] = _unescape(value)
elif name == 'PHOTO':
result['photo'] = value.strip() or None
return result
def _normalise_date(s: str):
s = s.strip()
m = re.match(r'^(\d{4})-?(\d{2})-?(\d{2})$', s[:10])
if m:
return f'{m.group(1)}-{m.group(2)}-{m.group(3)}'
return None
# ---------------------------------------------------------------------------
# Address books
# ---------------------------------------------------------------------------
@api_bp.route('/addressbooks', methods=['GET'])
@token_required
def list_addressbooks():
user = request.current_user
own = AddressBook.query.filter_by(owner_id=user.id).all()
shared_ids = [s.address_book_id for s in
AddressBookShare.query.filter_by(shared_with_id=user.id).all()]
shared = AddressBook.query.filter(AddressBook.id.in_(shared_ids)).all() if shared_ids else []
result = []
for b in own:
d = b.to_dict()
d['permission'] = 'owner'
d['contact_count'] = b.contacts.count()
result.append(d)
for b in shared:
d = b.to_dict()
share = AddressBookShare.query.filter_by(
address_book_id=b.id, shared_with_id=user.id
).first()
d['permission'] = share.permission if share else 'read'
d['owner_color'] = d.get('color')
if share and share.color:
d['color'] = share.color
d['owner_name'] = b.owner.username
d['owner_full_name'] = b.owner.full_name
d['owner_display_name'] = b.owner.display_name
d['contact_count'] = b.contacts.count()
result.append(d)
return jsonify(result), 200
@api_bp.route('/addressbooks', methods=['POST'])
@token_required
def create_addressbook():
user = request.current_user
data = request.get_json()
name = (data.get('name') or '').strip()
if not name:
return jsonify({'error': 'Name erforderlich'}), 400
book = AddressBook(
owner_id=user.id,
name=name,
color=data.get('color', '#3788d8'),
description=data.get('description') or None,
)
db.session.add(book)
db.session.commit()
_notify_addressbook(user.id, book.id, 'created')
return jsonify(book.to_dict()), 201
@api_bp.route('/addressbooks/<int:book_id>', methods=['PUT'])
@token_required
def update_addressbook(book_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book or book.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404
data = request.get_json()
if 'name' in data:
book.name = data['name'].strip()
if 'description' in data:
book.description = data['description'] or None
if 'color' in data:
book.color = data['color']
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'updated',
shared_with=_book_recipients(book))
return jsonify(book.to_dict()), 200
@api_bp.route('/addressbooks/<int:book_id>/my-color', methods=['PUT'])
@token_required
def set_my_addressbook_color(book_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book:
return jsonify({'error': 'Nicht gefunden'}), 404
color = ((request.get_json() or {}).get('color') or '').strip()
if book.owner_id == user.id:
if color:
book.color = color
db.session.commit()
return jsonify({'color': book.color}), 200
share = AddressBookShare.query.filter_by(
address_book_id=book_id, shared_with_id=user.id
).first()
if not share:
return jsonify({'error': 'Kein Zugriff'}), 403
share.color = color or None
db.session.commit()
return jsonify({'color': share.color or book.color}), 200
@api_bp.route('/addressbooks/<int:book_id>', methods=['DELETE'])
@token_required
def delete_addressbook(book_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book or book.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden oder keine Berechtigung'}), 404
recipients = _book_recipients(book)
owner_id = book.owner_id
bid = book.id
db.session.delete(book)
db.session.commit()
_notify_addressbook(owner_id, bid, 'deleted', shared_with=recipients)
return jsonify({'message': 'Adressbuch geloescht'}), 200
# ---------------------------------------------------------------------------
# Contacts
# ---------------------------------------------------------------------------
@api_bp.route('/addressbooks/<int:book_id>/contacts', methods=['GET'])
@token_required
def list_contacts(book_id):
user = request.current_user
book, err = _get_addressbook_or_err(book_id, user)
if err:
return err
search = (request.args.get('q') or '').strip()
q = Contact.query.filter_by(address_book_id=book_id)
if search:
like = f'%{search}%'
q = q.filter(
(Contact.display_name.ilike(like)) |
(Contact.primary_email.ilike(like)) |
(Contact.organization.ilike(like))
)
contacts = q.order_by(Contact.display_name).all()
return jsonify([c.to_dict() for c in contacts]), 200
@api_bp.route('/addressbooks/<int:book_id>/export', methods=['GET'])
@token_required
def export_addressbook(book_id):
"""Export contacts as a single .vcf, a .zip with one .vcf per contact, or .csv."""
user = request.current_user
book, err = _get_addressbook_or_err(book_id, user)
if err:
return err
fmt = (request.args.get('format') or 'vcf').lower()
contacts = Contact.query.filter_by(address_book_id=book_id).order_by(Contact.display_name).all()
safe_name = re.sub(r'[^A-Za-z0-9._-]+', '_', book.name or 'kontakte') or 'kontakte'
if fmt == 'vcf':
body = '\r\n'.join((c.vcard_data or _build_vcard(c)).strip() for c in contacts) + '\r\n'
return Response(
body, mimetype='text/vcard; charset=utf-8',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.vcf"'},
)
if fmt == 'vcf-zip':
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
seen = {}
for c in contacts:
base = re.sub(r'[^A-Za-z0-9._-]+', '_', c.display_name or c.uid) or c.uid
seen[base] = seen.get(base, 0) + 1
fname = f"{base}.vcf" if seen[base] == 1 else f"{base}_{seen[base]}.vcf"
zf.writestr(fname, (c.vcard_data or _build_vcard(c)).strip() + '\r\n')
buf.seek(0)
return Response(
buf.read(), mimetype='application/zip',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.zip"'},
)
if fmt == 'csv':
out = io.StringIO()
cols = ['display_name', 'prefix', 'first_name', 'middle_name', 'last_name', 'suffix',
'nickname', 'organization', 'department', 'job_title',
'primary_email', 'primary_phone', 'birthday', 'anniversary',
'emails', 'phones', 'addresses', 'websites', 'categories', 'notes']
w = csv.writer(out, delimiter=';', quoting=csv.QUOTE_ALL)
w.writerow(cols)
for c in contacts:
d = c.to_dict()
row = []
for col in cols:
v = d.get(col, '')
if isinstance(v, list):
if v and isinstance(v[0], dict):
v = '; '.join(
(x.get('value') or x.get('street') or '') +
(f" ({x.get('type')})" if x.get('type') else '')
for x in v if isinstance(x, dict)
)
else:
v = ', '.join(str(x) for x in v)
row.append('' if v is None else str(v))
w.writerow(row)
return Response(
'\ufeff' + out.getvalue(), mimetype='text/csv; charset=utf-8',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.csv"'},
)
return jsonify({'error': 'Unbekanntes Format'}), 400
@api_bp.route('/addressbooks/<int:book_id>/import', methods=['POST'])
@token_required
def import_addressbook(book_id):
"""Import vCard (.vcf, single oder mehrere im File) oder CSV."""
user = request.current_user
book, err = _get_addressbook_or_err(book_id, user, need_write=True)
if err:
return err
file = request.files.get('file')
if not file:
return jsonify({'error': 'Keine Datei'}), 400
raw = file.read()
name = (file.filename or '').lower()
try:
text = raw.decode('utf-8-sig')
except UnicodeDecodeError:
text = raw.decode('latin-1', errors='replace')
imported = 0
skipped = 0
def _add_from_parsed(parsed: dict, raw_text: str | None = None) -> bool:
nonlocal imported, skipped
if not parsed.get('display_name') and not parsed.get('first_name') \
and not parsed.get('last_name') and not parsed.get('organization'):
skipped += 1
return False
uid = parsed.get('uid') or str(uuid.uuid4())
existing = Contact.query.filter_by(address_book_id=book_id, uid=uid).first()
contact = existing or Contact(address_book_id=book_id, uid=uid, vcard_data='')
_apply_fields_to_contact(contact, parsed)
contact.vcard_data = (raw_text or '').strip() or _build_vcard(contact)
contact.updated_at = datetime.now(timezone.utc)
if not existing:
db.session.add(contact)
imported += 1
return True
if name.endswith('.csv') or (b',' in raw[:200] and b'BEGIN:VCARD' not in raw[:200]):
# CSV import
reader = csv.DictReader(io.StringIO(text), delimiter=';')
if not reader.fieldnames or len(reader.fieldnames) < 2:
# try comma
reader = csv.DictReader(io.StringIO(text), delimiter=',')
for row in reader:
row = {k.strip().lower(): (v or '').strip() for k, v in row.items() if k}
parsed = {
'display_name': row.get('display_name') or row.get('name')
or row.get('vollname') or row.get('full name'),
'first_name': row.get('first_name') or row.get('vorname'),
'last_name': row.get('last_name') or row.get('nachname'),
'middle_name': row.get('middle_name'),
'prefix': row.get('prefix') or row.get('anrede'),
'suffix': row.get('suffix'),
'nickname': row.get('nickname') or row.get('spitzname'),
'organization': row.get('organization') or row.get('firma') or row.get('company'),
'department': row.get('department') or row.get('abteilung'),
'job_title': row.get('job_title') or row.get('position') or row.get('title'),
'birthday': row.get('birthday') or row.get('geburtstag'),
'notes': row.get('notes') or row.get('notizen'),
'emails': [], 'phones': [], 'addresses': [], 'websites': [], 'categories': [],
}
email = row.get('primary_email') or row.get('email') or row.get('e-mail')
if email:
parsed['emails'].append({'type': 'home', 'value': email})
phone = row.get('primary_phone') or row.get('phone') or row.get('telefon') or row.get('mobil')
if phone:
parsed['phones'].append({'type': 'cell', 'value': phone})
cats = row.get('categories') or row.get('kategorien')
if cats:
parsed['categories'] = [c.strip() for c in cats.split(',') if c.strip()]
_add_from_parsed(parsed)
else:
# vCard - eine oder mehrere im File
parts = re.findall(r'BEGIN:VCARD.*?END:VCARD', text, flags=re.DOTALL | re.IGNORECASE)
if not parts:
return jsonify({'error': 'Keine VCARD-Daten gefunden'}), 400
for vcf in parts:
try:
parsed = parse_vcard(vcf)
except Exception:
skipped += 1
continue
_add_from_parsed(parsed, raw_text=vcf)
db.session.commit()
if imported:
_notify_addressbook(book.owner_id, book.id, 'contact',
shared_with=_book_recipients(book))
return jsonify({'imported': imported, 'skipped': skipped}), 200
@api_bp.route('/addressbooks/<int:book_id>/contacts', methods=['POST'])
@token_required
def create_contact(book_id):
user = request.current_user
book, err = _get_addressbook_or_err(book_id, user, need_write=True)
if err:
return err
data = request.get_json() or {}
contact = Contact(address_book_id=book_id, uid=str(uuid.uuid4()), vcard_data='')
_apply_fields_to_contact(contact, data)
if not contact.display_name:
return jsonify({'error': 'Name oder Firma erforderlich'}), 400
contact.vcard_data = _build_vcard(contact)
db.session.add(contact)
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'contact',
shared_with=_book_recipients(book))
return jsonify(contact.to_dict()), 201
@api_bp.route('/contacts/<int:contact_id>', methods=['GET'])
@token_required
def get_contact(contact_id):
user = request.current_user
contact = db.session.get(Contact, contact_id)
if not contact:
return jsonify({'error': 'Kontakt nicht gefunden'}), 404
book, err = _get_addressbook_or_err(contact.address_book_id, user)
if err:
return err
result = contact.to_dict()
result['vcard_data'] = contact.vcard_data
return jsonify(result), 200
@api_bp.route('/contacts/<int:contact_id>', methods=['PUT'])
@token_required
def update_contact(contact_id):
user = request.current_user
contact = db.session.get(Contact, contact_id)
if not contact:
return jsonify({'error': 'Kontakt nicht gefunden'}), 404
book, err = _get_addressbook_or_err(contact.address_book_id, user, need_write=True)
if err:
return err
data = request.get_json() or {}
_apply_fields_to_contact(contact, data)
contact.vcard_data = _build_vcard(contact)
contact.updated_at = datetime.now(timezone.utc)
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'contact',
shared_with=_book_recipients(book))
return jsonify(contact.to_dict()), 200
@api_bp.route('/contacts/<int:contact_id>', methods=['DELETE'])
@token_required
def delete_contact(contact_id):
user = request.current_user
contact = db.session.get(Contact, contact_id)
if not contact:
return jsonify({'error': 'Kontakt nicht gefunden'}), 404
book, err = _get_addressbook_or_err(contact.address_book_id, user, need_write=True)
if err:
return err
db.session.delete(contact)
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'contact',
shared_with=_book_recipients(book))
return jsonify({'message': 'Kontakt geloescht'}), 200
# ---------------------------------------------------------------------------
# Sharing
# ---------------------------------------------------------------------------
@api_bp.route('/addressbooks/<int:book_id>/share', methods=['POST'])
@token_required
def share_addressbook(book_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book or book.owner_id != user.id:
return jsonify({'error': 'Nur der Eigentuemer kann teilen'}), 403
data = request.get_json() or {}
username = (data.get('username') or '').strip()
permission = data.get('permission', 'read')
if permission not in ('read', 'readwrite'):
return jsonify({'error': 'Ungueltige Berechtigung'}), 400
target = User.query.filter_by(username=username).first()
if not target:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
if target.id == user.id:
return jsonify({'error': 'Kann nicht mit sich selbst teilen'}), 400
existing = AddressBookShare.query.filter_by(
address_book_id=book_id, shared_with_id=target.id
).first()
if existing:
existing.permission = permission
else:
share = AddressBookShare(
address_book_id=book_id, shared_with_id=target.id, permission=permission
)
db.session.add(share)
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'share',
shared_with=[target.id, *_book_recipients(book)])
return jsonify({'message': f'Adressbuch mit {username} geteilt'}), 200
@api_bp.route('/addressbooks/<int:book_id>/shares', methods=['GET'])
@token_required
def list_addressbook_shares(book_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book or book.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
shares = AddressBookShare.query.filter_by(address_book_id=book_id).all()
return jsonify([{
'id': s.id,
'user_id': s.shared_with_id,
'username': s.shared_with.username,
'permission': s.permission,
} for s in shares]), 200
@api_bp.route('/addressbooks/<int:book_id>/shares/<int:share_id>', methods=['DELETE'])
@token_required
def remove_addressbook_share(book_id, share_id):
user = request.current_user
book = db.session.get(AddressBook, book_id)
if not book or book.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
share = db.session.get(AddressBookShare, share_id)
if not share or share.address_book_id != book_id:
return jsonify({'error': 'Freigabe nicht gefunden'}), 404
target_id = share.shared_with_id
db.session.delete(share)
db.session.commit()
_notify_addressbook(book.owner_id, book.id, 'share',
shared_with=[target_id, *_book_recipients(book)])
return jsonify({'message': 'Freigabe entfernt'}), 200
# ---------------------------------------------------------------------------
# vCard export (all contacts of a book)
# ---------------------------------------------------------------------------
@api_bp.route('/addressbooks/<int:book_id>/export', methods=['GET'])
@token_required
def export_contacts(book_id):
user = request.current_user
book, err = _get_addressbook_or_err(book_id, user)
if err:
return err
parts = [c.vcard_data for c in book.contacts]
return Response(
'\r\n'.join(parts),
mimetype='text/vcard; charset=utf-8',
headers={'Content-Disposition': f'attachment; filename="{book.name}.vcf"'},
)