docker-repo/auth-app/app.py

223 lines
6.1 KiB
Python

import os
import secrets
import sqlite3
from functools import wraps
import bcrypt as bc
import requests as http_client
from flask import (
Flask,
flash,
redirect,
render_template,
request,
session,
url_for,
)
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "default-secret-key")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "changeme")
DB_PATH = os.environ.get("DB_PATH", "/data/users.db")
HTPASSWD_PATH = os.environ.get("HTPASSWD_PATH", "/auth/htpasswd")
REGISTRY_URL = os.environ.get("REGISTRY_URL", "http://registry:5000")
# Internal service account for registry API queries (not stored in SQLite)
_SERVICE_USER = "_service"
_SERVICE_PASSWORD = secrets.token_hex(16)
_SERVICE_PASSWORD_HASH = None
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
os.makedirs(os.path.dirname(HTPASSWD_PATH), exist_ok=True)
conn = get_db()
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
conn.close()
def hash_password(password):
"""Hash password with bcrypt, using $2y$ identifier for htpasswd compatibility."""
hashed = bc.hashpw(password.encode("utf-8"), bc.gensalt(rounds=12))
return hashed.decode("utf-8").replace("$2b$", "$2y$", 1)
def sync_htpasswd():
"""Regenerate htpasswd file from all users in SQLite + internal service user."""
conn = get_db()
users = conn.execute("SELECT username, password_hash FROM users").fetchall()
conn.close()
with open(HTPASSWD_PATH, "w") as f:
for user in users:
f.write(f"{user['username']}:{user['password_hash']}\n")
f.write(f"{_SERVICE_USER}:{_SERVICE_PASSWORD_HASH}\n")
def query_registry(path):
"""Query the registry API using the internal service account."""
try:
resp = http_client.get(
f"{REGISTRY_URL}{path}",
auth=(_SERVICE_USER, _SERVICE_PASSWORD),
timeout=5,
)
if resp.ok:
return resp.json()
except http_client.RequestException:
pass
return None
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "")
password = request.form.get("password", "")
if username == ADMIN_USER and password == ADMIN_PASSWORD:
session["logged_in"] = True
return redirect(url_for("users"))
flash("Ungueltige Anmeldedaten.", "error")
return render_template("login.html")
@app.route("/logout")
def logout():
session.pop("logged_in", None)
return redirect(url_for("login"))
@app.route("/")
@login_required
def users():
conn = get_db()
user_list = conn.execute(
"SELECT id, username, created_at FROM users ORDER BY username"
).fetchall()
conn.close()
return render_template("users.html", users=user_list)
@app.route("/images")
@login_required
def images():
repos = []
data = query_registry("/v2/_catalog")
if data:
for name in sorted(data.get("repositories", [])):
tags_data = query_registry(f"/v2/{name}/tags/list")
tags = sorted(tags_data.get("tags", [])) if tags_data and tags_data.get("tags") else []
repos.append({"name": name, "tags": tags})
return render_template("images.html", repos=repos)
@app.route("/help")
@login_required
def help_page():
return render_template("help.html", domain=request.host)
@app.route("/add", methods=["POST"])
@login_required
def add_user():
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if not username or not password:
flash("Benutzername und Passwort sind erforderlich.", "error")
return redirect(url_for("users"))
if len(password) < 6:
flash("Passwort muss mindestens 6 Zeichen lang sein.", "error")
return redirect(url_for("users"))
password_h = hash_password(password)
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
(username, password_h),
)
conn.commit()
sync_htpasswd()
flash(f'Benutzer "{username}" wurde erstellt.', "success")
except sqlite3.IntegrityError:
flash(f'Benutzer "{username}" existiert bereits.', "error")
finally:
conn.close()
return redirect(url_for("users"))
@app.route("/password/<int:user_id>", methods=["POST"])
@login_required
def change_password(user_id):
password = request.form.get("password", "")
if not password or len(password) < 6:
flash("Passwort muss mindestens 6 Zeichen lang sein.", "error")
return redirect(url_for("users"))
password_h = hash_password(password)
conn = get_db()
conn.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(password_h, user_id),
)
conn.commit()
conn.close()
sync_htpasswd()
flash("Passwort wurde geaendert.", "success")
return redirect(url_for("users"))
@app.route("/delete/<int:user_id>", methods=["POST"])
@login_required
def delete_user(user_id):
conn = get_db()
user = conn.execute(
"SELECT username FROM users WHERE id = ?", (user_id,)
).fetchone()
if user:
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
sync_htpasswd()
flash(f'Benutzer "{user["username"]}" wurde geloescht.', "success")
conn.close()
return redirect(url_for("users"))
# Initialize database and htpasswd on startup
init_db()
_SERVICE_PASSWORD_HASH = hash_password(_SERVICE_PASSWORD)
sync_htpasswd()