246 lines
8.1 KiB
Python
246 lines
8.1 KiB
Python
import os
|
|
from datetime import datetime, timezone
|
|
from functools import wraps
|
|
|
|
import jwt
|
|
from flask import request, jsonify, current_app, make_response
|
|
|
|
from app.api import api_bp
|
|
from app.extensions import db
|
|
from app.models.user import User
|
|
|
|
|
|
def create_access_token(user_id):
|
|
exp = datetime.now(timezone.utc) + current_app.config['JWT_ACCESS_TOKEN_EXPIRES']
|
|
payload = {
|
|
'user_id': user_id,
|
|
'type': 'access',
|
|
'exp': exp,
|
|
'iat': datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
|
|
|
|
|
|
def create_refresh_token(user_id):
|
|
exp = datetime.now(timezone.utc) + current_app.config['JWT_REFRESH_TOKEN_EXPIRES']
|
|
payload = {
|
|
'user_id': user_id,
|
|
'type': 'refresh',
|
|
'exp': exp,
|
|
'iat': datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256')
|
|
|
|
|
|
def token_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
token = None
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if auth_header.startswith('Bearer '):
|
|
token = auth_header[7:]
|
|
|
|
# Fallback: token as query parameter (for direct browser downloads)
|
|
if not token:
|
|
token = request.args.get('token', '')
|
|
|
|
if not token:
|
|
return jsonify({'error': 'Token fehlt'}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['JWT_SECRET_KEY'],
|
|
algorithms=['HS256'])
|
|
if payload.get('type') != 'access':
|
|
return jsonify({'error': 'Falscher Token-Typ'}), 401
|
|
user = db.session.get(User, payload['user_id'])
|
|
if not user or not user.is_active:
|
|
return jsonify({'error': 'Benutzer nicht gefunden oder deaktiviert'}), 401
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Token abgelaufen'}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Ungueltiger Token'}), 401
|
|
|
|
request.current_user = user
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
@token_required
|
|
def decorated(*args, **kwargs):
|
|
if request.current_user.role != 'admin':
|
|
return jsonify({'error': 'Admin-Berechtigung erforderlich'}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
|
|
@api_bp.route('/auth/registration-status', methods=['GET'])
|
|
def registration_status():
|
|
"""Check if public registration is allowed."""
|
|
from app.models.settings import AppSettings
|
|
is_first_user = User.query.count() == 0
|
|
public_registration = AppSettings.get_bool('public_registration', default=True)
|
|
return jsonify({
|
|
'allowed': is_first_user or public_registration,
|
|
'is_first_user': is_first_user,
|
|
}), 200
|
|
|
|
|
|
@api_bp.route('/auth/register', methods=['POST'])
|
|
def register():
|
|
from app.models.settings import AppSettings
|
|
|
|
is_first_user = User.query.count() == 0
|
|
|
|
# Check invite token (works even if public registration is off)
|
|
invite_token = request.args.get('invite') or (request.get_json() or {}).get('invite_token')
|
|
valid_invite = False
|
|
if invite_token:
|
|
from app.models.settings import AppSettings as _S
|
|
stored = _S.get(f'invite_{invite_token}', '')
|
|
if stored == 'valid':
|
|
valid_invite = True
|
|
|
|
# Check if registration is allowed
|
|
if not is_first_user and not valid_invite and not AppSettings.get_bool('public_registration', default=True):
|
|
return jsonify({'error': 'Oeffentliche Registrierung ist deaktiviert'}), 403
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Keine Daten gesendet'}), 400
|
|
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '')
|
|
email = data.get('email', '').strip() or None
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
|
|
|
|
if len(username) < 3:
|
|
return jsonify({'error': 'Benutzername muss mindestens 3 Zeichen lang sein'}), 400
|
|
|
|
if len(password) < 8:
|
|
return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Benutzername bereits vergeben'}), 409
|
|
|
|
if email and User.query.filter_by(email=email).first():
|
|
return jsonify({'error': 'Email-Adresse bereits vergeben'}), 409
|
|
|
|
# First user becomes admin
|
|
is_first_user = User.query.count() == 0
|
|
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
role='admin' if is_first_user else 'user',
|
|
master_key_salt=os.urandom(32),
|
|
)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
# Invalidate invite token if used
|
|
if valid_invite and invite_token:
|
|
AppSettings.set(f'invite_{invite_token}', 'used')
|
|
|
|
access_token = create_access_token(user.id)
|
|
refresh_token = create_refresh_token(user.id)
|
|
|
|
response = make_response(jsonify({
|
|
'user': user.to_dict(include_email=True),
|
|
'access_token': access_token,
|
|
}))
|
|
|
|
response.set_cookie(
|
|
'refresh_token', refresh_token,
|
|
httponly=True, secure=request.is_secure,
|
|
samesite='Lax',
|
|
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
|
|
)
|
|
|
|
return response, 201
|
|
|
|
|
|
@api_bp.route('/auth/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Keine Daten gesendet'}), 400
|
|
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '')
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user or not user.check_password(password):
|
|
return jsonify({'error': 'Ungueltige Anmeldedaten'}), 401
|
|
|
|
if not user.is_active:
|
|
return jsonify({'error': 'Konto deaktiviert'}), 403
|
|
|
|
access_token = create_access_token(user.id)
|
|
refresh_token = create_refresh_token(user.id)
|
|
|
|
import base64
|
|
response = make_response(jsonify({
|
|
'user': user.to_dict(include_email=True),
|
|
'access_token': access_token,
|
|
'master_key_salt': base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None,
|
|
}))
|
|
|
|
response.set_cookie(
|
|
'refresh_token', refresh_token,
|
|
httponly=True, secure=request.is_secure,
|
|
samesite='Lax',
|
|
max_age=int(current_app.config['JWT_REFRESH_TOKEN_EXPIRES'].total_seconds()),
|
|
)
|
|
|
|
return response, 200
|
|
|
|
|
|
@api_bp.route('/auth/refresh', methods=['POST'])
|
|
def refresh():
|
|
refresh_token = request.cookies.get('refresh_token')
|
|
if not refresh_token:
|
|
return jsonify({'error': 'Refresh-Token fehlt'}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(refresh_token, current_app.config['JWT_SECRET_KEY'],
|
|
algorithms=['HS256'])
|
|
if payload.get('type') != 'refresh':
|
|
return jsonify({'error': 'Falscher Token-Typ'}), 401
|
|
user = db.session.get(User, payload['user_id'])
|
|
if not user or not user.is_active:
|
|
return jsonify({'error': 'Benutzer nicht gefunden'}), 401
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Refresh-Token abgelaufen'}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Ungueltiger Token'}), 401
|
|
|
|
access_token = create_access_token(user.id)
|
|
return jsonify({'access_token': access_token}), 200
|
|
|
|
|
|
@api_bp.route('/auth/logout', methods=['POST'])
|
|
def logout():
|
|
response = make_response(jsonify({'message': 'Abgemeldet'}))
|
|
response.delete_cookie('refresh_token')
|
|
return response, 200
|
|
|
|
|
|
@api_bp.route('/auth/me', methods=['GET'])
|
|
@token_required
|
|
def me():
|
|
import base64
|
|
user = request.current_user
|
|
data = user.to_dict(include_email=True)
|
|
data['master_key_salt'] = base64.b64encode(user.master_key_salt).decode() if user.master_key_salt else None
|
|
data['email_account_count'] = user.email_accounts.count()
|
|
return jsonify(data), 200
|