eccce7e539
- F-15 CSRF-Logout: /logout nur noch via POST mit CSRF-Token; Sidebar-Link ist jetzt ein POST-Formular. Schuetzt vor Cross-Site-Logout (SameSite=Lax greift bei Top-Level-GET nicht). - F-16 SRI: Subresource-Integrity-Hashes (sha384) + crossorigin fuer alle CDN-Ressourcen (Bootstrap CSS/JS, Bootstrap-Icons). - F-17: Permissions-Policy-Header (deaktiviert ungenutzte Browser-Features). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
652 lines
22 KiB
Python
652 lines
22 KiB
Python
import hmac
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
import secrets
|
|
import socket
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
from flask import (Flask, Response, abort, flash, redirect, render_template,
|
|
request, session, url_for)
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from database import get_db, get_setting, init_db, set_setting
|
|
from plesk import test_connection, update_dns_record
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(32).hex())
|
|
|
|
# Hinter dem nginx-Reverse-Proxy: X-Forwarded-* auswerten, damit
|
|
# request.remote_addr die echte Client-IP liefert (für Login-Lockout & myip).
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
|
|
|
|
# Session-Cookie härten. SESSION_COOKIE_SECURE für lokale http-Tests via
|
|
# Env abschaltbar (Default an, da Produktivbetrieb hinter HTTPS läuft).
|
|
app.config.update(
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE='Lax',
|
|
SESSION_COOKIE_SECURE=os.environ.get('SESSION_COOKIE_SECURE', '1') == '1',
|
|
)
|
|
|
|
# Endpunkte ohne CSRF-Token-Prüfung (reine API/Basic-Auth, keine Cookies).
|
|
CSRF_EXEMPT = {'dyndns_update'}
|
|
|
|
# Login-Brute-Force-Schutz
|
|
LOGIN_MAX_FAILS = 5
|
|
LOGIN_LOCK_MINUTES = 15
|
|
|
|
# Security-Header. CSS/JS sind ausgelagert (static/), daher kein 'unsafe-inline'.
|
|
# Erlaubt sind nur die eigene Origin und das Bootstrap-CDN.
|
|
CSP = (
|
|
"default-src 'self'; "
|
|
"img-src 'self' data:; "
|
|
"style-src 'self' https://cdn.jsdelivr.net; "
|
|
"script-src 'self' https://cdn.jsdelivr.net; "
|
|
"font-src 'self' https://cdn.jsdelivr.net; "
|
|
"form-action 'self'; frame-ancestors 'none'; base-uri 'self'"
|
|
)
|
|
|
|
|
|
@app.after_request
|
|
def _security_headers(resp):
|
|
resp.headers['X-Frame-Options'] = 'DENY'
|
|
resp.headers['X-Content-Type-Options'] = 'nosniff'
|
|
resp.headers['Referrer-Policy'] = 'no-referrer'
|
|
resp.headers['Permissions-Policy'] = (
|
|
'geolocation=(), camera=(), microphone=(), payment=(), usb=(), '
|
|
'accelerometer=(), gyroscope=(), magnetometer=()'
|
|
)
|
|
resp.headers.setdefault('Content-Security-Policy', CSP)
|
|
# HTML-Seiten (Formulare/Session-Daten) nicht cachen lassen.
|
|
if resp.mimetype == 'text/html':
|
|
resp.headers['Cache-Control'] = 'no-store'
|
|
return resp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CSRF-Schutz (schlanker Session-Token, ohne externe Abhängigkeit)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.before_request
|
|
def _csrf_protect():
|
|
if request.endpoint in CSRF_EXEMPT:
|
|
return
|
|
if '_csrf_token' not in session:
|
|
session['_csrf_token'] = secrets.token_urlsafe(32)
|
|
if request.method in ('POST', 'PUT', 'PATCH', 'DELETE'):
|
|
token = session.get('_csrf_token', '')
|
|
sent = request.form.get('csrf_token', '')
|
|
if not token or not sent or not hmac.compare_digest(token, sent):
|
|
abort(400, 'CSRF-Token ungültig oder fehlt.')
|
|
|
|
|
|
@app.context_processor
|
|
def _inject_csrf():
|
|
return {'csrf_token': lambda: session.get('_csrf_token', '')}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Eigene Fehlerseiten (verraten kein Framework, kein Stacktrace)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ERROR_MESSAGES = {
|
|
400: 'Ungültige Anfrage.',
|
|
403: 'Zugriff verweigert.',
|
|
404: 'Seite nicht gefunden.',
|
|
405: 'Methode nicht erlaubt.',
|
|
500: 'Interner Serverfehler.',
|
|
}
|
|
|
|
|
|
def _render_error(code):
|
|
return render_template('error.html', code=code,
|
|
message=ERROR_MESSAGES.get(code, 'Fehler')), code
|
|
|
|
|
|
@app.errorhandler(400)
|
|
def _err_400(e):
|
|
return _render_error(400)
|
|
|
|
|
|
@app.errorhandler(403)
|
|
def _err_403(e):
|
|
return _render_error(403)
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def _err_404(e):
|
|
return _render_error(404)
|
|
|
|
|
|
@app.errorhandler(405)
|
|
def _err_405(e):
|
|
return _render_error(405)
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def _err_500(e):
|
|
return _render_error(500)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
if 'admin_id' not in session:
|
|
return redirect(url_for('login', next=request.path))
|
|
return f(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def is_safe_url(target):
|
|
"""True nur für relative Ziele auf derselben Origin (Open-Redirect-Schutz)."""
|
|
if not target:
|
|
return False
|
|
ref = urlparse(request.host_url)
|
|
test = urlparse(urljoin(request.host_url, target))
|
|
return test.scheme in ('http', 'https') and ref.netloc == test.netloc
|
|
|
|
|
|
# --- Brute-Force-Schutz (DB-gestützt, pro Scope + Client-IP) ---------------
|
|
# Der Schlüssel ist "<scope>:<ip>", damit Admin-Login ('login') und
|
|
# DynDNS-Update ('dyndns') getrennt gezählt werden.
|
|
|
|
def _attempt_key(scope, ip):
|
|
return f'{scope}:{ip}'
|
|
|
|
|
|
def _login_locked_for(scope, ip):
|
|
"""Sekunden bis zur Entsperrung, oder 0 wenn nicht gesperrt."""
|
|
db = get_db()
|
|
row = db.execute('SELECT locked_until FROM login_attempts WHERE ip = ?',
|
|
(_attempt_key(scope, ip),)).fetchone()
|
|
db.close()
|
|
if row and row['locked_until']:
|
|
try:
|
|
until = datetime.strptime(row['locked_until'], '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
return 0
|
|
remaining = (until - datetime.now()).total_seconds()
|
|
return int(remaining) if remaining > 0 else 0
|
|
return 0
|
|
|
|
|
|
def _record_login_fail(scope, ip):
|
|
key = _attempt_key(scope, ip)
|
|
db = get_db()
|
|
row = db.execute('SELECT fails FROM login_attempts WHERE ip = ?', (key,)).fetchone()
|
|
fails = (row['fails'] if row else 0) + 1
|
|
locked_until = None
|
|
if fails >= LOGIN_MAX_FAILS:
|
|
locked_until = (datetime.now() + timedelta(minutes=LOGIN_LOCK_MINUTES)
|
|
).strftime('%Y-%m-%d %H:%M:%S')
|
|
db.execute(
|
|
'INSERT INTO login_attempts (ip, fails, locked_until) VALUES (?, ?, ?) '
|
|
'ON CONFLICT(ip) DO UPDATE SET fails = excluded.fails, locked_until = excluded.locked_until',
|
|
(key, fails, locked_until),
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
def _reset_login_fails(scope, ip):
|
|
db = get_db()
|
|
db.execute('DELETE FROM login_attempts WHERE ip = ?', (_attempt_key(scope, ip),))
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
# --- SSRF-Schutz für die admin-konfigurierte Plesk-URL ---------------------
|
|
|
|
def validate_plesk_url(url):
|
|
"""(ok, fehlermeldung). Lässt nur http(s) zu und blockt Link-Local- /
|
|
Cloud-Metadata- (169.254.0.0/16), Multicast- und reservierte Ziele.
|
|
Private/Loopback bleiben erlaubt — Plesk läuft oft intern/am selben Host."""
|
|
p = urlparse(url)
|
|
if p.scheme not in ('http', 'https') or not p.hostname:
|
|
return False, 'Nur gültige http(s)-URLs erlaubt.'
|
|
try:
|
|
infos = socket.getaddrinfo(p.hostname, p.port or (443 if p.scheme == 'https' else 80))
|
|
except (socket.gaierror, UnicodeError):
|
|
return False, 'Hostname nicht auflösbar.'
|
|
for info in infos:
|
|
ip = ipaddress.ip_address(info[4][0])
|
|
if ip.is_link_local or ip.is_multicast or ip.is_reserved or ip.is_unspecified:
|
|
return False, 'Ziel-Adresse nicht erlaubt (Link-Local/Reserved).'
|
|
return True, ''
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return redirect(url_for('dashboard') if 'admin_id' in session else url_for('login'))
|
|
|
|
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
return app.send_static_file('favicon.svg')
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
ip = request.remote_addr or 'unknown'
|
|
locked = _login_locked_for('login', ip)
|
|
if locked:
|
|
flash(f'Zu viele Fehlversuche. Bitte in {locked // 60 + 1} Minuten erneut versuchen.', 'danger')
|
|
return render_template('login.html')
|
|
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '')
|
|
db = get_db()
|
|
admin = db.execute(
|
|
'SELECT * FROM admin_users WHERE username = ?', (username,)
|
|
).fetchone()
|
|
db.close()
|
|
if admin and check_password_hash(admin['password_hash'], password):
|
|
_reset_login_fails('login', ip)
|
|
session.clear()
|
|
session['admin_id'] = admin['id']
|
|
session['admin_username'] = admin['username']
|
|
nxt = request.form.get('next') or request.args.get('next')
|
|
return redirect(nxt if is_safe_url(nxt) else url_for('dashboard'))
|
|
|
|
_record_login_fail('login', ip)
|
|
flash('Benutzername oder Passwort falsch.', 'danger')
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/logout', methods=['POST'])
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dashboard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
db = get_db()
|
|
subdomains = db.execute('''
|
|
SELECT s.*, u.username, u.active,
|
|
(SELECT COUNT(*) FROM update_log l WHERE l.subdomain_id = s.id) AS update_count
|
|
FROM subdomains s
|
|
JOIN dyndns_users u ON s.dyndns_user_id = u.id
|
|
ORDER BY s.subdomain
|
|
''').fetchall()
|
|
user_count = db.execute('SELECT COUNT(*) AS c FROM dyndns_users').fetchone()['c']
|
|
logs = db.execute('''
|
|
SELECT l.*, s.subdomain, u.username AS dyndns_username
|
|
FROM update_log l
|
|
JOIN dyndns_users u ON l.dyndns_user_id = u.id
|
|
LEFT JOIN subdomains s ON l.subdomain_id = s.id
|
|
ORDER BY l.timestamp DESC
|
|
LIMIT 30
|
|
''').fetchall()
|
|
db.close()
|
|
base_domain = get_setting('plesk_base_domain')
|
|
return render_template('dashboard.html', subdomains=subdomains,
|
|
user_count=user_count, logs=logs, base_domain=base_domain)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route('/settings')
|
|
@login_required
|
|
def settings():
|
|
return render_template('settings.html',
|
|
plesk_url=get_setting('plesk_url'),
|
|
plesk_api_key=get_setting('plesk_api_key'),
|
|
plesk_base_domain=get_setting('plesk_base_domain'),
|
|
plesk_verify_ssl=get_setting('plesk_verify_ssl', '1'),
|
|
)
|
|
|
|
|
|
@app.route('/settings/plesk', methods=['POST'])
|
|
@login_required
|
|
def settings_plesk():
|
|
plesk_url = request.form.get('plesk_url', '').strip().rstrip('/')
|
|
plesk_api_key = request.form.get('plesk_api_key', '').strip()
|
|
plesk_base_domain = request.form.get('plesk_base_domain', '').strip()
|
|
plesk_verify_ssl = '1' if request.form.get('plesk_verify_ssl') else '0'
|
|
|
|
ok, err = validate_plesk_url(plesk_url)
|
|
if not ok:
|
|
flash(f'Plesk-URL abgelehnt: {err}', 'danger')
|
|
return redirect(url_for('settings'))
|
|
|
|
set_setting('plesk_url', plesk_url)
|
|
set_setting('plesk_api_key', plesk_api_key)
|
|
set_setting('plesk_base_domain', plesk_base_domain)
|
|
set_setting('plesk_verify_ssl', plesk_verify_ssl)
|
|
|
|
if 'test_connection' in request.form:
|
|
try:
|
|
info = test_connection(plesk_url, plesk_api_key, verify_ssl=(plesk_verify_ssl == '1'))
|
|
ver = info.get('version', '?')
|
|
flash(f'Verbindung OK — Plesk {ver}', 'success')
|
|
except Exception as exc:
|
|
app.logger.warning('Plesk-Verbindungstest fehlgeschlagen: %s', exc)
|
|
flash('Verbindungsfehler — bitte URL, API-Schlüssel und Erreichbarkeit prüfen.', 'danger')
|
|
else:
|
|
flash('Plesk-Einstellungen gespeichert.', 'success')
|
|
|
|
return redirect(url_for('settings'))
|
|
|
|
|
|
@app.route('/settings/password', methods=['POST'])
|
|
@login_required
|
|
def settings_password():
|
|
current = request.form.get('current_password', '')
|
|
new_pw = request.form.get('new_password', '')
|
|
new_pw2 = request.form.get('new_password2', '')
|
|
|
|
db = get_db()
|
|
admin = db.execute('SELECT * FROM admin_users WHERE id = ?', (session['admin_id'],)).fetchone()
|
|
|
|
if not check_password_hash(admin['password_hash'], current):
|
|
flash('Aktuelles Passwort falsch.', 'danger')
|
|
elif new_pw != new_pw2:
|
|
flash('Neue Passwörter stimmen nicht überein.', 'danger')
|
|
elif len(new_pw) < 6:
|
|
flash('Mindestens 6 Zeichen erforderlich.', 'danger')
|
|
else:
|
|
db.execute('UPDATE admin_users SET password_hash = ? WHERE id = ?',
|
|
(generate_password_hash(new_pw), session['admin_id']))
|
|
db.commit()
|
|
flash('Passwort geändert.', 'success')
|
|
|
|
db.close()
|
|
return redirect(url_for('settings'))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Users
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SUBDOMAIN_RE = re.compile(r'^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$')
|
|
|
|
|
|
def _parse_subdomains(raw):
|
|
"""Zerlegt eine Eingabe (Komma/Leerzeichen/Zeilenumbruch-getrennt) in
|
|
eine deduplizierte Liste gültiger Subdomain-Labels."""
|
|
names, seen = [], set()
|
|
for part in re.split(r'[\s,]+', raw.strip().lower()):
|
|
part = part.strip().rstrip('.')
|
|
if part and part not in seen:
|
|
seen.add(part)
|
|
names.append(part)
|
|
return names
|
|
|
|
|
|
@app.route('/users')
|
|
@login_required
|
|
def users():
|
|
db = get_db()
|
|
user_list = db.execute('SELECT * FROM dyndns_users ORDER BY username').fetchall()
|
|
subs = db.execute('SELECT * FROM subdomains ORDER BY subdomain').fetchall()
|
|
db.close()
|
|
by_user = {}
|
|
for s in subs:
|
|
by_user.setdefault(s['dyndns_user_id'], []).append(s)
|
|
user_list = [{'row': u, 'subdomains': by_user.get(u['id'], [])} for u in user_list]
|
|
base_domain = get_setting('plesk_base_domain')
|
|
return render_template('users.html', users=user_list, base_domain=base_domain)
|
|
|
|
|
|
@app.route('/users/add', methods=['POST'])
|
|
@login_required
|
|
def user_add():
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
names = _parse_subdomains(request.form.get('subdomains', ''))
|
|
|
|
if not username or not password or not names:
|
|
flash('Benutzername, Passwort und mindestens eine Subdomain sind erforderlich.', 'danger')
|
|
return redirect(url_for('users'))
|
|
|
|
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
|
|
if invalid:
|
|
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
|
|
return redirect(url_for('users'))
|
|
|
|
db = get_db()
|
|
try:
|
|
cur = db.execute(
|
|
'INSERT INTO dyndns_users (username, password_hash) VALUES (?, ?)',
|
|
(username, generate_password_hash(password)),
|
|
)
|
|
user_id = cur.lastrowid
|
|
for n in names:
|
|
db.execute(
|
|
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
|
|
(user_id, n),
|
|
)
|
|
db.commit()
|
|
flash(f'Benutzer "{username}" mit {len(names)} Subdomain(s) angelegt.', 'success')
|
|
except Exception as exc:
|
|
db.rollback()
|
|
flash(f'Fehler: {exc}', 'danger')
|
|
finally:
|
|
db.close()
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
@app.route('/users/<int:user_id>/edit', methods=['POST'])
|
|
@login_required
|
|
def user_edit(user_id):
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
|
|
db = get_db()
|
|
try:
|
|
if password:
|
|
db.execute(
|
|
'UPDATE dyndns_users SET username=?, password_hash=? WHERE id=?',
|
|
(username, generate_password_hash(password), user_id),
|
|
)
|
|
else:
|
|
db.execute('UPDATE dyndns_users SET username=? WHERE id=?', (username, user_id))
|
|
db.commit()
|
|
flash('Benutzer aktualisiert.', 'success')
|
|
except Exception as exc:
|
|
flash(f'Fehler: {exc}', 'danger')
|
|
finally:
|
|
db.close()
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
@app.route('/users/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def user_delete(user_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM update_log WHERE dyndns_user_id = ?', (user_id,))
|
|
db.execute('DELETE FROM subdomains WHERE dyndns_user_id = ?', (user_id,))
|
|
db.execute('DELETE FROM dyndns_users WHERE id = ?', (user_id,))
|
|
db.commit()
|
|
db.close()
|
|
flash('Benutzer gelöscht.', 'success')
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
@app.route('/users/<int:user_id>/subdomains/add', methods=['POST'])
|
|
@login_required
|
|
def subdomain_add(user_id):
|
|
names = _parse_subdomains(request.form.get('subdomains', ''))
|
|
if not names:
|
|
flash('Keine Subdomain angegeben.', 'danger')
|
|
return redirect(url_for('users'))
|
|
|
|
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
|
|
if invalid:
|
|
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
|
|
return redirect(url_for('users'))
|
|
|
|
db = get_db()
|
|
added = 0
|
|
try:
|
|
for n in names:
|
|
try:
|
|
db.execute(
|
|
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
|
|
(user_id, n),
|
|
)
|
|
added += 1
|
|
except sqlite3.IntegrityError:
|
|
flash(f'Subdomain "{n}" existiert bereits.', 'warning')
|
|
db.commit()
|
|
if added:
|
|
flash(f'{added} Subdomain(s) hinzugefügt.', 'success')
|
|
finally:
|
|
db.close()
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
@app.route('/subdomains/<int:subdomain_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def subdomain_delete(subdomain_id):
|
|
db = get_db()
|
|
db.execute('UPDATE update_log SET subdomain_id = NULL WHERE subdomain_id = ?', (subdomain_id,))
|
|
db.execute('DELETE FROM subdomains WHERE id = ?', (subdomain_id,))
|
|
db.commit()
|
|
db.close()
|
|
flash('Subdomain gelöscht.', 'success')
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
@app.route('/users/<int:user_id>/toggle', methods=['POST'])
|
|
@login_required
|
|
def user_toggle(user_id):
|
|
db = get_db()
|
|
db.execute('UPDATE dyndns_users SET active = 1 - active WHERE id = ?', (user_id,))
|
|
db.commit()
|
|
db.close()
|
|
return redirect(url_for('users'))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DynDNS v2 update endpoint (Speedport "Anderer Anbieter")
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route('/nic/update')
|
|
def dyndns_update():
|
|
ip = request.remote_addr or 'unknown'
|
|
|
|
# Brute-Force-Schutz: gesperrte Client-IPs sofort abweisen (dyndns2: "abuse")
|
|
if _login_locked_for('dyndns', ip):
|
|
return Response('abuse', 403, mimetype='text/plain')
|
|
|
|
auth = request.authorization
|
|
if not auth:
|
|
_record_login_fail('dyndns', ip)
|
|
return Response(
|
|
'badauth',
|
|
401,
|
|
{'WWW-Authenticate': 'Basic realm="DynDNS Update"'},
|
|
mimetype='text/plain',
|
|
)
|
|
|
|
myip = request.args.get('myip') or request.args.get('ip') or ip
|
|
|
|
db = get_db()
|
|
user = db.execute(
|
|
'SELECT * FROM dyndns_users WHERE username = ? AND active = 1',
|
|
(auth.username,),
|
|
).fetchone()
|
|
|
|
if not user or not check_password_hash(user['password_hash'], auth.password):
|
|
db.close()
|
|
_record_login_fail('dyndns', ip)
|
|
return Response('badauth', 401, mimetype='text/plain')
|
|
|
|
# Erfolgreiche Authentifizierung -> Fehlversuchszähler zurücksetzen
|
|
_reset_login_fails('dyndns', ip)
|
|
|
|
plesk_url = get_setting('plesk_url')
|
|
plesk_api_key = get_setting('plesk_api_key')
|
|
plesk_base_domain = get_setting('plesk_base_domain')
|
|
plesk_verify_ssl = get_setting('plesk_verify_ssl', '1') == '1'
|
|
|
|
if not plesk_url or not plesk_api_key or not plesk_base_domain:
|
|
db.close()
|
|
return Response('911', 500, mimetype='text/plain')
|
|
|
|
subs = db.execute(
|
|
'SELECT * FROM subdomains WHERE dyndns_user_id = ?', (user['id'],)
|
|
).fetchall()
|
|
|
|
# Optional kann der Client per ?hostname= eine bestimmte Subdomain wählen
|
|
# (FQDN wie "mypc.example.com" oder nur das Label "mypc"). Mehrere durch
|
|
# Komma getrennt sind erlaubt. Ohne hostname werden ALLE aktualisiert.
|
|
base = plesk_base_domain.lower().rstrip('.')
|
|
requested = [h for h in re.split(r'[\s,]+', request.args.get('hostname', '').strip().lower()) if h]
|
|
if requested:
|
|
wanted = {h.rstrip('.') for h in requested}
|
|
targets = [
|
|
s for s in subs
|
|
if s['subdomain'] in wanted or f"{s['subdomain']}.{base}" in wanted
|
|
]
|
|
else:
|
|
targets = list(subs)
|
|
|
|
if not targets:
|
|
db.close()
|
|
return Response('nohost', 200, mimetype='text/plain')
|
|
|
|
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
lines = []
|
|
for s in targets:
|
|
old_ip = s['current_ip']
|
|
if old_ip == myip:
|
|
lines.append(f'nochg {myip}')
|
|
continue
|
|
try:
|
|
update_dns_record(
|
|
plesk_url, plesk_api_key, plesk_base_domain,
|
|
s['subdomain'], myip, verify_ssl=plesk_verify_ssl,
|
|
)
|
|
db.execute(
|
|
'UPDATE subdomains SET current_ip=?, last_updated=? WHERE id=?',
|
|
(myip, now, s['id']),
|
|
)
|
|
db.execute(
|
|
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
|
|
' VALUES (?,?,?,?,?)',
|
|
(user['id'], s['id'], old_ip, myip, 'good'),
|
|
)
|
|
lines.append(f'good {myip}')
|
|
except Exception as exc:
|
|
db.execute(
|
|
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
|
|
' VALUES (?,?,?,?,?)',
|
|
(user['id'], s['id'], old_ip, myip, f'error: {exc}'),
|
|
)
|
|
lines.append('dnserr')
|
|
|
|
db.commit()
|
|
db.close()
|
|
status = 500 if any(l == 'dnserr' for l in lines) else 200
|
|
return Response('\n'.join(lines) + '\n', status, mimetype='text/plain')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == '__main__':
|
|
init_db()
|
|
app.run(host='0.0.0.0', port=5000, debug=False)
|