Files
dyndns-server/app/main.py
T
Stefan Hacker 9c631992af Mehrere Subdomains pro Benutzer + README
- subdomains-Tabelle (n DNS-Namen je Benutzer) inkl. Migration vom alten
  Einzel-Subdomain-Schema in database.init_db()
- Benutzeranlage/Verwaltung: mehrere Subdomains hinzufuegen/entfernen
- /nic/update aktualisiert alle Subdomains des Benutzers bzw. die per
  ?hostname= gewaehlte(n); eine Antwortzeile je Subdomain
- Dashboard/Users-Templates auf das neue Modell umgestellt
- README.md mit Setup, Plesk-Konfig, Router-Einrichtung und Endpoint-Doku
- .gitignore: __pycache__/ und *.pyc

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 14:13:48 +02:00

424 lines
14 KiB
Python

import os
import re
import sqlite3
from datetime import datetime
from functools import wraps
from flask import (Flask, Response, flash, redirect, render_template,
request, session, url_for)
from werkzeug.security import check_password_hash, generate_password_hash
from database import get_db, get_setting, init_db, set_setting
from plesk import test_connection, update_dns_record
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(32).hex())
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def login_required(f):
@wraps(f)
def wrapped(*args, **kwargs):
if 'admin_id' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return wrapped
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
@app.route('/')
def index():
return redirect(url_for('dashboard') if 'admin_id' in session else url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
db = get_db()
admin = db.execute(
'SELECT * FROM admin_users WHERE username = ?', (username,)
).fetchone()
db.close()
if admin and check_password_hash(admin['password_hash'], password):
session['admin_id'] = admin['id']
session['admin_username'] = admin['username']
return redirect(url_for('dashboard'))
flash('Benutzername oder Passwort falsch.', 'danger')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@app.route('/dashboard')
@login_required
def dashboard():
db = get_db()
subdomains = db.execute('''
SELECT s.*, u.username, u.active,
(SELECT COUNT(*) FROM update_log l WHERE l.subdomain_id = s.id) AS update_count
FROM subdomains s
JOIN dyndns_users u ON s.dyndns_user_id = u.id
ORDER BY s.subdomain
''').fetchall()
user_count = db.execute('SELECT COUNT(*) AS c FROM dyndns_users').fetchone()['c']
logs = db.execute('''
SELECT l.*, s.subdomain, u.username AS dyndns_username
FROM update_log l
JOIN dyndns_users u ON l.dyndns_user_id = u.id
LEFT JOIN subdomains s ON l.subdomain_id = s.id
ORDER BY l.timestamp DESC
LIMIT 30
''').fetchall()
db.close()
base_domain = get_setting('plesk_base_domain')
return render_template('dashboard.html', subdomains=subdomains,
user_count=user_count, logs=logs, base_domain=base_domain)
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@app.route('/settings')
@login_required
def settings():
return render_template('settings.html',
plesk_url=get_setting('plesk_url'),
plesk_api_key=get_setting('plesk_api_key'),
plesk_base_domain=get_setting('plesk_base_domain'),
plesk_verify_ssl=get_setting('plesk_verify_ssl', '1'),
)
@app.route('/settings/plesk', methods=['POST'])
@login_required
def settings_plesk():
plesk_url = request.form.get('plesk_url', '').strip().rstrip('/')
plesk_api_key = request.form.get('plesk_api_key', '').strip()
plesk_base_domain = request.form.get('plesk_base_domain', '').strip()
plesk_verify_ssl = '1' if request.form.get('plesk_verify_ssl') else '0'
set_setting('plesk_url', plesk_url)
set_setting('plesk_api_key', plesk_api_key)
set_setting('plesk_base_domain', plesk_base_domain)
set_setting('plesk_verify_ssl', plesk_verify_ssl)
if 'test_connection' in request.form:
try:
info = test_connection(plesk_url, plesk_api_key, verify_ssl=(plesk_verify_ssl == '1'))
ver = info.get('version', '?')
flash(f'Verbindung OK — Plesk {ver}', 'success')
except Exception as exc:
flash(f'Verbindungsfehler: {exc}', 'danger')
else:
flash('Plesk-Einstellungen gespeichert.', 'success')
return redirect(url_for('settings'))
@app.route('/settings/password', methods=['POST'])
@login_required
def settings_password():
current = request.form.get('current_password', '')
new_pw = request.form.get('new_password', '')
new_pw2 = request.form.get('new_password2', '')
db = get_db()
admin = db.execute('SELECT * FROM admin_users WHERE id = ?', (session['admin_id'],)).fetchone()
if not check_password_hash(admin['password_hash'], current):
flash('Aktuelles Passwort falsch.', 'danger')
elif new_pw != new_pw2:
flash('Neue Passwörter stimmen nicht überein.', 'danger')
elif len(new_pw) < 6:
flash('Mindestens 6 Zeichen erforderlich.', 'danger')
else:
db.execute('UPDATE admin_users SET password_hash = ? WHERE id = ?',
(generate_password_hash(new_pw), session['admin_id']))
db.commit()
flash('Passwort geändert.', 'success')
db.close()
return redirect(url_for('settings'))
# ---------------------------------------------------------------------------
# Users
# ---------------------------------------------------------------------------
SUBDOMAIN_RE = re.compile(r'^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$')
def _parse_subdomains(raw):
"""Zerlegt eine Eingabe (Komma/Leerzeichen/Zeilenumbruch-getrennt) in
eine deduplizierte Liste gültiger Subdomain-Labels."""
names, seen = [], set()
for part in re.split(r'[\s,]+', raw.strip().lower()):
part = part.strip().rstrip('.')
if part and part not in seen:
seen.add(part)
names.append(part)
return names
@app.route('/users')
@login_required
def users():
db = get_db()
user_list = db.execute('SELECT * FROM dyndns_users ORDER BY username').fetchall()
subs = db.execute('SELECT * FROM subdomains ORDER BY subdomain').fetchall()
db.close()
by_user = {}
for s in subs:
by_user.setdefault(s['dyndns_user_id'], []).append(s)
user_list = [{'row': u, 'subdomains': by_user.get(u['id'], [])} for u in user_list]
base_domain = get_setting('plesk_base_domain')
return render_template('users.html', users=user_list, base_domain=base_domain)
@app.route('/users/add', methods=['POST'])
@login_required
def user_add():
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
names = _parse_subdomains(request.form.get('subdomains', ''))
if not username or not password or not names:
flash('Benutzername, Passwort und mindestens eine Subdomain sind erforderlich.', 'danger')
return redirect(url_for('users'))
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
if invalid:
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
return redirect(url_for('users'))
db = get_db()
try:
cur = db.execute(
'INSERT INTO dyndns_users (username, password_hash) VALUES (?, ?)',
(username, generate_password_hash(password)),
)
user_id = cur.lastrowid
for n in names:
db.execute(
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
(user_id, n),
)
db.commit()
flash(f'Benutzer "{username}" mit {len(names)} Subdomain(s) angelegt.', 'success')
except Exception as exc:
db.rollback()
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/edit', methods=['POST'])
@login_required
def user_edit(user_id):
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
db = get_db()
try:
if password:
db.execute(
'UPDATE dyndns_users SET username=?, password_hash=? WHERE id=?',
(username, generate_password_hash(password), user_id),
)
else:
db.execute('UPDATE dyndns_users SET username=? WHERE id=?', (username, user_id))
db.commit()
flash('Benutzer aktualisiert.', 'success')
except Exception as exc:
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/delete', methods=['POST'])
@login_required
def user_delete(user_id):
db = get_db()
db.execute('DELETE FROM update_log WHERE dyndns_user_id = ?', (user_id,))
db.execute('DELETE FROM subdomains WHERE dyndns_user_id = ?', (user_id,))
db.execute('DELETE FROM dyndns_users WHERE id = ?', (user_id,))
db.commit()
db.close()
flash('Benutzer gelöscht.', 'success')
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/subdomains/add', methods=['POST'])
@login_required
def subdomain_add(user_id):
names = _parse_subdomains(request.form.get('subdomains', ''))
if not names:
flash('Keine Subdomain angegeben.', 'danger')
return redirect(url_for('users'))
invalid = [n for n in names if not SUBDOMAIN_RE.match(n)]
if invalid:
flash(f'Ungültige Subdomain(s): {", ".join(invalid)}', 'danger')
return redirect(url_for('users'))
db = get_db()
added = 0
try:
for n in names:
try:
db.execute(
'INSERT INTO subdomains (dyndns_user_id, subdomain) VALUES (?, ?)',
(user_id, n),
)
added += 1
except sqlite3.IntegrityError:
flash(f'Subdomain "{n}" existiert bereits.', 'warning')
db.commit()
if added:
flash(f'{added} Subdomain(s) hinzugefügt.', 'success')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/subdomains/<int:subdomain_id>/delete', methods=['POST'])
@login_required
def subdomain_delete(subdomain_id):
db = get_db()
db.execute('UPDATE update_log SET subdomain_id = NULL WHERE subdomain_id = ?', (subdomain_id,))
db.execute('DELETE FROM subdomains WHERE id = ?', (subdomain_id,))
db.commit()
db.close()
flash('Subdomain gelöscht.', 'success')
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/toggle', methods=['POST'])
@login_required
def user_toggle(user_id):
db = get_db()
db.execute('UPDATE dyndns_users SET active = 1 - active WHERE id = ?', (user_id,))
db.commit()
db.close()
return redirect(url_for('users'))
# ---------------------------------------------------------------------------
# DynDNS v2 update endpoint (Speedport "Anderer Anbieter")
# ---------------------------------------------------------------------------
@app.route('/nic/update')
def dyndns_update():
auth = request.authorization
if not auth:
return Response(
'badauth',
401,
{'WWW-Authenticate': 'Basic realm="DynDNS Update"'},
mimetype='text/plain',
)
myip = request.args.get('myip') or request.args.get('ip') or request.remote_addr
db = get_db()
user = db.execute(
'SELECT * FROM dyndns_users WHERE username = ? AND active = 1',
(auth.username,),
).fetchone()
if not user or not check_password_hash(user['password_hash'], auth.password):
db.close()
return Response('badauth', 401, mimetype='text/plain')
plesk_url = get_setting('plesk_url')
plesk_api_key = get_setting('plesk_api_key')
plesk_base_domain = get_setting('plesk_base_domain')
plesk_verify_ssl = get_setting('plesk_verify_ssl', '1') == '1'
if not plesk_url or not plesk_api_key or not plesk_base_domain:
db.close()
return Response('911', 500, mimetype='text/plain')
subs = db.execute(
'SELECT * FROM subdomains WHERE dyndns_user_id = ?', (user['id'],)
).fetchall()
# Optional kann der Client per ?hostname= eine bestimmte Subdomain wählen
# (FQDN wie "mypc.example.com" oder nur das Label "mypc"). Mehrere durch
# Komma getrennt sind erlaubt. Ohne hostname werden ALLE aktualisiert.
base = plesk_base_domain.lower().rstrip('.')
requested = [h for h in re.split(r'[\s,]+', request.args.get('hostname', '').strip().lower()) if h]
if requested:
wanted = {h.rstrip('.') for h in requested}
targets = [
s for s in subs
if s['subdomain'] in wanted or f"{s['subdomain']}.{base}" in wanted
]
else:
targets = list(subs)
if not targets:
db.close()
return Response('nohost', 200, mimetype='text/plain')
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
lines = []
for s in targets:
old_ip = s['current_ip']
if old_ip == myip:
lines.append(f'nochg {myip}')
continue
try:
update_dns_record(
plesk_url, plesk_api_key, plesk_base_domain,
s['subdomain'], myip, verify_ssl=plesk_verify_ssl,
)
db.execute(
'UPDATE subdomains SET current_ip=?, last_updated=? WHERE id=?',
(myip, now, s['id']),
)
db.execute(
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
' VALUES (?,?,?,?,?)',
(user['id'], s['id'], old_ip, myip, 'good'),
)
lines.append(f'good {myip}')
except Exception as exc:
db.execute(
'INSERT INTO update_log (dyndns_user_id, subdomain_id, old_ip, new_ip, result)'
' VALUES (?,?,?,?,?)',
(user['id'], s['id'], old_ip, myip, f'error: {exc}'),
)
lines.append('dnserr')
db.commit()
db.close()
status = 500 if any(l == 'dnserr' for l in lines) else 200
return Response('\n'.join(lines) + '\n', status, mimetype='text/plain')
# ---------------------------------------------------------------------------
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=5000, debug=False)