316 lines
11 KiB
Python
316 lines
11 KiB
Python
import os
|
|
|
|
from flask import request, jsonify
|
|
|
|
from app.api import api_bp
|
|
from app.api.auth import admin_required, token_required
|
|
from app.extensions import db
|
|
from app.models.user import User
|
|
from app.models.settings import AppSettings
|
|
|
|
|
|
@api_bp.route('/users', methods=['GET'])
|
|
@admin_required
|
|
def list_users():
|
|
users = User.query.order_by(User.created_at.desc()).all()
|
|
return jsonify([u.to_dict(include_email=True) for u in users]), 200
|
|
|
|
|
|
@api_bp.route('/users', methods=['POST'])
|
|
@admin_required
|
|
def create_user():
|
|
"""Admin creates a new user."""
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Keine Daten gesendet'}), 400
|
|
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '')
|
|
email = data.get('email', '').strip() or None
|
|
role = data.get('role', 'user')
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400
|
|
|
|
if len(username) < 3:
|
|
return jsonify({'error': 'Benutzername muss mindestens 3 Zeichen lang sein'}), 400
|
|
|
|
if len(password) < 8:
|
|
return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400
|
|
|
|
if role not in ('admin', 'user'):
|
|
return jsonify({'error': 'Rolle muss "admin" oder "user" sein'}), 400
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Benutzername bereits vergeben'}), 409
|
|
|
|
if email and User.query.filter_by(email=email).first():
|
|
return jsonify({'error': 'Email-Adresse bereits vergeben'}), 409
|
|
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
role=role,
|
|
master_key_salt=os.urandom(32),
|
|
storage_quota_mb=data.get('storage_quota_mb', 5120),
|
|
)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
# Notify new user via email
|
|
try:
|
|
from app.services.system_mail import notify_user_created
|
|
notify_user_created(user, request.current_user.username)
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify(user.to_dict(include_email=True)), 201
|
|
|
|
|
|
@api_bp.route('/users/<int:user_id>', methods=['GET'])
|
|
@admin_required
|
|
def get_user(user_id):
|
|
user = db.session.get(User, user_id)
|
|
if not user:
|
|
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
|
|
return jsonify(user.to_dict(include_email=True)), 200
|
|
|
|
|
|
@api_bp.route('/users/<int:user_id>', methods=['PUT'])
|
|
@admin_required
|
|
def update_user(user_id):
|
|
user = db.session.get(User, user_id)
|
|
if not user:
|
|
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Keine Daten gesendet'}), 400
|
|
|
|
if 'email' in data:
|
|
email = data['email'].strip() or None
|
|
if email and email != user.email:
|
|
if User.query.filter_by(email=email).first():
|
|
return jsonify({'error': 'Email bereits vergeben'}), 409
|
|
user.email = email
|
|
|
|
if 'role' in data and data['role'] in ('admin', 'user'):
|
|
if user.role == 'admin' and data['role'] == 'user':
|
|
admin_count = User.query.filter_by(role='admin', is_active=True).count()
|
|
if admin_count <= 1:
|
|
return jsonify({'error': 'Letzter Admin kann nicht herabgestuft werden'}), 400
|
|
user.role = data['role']
|
|
|
|
if 'is_active' in data:
|
|
if user.id == request.current_user.id and not data['is_active']:
|
|
return jsonify({'error': 'Eigenes Konto kann nicht deaktiviert werden'}), 400
|
|
user.is_active = data['is_active']
|
|
|
|
if 'storage_quota_mb' in data:
|
|
user.storage_quota_mb = max(0, int(data['storage_quota_mb']))
|
|
|
|
if 'password' in data and data['password']:
|
|
if len(data['password']) < 8:
|
|
return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400
|
|
user.set_password(data['password'])
|
|
|
|
db.session.commit()
|
|
return jsonify(user.to_dict(include_email=True)), 200
|
|
|
|
|
|
@api_bp.route('/users/<int:user_id>', methods=['DELETE'])
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
user = db.session.get(User, user_id)
|
|
if not user:
|
|
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
|
|
|
|
if user.id == request.current_user.id:
|
|
return jsonify({'error': 'Eigenes Konto kann nicht geloescht werden'}), 400
|
|
|
|
if user.role == 'admin':
|
|
admin_count = User.query.filter_by(role='admin', is_active=True).count()
|
|
if admin_count <= 1:
|
|
return jsonify({'error': 'Letzter Admin kann nicht geloescht werden'}), 400
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Benutzer geloescht'}), 200
|
|
|
|
|
|
# --- App Settings ---
|
|
|
|
@api_bp.route('/settings', methods=['GET'])
|
|
@admin_required
|
|
def get_settings():
|
|
return jsonify({
|
|
'public_registration': AppSettings.get_bool('public_registration', default=True),
|
|
'system_smtp_host': AppSettings.get('system_smtp_host', ''),
|
|
'system_smtp_port': int(AppSettings.get('system_smtp_port', '587')),
|
|
'system_smtp_ssl': AppSettings.get_bool('system_smtp_ssl', default=True),
|
|
'system_smtp_username': AppSettings.get('system_smtp_username', ''),
|
|
'system_smtp_password_set': bool(AppSettings.get('system_smtp_password', '')),
|
|
'system_email_from': AppSettings.get('system_email_from', ''),
|
|
'onlyoffice_url': os.environ.get('ONLYOFFICE_URL', ''),
|
|
'onlyoffice_configured': bool(os.environ.get('ONLYOFFICE_URL', '')),
|
|
}), 200
|
|
|
|
|
|
@api_bp.route('/settings', methods=['PUT'])
|
|
@admin_required
|
|
def update_settings():
|
|
data = request.get_json()
|
|
if 'public_registration' in data:
|
|
AppSettings.set('public_registration', str(data['public_registration']).lower())
|
|
for key in ['system_smtp_host', 'system_smtp_port', 'system_smtp_ssl',
|
|
'system_smtp_username', 'system_email_from']:
|
|
if key in data:
|
|
AppSettings.set(key, str(data[key]))
|
|
if 'system_smtp_password' in data and data['system_smtp_password']:
|
|
AppSettings.set('system_smtp_password', data['system_smtp_password'])
|
|
return jsonify({'message': 'Einstellungen gespeichert'}), 200
|
|
|
|
|
|
@api_bp.route('/settings/test-email', methods=['POST'])
|
|
@admin_required
|
|
def test_system_email():
|
|
"""Test system SMTP connection."""
|
|
import smtplib
|
|
|
|
host = AppSettings.get('system_smtp_host', '')
|
|
port = int(AppSettings.get('system_smtp_port', '587'))
|
|
ssl = AppSettings.get_bool('system_smtp_ssl', default=True)
|
|
username = AppSettings.get('system_smtp_username', '')
|
|
password = AppSettings.get('system_smtp_password', '')
|
|
from_addr = AppSettings.get('system_email_from', '')
|
|
|
|
if not host or not username or not password:
|
|
return jsonify({'error': 'SMTP-Einstellungen unvollstaendig'}), 400
|
|
|
|
try:
|
|
if ssl and port == 465:
|
|
server = smtplib.SMTP_SSL(host, port, timeout=10)
|
|
else:
|
|
server = smtplib.SMTP(host, port, timeout=10)
|
|
server.starttls()
|
|
server.login(username, password)
|
|
server.quit()
|
|
return jsonify({'message': 'SMTP-Verbindung erfolgreich'}), 200
|
|
except Exception as e:
|
|
return jsonify({'error': f'Verbindungsfehler: {str(e)}'}), 400
|
|
|
|
|
|
# --- Invite Links ---
|
|
|
|
@api_bp.route('/settings/invite', methods=['POST'])
|
|
@admin_required
|
|
def create_invite_link():
|
|
"""Generate a one-time registration link."""
|
|
import secrets
|
|
token = secrets.token_urlsafe(32)
|
|
AppSettings.set(f'invite_{token}', 'valid')
|
|
|
|
data = request.get_json() or {}
|
|
send_email = data.get('send_to_email', '')
|
|
|
|
result = {
|
|
'token': token,
|
|
'url': f'/register?invite={token}',
|
|
}
|
|
|
|
# Optionally send via system email
|
|
if send_email:
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
|
|
host = AppSettings.get('system_smtp_host', '')
|
|
port = int(AppSettings.get('system_smtp_port', '587'))
|
|
ssl = AppSettings.get_bool('system_smtp_ssl', default=True)
|
|
username = AppSettings.get('system_smtp_username', '')
|
|
password = AppSettings.get('system_smtp_password', '')
|
|
from_addr = AppSettings.get('system_email_from', '')
|
|
|
|
if not host or not password:
|
|
return jsonify({**result, 'email_sent': False,
|
|
'email_error': 'System-Email nicht konfiguriert'}), 200
|
|
|
|
# Build full URL from request
|
|
base_url = request.host_url.rstrip('/')
|
|
full_url = f'{base_url}/register?invite={token}'
|
|
|
|
body = (
|
|
f'Du wurdest zur Mini-Cloud eingeladen!\n\n'
|
|
f'Klicke auf folgenden Link, um dich zu registrieren:\n'
|
|
f'{full_url}\n\n'
|
|
f'Dieser Link ist nur einmal verwendbar.'
|
|
)
|
|
msg = MIMEText(body, 'plain', 'utf-8')
|
|
msg['From'] = from_addr or username
|
|
msg['To'] = send_email
|
|
msg['Subject'] = 'Einladung zur Mini-Cloud'
|
|
|
|
try:
|
|
if ssl and port == 465:
|
|
server = smtplib.SMTP_SSL(host, port, timeout=10)
|
|
else:
|
|
server = smtplib.SMTP(host, port, timeout=10)
|
|
server.starttls()
|
|
server.login(username, password)
|
|
server.sendmail(from_addr or username, [send_email], msg.as_string())
|
|
server.quit()
|
|
result['email_sent'] = True
|
|
except Exception as e:
|
|
result['email_sent'] = False
|
|
result['email_error'] = str(e)
|
|
|
|
return jsonify(result), 201
|
|
|
|
|
|
# --- User search (for sharing dialogs) ---
|
|
|
|
@api_bp.route('/users/search', methods=['GET'])
|
|
@token_required
|
|
def search_users():
|
|
"""Search users by username - for sharing dialogs."""
|
|
query = request.args.get('q', '').strip()
|
|
if len(query) < 2:
|
|
return jsonify([]), 200
|
|
|
|
users = User.query.filter(
|
|
User.username.ilike(f'%{query}%'),
|
|
User.id != request.current_user.id,
|
|
User.is_active == True,
|
|
).limit(10).all()
|
|
|
|
return jsonify([{'id': u.id, 'username': u.username} for u in users]), 200
|
|
|
|
|
|
# --- Change password (non-admin, own account) ---
|
|
|
|
@api_bp.route('/auth/change-password', methods=['POST'])
|
|
@token_required
|
|
def change_password():
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Keine Daten gesendet'}), 400
|
|
|
|
current_password = data.get('current_password', '')
|
|
new_password = data.get('new_password', '')
|
|
|
|
if not current_password or not new_password:
|
|
return jsonify({'error': 'Aktuelles und neues Passwort erforderlich'}), 400
|
|
|
|
if len(new_password) < 8:
|
|
return jsonify({'error': 'Neues Passwort muss mindestens 8 Zeichen lang sein'}), 400
|
|
|
|
user = request.current_user
|
|
if not user.check_password(current_password):
|
|
return jsonify({'error': 'Aktuelles Passwort falsch'}), 401
|
|
|
|
user.set_password(new_password)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passwort geaendert'}), 200
|