Files
minmal-file-cloud-email-pim…/backend/app/api/passwords.py
T
Stefan Hacker 62f550c373 feat: Mini-Cloud Plattform - komplette Implementierung Phase 0-8
Selbstgehostete Web-Cloud mit Dateiverwaltung, Kalender, Kontakte,
Email-Webclient, Office-Viewer und Passwort-Manager.

Backend (Flask/Python):
- JWT-Auth mit Access/Refresh Tokens, Benutzerverwaltung
- Dateien: Upload/Download, Ordner, Berechtigungen, Share-Links
- Kalender: CRUD, Teilen, iCal-Export, CalDAV well-known URLs
- Kontakte: Adressbuecher, vCard-Export, Teilen
- Email: IMAP/SMTP-Proxy, Multi-Account
- Office-Viewer: DOCX/XLSX/PPTX/PDF Vorschau
- Passwort-Manager: AES-256-GCM clientseitig, KeePass-Import
- Sync-API fuer Desktop/Mobile-Clients
- SQLite mit WAL-Modus

Frontend (Vue 3 + PrimeVue):
- Datei-Explorer mit Breadcrumbs und Share-Dialogen
- Monatskalender mit Event-Verwaltung
- Kontaktliste mit Adressbuch-Sidebar
- Email-Client mit 3-Spalten-Layout
- Passwort-Manager mit TOTP und Passwort-Generator
- Admin-Panel, Settings, oeffentliche Share-Seite

Docker: Multi-Stage Build, Bind Mounts (keine Volumes)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 14:53:28 +02:00

362 lines
12 KiB
Python

