import os import re import secrets import shutil import sqlite3 import subprocess from functools import wraps import bcrypt as bc import requests as http_client import yaml from flask import ( Flask, flash, redirect, render_template, request, session, url_for, ) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "default-secret-key") ADMIN_USER = os.environ.get("ADMIN_USER", "admin") ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "changeme") DB_PATH = os.environ.get("DB_PATH", "/data/users.db") HTPASSWD_PATH = os.environ.get("HTPASSWD_PATH", "/auth/htpasswd") GC_CONFIG = os.environ.get("GC_CONFIG", "/app/registry-gc-config.yml") REGISTRY_DATA = os.environ.get("REGISTRY_DATA", "/var/lib/registry") REGISTRY_URL = os.environ.get("REGISTRY_URL", "http://registry:5000") # Host:Port ohne Schema, fuer skopeo-Ziel (z. B. "registry:5000") REGISTRY_HOST = REGISTRY_URL.split("://", 1)[-1] # Internal service account for registry API queries (not stored in SQLite) _SERVICE_USER = "_service" _SERVICE_PASSWORD = secrets.token_hex(16) _SERVICE_PASSWORD_HASH = None def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) os.makedirs(os.path.dirname(HTPASSWD_PATH), exist_ok=True) conn = get_db() conn.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) conn.commit() conn.close() def hash_password(password): """Hash password with bcrypt, using $2y$ identifier for htpasswd compatibility.""" hashed = bc.hashpw(password.encode("utf-8"), bc.gensalt(rounds=12)) return hashed.decode("utf-8").replace("$2b$", "$2y$", 1) def sync_htpasswd(): """Regenerate htpasswd file from all users in SQLite + internal service user.""" conn = get_db() users = conn.execute("SELECT username, password_hash FROM users").fetchall() conn.close() with open(HTPASSWD_PATH, "w") as f: for user in users: f.write(f"{user['username']}:{user['password_hash']}\n") f.write(f"{_SERVICE_USER}:{_SERVICE_PASSWORD_HASH}\n") def query_registry(path): """Query the registry API using the internal service account.""" try: resp = http_client.get( f"{REGISTRY_URL}{path}", auth=(_SERVICE_USER, _SERVICE_PASSWORD), timeout=5, ) if resp.ok: return resp.json() except http_client.RequestException: pass return None # Erlaubte Zeichen in einer Image-Referenz (verhindert Shell-/Argument-Injection) _IMAGE_REF_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:/@-]*$") def strip_registry_host(ref): """Entfernt einen optionalen Registry-Host (z. B. docker.io/, ghcr.io:443/).""" if "/" in ref: first, rest = ref.split("/", 1) if "." in first or ":" in first or first == "localhost": return rest return ref def derive_dest(source): """Leitet den Ziel-Namen in unserer Registry aus der Quell-Referenz ab. Beispiele: nginx -> nginx:latest bitnami/redis:7 -> bitnami/redis:7 ghcr.io/owner/app:v1 -> owner/app:v1 """ dest = strip_registry_host(source) last_segment = dest.rsplit("/", 1)[-1] if ":" not in last_segment and "@" not in last_segment: dest += ":latest" return dest def split_dest(dest): """Teilt einen Ziel-Namen "repo:tag" in (repo, tag). dest hat immer einen Tag.""" repo, _, tag = dest.rpartition(":") return repo, tag # Loest ${VAR:-default} und ${VAR-default} zum Default-Wert auf. # ${VAR}, ${VAR:?...} usw. (ohne Default) werden NICHT aufgeloest. _VAR_DEFAULT_RE = re.compile(r"\$\{[A-Za-z_][A-Za-z0-9_]*:?-([^}]*)\}") def resolve_compose_image(image): """Ersetzt Compose-Variablen mit Default (${VAR:-default}) durch den Default.""" return _VAR_DEFAULT_RE.sub(lambda m: m.group(1), image) def extract_compose_images(content): """Liest alle 'image:'-Eintraege aus den Services einer docker-compose. Gibt (images, ignored) zurueck. images ist None bei ungueltigem YAML. Variablen mit Default (${VAR:-wert}) werden aufgeloest. Eintraege mit nicht aufloesbaren Variablen oder auf die eigene Registry landen in ignored. """ try: data = yaml.safe_load(content) except yaml.YAMLError: return None, [] if not isinstance(data, dict): return None, [] services = data.get("services") if not isinstance(services, dict): return [], [] images, ignored = [], [] for svc in services.values(): if not isinstance(svc, dict): continue raw = svc.get("image") if not isinstance(raw, str) or not raw.strip(): continue raw = raw.strip() image = resolve_compose_image(raw) host = image.split("/", 1)[0] if "/" in image else "" if not image or "$" in image or "{" in image or "}" in image: ignored.append(raw) # Variable ohne Default - nicht aufloesbar elif host and host in (REGISTRY_HOST, request.host): ignored.append(image) # zeigt bereits auf unsere Registry elif not _IMAGE_REF_RE.match(image): ignored.append(image) elif image not in images: images.append(image) return images, ignored _MANIFEST_ACCEPT = ", ".join( [ "application/vnd.docker.distribution.manifest.v2+json", "application/vnd.docker.distribution.manifest.list.v2+json", "application/vnd.oci.image.manifest.v1+json", "application/vnd.oci.image.index.v1+json", ] ) def get_manifest_digest(name, reference): """Liefert den Manifest-Digest eines Tags (fuer das Loeschen benoetigt).""" try: resp = http_client.get( f"{REGISTRY_URL}/v2/{name}/manifests/{reference}", auth=(_SERVICE_USER, _SERVICE_PASSWORD), headers={"Accept": _MANIFEST_ACCEPT}, timeout=5, ) if resp.ok: return resp.headers.get("Docker-Content-Digest") except http_client.RequestException: pass return None def delete_manifest(name, digest): """Loescht ein Manifest per Digest aus der Registry.""" try: resp = http_client.delete( f"{REGISTRY_URL}/v2/{name}/manifests/{digest}", auth=(_SERVICE_USER, _SERVICE_PASSWORD), timeout=5, ) return resp.status_code in (200, 202) except http_client.RequestException: return False def repo_has_tags(name): """True, wenn das Repository noch mindestens einen Tag hat.""" data = query_registry(f"/v2/{name}/tags/list") return bool(data and data.get("tags")) def _repositories_base(): return os.path.realpath( os.path.join(REGISTRY_DATA, "docker", "registry", "v2", "repositories") ) def _safe_repo_path(*parts): """Absoluter Pfad innerhalb des repositories-Verzeichnisses oder None (Schutz vor ../).""" base = _repositories_base() target = os.path.realpath(os.path.join(base, *parts)) if target != base and not target.startswith(base + os.sep): return None return target def remove_repository_storage(name): """Entfernt das leere Repository-Verzeichnis aus dem Registry-Speicher. Die Registry-API kennt kein "Repository loeschen"; nach dem Loeschen des letzten Tags bleibt sonst ein leerer Eintrag im Katalog stehen. """ target = _safe_repo_path(*name.split("/")) if not target or not os.path.isdir(target): return False shutil.rmtree(target, ignore_errors=True) # Leere Eltern-Verzeichnisse (z. B. "bitnami" nach "bitnami/redis") aufraeumen base = _repositories_base() parent = os.path.dirname(target) while parent != base and os.path.isdir(parent) and not os.listdir(parent): try: os.rmdir(parent) except OSError: break parent = os.path.dirname(parent) return True def remove_tag_storage(name, tag): """Entfernt den Tag-Verweis direkt aus dem Speicher. Noetig fuer verwaiste Tags, die noch im Katalog stehen, deren Manifest aber nicht mehr abrufbar ist (Loeschen ueber die API schlaegt dann fehl). """ target = _safe_repo_path(*name.split("/"), "_manifests", "tags", tag) if not target or not os.path.isdir(target): return False shutil.rmtree(target, ignore_errors=True) return True def prune_empty_repositories(): """Entfernt alle Repositories ohne Tags aus dem Speicher. Gibt Anzahl zurueck.""" data = query_registry("/v2/_catalog") if not data: return 0 removed = 0 for name in data.get("repositories", []): if not repo_has_tags(name) and remove_repository_storage(name): removed += 1 return removed def remote_digest(source, src_creds=None, src_token=None): """Liefert den Manifest-Digest eines Quell-Images, ohne es herunterzuladen.""" cmd = ["skopeo", "inspect", "--format", "{{.Digest}}", f"docker://{source}"] if src_token: cmd += ["--registry-token", src_token] elif src_creds: cmd += ["--creds", src_creds] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode == 0: return result.stdout.strip() except (subprocess.SubprocessError, OSError): pass return None def copy_image(source, src_creds=None, src_token=None): """Kopiert ein Image in unsere Registry. Gibt (status, info) zurueck: ("pulled", dest) - kopiert ("skipped", dest) - bereits mit gleichem Digest vorhanden ("error", grund) - fehlgeschlagen """ if not _IMAGE_REF_RE.match(source): return ("error", f"{source}: ungueltige Image-Referenz") dest = derive_dest(source) dest_name, dest_tag = split_dest(dest) # Doppel-Upload vermeiden: gleicher Digest schon vorhanden? (z. B. bei "latest") digest = remote_digest(source, src_creds, src_token) if digest and get_manifest_digest(dest_name, dest_tag) == digest: return ("skipped", dest) # --all kopiert alle Architekturen und erhaelt die Manifest-Liste, # damit der Digest mit der Quelle uebereinstimmt. cmd = [ "skopeo", "copy", "--all", "--dest-tls-verify=false", f"docker://{source}", f"docker://{REGISTRY_HOST}/{dest}", "--dest-creds", f"{_SERVICE_USER}:{_SERVICE_PASSWORD}", ] if src_token: cmd += ["--src-registry-token", src_token] elif src_creds: cmd += ["--src-creds", src_creds] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800) if result.returncode == 0: return ("pulled", dest) detail = (result.stderr or result.stdout).strip().splitlines() return ("error", f"{source}: {detail[-1] if detail else 'Unbekannter Fehler'}") except subprocess.TimeoutExpired: return ("error", f"{source}: Zeitueberschreitung") except FileNotFoundError: return ("error", "skopeo ist nicht installiert") def login_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get("logged_in"): return redirect(url_for("login")) return f(*args, **kwargs) return decorated @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username", "") password = request.form.get("password", "") if username == ADMIN_USER and password == ADMIN_PASSWORD: session["logged_in"] = True return redirect(url_for("users")) flash("Ungueltige Anmeldedaten.", "error") return render_template("login.html") @app.route("/logout") def logout(): session.pop("logged_in", None) return redirect(url_for("login")) @app.route("/") @login_required def users(): conn = get_db() user_list = conn.execute( "SELECT id, username, created_at FROM users ORDER BY username" ).fetchall() conn.close() return render_template("users.html", users=user_list) @app.route("/images") @login_required def images(): repos = [] data = query_registry("/v2/_catalog") if data: for name in sorted(data.get("repositories", [])): tags_data = query_registry(f"/v2/{name}/tags/list") tags = sorted(tags_data.get("tags", [])) if tags_data and tags_data.get("tags") else [] repos.append({"name": name, "tags": tags}) return render_template("images.html", repos=repos) @app.route("/pull", methods=["POST"]) @login_required def pull_image(): source = request.form.get("source", "").strip() use_creds = request.form.get("use_creds") == "on" use_token = request.form.get("use_token") == "on" src_user = request.form.get("src_username", "").strip() src_pass = request.form.get("src_password", "") src_token = request.form.get("src_token", "").strip() if not source: flash("Bitte ein Quell-Image angeben.", "error") return redirect(url_for("images")) # Quell-Zugangsdaten nur anwenden, wenn die jeweilige Checkbox aktiviert ist. creds = f"{src_user}:{src_pass}" if (use_creds and src_user) else None token = src_token if (use_token and src_token) else None status, info = copy_image(source, creds, token) if status == "pulled": flash(f'Image "{source}" wurde als "{info}" geholt.', "success") elif status == "skipped": flash( f'Image "{info}" ist bereits mit gleichem Digest vorhanden - uebersprungen.', "success", ) else: flash(f"Fehler beim Holen: {info}", "error") return redirect(url_for("images")) @app.route("/pull-compose", methods=["POST"]) @login_required def pull_compose(): content = request.form.get("compose", "") uploaded = request.files.get("compose_file") if uploaded and uploaded.filename: try: content = uploaded.read().decode("utf-8") except UnicodeDecodeError: flash("Die hochgeladene Datei ist keine gueltige Textdatei.", "error") return redirect(url_for("images")) if not content.strip(): flash("Bitte eine docker-compose einfuegen oder hochladen.", "error") return redirect(url_for("images")) images_list, ignored = extract_compose_images(content) if images_list is None: flash("Die docker-compose konnte nicht gelesen werden (ungueltiges YAML).", "error") return redirect(url_for("images")) if not images_list: flash("Keine verwertbaren Images in der docker-compose gefunden.", "error") return redirect(url_for("images")) use_creds = request.form.get("bulk_use_creds") == "on" use_token = request.form.get("bulk_use_token") == "on" src_user = request.form.get("bulk_src_username", "").strip() src_pass = request.form.get("bulk_src_password", "") src_token = request.form.get("bulk_src_token", "").strip() creds = f"{src_user}:{src_pass}" if (use_creds and src_user) else None token = src_token if (use_token and src_token) else None pulled, skipped, errors = [], [], [] for image in images_list: status, info = copy_image(image, creds, token) if status == "pulled": pulled.append(info) elif status == "skipped": skipped.append(info) else: errors.append(info) if pulled: flash(f"{len(pulled)} Image(s) geholt: " + ", ".join(pulled), "success") if skipped: flash( f"{len(skipped)} bereits vorhanden (gleicher Digest), uebersprungen: " + ", ".join(skipped), "success", ) if ignored: flash( f"{len(ignored)} Eintrag/Eintraege ignoriert (Variable oder eigene Registry): " + ", ".join(ignored), "error", ) for err in errors: flash(f"Fehler: {err}", "error") return redirect(url_for("images")) @app.route("/images/delete", methods=["POST"]) @login_required def delete_image(): name = request.form.get("name", "").strip() tag = request.form.get("tag", "").strip() if not name or not tag: flash("Image und Tag sind erforderlich.", "error") return redirect(url_for("images")) digest = get_manifest_digest(name, tag) # Normalfall: Manifest per Digest loeschen. Faellt der API-Lookup aus # (verwaister Tag: im Katalog gelistet, Manifest aber nicht abrufbar), # den Tag-Verweis direkt aus dem Speicher entfernen. manifest_deleted = delete_manifest(name, digest) if digest else False tag_removed = remove_tag_storage(name, tag) if not manifest_deleted and not tag_removed: flash(f'Tag "{tag}" von "{name}" wurde nicht gefunden.', "error") return redirect(url_for("images")) # War das der letzte Tag? Dann das leere Repository entfernen, damit # es nicht weiter (ohne Tags) im Katalog stehen bleibt. if not repo_has_tags(name) and remove_repository_storage(name): flash( f'Image "{name}:{tag}" geloescht - Repository "{name}" entfernt ' "(keine Tags mehr).", "success", ) else: flash(f'Image "{name}:{tag}" wurde geloescht.', "success") return redirect(url_for("images")) @app.route("/images/gc", methods=["POST"]) @login_required def garbage_collect(): # Entfernt Blobs/Layer, die von keinem Manifest mehr referenziert werden # (z. B. nach dem Loeschen von Tags). Nutzt die registry-Binary direkt auf # dem gemounteten Speicher - kein Docker-Socket noetig. # # WICHTIG: KEIN --delete-untagged! Dieses Flag loescht bei Multi-Arch-Images # die per-Architektur-Manifeste (die nur ueber die Manifest-Liste referenziert # sind) und macht die Images damit unbrauchbar ("manifest unknown" beim Pull). try: result = subprocess.run( ["registry", "garbage-collect", GC_CONFIG], capture_output=True, text=True, timeout=600, ) if result.returncode == 0: freed = sum( 1 for line in result.stdout.splitlines() if line.startswith("blob eligible for deletion") ) pruned = prune_empty_repositories() extra = f" {pruned} leere(s) Repository entfernt." if pruned else "" flash( "Garbage Collection abgeschlossen. " f"{freed} nicht mehr benoetigte Blob(s) wurden entfernt.{extra}", "success", ) else: detail = (result.stderr or result.stdout).strip().splitlines() reason = detail[-1] if detail else "Unbekannter Fehler" # Frische, leere Registry: nichts zu tun if "Path not found" in reason and "repositories" in reason: flash("Garbage Collection: keine Images vorhanden, nichts zu tun.", "success") else: flash(f"Fehler bei der Garbage Collection: {reason}", "error") except subprocess.TimeoutExpired: flash("Zeitueberschreitung bei der Garbage Collection.", "error") except FileNotFoundError: flash("registry-Binary ist nicht verfuegbar.", "error") return redirect(url_for("images")) @app.route("/help") @login_required def help_page(): return render_template("help.html", domain=request.host) @app.route("/add", methods=["POST"]) @login_required def add_user(): username = request.form.get("username", "").strip() password = request.form.get("password", "") if not username or not password: flash("Benutzername und Passwort sind erforderlich.", "error") return redirect(url_for("users")) if len(password) < 6: flash("Passwort muss mindestens 6 Zeichen lang sein.", "error") return redirect(url_for("users")) password_h = hash_password(password) conn = get_db() try: conn.execute( "INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_h), ) conn.commit() sync_htpasswd() flash(f'Benutzer "{username}" wurde erstellt.', "success") except sqlite3.IntegrityError: flash(f'Benutzer "{username}" existiert bereits.', "error") finally: conn.close() return redirect(url_for("users")) @app.route("/password/", methods=["POST"]) @login_required def change_password(user_id): password = request.form.get("password", "") if not password or len(password) < 6: flash("Passwort muss mindestens 6 Zeichen lang sein.", "error") return redirect(url_for("users")) password_h = hash_password(password) conn = get_db() conn.execute( "UPDATE users SET password_hash = ? WHERE id = ?", (password_h, user_id), ) conn.commit() conn.close() sync_htpasswd() flash("Passwort wurde geaendert.", "success") return redirect(url_for("users")) @app.route("/delete/", methods=["POST"]) @login_required def delete_user(user_id): conn = get_db() user = conn.execute( "SELECT username FROM users WHERE id = ?", (user_id,) ).fetchone() if user: conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() sync_htpasswd() flash(f'Benutzer "{user["username"]}" wurde geloescht.', "success") conn.close() return redirect(url_for("users")) # Initialize database and htpasswd on startup init_db() _SERVICE_PASSWORD_HASH = hash_password(_SERVICE_PASSWORD) sync_htpasswd()