c3070469c1
- CSRF-Schutz: session-gebundenes Token in allen POST-Formularen, serverseitig per before_request geprueft; /nic/update ausgenommen (Basic-Auth-API) - Brute-Force-Schutz: DB-gestuetzter Login-Lockout pro Client-IP (5 Fehlversuche -> 15 min), echte IP via ProxyFix/X-Forwarded-For - SSRF: validate_plesk_url() erzwingt http(s) und blockt Link-Local/Metadata, Multicast und reservierte Ziele - Session-Cookies: HttpOnly, SameSite=Lax, Secure (per Env abschaltbar) - Security-Header: CSP, X-Frame-Options, X-Content-Type-Options, Referrer-Policy - Generische Plesk-Fehlermeldungen (keine internen URLs im UI) - CSS/JS nach static/ ausgelagert -> strikte CSP ohne 'unsafe-inline' - login_attempts-Tabelle + README-Security-Abschnitt Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
121 lines
4.2 KiB
Python
121 lines
4.2 KiB
Python
import sqlite3
|
|
import os
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
DB_PATH = os.environ.get('DB_PATH', '/data/dyndns.db')
|
|
|
|
|
|
def get_db():
|
|
db_dir = os.path.dirname(DB_PATH)
|
|
if db_dir:
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _columns(db, table):
|
|
return [r['name'] for r in db.execute(f'PRAGMA table_info({table})').fetchall()]
|
|
|
|
|
|
def init_db():
|
|
db = get_db()
|
|
|
|
db.executescript('''
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL DEFAULT ''
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS admin_users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS dyndns_users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
active INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS subdomains (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
dyndns_user_id INTEGER NOT NULL,
|
|
subdomain TEXT UNIQUE NOT NULL,
|
|
current_ip TEXT,
|
|
last_updated TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY (dyndns_user_id) REFERENCES dyndns_users(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS login_attempts (
|
|
ip TEXT PRIMARY KEY,
|
|
fails INTEGER NOT NULL DEFAULT 0,
|
|
locked_until TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS update_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
dyndns_user_id INTEGER NOT NULL,
|
|
subdomain_id INTEGER,
|
|
old_ip TEXT,
|
|
new_ip TEXT NOT NULL,
|
|
result TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY (dyndns_user_id) REFERENCES dyndns_users(id)
|
|
);
|
|
''')
|
|
|
|
# --- Migration vom alten Schema (genau eine Subdomain pro Benutzer) ---
|
|
# Früher trug dyndns_users die Spalten subdomain/current_ip/last_updated
|
|
# direkt. Diese werden in die neue subdomains-Tabelle überführt.
|
|
if 'subdomain' in _columns(db, 'dyndns_users'):
|
|
db.execute('''
|
|
INSERT OR IGNORE INTO subdomains (dyndns_user_id, subdomain, current_ip, last_updated)
|
|
SELECT id, subdomain, current_ip, last_updated FROM dyndns_users
|
|
''')
|
|
db.executescript('''
|
|
CREATE TABLE dyndns_users_new (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
active INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
INSERT INTO dyndns_users_new (id, username, password_hash, active, created_at)
|
|
SELECT id, username, password_hash, active, created_at FROM dyndns_users;
|
|
DROP TABLE dyndns_users;
|
|
ALTER TABLE dyndns_users_new RENAME TO dyndns_users;
|
|
''')
|
|
|
|
# update_log: subdomain_id nachrüsten, falls noch altes Schema
|
|
if 'subdomain_id' not in _columns(db, 'update_log'):
|
|
db.execute('ALTER TABLE update_log ADD COLUMN subdomain_id INTEGER')
|
|
|
|
existing = db.execute('SELECT id FROM admin_users LIMIT 1').fetchone()
|
|
if not existing:
|
|
db.execute(
|
|
'INSERT INTO admin_users (username, password_hash) VALUES (?, ?)',
|
|
('admin', generate_password_hash('admin'))
|
|
)
|
|
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
def get_setting(key, default=''):
|
|
db = get_db()
|
|
row = db.execute('SELECT value FROM settings WHERE key = ?', (key,)).fetchone()
|
|
db.close()
|
|
return row['value'] if row else default
|
|
|
|
|
|
def set_setting(key, value):
|
|
db = get_db()
|
|
db.execute('INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)', (key, value))
|
|
db.commit()
|
|
db.close()
|