Files
dyndns-server/app/main.py
T
Stefan Hacker c3070469c1 Security-Hardening (Pentest-Findings F-02 bis F-07)
- CSRF-Schutz: session-gebundenes Token in allen POST-Formularen, serverseitig
  per before_request geprueft; /nic/update ausgenommen (Basic-Auth-API)
- Brute-Force-Schutz: DB-gestuetzter Login-Lockout pro Client-IP
  (5 Fehlversuche -> 15 min), echte IP via ProxyFix/X-Forwarded-For
- SSRF: validate_plesk_url() erzwingt http(s) und blockt Link-Local/Metadata,
  Multicast und reservierte Ziele
- Session-Cookies: HttpOnly, SameSite=Lax, Secure (per Env abschaltbar)
- Security-Header: CSP, X-Frame-Options, X-Content-Type-Options, Referrer-Policy
- Generische Plesk-Fehlermeldungen (keine internen URLs im UI)
- CSS/JS nach static/ ausgelagert -> strikte CSP ohne 'unsafe-inline'
- login_attempts-Tabelle + README-Security-Abschnitt

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 14:45:27 +02:00

568 lines
20 KiB
Python

import hmac
import ipaddress
import os
import re
import secrets
import socket
import sqlite3
from datetime import datetime, timedelta
from functools import wraps
from urllib.parse import urlparse
from flask import (Flask, Response, abort, flash, redirect, render_template,
request, session, url_for)
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.security import check_password_hash, generate_password_hash
from database import get_db, get_setting, init_db, set_setting
from plesk import test_connection, update_dns_record
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(32).hex())
# Hinter dem nginx-Reverse-Proxy: X-Forwarded-* auswerten, damit
# request.remote_addr die echte Client-IP liefert (für Login-Lockout & myip).
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
# Session-Cookie härten. SESSION_COOKIE_SECURE für lokale http-Tests via
# Env abschaltbar (Default an, da Produktivbetrieb hinter HTTPS läuft).
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
SESSION_COOKIE_SECURE=os.environ.get('SESSION_COOKIE_SECURE', '1') == '1',
)
# Endpunkte ohne CSRF-Token-Prüfung (reine API/Basic-Auth, keine Cookies).
CSRF_EXEMPT = {'dyndns_update'}
# Login-Brute-Force-Schutz
LOGIN_MAX_FAILS = 5
LOGIN_LOCK_MINUTES = 15
# Security-Header. CSS/JS sind ausgelagert (static/), daher kein 'unsafe-inline'.
# Erlaubt sind nur die eigene Origin und das Bootstrap-CDN.
CSP = (
"default-src 'self'; "
"img-src 'self' data:; "
"style-src 'self' https://cdn.jsdelivr.net; "
"script-src 'self' https://cdn.jsdelivr.net; "
"font-src 'self' https://cdn.jsdelivr.net; "
"form-action 'self'; frame-ancestors 'none'; base-uri 'self'"
)
@app.after_request
def _security_headers(resp):
resp.headers['X-Frame-Options'] = 'DENY'
resp.headers['X-Content-Type-Options'] = 'nosniff'
resp.headers['Referrer-Policy'] = 'no-referrer'
resp.headers.setdefault('Content-Security-Policy', CSP)
return resp
# ---------------------------------------------------------------------------
# CSRF-Schutz (schlanker Session-Token, ohne externe Abhängigkeit)
# ---------------------------------------------------------------------------
@app.before_request
def _csrf_protect():
if request.endpoint in CSRF_EXEMPT:
return
if '_csrf_token' not in session:
session['_csrf_token'] = secrets.token_urlsafe(32)
if request.method in ('POST', 'PUT', 'PATCH', 'DELETE'):
token = session.get('_csrf_token', '')
sent = request.form.get('csrf_token', '')
if not token or not sent or not hmac.compare_digest(token, sent):
abort(400, 'CSRF-Token ungültig oder fehlt.')
@app.context_processor
def _inject_csrf():
return {'csrf_token': lambda: session.get('_csrf_token', '')}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def login_required(f):
@wraps(f)
def wrapped(*args, **kwargs):
if 'admin_id' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return wrapped
# --- Login-Brute-Force-Schutz (DB-gestützt, pro Client-IP) -----------------
def _login_locked_for(ip):
"""Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt."""
db = get_db()
row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?', (ip,)).fetchone()
db.close()
if row and row['locked_until']:
try:
until = datetime.strptime(row['locked_until'], '%Y-%m-%d %H:%M:%S')
except ValueError:
return 0
remaining = (until - datetime.now()).total_seconds()
return int(remaining) if remaining > 0 else 0
return 0
def _record_login_fail(ip):
db = get_db()
row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (ip,)).fetchone()
fails = (row['fails'] if row else 0) + 1
locked_until = None
if fails >= LOGIN_MAX_FAILS:
locked_until = (datetime.now() + timedelta(minutes=LOGIN_LOCK_MINUTES)
).strftime('%Y-%m-%d %H:%M:%S')
db.execute(
'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) '
'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until',
(ip, fails, locked_until),
)
db.commit()
db.close()
def _reset_login_fails(ip):
db = get_db()
db.execute('DELETE FROM login_attempts WHERE ip = ?', (ip,))
db.commit()
db.close()
# --- SSRF-Schutz für die admin-konfigurierte Plesk-URL ---------------------
def validate_plesk_url(url):
"""(ok, fehlermeldung). Lässt nur http(s) zu und blockt Link-Local- /
Cloud-Metadata- (169.254.0.0/16), Multicast- und reservierte Ziele.
Private/Loopback bleiben erlaubt — Plesk läuft oft intern/am selben Host."""
p = urlparse(url)
if p.scheme not in ('http', 'https') or not p.hostname:
return False, 'Nur gültige http(s)-URLs erlaubt.'
try:
infos = socket.getaddrinfo(p.hostname, p.port or (443 if p.scheme == 'https' else 80))
except (socket.gaierror, UnicodeError):
return False, 'Hostname nicht auflösbar.'
for info in infos:
ip = ipaddress.ip_address(info[4][0])
if ip.is_link_local or ip.is_multicast or ip.is_reserved or ip.is_unspecified:
return False, 'Ziel-Adresse nicht erlaubt (Link-Local/Reserved).'
return True, ''
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
@app.route('/')
def index():
return redirect(url_for('dashboard') if 'admin_id' in session else url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
ip = request.remote_addr or 'unknown'
locked = _login_locked_for(ip)
if locked:
flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger')
return render_template('login.html')
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
db = get_db()
admin = db.execute(
'SELECT * FROM admin_users WHERE username = ?', (username,)
).fetchone()
db.close()
if admin and check_password_hash(admin['password_hash'], password):
_reset_login_fails(ip)
session.clear()
session['admin_id'] = admin['id']
session['admin_username'] = admin['username']
return redirect(url_for('dashboard'))
_record_login_fail(ip)
flash('Benutzername oder Passwort falsch.', 'danger')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@app.route('/dashboard')
@login_required
def dashboard():
db = get_db()
subdomains = db.execute('''
SELECT s.*, u.username, u.active,
(SELECT COUNT(*) FROM update_log l WHERE l.subdomain_id = s.id) AS update_count
FROM subdomains s
JOIN dyndns_users u ON s.dyndns_user_id = u.id
ORDER BY s.subdomain
''').fetchall()
user_count = db.execute('SELECT COUNT(*) AS c FROM dyndns_users').fetchone()['c']
logs = db.execute('''
SELECT l.*, s.subdomain, u.username AS dyndns_username
FROM update_log l
JOIN dyndns_users u ON l.dyndns_user_id = u.id
LEFT JOIN subdomains s ON l.subdomain_id = s.id
ORDER BY l.timestamp DESC
LIMIT 30
''').fetchall()
db.close()
base_domain = get_setting('plesk_base_domain')
return render_template('dashboard.html', subdomains=subdomains,
user_count=user_count, logs=logs, base_domain=base_domain)
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@app.route('/settings')
@login_required
def settings():
return render_template('settings.html',
plesk_url=get_setting('plesk_url'),
plesk_api_key=get_setting('plesk_api_key'),
plesk_base_domain=get_setting('plesk_base_domain'),
plesk_verify_ssl=get_setting('plesk_verify_ssl', '1'),
)
@app.route('/settings/plesk', methods=['POST'])
@login_required
def settings_plesk():
plesk_url = request.form.get('plesk_url', '').strip().rstrip('/')
plesk_api_key = request.form.get('plesk_api_key', '').strip()
plesk_base_domain = request.form.get('plesk_base_domain', '').strip()
plesk_verify_ssl = '1' if request.form.get('plesk_verify_ssl') else '0'
ok, err = validate_plesk_url(plesk_url)
if not ok:
flash(f'Plesk-URL abgelehnt: {err}', 'danger')
return redirect(url_for('settings'))
set_setting('plesk_url', plesk_url)
set_setting('plesk_api_key', plesk_api_key)
set_setting('plesk_base_domain', plesk_base_domain)
set_setting('plesk_verify_ssl', plesk_verify_ssl)
if 'test_connection' in request.form:
try:
info = test_connection(plesk_url, plesk_api_key, verify_ssl=(plesk_verify_ssl == '1'))
ver = info.get('version', '?')
flash(f'Verbindung OK — Plesk {ver}', 'success')
except Exception as exc:
app.logger.warning('Plesk-Verbindungstest fehlgeschlagen: %s', exc)
flash('Verbindungsfehler — bitte URL, API-Schlüssel und Erreichbarkeit prüfen.', 'danger')
else:
flash('Plesk-Einstellungen gespeichert.', 'success')
return redirect(url_for('settings'))
@app.route('/settings/password', methods=['POST'])
@login_required
def settings_password():
current = request.form.get('current_password', '')
new_pw = request.form.get('new_password', '')
new_pw2 = request.form.get('new_password2', '')
db = get_db()
admin = db.execute('SELECT * FROM admin_users WHERE id = ?', (session['admin_id'],)).fetchone()
if not check_password_hash(admin['password_hash'], current):
flash('Aktuelles Passwort falsch.', 'danger')
elif new_pw != new_pw2:
flash('Neue Passwörter stimmen nicht überein.', 'danger')
elif len(new_pw) < 6:
flash('Mindestens 6 Zeichen erforderlich.', 'danger')
else:
db.execute('UPDATE admin_users SET password_hash = ? WHERE id = ?',
(generate_password_hash(new_pw), session['admin_id']))
db.commit()
flash('Passwort geändert.', 'success')
db.close()
return redirect(url_for('settings'))
# ---------------------------------------------------------------------------
# Users
# ---------------------------------------------------------------------------
SUBDOMAIN_RE = re.compile(r'^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$')
def _parse_subdomains(raw):
"""Zerlegt eine Eingabe (Komma/Leerzeichen/Zeilenumbruch-getrennt) in
eine deduplizierte Liste gültiger Subdomain-Labels."""
names, seen = [], set()
for part in re.split(r'[\s,]+', raw.strip().lower()):
part = part.strip().rstrip('.')
if part and part not in seen:
seen.add(part)
names.append(part)
return names
@app.route('/users')
@login_required
def users():
db = get_db()
user_list = db.execute('SELECT * FROM dyndns_users ORDER BY username').fetchall()
subs = db.execute('SELECT * FROM subdomains ORDER BY subdomain').fetchall()
db.close()
by_user = {}
for s in subs:
by_user.setdefault(s['dyndns_user_id'], []).append(s)
user_list = [{'row': u, 'subdomains': by_user.get(u['id'], [])} for u in user_list]
base_domain = get_setting('plesk_base_domain')
return render_template('users.html', users=user_list, base_domain=base_domain)
@app.route('/users/add', methods=['POST'])
@login_required
def user_add():
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
names = _parse_subdomains(request.form.get('subdomains', ''))
if not username or not password or not names:
flash('Benutzername, Passwort und mindestens eine Subdomain sind erforderlich.', 'danger')
return redirect(url_for('users'))
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
if invalid:
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
return redirect(url_for('users'))
db = get_db()
try:
cur = db.execute(
'INSERT INTO dyndns_users (username, password_hash) VALUES (?, ?)',
(username, generate_password_hash(password)),
)
user_id = cur.lastrowid
for n in names:
db.execute(
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
(user_id, n),
)
db.commit()
flash(f'Benutzer "{username}" mit {len(names)} Subdomain(s) angelegt.', 'success')
except Exception as exc:
db.rollback()
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/edit', methods=['POST'])
@login_required
def user_edit(user_id):
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
db = get_db()
try:
if password:
db.execute(
'UPDATE dyndns_users SET username=?, password_hash=? WHERE id=?',
(username, generate_password_hash(password), user_id),
)
else:
db.execute('UPDATE dyndns_users SET username=? WHERE id=?', (username, user_id))
db.commit()
flash('Benutzer aktualisiert.', 'success')
except Exception as exc:
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/delete', methods=['POST'])
@login_required
def user_delete(user_id):
db = get_db()
db.execute('DELETE FROM update_log WHERE dyndns_user_id = ?', (user_id,))
db.execute('DELETE FROM subdomains WHERE dyndns_user_id = ?', (user_id,))
db.execute('DELETE FROM dyndns_users WHERE id = ?', (user_id,))
db.commit()
db.close()
flash('Benutzer gelöscht.', 'success')
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/subdomains/add', methods=['POST'])
@login_required
def subdomain_add(user_id):
names = _parse_subdomains(request.form.get('subdomains', ''))
if not names:
flash('Keine Subdomain angegeben.', 'danger')
return redirect(url_for('users'))
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
if invalid:
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
return redirect(url_for('users'))
db = get_db()
added = 0
try:
for n in names:
try:
db.execute(
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
(user_id, n),
)
added += 1
except sqlite3.IntegrityError:
flash(f'Subdomain "{n}" existiert bereits.', 'warning')
db.commit()
if added:
flash(f'{added} Subdomain(s) hinzugefügt.', 'success')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/subdomains/<int:subdomain_id>/delete', methods=['POST'])
@login_required
def subdomain_delete(subdomain_id):
db = get_db()
db.execute('UPDATE update_log SET subdomain_id = NULL WHERE subdomain_id = ?', (subdomain_id,))
db.execute('DELETE FROM subdomains WHERE id = ?', (subdomain_id,))
db.commit()
db.close()
flash('Subdomain gelöscht.', 'success')
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/toggle', methods=['POST'])
@login_required
def user_toggle(user_id):
db = get_db()
db.execute('UPDATE dyndns_users SET active = 1 - active WHERE id = ?', (user_id,))
db.commit()
db.close()
return redirect(url_for('users'))
# ---------------------------------------------------------------------------
# DynDNS v2 update endpoint (Speedport "Anderer Anbieter")
# ---------------------------------------------------------------------------
@app.route('/nic/update')
def dyndns_update():
auth = request.authorization
if not auth:
return Response(
'badauth',
401,
{'WWW-Authenticate': 'Basic realm="DynDNS Update"'},
mimetype='text/plain',
)
myip = request.args.get('myip') or request.args.get('ip') or request.remote_addr
db = get_db()
user = db.execute(
'SELECT * FROM dyndns_users WHERE username = ? AND active = 1',
(auth.username,),
).fetchone()
if not user or not check_password_hash(user['password_hash'], auth.password):
db.close()
return Response('badauth', 401, mimetype='text/plain')
plesk_url = get_setting('plesk_url')
plesk_api_key = get_setting('plesk_api_key')
plesk_base_domain = get_setting('plesk_base_domain')
plesk_verify_ssl = get_setting('plesk_verify_ssl', '1') == '1'
if not plesk_url or not plesk_api_key or not plesk_base_domain:
db.close()
return Response('911', 500, mimetype='text/plain')
subs = db.execute(
'SELECT * FROM subdomains WHERE dyndns_user_id = ?', (user['id'],)
).fetchall()
# Optional kann der Client per ?hostname= eine bestimmte Subdomain wählen
# (FQDN wie "mypc.example.com" oder nur das Label "mypc"). Mehrere durch
# Komma getrennt sind erlaubt. Ohne hostname werden ALLE aktualisiert.
base = plesk_base_domain.lower().rstrip('.')
requested = [h for h in re.split(r'[\s,]+', request.args.get('hostname', '').strip().lower()) if h]
if requested:
wanted = {h.rstrip('.') for h in requested}
targets = [
s for s in subs
if s['subdomain'] in wanted or f"{s['subdomain']}.{base}" in wanted
]
else:
targets = list(subs)
if not targets:
db.close()
return Response('nohost', 200, mimetype='text/plain')
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
lines = []
for s in targets:
old_ip = s['current_ip']
if old_ip == myip:
lines.append(f'nochg {myip}')
continue
try:
update_dns_record(
plesk_url, plesk_api_key, plesk_base_domain,
s['subdomain'], myip, verify_ssl=plesk_verify_ssl,
)
db.execute(
'UPDATE subdomains SET current_ip=?, last_updated=? WHERE id=?',
(myip, now, s['id']),
)
db.execute(
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
' VALUES (?,?,?,?,?)',
(user['id'], s['id'], old_ip, myip, 'good'),
)
lines.append(f'good {myip}')
except Exception as exc:
db.execute(
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
' VALUES (?,?,?,?,?)',
(user['id'], s['id'], old_ip, myip, f'error: {exc}'),
)
lines.append('dnserr')
db.commit()
db.close()
status = 500 if any(l == 'dnserr' for l in lines) else 200
return Response('\n'.join(lines) + '\n', status, mimetype='text/plain')
# ---------------------------------------------------------------------------
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=5000, debug=False)