minmal-file-cloud-email-pim.../backend/app/api/auth.py

242 lines
7.9 KiB
Python

import os
from datetime import datetime, timezone
from functools import wraps
import jwt
from flask import request, jsonify, current_app, make_response
from app.api import api_bp
from app.extensions import db
from app.models.user import User
def create_access_token(user_id):
exp = datetime.now(timezone.utc) + current_app.config['JWT_ACCESS_TOKEN_EXPIRES']
payload = {
'user_id': user_id,
'type': 'access',
'exp': exp,
'iat': datetime.now(timezone.utc),
}
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
def create_refresh_token(user_id):
exp = datetime.now(timezone.utc) + current_app.config['JWT_REFRESH_TOKEN_EXPIRES']
payload = {
'user_id': user_id,
'type': 'refresh',
'exp': exp,
'iat': datetime.now(timezone.utc),
}
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
if not token:
return jsonify({'error': 'Token fehlt'}), 401
try:
payload = jwt.decode(token, current_app.config['JWT_SECRET_KEY'],
algorithms=['HS256'])
if payload.get('type') != 'access':
return jsonify({'error': 'Falscher Token-Typ'}), 401
user = db.session.get(User, payload['user_id'])
if not user or not user.is_active:
return jsonify({'error': 'Benutzer nicht gefunden oder deaktiviert'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token abgelaufen'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Ungueltiger Token'}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if request.current_user.role != 'admin':
return jsonify({'error': 'Admin-Berechtigung erforderlich'}), 403
return f(*args, **kwargs)
return decorated
@api_bp.route('/auth/registration-status', methods=['GET'])
def registration_status():
"""Check if public registration is allowed."""
from app.models.settings import AppSettings
is_first_user = User.query.count() == 0
public_registration = AppSettings.get_bool('public_registration', default=True)
return jsonify({
'allowed': is_first_user or public_registration,
'is_first_user': is_first_user,
}), 200
@api_bp.route('/auth/register', methods=['POST'])
def register():
from app.models.settings import AppSettings
is_first_user = User.query.count() == 0
# Check invite token (works even if public registration is off)
invite_token = request.args.get('invite') or (request.get_json() or {}).get('invite_token')
valid_invite = False
if invite_token:
from app.models.settings import AppSettings as _S
stored = _S.get(f'invite_{invite_token}', '')
if stored == 'valid':
valid_invite = True
# Check if registration is allowed
if not is_first_user and not valid_invite and not AppSettings.get_bool('public_registration', default=True):
return jsonify({'error': 'Oeffentliche Registrierung ist deaktiviert'}), 403
data = request.get_json()
if not data:
return jsonify({'error': 'Keine Daten gesendet'}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
email = data.get('email', '').strip() or None
if not username or not password:
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
if len(username) < 3:
return jsonify({'error': 'Benutzername muss mindestens 3 Zeichen lang sein'}), 400
if len(password) < 8:
return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400
if User.query.filter_by(username=username).first():
return jsonify({'error': 'Benutzername bereits vergeben'}), 409
if email and User.query.filter_by(email=email).first():
return jsonify({'error': 'Email-Adresse bereits vergeben'}), 409
# First user becomes admin
is_first_user = User.query.count() == 0
user = User(
username=username,
email=email,
role='admin' if is_first_user else 'user',
master_key_salt=os.urandom(32),
)
user.set_password(password)
db.session.add(user)
db.session.commit()
# Invalidate invite token if used
if valid_invite and invite_token:
AppSettings.set(f'invite_{invite_token}', 'used')
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
response = make_response(jsonify({
'user': user.to_dict(include_email=True),
'access_token': access_token,
}))
response.set_cookie(
'refresh_token', refresh_token,
httponly=True, secure=request.is_secure,
samesite='Lax',
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
)
return response, 201
@api_bp.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({'error': 'Keine Daten gesendet'}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': 'Ungueltige Anmeldedaten'}), 401
if not user.is_active:
return jsonify({'error': 'Konto deaktiviert'}), 403
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
import base64
response = make_response(jsonify({
'user': user.to_dict(include_email=True),
'access_token': access_token,
'master_key_salt': base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None,
}))
response.set_cookie(
'refresh_token', refresh_token,
httponly=True, secure=request.is_secure,
samesite='Lax',
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
)
return response, 200
@api_bp.route('/auth/refresh', methods=['POST'])
def refresh():
refresh_token = request.cookies.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Refresh-Token fehlt'}), 401
try:
payload = jwt.decode(refresh_token, current_app.config['JWT_SECRET_KEY'],
algorithms=['HS256'])
if payload.get('type') != 'refresh':
return jsonify({'error': 'Falscher Token-Typ'}), 401
user = db.session.get(User, payload['user_id'])
if not user or not user.is_active:
return jsonify({'error': 'Benutzer nicht gefunden'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Refresh-Token abgelaufen'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Ungueltiger Token'}), 401
access_token = create_access_token(user.id)
return jsonify({'access_token': access_token}), 200
@api_bp.route('/auth/logout', methods=['POST'])
def logout():
response = make_response(jsonify({'message': 'Abgemeldet'}))
response.delete_cookie('refresh_token')
return response, 200
@api_bp.route('/auth/me', methods=['GET'])
@token_required
def me():
import base64
user = request.current_user
data = user.to_dict(include_email=True)
data['master_key_salt'] = base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None
data['email_account_count'] = user.email_accounts.count()
return jsonify(data), 200