F-08: Brute-Force-Schutz fuer /nic/update

- Lockout-Mechanismus um 'scope' erweitert (login vs. dyndns getrennt gezaehlt)
- /nic/update sperrt Client-IPs nach 5 fehlgeschlagenen Basic-Auth-Versuchen
  (dyndns2-Antwort 'abuse', 403); erfolgreiche Auth setzt den Zaehler zurueck

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Stefan Hacker
2026-06-06 14:55:26 +02:00
parent c3070469c1
commit a5787a5393
+31 -12
View File
@@ -95,12 +95,19 @@ def login_required(f):
return wrapped return wrapped
# --- Login-Brute-Force-Schutz (DB-gestützt, pro Client-IP) ----------------- # --- Brute-Force-Schutz (DB-gestützt, pro Scope + Client-IP) ---------------
# Der Schlüssel ist "<scope>:<ip>", damit Admin-Login ('login') und
# DynDNS-Update ('dyndns') getrennt gezählt werden.
def _login_locked_for(ip): def _attempt_key(scope, ip):
return f'{scope}:{ip}'
def _login_locked_for(scope, ip):
"""Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt.""" """Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt."""
db = get_db() db = get_db()
row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?', (ip,)).fetchone() row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?',
(_attempt_key(scope, ip),)).fetchone()
db.close() db.close()
if row and row['locked_until']: if row and row['locked_until']:
try: try:
@@ -112,9 +119,10 @@ def _login_locked_for(ip):
return 0 return 0
def _record_login_fail(ip): def _record_login_fail(scope, ip):
key = _attempt_key(scope, ip)
db = get_db() db = get_db()
row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (ip,)).fetchone() row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (key,)).fetchone()
fails = (row['fails'] if row else 0) + 1 fails = (row['fails'] if row else 0) + 1
locked_until = None locked_until = None
if fails >= LOGIN_MAX_FAILS: if fails >= LOGIN_MAX_FAILS:
@@ -123,15 +131,15 @@ def _record_login_fail(ip):
db.execute( db.execute(
'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) ' 'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) '
'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until', 'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until',
(ip, fails, locked_until), (key, fails, locked_until),
) )
db.commit() db.commit()
db.close() db.close()
def _reset_login_fails(ip): def _reset_login_fails(scope, ip):
db = get_db() db = get_db()
db.execute('DELETE FROM login_attempts WHERE ip = ?', (ip,)) db.execute('DELETE FROM login_attempts WHERE ip = ?', (_attempt_key(scope, ip),))
db.commit() db.commit()
db.close() db.close()
@@ -169,7 +177,7 @@ def index():
def login(): def login():
if request.method == 'POST': if request.method == 'POST':
ip = request.remote_addr or 'unknown' ip = request.remote_addr or 'unknown'
locked = _login_locked_for(ip) locked = _login_locked_for('login', ip)
if locked: if locked:
flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger') flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger')
return render_template('login.html') return render_template('login.html')
@@ -182,13 +190,13 @@ def login():
).fetchone() ).fetchone()
db.close() db.close()
if admin and check_password_hash(admin['password_hash'], password): if admin and check_password_hash(admin['password_hash'], password):
_reset_login_fails(ip) _reset_login_fails('login', ip)
session.clear() session.clear()
session['admin_id'] = admin['id'] session['admin_id'] = admin['id']
session['admin_username'] = admin['username'] session['admin_username'] = admin['username']
return redirect(url_for('dashboard')) return redirect(url_for('dashboard'))
_record_login_fail(ip) _record_login_fail('login', ip)
flash('Benutzername oder Passwort falsch.', 'danger') flash('Benutzername oder Passwort falsch.', 'danger')
return render_template('login.html') return render_template('login.html')
@@ -472,8 +480,15 @@ def user_toggle(user_id):
@app.route('/nic/update') @app.route('/nic/update')
def dyndns_update(): def dyndns_update():
ip = request.remote_addr or 'unknown'
# Brute-Force-Schutz: gesperrte Client-IPs sofort abweisen (dyndns2: "abuse")
if _login_locked_for('dyndns', ip):
return Response('abuse', 403, mimetype='text/plain')
auth = request.authorization auth = request.authorization
if not auth: if not auth:
_record_login_fail('dyndns', ip)
return Response( return Response(
'badauth', 'badauth',
401, 401,
@@ -481,7 +496,7 @@ def dyndns_update():
mimetype='text/plain', mimetype='text/plain',
) )
myip = request.args.get('myip') or request.args.get('ip') or request.remote_addr myip = request.args.get('myip') or request.args.get('ip') or ip
db = get_db() db = get_db()
user = db.execute( user = db.execute(
@@ -491,8 +506,12 @@ def dyndns_update():
if not user or not check_password_hash(user['password_hash'], auth.password): if not user or not check_password_hash(user['password_hash'], auth.password):
db.close() db.close()
_record_login_fail('dyndns', ip)
return Response('badauth', 401, mimetype='text/plain') return Response('badauth', 401, mimetype='text/plain')
# Erfolgreiche Authentifizierung -> Fehlversuchszähler zurücksetzen
_reset_login_fails('dyndns', ip)
plesk_url = get_setting('plesk_url') plesk_url = get_setting('plesk_url')
plesk_api_key = get_setting('plesk_api_key') plesk_api_key = get_setting('plesk_api_key')
plesk_base_domain = get_setting('plesk_base_domain') plesk_base_domain = get_setting('plesk_base_domain')