import hmac import ipaddress import os import re import secrets import socket import sqlite3 from datetime import datetime, timedelta from functools import wraps from urllib.parse import urljoin, urlparse from flask import (Flask, Response, abort, flash, redirect, render_template, request, session, url_for) from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.security import check_password_hash, generate_password_hash from database import get_db, get_setting, init_db, set_setting from plesk import test_connection, update_dns_record app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', os.urandom(32).hex()) # Hinter dem nginx-Reverse-Proxy: X-Forwarded-* auswerten, damit # request.remote_addr die echte Client-IP liefert (für Login-Lockout & myip). app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) # Session-Cookie härten. SESSION_COOKIE_SECURE für lokale http-Tests via # Env abschaltbar (Default an, da Produktivbetrieb hinter HTTPS läuft). app.config.update( SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE='Lax', SESSION_COOKIE_SECURE=os.environ.get('SESSION_COOKIE_SECURE', '1') == '1', ) # Endpunkte ohne CSRF-Token-Prüfung (reine API/Basic-Auth, keine Cookies). CSRF_EXEMPT = {'dyndns_update'} # Login-Brute-Force-Schutz LOGIN_MAX_FAILS = 5 LOGIN_LOCK_MINUTES = 15 # Security-Header. CSS/JS sind ausgelagert (static/), daher kein 'unsafe-inline'. # Erlaubt sind nur die eigene Origin und das Bootstrap-CDN. CSP = ( "default-src 'self'; " "img-src 'self' data:; " "style-src 'self' https://cdn.jsdelivr.net; " "script-src 'self' https://cdn.jsdelivr.net; " "font-src 'self' https://cdn.jsdelivr.net; " "form-action 'self'; frame-ancestors 'none'; base-uri 'self'" ) @app.after_request def _security_headers(resp): resp.headers['X-Frame-Options'] = 'DENY' resp.headers['X-Content-Type-Options'] = 'nosniff' resp.headers['Referrer-Policy'] = 'no-referrer' resp.headers.setdefault('Content-Security-Policy', CSP) return resp # --------------------------------------------------------------------------- # CSRF-Schutz (schlanker Session-Token, ohne externe Abhängigkeit) # --------------------------------------------------------------------------- @app.before_request def _csrf_protect(): if request.endpoint in CSRF_EXEMPT: return if '_csrf_token' not in session: session['_csrf_token'] = secrets.token_urlsafe(32) if request.method in ('POST', 'PUT', 'PATCH', 'DELETE'): token = session.get('_csrf_token', '') sent = request.form.get('csrf_token', '') if not token or not sent or not hmac.compare_digest(token, sent): abort(400, 'CSRF-Token ungültig oder fehlt.') @app.context_processor def _inject_csrf(): return {'csrf_token': lambda: session.get('_csrf_token', '')} # --------------------------------------------------------------------------- # Eigene Fehlerseiten (verraten kein Framework, kein Stacktrace) # --------------------------------------------------------------------------- ERROR_MESSAGES = { 400: 'Ungültige Anfrage.', 403: 'Zugriff verweigert.', 404: 'Seite nicht gefunden.', 405: 'Methode nicht erlaubt.', 500: 'Interner Serverfehler.', } def _render_error(code): return render_template('error.html', code=code, message=ERROR_MESSAGES.get(code, 'Fehler')), code @app.errorhandler(400) def _err_400(e): return _render_error(400) @app.errorhandler(403) def _err_403(e): return _render_error(403) @app.errorhandler(404) def _err_404(e): return _render_error(404) @app.errorhandler(405) def _err_405(e): return _render_error(405) @app.errorhandler(500) def _err_500(e): return _render_error(500) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def login_required(f): @wraps(f) def wrapped(*args, **kwargs): if 'admin_id' not in session: return redirect(url_for('login', next=request.path)) return f(*args, **kwargs) return wrapped def is_safe_url(target): """True nur für relative Ziele auf derselben Origin (Open-Redirect-Schutz).""" if not target: return False ref = urlparse(request.host_url) test = urlparse(urljoin(request.host_url, target)) return test.scheme in ('http', 'https') and ref.netloc == test.netloc # --- Brute-Force-Schutz (DB-gestützt, pro Scope + Client-IP) --------------- # Der Schlüssel ist ":", damit Admin-Login ('login') und # DynDNS-Update ('dyndns') getrennt gezählt werden. def _attempt_key(scope, ip): return f'{scope}:{ip}' def _login_locked_for(scope, ip): """Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt.""" db = get_db() row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?', (_attempt_key(scope, ip),)).fetchone() db.close() if row and row['locked_until']: try: until = datetime.strptime(row['locked_until'], '%Y-%m-%d %H:%M:%S') except ValueError: return 0 remaining = (until - datetime.now()).total_seconds() return int(remaining) if remaining > 0 else 0 return 0 def _record_login_fail(scope, ip): key = _attempt_key(scope, ip) db = get_db() row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (key,)).fetchone() fails = (row['fails'] if row else 0) + 1 locked_until = None if fails >= LOGIN_MAX_FAILS: locked_until = (datetime.now() + timedelta(minutes=LOGIN_LOCK_MINUTES) ).strftime('%Y-%m-%d %H:%M:%S') db.execute( 'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) ' 'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until', (key, fails, locked_until), ) db.commit() db.close() def _reset_login_fails(scope, ip): db = get_db() db.execute('DELETE FROM login_attempts WHERE ip = ?', (_attempt_key(scope, ip),)) db.commit() db.close() # --- SSRF-Schutz für die admin-konfigurierte Plesk-URL --------------------- def validate_plesk_url(url): """(ok, fehlermeldung). Lässt nur http(s) zu und blockt Link-Local- / Cloud-Metadata- (169.254.0.0/16), Multicast- und reservierte Ziele. Private/Loopback bleiben erlaubt — Plesk läuft oft intern/am selben Host.""" p = urlparse(url) if p.scheme not in ('http', 'https') or not p.hostname: return False, 'Nur gültige http(s)-URLs erlaubt.' try: infos = socket.getaddrinfo(p.hostname, p.port or (443 if p.scheme == 'https' else 80)) except (socket.gaierror, UnicodeError): return False, 'Hostname nicht auflösbar.' for info in infos: ip = ipaddress.ip_address(info[4][0]) if ip.is_link_local or ip.is_multicast or ip.is_reserved or ip.is_unspecified: return False, 'Ziel-Adresse nicht erlaubt (Link-Local/Reserved).' return True, '' # --------------------------------------------------------------------------- # Auth # --------------------------------------------------------------------------- @app.route('/') def index(): return redirect(url_for('dashboard') if 'admin_id' in session else url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': ip = request.remote_addr or 'unknown' locked = _login_locked_for('login', ip) if locked: flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger') return render_template('login.html') username = request.form.get('username', '').strip() password = request.form.get('password', '') db = get_db() admin = db.execute( 'SELECT * FROM admin_users WHERE username = ?', (username,) ).fetchone() db.close() if admin and check_password_hash(admin['password_hash'], password): _reset_login_fails('login', ip) session.clear() session['admin_id'] = admin['id'] session['admin_username'] = admin['username'] nxt = request.form.get('next') or request.args.get('next') return redirect(nxt if is_safe_url(nxt) else url_for('dashboard')) _record_login_fail('login', ip) flash('Benutzername oder Passwort falsch.', 'danger') return render_template('login.html') @app.route('/logout') def logout(): session.clear() return redirect(url_for('login')) # --------------------------------------------------------------------------- # Dashboard # --------------------------------------------------------------------------- @app.route('/dashboard') @login_required def dashboard(): db = get_db() subdomains = db.execute(''' SELECT s.*, u.username, u.active, (SELECT COUNT(*) FROM update_log l WHERE l.subdomain_id = s.id) AS update_count FROM subdomains s JOIN dyndns_users u ON s.dyndns_user_id = u.id ORDER BY s.subdomain ''').fetchall() user_count = db.execute('SELECT COUNT(*) AS c FROM dyndns_users').fetchone()['c'] logs = db.execute(''' SELECT l.*, s.subdomain, u.username AS dyndns_username FROM update_log l JOIN dyndns_users u ON l.dyndns_user_id = u.id LEFT JOIN subdomains s ON l.subdomain_id = s.id ORDER BY l.timestamp DESC LIMIT 30 ''').fetchall() db.close() base_domain = get_setting('plesk_base_domain') return render_template('dashboard.html', subdomains=subdomains, user_count=user_count, logs=logs, base_domain=base_domain) # --------------------------------------------------------------------------- # Settings # --------------------------------------------------------------------------- @app.route('/settings') @login_required def settings(): return render_template('settings.html', plesk_url=get_setting('plesk_url'), plesk_api_key=get_setting('plesk_api_key'), plesk_base_domain=get_setting('plesk_base_domain'), plesk_verify_ssl=get_setting('plesk_verify_ssl', '1'), ) @app.route('/settings/plesk', methods=['POST']) @login_required def settings_plesk(): plesk_url = request.form.get('plesk_url', '').strip().rstrip('/') plesk_api_key = request.form.get('plesk_api_key', '').strip() plesk_base_domain = request.form.get('plesk_base_domain', '').strip() plesk_verify_ssl = '1' if request.form.get('plesk_verify_ssl') else '0' ok, err = validate_plesk_url(plesk_url) if not ok: flash(f'Plesk-URL abgelehnt: {err}', 'danger') return redirect(url_for('settings')) set_setting('plesk_url', plesk_url) set_setting('plesk_api_key', plesk_api_key) set_setting('plesk_base_domain', plesk_base_domain) set_setting('plesk_verify_ssl', plesk_verify_ssl) if 'test_connection' in request.form: try: info = test_connection(plesk_url, plesk_api_key, verify_ssl=(plesk_verify_ssl == '1')) ver = info.get('version', '?') flash(f'Verbindung OK — Plesk {ver}', 'success') except Exception as exc: app.logger.warning('Plesk-Verbindungstest fehlgeschlagen: %s', exc) flash('Verbindungsfehler — bitte URL, API-Schlüssel und Erreichbarkeit prüfen.', 'danger') else: flash('Plesk-Einstellungen gespeichert.', 'success') return redirect(url_for('settings')) @app.route('/settings/password', methods=['POST']) @login_required def settings_password(): current = request.form.get('current_password', '') new_pw = request.form.get('new_password', '') new_pw2 = request.form.get('new_password2', '') db = get_db() admin = db.execute('SELECT * FROM admin_users WHERE id = ?', (session['admin_id'],)).fetchone() if not check_password_hash(admin['password_hash'], current): flash('Aktuelles Passwort falsch.', 'danger') elif new_pw != new_pw2: flash('Neue Passwörter stimmen nicht überein.', 'danger') elif len(new_pw) < 6: flash('Mindestens 6 Zeichen erforderlich.', 'danger') else: db.execute('UPDATE admin_users SET password_hash = ? WHERE id = ?', (generate_password_hash(new_pw), session['admin_id'])) db.commit() flash('Passwort geändert.', 'success') db.close() return redirect(url_for('settings')) # --------------------------------------------------------------------------- # Users # --------------------------------------------------------------------------- SUBDOMAIN_RE = re.compile(r'^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$') def _parse_subdomains(raw): """Zerlegt eine Eingabe (Komma/Leerzeichen/Zeilenumbruch-getrennt) in eine deduplizierte Liste gültiger Subdomain-Labels.""" names, seen = [], set() for part in re.split(r'[\s,]+', raw.strip().lower()): part = part.strip().rstrip('.') if part and part not in seen: seen.add(part) names.append(part) return names @app.route('/users') @login_required def users(): db = get_db() user_list = db.execute('SELECT * FROM dyndns_users ORDER BY username').fetchall() subs = db.execute('SELECT * FROM subdomains ORDER BY subdomain').fetchall() db.close() by_user = {} for s in subs: by_user.setdefault(s['dyndns_user_id'], []).append(s) user_list = [{'row': u, 'subdomains': by_user.get(u['id'], [])} for u in user_list] base_domain = get_setting('plesk_base_domain') return render_template('users.html', users=user_list, base_domain=base_domain) @app.route('/users/add', methods=['POST']) @login_required def user_add(): username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() names = _parse_subdomains(request.form.get('subdomains', '')) if not username or not password or not names: flash('Benutzername, Passwort und mindestens eine Subdomain sind erforderlich.', 'danger') return redirect(url_for('users')) invalid = [n for n in names if not SUBDOMAIN_RE.match(n)] if invalid: flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger') return redirect(url_for('users')) db = get_db() try: cur = db.execute( 'INSERT INTO dyndns_users (username, password_hash) VALUES (?, ?)', (username, generate_password_hash(password)), ) user_id = cur.lastrowid for n in names: db.execute( 'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)', (user_id, n), ) db.commit() flash(f'Benutzer "{username}" mit {len(names)} Subdomain(s) angelegt.', 'success') except Exception as exc: db.rollback() flash(f'Fehler: {exc}', 'danger') finally: db.close() return redirect(url_for('users')) @app.route('/users//edit', methods=['POST']) @login_required def user_edit(user_id): username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() db = get_db() try: if password: db.execute( 'UPDATE dyndns_users SET username=?, password_hash=? WHERE id=?', (username, generate_password_hash(password), user_id), ) else: db.execute('UPDATE dyndns_users SET username=? WHERE id=?', (username, user_id)) db.commit() flash('Benutzer aktualisiert.', 'success') except Exception as exc: flash(f'Fehler: {exc}', 'danger') finally: db.close() return redirect(url_for('users')) @app.route('/users//delete', methods=['POST']) @login_required def user_delete(user_id): db = get_db() db.execute('DELETE FROM update_log WHERE dyndns_user_id = ?', (user_id,)) db.execute('DELETE FROM subdomains WHERE dyndns_user_id = ?', (user_id,)) db.execute('DELETE FROM dyndns_users WHERE id = ?', (user_id,)) db.commit() db.close() flash('Benutzer gelöscht.', 'success') return redirect(url_for('users')) @app.route('/users//subdomains/add', methods=['POST']) @login_required def subdomain_add(user_id): names = _parse_subdomains(request.form.get('subdomains', '')) if not names: flash('Keine Subdomain angegeben.', 'danger') return redirect(url_for('users')) invalid = [n for n in names if not SUBDOMAIN_RE.match(n)] if invalid: flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger') return redirect(url_for('users')) db = get_db() added = 0 try: for n in names: try: db.execute( 'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)', (user_id, n), ) added += 1 except sqlite3.IntegrityError: flash(f'Subdomain "{n}" existiert bereits.', 'warning') db.commit() if added: flash(f'{added} Subdomain(s) hinzugefügt.', 'success') finally: db.close() return redirect(url_for('users')) @app.route('/subdomains//delete', methods=['POST']) @login_required def subdomain_delete(subdomain_id): db = get_db() db.execute('UPDATE update_log SET subdomain_id = NULL WHERE subdomain_id = ?', (subdomain_id,)) db.execute('DELETE FROM subdomains WHERE id = ?', (subdomain_id,)) db.commit() db.close() flash('Subdomain gelöscht.', 'success') return redirect(url_for('users')) @app.route('/users//toggle', methods=['POST']) @login_required def user_toggle(user_id): db = get_db() db.execute('UPDATE dyndns_users SET active = 1 - active WHERE id = ?', (user_id,)) db.commit() db.close() return redirect(url_for('users')) # --------------------------------------------------------------------------- # DynDNS v2 update endpoint (Speedport "Anderer Anbieter") # --------------------------------------------------------------------------- @app.route('/nic/update') def dyndns_update(): ip = request.remote_addr or 'unknown' # Brute-Force-Schutz: gesperrte Client-IPs sofort abweisen (dyndns2: "abuse") if _login_locked_for('dyndns', ip): return Response('abuse', 403, mimetype='text/plain') auth = request.authorization if not auth: _record_login_fail('dyndns', ip) return Response( 'badauth', 401, {'WWW-Authenticate': 'Basic realm="DynDNS Update"'}, mimetype='text/plain', ) myip = request.args.get('myip') or request.args.get('ip') or ip db = get_db() user = db.execute( 'SELECT * FROM dyndns_users WHERE username = ? AND active = 1', (auth.username,), ).fetchone() if not user or not check_password_hash(user['password_hash'], auth.password): db.close() _record_login_fail('dyndns', ip) return Response('badauth', 401, mimetype='text/plain') # Erfolgreiche Authentifizierung -> Fehlversuchszähler zurücksetzen _reset_login_fails('dyndns', ip) plesk_url = get_setting('plesk_url') plesk_api_key = get_setting('plesk_api_key') plesk_base_domain = get_setting('plesk_base_domain') plesk_verify_ssl = get_setting('plesk_verify_ssl', '1') == '1' if not plesk_url or not plesk_api_key or not plesk_base_domain: db.close() return Response('911', 500, mimetype='text/plain') subs = db.execute( 'SELECT * FROM subdomains WHERE dyndns_user_id = ?', (user['id'],) ).fetchall() # Optional kann der Client per ?hostname= eine bestimmte Subdomain wählen # (FQDN wie "mypc.example.com" oder nur das Label "mypc"). Mehrere durch # Komma getrennt sind erlaubt. Ohne hostname werden ALLE aktualisiert. base = plesk_base_domain.lower().rstrip('.') requested = [h for h in re.split(r'[\s,]+', request.args.get('hostname', '').strip().lower()) if h] if requested: wanted = {h.rstrip('.') for h in requested} targets = [ s for s in subs if s['subdomain'] in wanted or f"{s['subdomain']}.{base}" in wanted ] else: targets = list(subs) if not targets: db.close() return Response('nohost', 200, mimetype='text/plain') now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') lines = [] for s in targets: old_ip = s['current_ip'] if old_ip == myip: lines.append(f'nochg {myip}') continue try: update_dns_record( plesk_url, plesk_api_key, plesk_base_domain, s['subdomain'], myip, verify_ssl=plesk_verify_ssl, ) db.execute( 'UPDATE subdomains SET current_ip=?, last_updated=? WHERE id=?', (myip, now, s['id']), ) db.execute( 'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)' ' VALUES (?,?,?,?,?)', (user['id'], s['id'], old_ip, myip, 'good'), ) lines.append(f'good {myip}') except Exception as exc: db.execute( 'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)' ' VALUES (?,?,?,?,?)', (user['id'], s['id'], old_ip, myip, f'error: {exc}'), ) lines.append('dnserr') db.commit() db.close() status = 500 if any(l == 'dnserr' for l in lines) else 200 return Response('\n'.join(lines) + '\n', status, mimetype='text/plain') # --------------------------------------------------------------------------- if __name__ == '__main__': init_db() app.run(host='0.0.0.0', port=5000, debug=False)