first release

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Stefan Hacker
2026-06-06 12:21:16 +02:00
parent ffc7876d17
commit 2542cf5455
13 changed files with 1037 additions and 0 deletions
+318
View File
@@ -0,0 +1,318 @@
import os
from datetime import datetime
from functools import wraps
from flask import (Flask, Response, flash, redirect, render_template,
request, session, url_for)
from werkzeug.security import check_password_hash, generate_password_hash
from database import get_db, get_setting, init_db, set_setting
from plesk import test_connection, update_dns_record
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(32).hex())
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def login_required(f):
@wraps(f)
def wrapped(*args, **kwargs):
if 'admin_id' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return wrapped
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
@app.route('/')
def index():
return redirect(url_for('dashboard') if 'admin_id' in session else url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
db = get_db()
admin = db.execute(
'SELECT * FROM admin_users WHERE username = ?', (username,)
).fetchone()
db.close()
if admin and check_password_hash(admin['password_hash'], password):
session['admin_id'] = admin['id']
session['admin_username'] = admin['username']
return redirect(url_for('dashboard'))
flash('Benutzername oder Passwort falsch.', 'danger')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@app.route('/dashboard')
@login_required
def dashboard():
db = get_db()
users = db.execute('''
SELECT u.*,
(SELECT COUNT(*) FROM update_log l WHERE l.dyndns_user_id = u.id) AS update_count
FROM dyndns_users u
ORDER BY u.subdomain
''').fetchall()
logs = db.execute('''
SELECT l.*, u.subdomain, u.username AS dyndns_username
FROM update_log l
JOIN dyndns_users u ON l.dyndns_user_id = u.id
ORDER BY l.timestamp DESC
LIMIT 30
''').fetchall()
db.close()
base_domain = get_setting('plesk_base_domain')
return render_template('dashboard.html', users=users, logs=logs, base_domain=base_domain)
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@app.route('/settings')
@login_required
def settings():
return render_template('settings.html',
plesk_url=get_setting('plesk_url'),
plesk_api_key=get_setting('plesk_api_key'),
plesk_base_domain=get_setting('plesk_base_domain'),
plesk_verify_ssl=get_setting('plesk_verify_ssl', '1'),
)
@app.route('/settings/plesk', methods=['POST'])
@login_required
def settings_plesk():
plesk_url = request.form.get('plesk_url', '').strip().rstrip('/')
plesk_api_key = request.form.get('plesk_api_key', '').strip()
plesk_base_domain = request.form.get('plesk_base_domain', '').strip()
plesk_verify_ssl = '1' if request.form.get('plesk_verify_ssl') else '0'
set_setting('plesk_url', plesk_url)
set_setting('plesk_api_key', plesk_api_key)
set_setting('plesk_base_domain', plesk_base_domain)
set_setting('plesk_verify_ssl', plesk_verify_ssl)
if 'test_connection' in request.form:
try:
info = test_connection(plesk_url, plesk_api_key, verify_ssl=(plesk_verify_ssl == '1'))
ver = info.get('version', '?')
flash(f'Verbindung OK — Plesk {ver}', 'success')
except Exception as exc:
flash(f'Verbindungsfehler: {exc}', 'danger')
else:
flash('Plesk-Einstellungen gespeichert.', 'success')
return redirect(url_for('settings'))
@app.route('/settings/password', methods=['POST'])
@login_required
def settings_password():
current = request.form.get('current_password', '')
new_pw = request.form.get('new_password', '')
new_pw2 = request.form.get('new_password2', '')
db = get_db()
admin = db.execute('SELECT * FROM admin_users WHERE id = ?', (session['admin_id'],)).fetchone()
if not check_password_hash(admin['password_hash'], current):
flash('Aktuelles Passwort falsch.', 'danger')
elif new_pw != new_pw2:
flash('Neue Passwörter stimmen nicht überein.', 'danger')
elif len(new_pw) < 6:
flash('Mindestens 6 Zeichen erforderlich.', 'danger')
else:
db.execute('UPDATE admin_users SET password_hash = ? WHERE id = ?',
(generate_password_hash(new_pw), session['admin_id']))
db.commit()
flash('Passwort geändert.', 'success')
db.close()
return redirect(url_for('settings'))
# ---------------------------------------------------------------------------
# Users
# ---------------------------------------------------------------------------
@app.route('/users')
@login_required
def users():
db = get_db()
user_list = db.execute('SELECT * FROM dyndns_users ORDER BY subdomain').fetchall()
db.close()
base_domain = get_setting('plesk_base_domain')
return render_template('users.html', users=user_list, base_domain=base_domain)
@app.route('/users/add', methods=['POST'])
@login_required
def user_add():
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
subdomain = request.form.get('subdomain', '').strip().lower()
if not username or not password or not subdomain:
flash('Alle Felder müssen ausgefüllt sein.', 'danger')
return redirect(url_for('users'))
db = get_db()
try:
db.execute(
'INSERT INTO dyndns_users (username, password_hash, subdomain) VALUES (?, ?, ?)',
(username, generate_password_hash(password), subdomain),
)
db.commit()
flash(f'Benutzer "{username}" angelegt.', 'success')
except Exception as exc:
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/edit', methods=['POST'])
@login_required
def user_edit(user_id):
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
subdomain = request.form.get('subdomain', '').strip().lower()
db = get_db()
try:
if password:
db.execute(
'UPDATE dyndns_users SET username=?, password_hash=?, subdomain=? WHERE id=?',
(username, generate_password_hash(password), subdomain, user_id),
)
else:
db.execute(
'UPDATE dyndns_users SET username=?, subdomain=? WHERE id=?',
(username, subdomain, user_id),
)
db.commit()
flash('Benutzer aktualisiert.', 'success')
except Exception as exc:
flash(f'Fehler: {exc}', 'danger')
finally:
db.close()
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/delete', methods=['POST'])
@login_required
def user_delete(user_id):
db = get_db()
db.execute('DELETE FROM update_log WHERE dyndns_user_id = ?', (user_id,))
db.execute('DELETE FROM dyndns_users WHERE id = ?', (user_id,))
db.commit()
db.close()
flash('Benutzer gelöscht.', 'success')
return redirect(url_for('users'))
@app.route('/users/<int:user_id>/toggle', methods=['POST'])
@login_required
def user_toggle(user_id):
db = get_db()
db.execute('UPDATE dyndns_users SET active = 1 - active WHERE id = ?', (user_id,))
db.commit()
db.close()
return redirect(url_for('users'))
# ---------------------------------------------------------------------------
# DynDNS v2 update endpoint (Speedport "Anderer Anbieter")
# ---------------------------------------------------------------------------
@app.route('/nic/update')
def dyndns_update():
auth = request.authorization
if not auth:
return Response(
'badauth',
401,
{'WWW-Authenticate': 'Basic realm="DynDNS Update"'},
mimetype='text/plain',
)
myip = request.args.get('myip') or request.args.get('ip') or request.remote_addr
db = get_db()
user = db.execute(
'SELECT * FROM dyndns_users WHERE username = ? AND active = 1',
(auth.username,),
).fetchone()
if not user or not check_password_hash(user['password_hash'], auth.password):
db.close()
return Response('badauth', 401, mimetype='text/plain')
plesk_url = get_setting('plesk_url')
plesk_api_key = get_setting('plesk_api_key')
plesk_base_domain = get_setting('plesk_base_domain')
plesk_verify_ssl = get_setting('plesk_verify_ssl', '1') == '1'
if not plesk_url or not plesk_api_key or not plesk_base_domain:
db.close()
return Response('911', 500, mimetype='text/plain')
old_ip = user['current_ip']
if old_ip == myip:
db.close()
return Response(f'nochg {myip}', 200, mimetype='text/plain')
try:
update_dns_record(
plesk_url, plesk_api_key, plesk_base_domain,
user['subdomain'], myip, verify_ssl=plesk_verify_ssl,
)
db.execute(
'UPDATE dyndns_users SET current_ip=?, last_updated=? WHERE id=?',
(myip, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user['id']),
)
db.execute(
'INSERT INTO update_log (dyndns_user_id, old_ip, new_ip, result) VALUES (?,?,?,?)',
(user['id'], old_ip, myip, 'good'),
)
db.commit()
db.close()
return Response(f'good {myip}', 200, mimetype='text/plain')
except Exception as exc:
db.execute(
'INSERT INTO update_log (dyndns_user_id, old_ip, new_ip, result) VALUES (?,?,?,?)',
(user['id'], old_ip, myip, f'error: {exc}'),
)
db.commit()
db.close()
return Response('dnserr', 500, mimetype='text/plain')
# ---------------------------------------------------------------------------
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=5000, debug=False)