docker-repo/auth-app/app.py

179 lines
4.8 KiB
Python

import os
import sqlite3
from functools import wraps
import bcrypt as bc
from flask import (
Flask,
flash,
redirect,
render_template,
request,
session,
url_for,
)
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "default-secret-key")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "changeme")
DB_PATH = os.environ.get("DB_PATH", "/data/users.db")
HTPASSWD_PATH = os.environ.get("HTPASSWD_PATH", "/auth/htpasswd")
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
os.makedirs(os.path.dirname(HTPASSWD_PATH), exist_ok=True)
conn = get_db()
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
conn.close()
def hash_password(password):
"""Hash password with bcrypt, using $2y$ identifier for htpasswd compatibility."""
hashed = bc.hashpw(password.encode("utf-8"), bc.gensalt(rounds=12))
return hashed.decode("utf-8").replace("$2b$", "$2y$", 1)
def sync_htpasswd():
"""Regenerate htpasswd file from all users in SQLite."""
conn = get_db()
users = conn.execute("SELECT username, password_hash FROM users").fetchall()
conn.close()
with open(HTPASSWD_PATH, "w") as f:
for user in users:
f.write(f"{user['username']}:{user['password_hash']}\n")
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "")
password = request.form.get("password", "")
if username == ADMIN_USER and password == ADMIN_PASSWORD:
session["logged_in"] = True
return redirect(url_for("users"))
flash("Ungueltige Anmeldedaten.", "error")
return render_template("login.html")
@app.route("/logout")
def logout():
session.pop("logged_in", None)
return redirect(url_for("login"))
@app.route("/")
@login_required
def users():
conn = get_db()
user_list = conn.execute(
"SELECT id, username, created_at FROM users ORDER BY username"
).fetchall()
conn.close()
return render_template("users.html", users=user_list)
@app.route("/add", methods=["POST"])
@login_required
def add_user():
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if not username or not password:
flash("Benutzername und Passwort sind erforderlich.", "error")
return redirect(url_for("users"))
if len(password) < 6:
flash("Passwort muss mindestens 6 Zeichen lang sein.", "error")
return redirect(url_for("users"))
password_h = hash_password(password)
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
(username, password_h),
)
conn.commit()
sync_htpasswd()
flash(f'Benutzer "{username}" wurde erstellt.', "success")
except sqlite3.IntegrityError:
flash(f'Benutzer "{username}" existiert bereits.', "error")
finally:
conn.close()
return redirect(url_for("users"))
@app.route("/password/<int:user_id>", methods=["POST"])
@login_required
def change_password(user_id):
password = request.form.get("password", "")
if not password or len(password) < 6:
flash("Passwort muss mindestens 6 Zeichen lang sein.", "error")
return redirect(url_for("users"))
password_h = hash_password(password)
conn = get_db()
conn.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(password_h, user_id),
)
conn.commit()
conn.close()
sync_htpasswd()
flash("Passwort wurde geaendert.", "success")
return redirect(url_for("users"))
@app.route("/delete/<int:user_id>", methods=["POST"])
@login_required
def delete_user(user_id):
conn = get_db()
user = conn.execute(
"SELECT username FROM users WHERE id = ?", (user_id,)
).fetchone()
if user:
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
sync_htpasswd()
flash(f'Benutzer "{user["username"]}" wurde geloescht.', "success")
conn.close()
return redirect(url_for("users"))
# Initialize database and htpasswd on startup
init_db()
sync_htpasswd()