diff --git a/app/main.py b/app/main.py index 1e8c6c4..829f71e 100644 --- a/app/main.py +++ b/app/main.py @@ -95,12 +95,19 @@ def login_required(f): return wrapped -# --- Login-Brute-Force-Schutz (DB-gestützt, pro Client-IP) ----------------- +# --- Brute-Force-Schutz (DB-gestützt, pro Scope + Client-IP) --------------- +# Der Schlüssel ist ":", damit Admin-Login ('login') und +# DynDNS-Update ('dyndns') getrennt gezählt werden. -def _login_locked_for(ip): +def _attempt_key(scope, ip): + return f'{scope}:{ip}' + + +def _login_locked_for(scope, ip): """Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt.""" db = get_db() - row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?', (ip,)).fetchone() + row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?', + (_attempt_key(scope, ip),)).fetchone() db.close() if row and row['locked_until']: try: @@ -112,9 +119,10 @@ def _login_locked_for(ip): return 0 -def _record_login_fail(ip): +def _record_login_fail(scope, ip): + key = _attempt_key(scope, ip) db = get_db() - row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (ip,)).fetchone() + row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (key,)).fetchone() fails = (row['fails'] if row else 0) + 1 locked_until = None if fails >= LOGIN_MAX_FAILS: @@ -123,15 +131,15 @@ def _record_login_fail(ip): db.execute( 'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) ' 'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until', - (ip, fails, locked_until), + (key, fails, locked_until), ) db.commit() db.close() -def _reset_login_fails(ip): +def _reset_login_fails(scope, ip): db = get_db() - db.execute('DELETE FROM login_attempts WHERE ip = ?', (ip,)) + db.execute('DELETE FROM login_attempts WHERE ip = ?', (_attempt_key(scope, ip),)) db.commit() db.close() @@ -169,7 +177,7 @@ def index(): def login(): if request.method == 'POST': ip = request.remote_addr or 'unknown' - locked = _login_locked_for(ip) + locked = _login_locked_for('login', ip) if locked: flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger') return render_template('login.html') @@ -182,13 +190,13 @@ def login(): ).fetchone() db.close() if admin and check_password_hash(admin['password_hash'], password): - _reset_login_fails(ip) + _reset_login_fails('login', ip) session.clear() session['admin_id'] = admin['id'] session['admin_username'] = admin['username'] return redirect(url_for('dashboard')) - _record_login_fail(ip) + _record_login_fail('login', ip) flash('Benutzername oder Passwort falsch.', 'danger') return render_template('login.html') @@ -472,8 +480,15 @@ def user_toggle(user_id): @app.route('/nic/update') def dyndns_update(): + ip = request.remote_addr or 'unknown' + + # Brute-Force-Schutz: gesperrte Client-IPs sofort abweisen (dyndns2: "abuse") + if _login_locked_for('dyndns', ip): + return Response('abuse', 403, mimetype='text/plain') + auth = request.authorization if not auth: + _record_login_fail('dyndns', ip) return Response( 'badauth', 401, @@ -481,7 +496,7 @@ def dyndns_update(): mimetype='text/plain', ) - myip = request.args.get('myip') or request.args.get('ip') or request.remote_addr + myip = request.args.get('myip') or request.args.get('ip') or ip db = get_db() user = db.execute( @@ -491,8 +506,12 @@ def dyndns_update(): if not user or not check_password_hash(user['password_hash'], auth.password): db.close() + _record_login_fail('dyndns', ip) return Response('badauth', 401, mimetype='text/plain') + # Erfolgreiche Authentifizierung -> Fehlversuchszähler zurücksetzen + _reset_login_fails('dyndns', ip) + plesk_url = get_setting('plesk_url') plesk_api_key = get_setting('plesk_api_key') plesk_base_domain = get_setting('plesk_base_domain')