import base64
from flask import request, jsonify
from app.api import api_bp
from app.api.auth import token_required
from app.extensions import db
from app.models.password_vault import PasswordFolder, PasswordEntry, PasswordShare
from app.models.user import User
# --- Folders ---
@api_bp.route('/passwords/folders', methods=['GET'])
@token_required
def list_password_folders():
user = request.current_user
own = PasswordFolder.query.filter_by(owner_id=user.id).all()
# Get shared folders
shared_folder_shares = PasswordShare.query.filter_by(
shared_with_id=user.id, shareable_type='folder'
).all()
shared_ids = [s.shareable_id for s in shared_folder_shares]
shared = PasswordFolder.query.filter(PasswordFolder.id.in_(shared_ids)).all() if shared_ids else []
result = []
for f in own:
d = f.to_dict()
d['permission'] = 'owner'
result.append(d)
for f in shared:
d = f.to_dict()
share = next((s for s in shared_folder_shares if s.shareable_id == f.id), None)
d['permission'] = share.permission if share else 'read'
d['owner_name'] = f.owner.username
result.append(d)
return jsonify(result), 200
@api_bp.route('/passwords/folders', methods=['POST'])
@token_required
def create_password_folder():
user = request.current_user
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Name erforderlich'}), 400
folder = PasswordFolder(
owner_id=user.id,
parent_id=data.get('parent_id'),
name=name,
icon=data.get('icon'),
)
db.session.add(folder)
db.session.commit()
return jsonify(folder.to_dict()), 201
@api_bp.route('/passwords/folders/<int:folder_id>', methods=['PUT'])
@token_required
def update_password_folder(folder_id):
user = request.current_user
folder = db.session.get(PasswordFolder, folder_id)
if not folder or folder.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
data = request.get_json()
if 'name' in data:
folder.name = data['name'].strip()
if 'icon' in data:
folder.icon = data['icon']
if 'parent_id' in data:
folder.parent_id = data['parent_id']
db.session.commit()
return jsonify(folder.to_dict()), 200
@api_bp.route('/passwords/folders/<int:folder_id>', methods=['DELETE'])
@token_required
def delete_password_folder(folder_id):
user = request.current_user
folder = db.session.get(PasswordFolder, folder_id)
if not folder or folder.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
db.session.delete(folder)
db.session.commit()
return jsonify({'message': 'Ordner geloescht'}), 200
# --- Entries ---
@api_bp.route('/passwords/entries', methods=['GET'])
@token_required
def list_password_entries():
user = request.current_user
folder_id = request.args.get('folder_id', None, type=int)
category = request.args.get('category', None)
query = PasswordEntry.query.filter_by(user_id=user.id)
if folder_id is not None:
query = query.filter_by(folder_id=folder_id)
if category:
query = query.filter_by(category=category)
entries = query.order_by(PasswordEntry.created_at.desc()).all()
# Also get shared entries
shared_entry_shares = PasswordShare.query.filter_by(
shared_with_id=user.id, shareable_type='entry'
).all()
shared_ids = [s.shareable_id for s in shared_entry_shares]
shared = PasswordEntry.query.filter(PasswordEntry.id.in_(shared_ids)).all() if shared_ids else []
result = [e.to_dict() for e in entries]
for e in shared:
d = e.to_dict()
d['shared'] = True
share = next((s for s in shared_entry_shares if s.shareable_id == e.id), None)
d['permission'] = share.permission if share else 'read'
result.append(d)
return jsonify(result), 200
@api_bp.route('/passwords/entries', methods=['POST'])
@token_required
def create_password_entry():
user = request.current_user
data = request.get_json()
if 'title_encrypted' not in data or 'iv' not in data:
return jsonify({'error': 'Verschluesselte Daten + IV erforderlich'}), 400
entry = PasswordEntry(
user_id=user.id,
folder_id=data.get('folder_id'),
title_encrypted=base64.b64decode(data['title_encrypted']),
url_encrypted=base64.b64decode(data['url_encrypted']) if data.get('url_encrypted') else None,
username_encrypted=base64.b64decode(data['username_encrypted']) if data.get('username_encrypted') else None,
password_encrypted=base64.b64decode(data['password_encrypted']) if data.get('password_encrypted') else None,
notes_encrypted=base64.b64decode(data['notes_encrypted']) if data.get('notes_encrypted') else None,
totp_secret_encrypted=base64.b64decode(data['totp_secret_encrypted']) if data.get('totp_secret_encrypted') else None,
passkey_data_encrypted=base64.b64decode(data['passkey_data_encrypted']) if data.get('passkey_data_encrypted') else None,
iv=base64.b64decode(data['iv']),
category=data.get('category'),
)
db.session.add(entry)
db.session.commit()
return jsonify(entry.to_dict()), 201
@api_bp.route('/passwords/entries/<int:entry_id>', methods=['PUT'])
@token_required
def update_password_entry(entry_id):
user = request.current_user
entry = db.session.get(PasswordEntry, entry_id)
if not entry:
return jsonify({'error': 'Nicht gefunden'}), 404
# Check access
if entry.user_id != user.id:
share = PasswordShare.query.filter_by(
shareable_type='entry', shareable_id=entry_id, shared_with_id=user.id
).first()
if not share or share.permission not in ('write', 'manage'):
return jsonify({'error': 'Zugriff verweigert'}), 403
data = request.get_json()
for field in ['title_encrypted', 'url_encrypted', 'username_encrypted',
'password_encrypted', 'notes_encrypted', 'totp_secret_encrypted',
'passkey_data_encrypted', 'iv']:
if field in data and data[field]:
setattr(entry, field, base64.b64decode(data[field]))
if 'category' in data:
entry.category = data['category']
if 'folder_id' in data:
entry.folder_id = data['folder_id']
db.session.commit()
return jsonify(entry.to_dict()), 200
@api_bp.route('/passwords/entries/<int:entry_id>', methods=['DELETE'])
@token_required
def delete_password_entry(entry_id):
user = request.current_user
entry = db.session.get(PasswordEntry, entry_id)
if not entry:
return jsonify({'error': 'Nicht gefunden'}), 404
if entry.user_id != user.id:
return jsonify({'error': 'Zugriff verweigert'}), 403
db.session.delete(entry)
db.session.commit()
return jsonify({'message': 'Eintrag geloescht'}), 200
# --- Sharing ---
@api_bp.route('/passwords/share', methods=['POST'])
@token_required
def share_password():
user = request.current_user
data = request.get_json()
shareable_type = data.get('type') # 'entry' or 'folder'
shareable_id = data.get('id')
username = data.get('username', '').strip()
permission = data.get('permission', 'read')
if shareable_type not in ('entry', 'folder'):
return jsonify({'error': 'Typ muss "entry" oder "folder" sein'}), 400
if permission not in ('read', 'write', 'manage'):
return jsonify({'error': 'Ungueltige Berechtigung'}), 400
# Verify ownership
if shareable_type == 'entry':
obj = db.session.get(PasswordEntry, shareable_id)
if not obj or obj.user_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
else:
obj = db.session.get(PasswordFolder, shareable_id)
if not obj or obj.owner_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
target = User.query.filter_by(username=username).first()
if not target:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
if target.id == user.id:
return jsonify({'error': 'Kann nicht mit sich selbst teilen'}), 400
existing = PasswordShare.query.filter_by(
shareable_type=shareable_type, shareable_id=shareable_id,
shared_with_id=target.id
).first()
if existing:
existing.permission = permission
else:
share = PasswordShare(
shareable_type=shareable_type,
shareable_id=shareable_id,
shared_by_id=user.id,
shared_with_id=target.id,
permission=permission,
encrypted_key=base64.b64decode(data['encrypted_key']) if data.get('encrypted_key') else None,
)
db.session.add(share)
db.session.commit()
return jsonify({'message': f'Mit {username} geteilt'}), 200
@api_bp.route('/passwords/shares', methods=['GET'])
@token_required
def list_password_shares():
user = request.current_user
shareable_type = request.args.get('type')
shareable_id = request.args.get('id', type=int)
query = PasswordShare.query.filter_by(shared_by_id=user.id)
if shareable_type:
query = query.filter_by(shareable_type=shareable_type)
if shareable_id:
query = query.filter_by(shareable_id=shareable_id)
shares = query.all()
return jsonify([{
'id': s.id,
'type': s.shareable_type,
'shareable_id': s.shareable_id,
'shared_with': s.shared_with.username,
'permission': s.permission,
} for s in shares]), 200
@api_bp.route('/passwords/shares/<int:share_id>', methods=['DELETE'])
@token_required
def remove_password_share(share_id):
user = request.current_user
share = db.session.get(PasswordShare, share_id)
if not share or share.shared_by_id != user.id:
return jsonify({'error': 'Nicht gefunden'}), 404
db.session.delete(share)
db.session.commit()
return jsonify({'message': 'Freigabe entfernt'}), 200
# --- KeePass Import ---
@api_bp.route('/passwords/import/keepass', methods=['POST'])
@token_required
def import_keepass():
user = request.current_user
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei gesendet'}), 400
kdbx_file = request.files['file']
kdbx_password = request.form.get('password', '')
if not kdbx_password:
return jsonify({'error': 'KeePass-Passwort erforderlich'}), 400
try:
from pykeepass import PyKeePass
import tempfile
import os
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix='.kdbx') as tmp:
kdbx_file.save(tmp.name)
tmp_path = tmp.name
try:
kp = PyKeePass(tmp_path, password=kdbx_password)
finally:
os.unlink(tmp_path)
# Return entries as plaintext - frontend will encrypt them
entries = []
groups = []
for group in kp.groups:
if group.name and group.name not in ('Root', 'Recycle Bin'):
groups.append({
'name': group.name,
'path': '/'.join(g.name for g in group.path if g.name),
'uuid': str(group.uuid),
'parent_uuid': str(group.parentgroup.uuid) if group.parentgroup else None,
})
for entry in kp.entries:
if entry.title:
group_path = '/'.join(g.name for g in entry.group.path if g.name) if entry.group else ''
entries.append({
'title': entry.title or '',
'url': entry.url or '',
'username': entry.username or '',
'password': entry.password or '',
'notes': entry.notes or '',
'totp': entry.otp or '',
'group': group_path,
'group_uuid': str(entry.group.uuid) if entry.group else None,
})
return jsonify({
'entries': entries,
'groups': groups,
'count': len(entries),
}), 200
except Exception as e:
return jsonify({'error': f'Import fehlgeschlagen: {str(e)}'}), 400