import hashlib import json import os import subprocess from functools import wraps from pathlib import Path from flask import (Flask, jsonify, redirect, render_template, request, send_file, session, url_for) app = Flask(__name__) # Stable secret key: derive from username+password so it survives restarts # but changes when credentials change _username = os.environ.get("WEBUI_USERNAME", "admin") _password = os.environ.get("WEBUI_PASSWORD", "admin123") app.secret_key = hashlib.sha256(f"{_username}:{_password}:proxy-secret".encode()).hexdigest() CONFIG_FILE = "/data/proxy_config.json" NGINX_CONF_DIR = "/etc/nginx/conf.d" NGINX_UPSTREAM_CONF = f"{NGINX_CONF_DIR}/proxy-targets.conf" NGINX_ACCESS_LOG = "/var/log/nginx/access.log" NGINX_ERROR_LOG = "/var/log/nginx/error.log" USERNAME = os.environ.get("WEBUI_USERNAME", "admin") PASSWORD = os.environ.get("WEBUI_PASSWORD", "admin123") def load_config(): if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE) as f: return json.load(f) return {"targets": []} def save_config(config): os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) with open(CONFIG_FILE, "w") as f: json.dump(config, f, indent=2) def _proxy_location_block(target_scheme, upstream_name): """Generate proxy location block with all necessary directives.""" return f""" location / {{ proxy_pass {target_scheme}://{upstream_name}; # Headers proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Forwarded-Port $server_port; # WebSocket support proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; # Timeouts proxy_connect_timeout 60s; proxy_send_timeout 120s; proxy_read_timeout 120s; # Buffers proxy_buffer_size 128k; proxy_buffers 4 256k; proxy_busy_buffers_size 256k; # Don't limit upload size client_max_body_size 0; # Pass redirects through proxy_redirect default; # Disable SSL verification for HTTPS targets proxy_ssl_verify off; }}""" def generate_nginx_config(config): """Generate nginx upstream/server blocks from config.""" lines = [] # Connection upgrade map for WebSocket support lines.append("map $http_upgrade $connection_upgrade {") lines.append(" default upgrade;") lines.append(" '' close;") lines.append("}") lines.append("") for i, target in enumerate(config.get("targets", [])): name = target.get("name", f"target_{i}") target_host = target.get("target_host", "") target_port = target.get("target_port", 80) listen_port = target.get("listen_port", 0) domains = target.get("domains", []) target_scheme = target.get("target_scheme", "http") if not target_host or not target.get("enabled", True): continue upstream_name = f"upstream_{name}" lines.append(f"upstream {upstream_name} {{") lines.append(f" server {target_host}:{target_port};") lines.append("}") lines.append("") location_block = _proxy_location_block(target_scheme, upstream_name) http_redirect = target.get("http_redirect", False) http_port = target.get("http_port", 0) # Domain-based routing if domains: for domain_entry in domains: domain = domain_entry.get("domain", "") domain_port = domain_entry.get("port", 443) if not domain: continue lines.append("server {") lines.append(f" listen {domain_port} ssl;") lines.append(f" server_name {domain};") lines.append("") lines.append(" ssl_certificate /certs/server.crt;") lines.append(" ssl_certificate_key /certs/server.key;") lines.append(" ssl_protocols TLSv1.2 TLSv1.3;") lines.append("") lines.append(f" access_log /var/log/nginx/access.log main;") lines.append(f" error_log /var/log/nginx/error.log warn;") lines.append("") lines.append(location_block) lines.append("}") lines.append("") # HTTP -> HTTPS redirect for this domain if http_redirect and http_port: lines.append("server {") lines.append(f" listen {http_port};") lines.append(f" server_name {domain};") lines.append(f" return 301 https://$host:{domain_port}$request_uri;") lines.append("}") lines.append("") # IP/Port-based routing if listen_port: lines.append("server {") lines.append(f" listen {listen_port} ssl;") lines.append(f" server_name _;") lines.append("") lines.append(" ssl_certificate /certs/server.crt;") lines.append(" ssl_certificate_key /certs/server.key;") lines.append(" ssl_protocols TLSv1.2 TLSv1.3;") lines.append("") lines.append(f" access_log /var/log/nginx/access.log main;") lines.append(f" error_log /var/log/nginx/error.log warn;") lines.append("") lines.append(location_block) lines.append("}") lines.append("") # HTTP -> HTTPS redirect for this port if http_redirect and http_port: lines.append("server {") lines.append(f" listen {http_port};") lines.append(f" server_name _;") lines.append(f" return 301 https://$host:{listen_port}$request_uri;") lines.append("}") lines.append("") conf_content = "\n".join(lines) os.makedirs(NGINX_CONF_DIR, exist_ok=True) with open(NGINX_UPSTREAM_CONF, "w") as f: f.write(conf_content) return conf_content def reload_nginx(): """Reload nginx configuration.""" try: result = subprocess.run( ["nginx", "-t"], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return False, f"Nginx config test failed: {result.stderr}" result = subprocess.run( ["nginx", "-s", "reload"], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return False, f"Nginx reload failed: {result.stderr}" return True, "Nginx reloaded successfully" except Exception as e: return False, str(e) def read_log_tail(filepath, lines=100): """Read the last N lines of a log file.""" try: result = subprocess.run( ["tail", "-n", str(lines), filepath], capture_output=True, text=True, timeout=5 ) return result.stdout except Exception: return "" def login_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get("logged_in"): # API requests get 401, browser requests get redirected if request.path.startswith("/api/"): auth = request.authorization if auth and auth.username == USERNAME and auth.password == PASSWORD: return f(*args, **kwargs) return jsonify({"error": "Authentication required"}), 401 return redirect(url_for("login")) return f(*args, **kwargs) return decorated # ==================== Auth Routes ==================== @app.route("/login", methods=["GET", "POST"]) def login(): error = None if request.method == "POST": if (request.form.get("username") == USERNAME and request.form.get("password") == PASSWORD): session["logged_in"] = True return redirect(url_for("index")) error = "Falscher Benutzername oder Passwort" return render_template("login.html", error=error) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # ==================== Public Routes ==================== @app.route("/ca.crt") def download_ca(): """Download CA certificate - no auth required so devices can easily fetch it.""" ca_path = "/certs/ca.crt" if not os.path.exists(ca_path): return "CA certificate not found", 404 return send_file(ca_path, as_attachment=True, download_name="ca.crt", mimetype="application/x-x509-ca-cert") # ==================== WebUI Routes ==================== @app.route("/") @login_required def index(): config = load_config() return render_template("index.html", config=config) @app.route("/logs") @login_required def logs(): lines = int(request.args.get("lines", 100)) log_type = request.args.get("type", "access") log_file = NGINX_ACCESS_LOG if log_type == "access" else NGINX_ERROR_LOG log_content = read_log_tail(log_file, lines) return render_template("logs.html", log_content=log_content, log_type=log_type, lines=lines) @app.route("/target/add", methods=["POST"]) @login_required def add_target(): config = load_config() domains = [] domain_names = request.form.getlist("domain_name[]") domain_ports = request.form.getlist("domain_port[]") for name, port in zip(domain_names, domain_ports): if name.strip(): domains.append({"domain": name.strip(), "port": int(port) if port else 443}) target = { "name": request.form.get("name", "").strip().replace(" ", "_"), "target_host": request.form.get("target_host", "").strip(), "target_port": int(request.form.get("target_port", 80)), "target_scheme": request.form.get("target_scheme", "http"), "listen_port": int(request.form.get("listen_port", 0) or 0), "http_redirect": request.form.get("http_redirect") == "on", "http_port": int(request.form.get("http_port", 0) or 0), "domains": domains, "enabled": True, } if not target["name"] or not target["target_host"]: return redirect(url_for("index")) config["targets"].append(target) save_config(config) generate_nginx_config(config) reload_nginx() return redirect(url_for("index")) @app.route("/target//delete", methods=["POST"]) @login_required def delete_target(idx): config = load_config() if 0 <= idx < len(config["targets"]): config["targets"].pop(idx) save_config(config) generate_nginx_config(config) reload_nginx() return redirect(url_for("index")) @app.route("/target//toggle", methods=["POST"]) @login_required def toggle_target(idx): config = load_config() if 0 <= idx < len(config["targets"]): config["targets"][idx]["enabled"] = not config["targets"][idx].get("enabled", True) save_config(config) generate_nginx_config(config) reload_nginx() return redirect(url_for("index")) @app.route("/target//edit", methods=["POST"]) @login_required def edit_target(idx): config = load_config() if 0 <= idx < len(config["targets"]): domains = [] domain_names = request.form.getlist("domain_name[]") domain_ports = request.form.getlist("domain_port[]") for name, port in zip(domain_names, domain_ports): if name.strip(): domains.append({"domain": name.strip(), "port": int(port) if port else 443}) config["targets"][idx] = { "name": request.form.get("name", "").strip().replace(" ", "_"), "target_host": request.form.get("target_host", "").strip(), "target_port": int(request.form.get("target_port", 80)), "target_scheme": request.form.get("target_scheme", "http"), "listen_port": int(request.form.get("listen_port", 0) or 0), "http_redirect": request.form.get("http_redirect") == "on", "http_port": int(request.form.get("http_port", 0) or 0), "domains": domains, "enabled": config["targets"][idx].get("enabled", True), } save_config(config) generate_nginx_config(config) reload_nginx() return redirect(url_for("index")) # ==================== API Routes ==================== @app.route("/api/targets", methods=["GET"]) @login_required def api_list_targets(): config = load_config() return jsonify(config) @app.route("/api/targets", methods=["POST"]) @login_required def api_add_target(): config = load_config() target = request.get_json() if not target: return jsonify({"error": "Invalid JSON"}), 400 if not target.get("name") or not target.get("target_host"): return jsonify({"error": "name and target_host are required"}), 400 target.setdefault("target_port", 80) target.setdefault("target_scheme", "http") target.setdefault("listen_port", 0) target.setdefault("domains", []) target.setdefault("enabled", True) config["targets"].append(target) save_config(config) generate_nginx_config(config) success, msg = reload_nginx() return jsonify({"status": "ok" if success else "warning", "message": msg, "target": target}), 201 @app.route("/api/targets/", methods=["PUT"]) @login_required def api_update_target(idx): config = load_config() if idx < 0 or idx >= len(config["targets"]): return jsonify({"error": "Target not found"}), 404 target = request.get_json() if not target: return jsonify({"error": "Invalid JSON"}), 400 config["targets"][idx] = target save_config(config) generate_nginx_config(config) success, msg = reload_nginx() return jsonify({"status": "ok" if success else "warning", "message": msg}) @app.route("/api/targets/", methods=["DELETE"]) @login_required def api_delete_target(idx): config = load_config() if idx < 0 or idx >= len(config["targets"]): return jsonify({"error": "Target not found"}), 404 removed = config["targets"].pop(idx) save_config(config) generate_nginx_config(config) success, msg = reload_nginx() return jsonify({"status": "ok" if success else "warning", "message": msg, "removed": removed}) @app.route("/api/reload", methods=["POST"]) @login_required def api_reload(): config = load_config() generate_nginx_config(config) success, msg = reload_nginx() return jsonify({"status": "ok" if success else "error", "message": msg}) @app.route("/api/logs", methods=["GET"]) @login_required def api_logs(): lines = int(request.args.get("lines", 100)) log_type = request.args.get("type", "access") log_file = NGINX_ACCESS_LOG if log_type == "access" else NGINX_ERROR_LOG return jsonify({"type": log_type, "content": read_log_tail(log_file, lines)}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)