import os from flask import request, jsonify from app.api import api_bp from app.api.auth import admin_required, token_required from app.extensions import db from app.models.user import User from app.models.settings import AppSettings @api_bp.route('/users', methods=['GET']) @admin_required def list_users(): users = User.query.order_by(User.created_at.desc()).all() return jsonify([u.to_dict(include_email=True) for u in users]), 200 @api_bp.route('/users', methods=['POST']) @admin_required def create_user(): """Admin creates a new user.""" data = request.get_json() if not data: return jsonify({'error': 'Keine Daten gesendet'}), 400 username = data.get('username', '').strip() password = data.get('password', '') email = data.get('email', '').strip() or None role = data.get('role', 'user') if not username or not password: return jsonify({'error': 'Benutzername und Passwort erforderlich'}), 400 if len(username) < 3: return jsonify({'error': 'Benutzername muss mindestens 3 Zeichen lang sein'}), 400 if len(password) < 8: return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400 if role not in ('admin', 'user'): return jsonify({'error': 'Rolle muss "admin" oder "user" sein'}), 400 if User.query.filter_by(username=username).first(): return jsonify({'error': 'Benutzername bereits vergeben'}), 409 if email and User.query.filter_by(email=email).first(): return jsonify({'error': 'Email-Adresse bereits vergeben'}), 409 user = User( username=username, email=email, role=role, master_key_salt=os.urandom(32), storage_quota_mb=data.get('storage_quota_mb', 5120), ) user.set_password(password) db.session.add(user) db.session.commit() # Notify new user via email try: from app.services.system_mail import notify_user_created notify_user_created(user, request.current_user.username) except Exception: pass return jsonify(user.to_dict(include_email=True)), 201 @api_bp.route('/users/', methods=['GET']) @admin_required def get_user(user_id): user = db.session.get(User, user_id) if not user: return jsonify({'error': 'Benutzer nicht gefunden'}), 404 return jsonify(user.to_dict(include_email=True)), 200 @api_bp.route('/users/', methods=['PUT']) @admin_required def update_user(user_id): user = db.session.get(User, user_id) if not user: return jsonify({'error': 'Benutzer nicht gefunden'}), 404 data = request.get_json() if not data: return jsonify({'error': 'Keine Daten gesendet'}), 400 if 'email' in data: email = data['email'].strip() or None if email and email != user.email: if User.query.filter_by(email=email).first(): return jsonify({'error': 'Email bereits vergeben'}), 409 user.email = email if 'role' in data and data['role'] in ('admin', 'user'): if user.role == 'admin' and data['role'] == 'user': admin_count = User.query.filter_by(role='admin', is_active=True).count() if admin_count <= 1: return jsonify({'error': 'Letzter Admin kann nicht herabgestuft werden'}), 400 user.role = data['role'] if 'is_active' in data: if user.id == request.current_user.id and not data['is_active']: return jsonify({'error': 'Eigenes Konto kann nicht deaktiviert werden'}), 400 user.is_active = data['is_active'] if 'storage_quota_mb' in data: user.storage_quota_mb = max(0, int(data['storage_quota_mb'])) if 'password' in data and data['password']: if len(data['password']) < 8: return jsonify({'error': 'Passwort muss mindestens 8 Zeichen lang sein'}), 400 user.set_password(data['password']) db.session.commit() return jsonify(user.to_dict(include_email=True)), 200 @api_bp.route('/users/', methods=['DELETE']) @admin_required def delete_user(user_id): user = db.session.get(User, user_id) if not user: return jsonify({'error': 'Benutzer nicht gefunden'}), 404 if user.id == request.current_user.id: return jsonify({'error': 'Eigenes Konto kann nicht geloescht werden'}), 400 if user.role == 'admin': admin_count = User.query.filter_by(role='admin', is_active=True).count() if admin_count <= 1: return jsonify({'error': 'Letzter Admin kann nicht geloescht werden'}), 400 db.session.delete(user) db.session.commit() return jsonify({'message': 'Benutzer geloescht'}), 200 # --- App Settings --- @api_bp.route('/settings', methods=['GET']) @admin_required def get_settings(): import time as _time from datetime import datetime as _dt try: tzname = _time.strftime('%Z') except Exception: tzname = '' return jsonify({ 'public_registration': AppSettings.get_bool('public_registration', default=True), 'system_smtp_host': AppSettings.get('system_smtp_host', ''), 'system_smtp_port': int(AppSettings.get('system_smtp_port', '587')), 'system_smtp_ssl': AppSettings.get_bool('system_smtp_ssl', default=True), 'system_smtp_username': AppSettings.get('system_smtp_username', ''), 'system_smtp_password_set': bool(AppSettings.get('system_smtp_password', '')), 'system_email_from': AppSettings.get('system_email_from', ''), 'onlyoffice_url': os.environ.get('ONLYOFFICE_URL', ''), 'onlyoffice_configured': bool(os.environ.get('ONLYOFFICE_URL', '')), # Read-only system info aus der .env 'timezone': os.environ.get('TZ', 'Europe/Berlin'), 'timezone_abbr': tzname, 'server_time': _dt.now().isoformat(timespec='seconds'), 'ntp_server': os.environ.get('NTP_SERVER', ''), }), 200 @api_bp.route('/settings', methods=['PUT']) @admin_required def update_settings(): data = request.get_json() if 'public_registration' in data: AppSettings.set('public_registration', str(data['public_registration']).lower()) for key in ['system_smtp_host', 'system_smtp_port', 'system_smtp_ssl', 'system_smtp_username', 'system_email_from']: if key in data: AppSettings.set(key, str(data[key])) if 'system_smtp_password' in data and data['system_smtp_password']: AppSettings.set('system_smtp_password', data['system_smtp_password']) return jsonify({'message': 'Einstellungen gespeichert'}), 200 @api_bp.route('/settings/test-email', methods=['POST']) @admin_required def test_system_email(): """Test system SMTP connection.""" import smtplib host = AppSettings.get('system_smtp_host', '') port = int(AppSettings.get('system_smtp_port', '587')) ssl = AppSettings.get_bool('system_smtp_ssl', default=True) username = AppSettings.get('system_smtp_username', '') password = AppSettings.get('system_smtp_password', '') from_addr = AppSettings.get('system_email_from', '') if not host or not username or not password: return jsonify({'error': 'SMTP-Einstellungen unvollstaendig'}), 400 try: if ssl and port == 465: server = smtplib.SMTP_SSL(host, port, timeout=10) else: server = smtplib.SMTP(host, port, timeout=10) server.starttls() server.login(username, password) server.quit() return jsonify({'message': 'SMTP-Verbindung erfolgreich'}), 200 except Exception as e: return jsonify({'error': f'Verbindungsfehler: {str(e)}'}), 400 # --- Invite Links --- @api_bp.route('/settings/invite', methods=['POST']) @admin_required def create_invite_link(): """Generate a one-time registration link.""" import secrets token = secrets.token_urlsafe(32) AppSettings.set(f'invite_{token}', 'valid') data = request.get_json() or {} send_email = data.get('send_to_email', '') result = { 'token': token, 'url': f'/register?invite={token}', } # Optionally send via system email if send_email: import smtplib from email.mime.text import MIMEText host = AppSettings.get('system_smtp_host', '') port = int(AppSettings.get('system_smtp_port', '587')) ssl = AppSettings.get_bool('system_smtp_ssl', default=True) username = AppSettings.get('system_smtp_username', '') password = AppSettings.get('system_smtp_password', '') from_addr = AppSettings.get('system_email_from', '') if not host or not password: return jsonify({**result, 'email_sent': False, 'email_error': 'System-Email nicht konfiguriert'}), 200 # Build full URL from request base_url = request.host_url.rstrip('/') full_url = f'{base_url}/register?invite={token}' body = ( f'Du wurdest zur Mini-Cloud eingeladen!\n\n' f'Klicke auf folgenden Link, um dich zu registrieren:\n' f'{full_url}\n\n' f'Dieser Link ist nur einmal verwendbar.' ) msg = MIMEText(body, 'plain', 'utf-8') msg['From'] = from_addr or username msg['To'] = send_email msg['Subject'] = 'Einladung zur Mini-Cloud' try: if ssl and port == 465: server = smtplib.SMTP_SSL(host, port, timeout=10) else: server = smtplib.SMTP(host, port, timeout=10) server.starttls() server.login(username, password) server.sendmail(from_addr or username, [send_email], msg.as_string()) server.quit() result['email_sent'] = True except Exception as e: result['email_sent'] = False result['email_error'] = str(e) return jsonify(result), 201 # --- User search (for sharing dialogs) --- @api_bp.route('/auth/me', methods=['GET']) @token_required def get_me(): return jsonify(request.current_user.to_dict(include_email=True)), 200 @api_bp.route('/auth/me', methods=['PUT']) @token_required def update_me(): user = request.current_user data = request.get_json() or {} if 'first_name' in data: user.first_name = (data.get('first_name') or '').strip() or None if 'last_name' in data: user.last_name = (data.get('last_name') or '').strip() or None if 'email' in data: email = (data.get('email') or '').strip() or None if email and email != user.email: if User.query.filter(User.email == email, User.id != user.id).first(): return jsonify({'error': 'E-Mail ist bereits vergeben'}), 409 user.email = email db.session.commit() return jsonify(user.to_dict(include_email=True)), 200 @api_bp.route('/users/search', methods=['GET']) @token_required def search_users(): """Search users by username - for sharing dialogs.""" query = request.args.get('q', '').strip() if len(query) < 2: return jsonify([]), 200 like = f'%{query}%' users = User.query.filter( (User.username.ilike(like)) | (User.first_name.ilike(like)) | (User.last_name.ilike(like)), User.id != request.current_user.id, User.is_active == True, ).limit(10).all() return jsonify([{ 'id': u.id, 'username': u.username, 'full_name': u.full_name, 'display_name': u.display_name, } for u in users]), 200 # --- Change password (non-admin, own account) --- @api_bp.route('/auth/change-password', methods=['POST']) @token_required def change_password(): data = request.get_json() if not data: return jsonify({'error': 'Keine Daten gesendet'}), 400 current_password = data.get('current_password', '') new_password = data.get('new_password', '') if not current_password or not new_password: return jsonify({'error': 'Aktuelles und neues Passwort erforderlich'}), 400 if len(new_password) < 8: return jsonify({'error': 'Neues Passwort muss mindestens 8 Zeichen lang sein'}), 400 user = request.current_user if not user.check_password(current_password): return jsonify({'error': 'Aktuelles Passwort falsch'}), 401 user.set_password(new_password) db.session.commit() return jsonify({'message': 'Passwort geaendert'}), 200