Files
minmal-file-cloud-email-pim…/backend/app/api/auth.py
T
Stefan Hacker 62f550c373 feat: Mini-Cloud Plattform - komplette Implementierung Phase 0-8
Selbstgehostete Web-Cloud mit Dateiverwaltung, Kalender, Kontakte,
Email-Webclient, Office-Viewer und Passwort-Manager.

Backend (Flask/Python):
- JWT-Auth mit Access/Refresh Tokens, Benutzerverwaltung
- Dateien: Upload/Download, Ordner, Berechtigungen, Share-Links
- Kalender: CRUD, Teilen, iCal-Export, CalDAV well-known URLs
- Kontakte: Adressbuecher, vCard-Export, Teilen
- Email: IMAP/SMTP-Proxy, Multi-Account
- Office-Viewer: DOCX/XLSX/PPTX/PDF Vorschau
- Passwort-Manager: AES-256-GCM clientseitig, KeePass-Import
- Sync-API fuer Desktop/Mobile-Clients
- SQLite mit WAL-Modus

Frontend (Vue 3 + PrimeVue):
- Datei-Explorer mit Breadcrumbs und Share-Dialogen
- Monatskalender mit Event-Verwaltung
- Kontaktliste mit Adressbuch-Sidebar
- Email-Client mit 3-Spalten-Layout
- Passwort-Manager mit TOTP und Passwort-Generator
- Admin-Panel, Settings, oeffentliche Share-Seite

Docker: Multi-Stage Build, Bind Mounts (keine Volumes)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 14:53:28 +02:00

209 lines
6.7 KiB
Python

import os
from datetime import datetime, timezone
from functools import wraps
import jwt
from flask import request, jsonify, current_app, make_response
from app.api import api_bp
from app.extensions import db
from app.models.user import User
def create_access_token(user_id):
exp = datetime.now(timezone.utc) + current_app.config['JWT_ACCESS_TOKEN_EXPIRES']
payload = {
'user_id': user_id,
'type': 'access',
'exp': exp,
'iat': datetime.now(timezone.utc),
}
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
def create_refresh_token(user_id):
exp = datetime.now(timezone.utc) + current_app.config['JWT_REFRESH_TOKEN_EXPIRES']
payload = {
'user_id': user_id,
'type': 'refresh',
'exp': exp,
'iat': datetime.now(timezone.utc),
}
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
if not token:
return jsonify({'error': 'Token fehlt'}), 401
try:
payload = jwt.decode(token, current_app.config['JWT_SECRET_KEY'],
algorithms=['HS256'])
if payload.get('type') != 'access':
return jsonify({'error': 'Falscher Token-Typ'}), 401
user = db.session.get(User, payload['user_id'])
if not user or not user.is_active:
return jsonify({'error': 'Benutzer nicht gefunden oder deaktiviert'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token abgelaufen'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Ungueltiger Token'}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if request.current_user.role != 'admin':
return jsonify({'error': 'Admin-Berechtigung erforderlich'}), 403
return f(*args, **kwargs)
return decorated
@api_bp.route('/auth/register', methods=['POST'])
def register():
data = request.get_json()
if not data:
return jsonify({'error': 'Keine Daten gesendet'}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
email = data.get('email', '').strip() or None
if not username or not password:
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
if len(username) < 3:
return jsonify({'error': 'Benutzername muss mindestens 3 Zeichen lang sein'}), 400
if len(password) < 8:
return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400
if User.query.filter_by(username=username).first():
return jsonify({'error': 'Benutzername bereits vergeben'}), 409
if email and User.query.filter_by(email=email).first():
return jsonify({'error': 'Email-Adresse bereits vergeben'}), 409
# First user becomes admin
is_first_user = User.query.count() == 0
user = User(
username=username,
email=email,
role='admin' if is_first_user else 'user',
master_key_salt=os.urandom(32),
)
user.set_password(password)
db.session.add(user)
db.session.commit()
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
response = make_response(jsonify({
'user': user.to_dict(include_email=True),
'access_token': access_token,
}))
response.set_cookie(
'refresh_token', refresh_token,
httponly=True, secure=request.is_secure,
samesite='Lax',
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
)
return response, 201
@api_bp.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({'error': 'Keine Daten gesendet'}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': 'Ungueltige Anmeldedaten'}), 401
if not user.is_active:
return jsonify({'error': 'Konto deaktiviert'}), 403
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
import base64
response = make_response(jsonify({
'user': user.to_dict(include_email=True),
'access_token': access_token,
'master_key_salt': base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None,
}))
response.set_cookie(
'refresh_token', refresh_token,
httponly=True, secure=request.is_secure,
samesite='Lax',
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
)
return response, 200
@api_bp.route('/auth/refresh', methods=['POST'])
def refresh():
refresh_token = request.cookies.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Refresh-Token fehlt'}), 401
try:
payload = jwt.decode(refresh_token, current_app.config['JWT_SECRET_KEY'],
algorithms=['HS256'])
if payload.get('type') != 'refresh':
return jsonify({'error': 'Falscher Token-Typ'}), 401
user = db.session.get(User, payload['user_id'])
if not user or not user.is_active:
return jsonify({'error': 'Benutzer nicht gefunden'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Refresh-Token abgelaufen'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Ungueltiger Token'}), 401
access_token = create_access_token(user.id)
return jsonify({'access_token': access_token}), 200
@api_bp.route('/auth/logout', methods=['POST'])
def logout():
response = make_response(jsonify({'message': 'Abgemeldet'}))
response.delete_cookie('refresh_token')
return response, 200
@api_bp.route('/auth/me', methods=['GET'])
@token_required
def me():
import base64
user = request.current_user
data = user.to_dict(include_email=True)
data['master_key_salt'] = base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None
data['email_account_count'] = user.email_accounts.count()
return jsonify(data), 200