https-proxy-with-self-signe.../app/app.py

319 lines
11 KiB
Python

import json
import os
import subprocess
from functools import wraps
from pathlib import Path
from flask import Flask, jsonify, redirect, render_template, request, url_for
app = Flask(__name__)
CONFIG_FILE = "/data/proxy_config.json"
NGINX_CONF_DIR = "/etc/nginx/conf.d"
NGINX_UPSTREAM_CONF = f"{NGINX_CONF_DIR}/proxy-targets.conf"
USERNAME = os.environ.get("WEBUI_USERNAME", "admin")
PASSWORD = os.environ.get("WEBUI_PASSWORD", "admin123")
def load_config():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
return json.load(f)
return {"targets": []}
def save_config(config):
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
def generate_nginx_config(config):
"""Generate nginx upstream/server blocks from config."""
lines = []
for i, target in enumerate(config.get("targets", [])):
name = target.get("name", f"target_{i}")
target_host = target.get("target_host", "")
target_port = target.get("target_port", 80)
listen_port = target.get("listen_port", 0)
domains = target.get("domains", [])
target_scheme = target.get("target_scheme", "http")
if not target_host or not target.get("enabled", True):
continue
upstream_name = f"upstream_{name}"
lines.append(f"upstream {upstream_name} {{")
lines.append(f" server {target_host}:{target_port};")
lines.append("}")
lines.append("")
# Domain-based routing
if domains:
for domain_entry in domains:
domain = domain_entry.get("domain", "")
domain_port = domain_entry.get("port", 443)
if not domain:
continue
lines.append("server {")
lines.append(f" listen {domain_port} ssl;")
lines.append(f" server_name {domain};")
lines.append("")
lines.append(" ssl_certificate /certs/server.crt;")
lines.append(" ssl_certificate_key /certs/server.key;")
lines.append(" ssl_protocols TLSv1.2 TLSv1.3;")
lines.append("")
lines.append(f" location / {{")
lines.append(f" proxy_pass {target_scheme}://{upstream_name};")
lines.append(" proxy_set_header Host $host;")
lines.append(" proxy_set_header X-Real-IP $remote_addr;")
lines.append(" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
lines.append(" proxy_set_header X-Forwarded-Proto $scheme;")
lines.append(" proxy_http_version 1.1;")
lines.append(' proxy_set_header Upgrade $http_upgrade;')
lines.append(' proxy_set_header Connection "upgrade";')
lines.append(" }")
lines.append("}")
lines.append("")
# IP/Port-based routing
if listen_port:
lines.append("server {")
lines.append(f" listen {listen_port} ssl;")
lines.append(f" server_name _;")
lines.append("")
lines.append(" ssl_certificate /certs/server.crt;")
lines.append(" ssl_certificate_key /certs/server.key;")
lines.append(" ssl_protocols TLSv1.2 TLSv1.3;")
lines.append("")
lines.append(f" location / {{")
lines.append(f" proxy_pass {target_scheme}://{upstream_name};")
lines.append(" proxy_set_header Host $host;")
lines.append(" proxy_set_header X-Real-IP $remote_addr;")
lines.append(" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
lines.append(" proxy_set_header X-Forwarded-Proto $scheme;")
lines.append(" proxy_http_version 1.1;")
lines.append(' proxy_set_header Upgrade $http_upgrade;')
lines.append(' proxy_set_header Connection "upgrade";')
lines.append(" }")
lines.append("}")
lines.append("")
conf_content = "\n".join(lines)
os.makedirs(NGINX_CONF_DIR, exist_ok=True)
with open(NGINX_UPSTREAM_CONF, "w") as f:
f.write(conf_content)
return conf_content
def reload_nginx():
"""Reload nginx configuration."""
try:
result = subprocess.run(
["nginx", "-t"],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
return False, f"Nginx config test failed: {result.stderr}"
result = subprocess.run(
["nginx", "-s", "reload"],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
return False, f"Nginx reload failed: {result.stderr}"
return True, "Nginx reloaded successfully"
except Exception as e:
return False, str(e)
def check_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or auth.username != USERNAME or auth.password != PASSWORD:
return (
"Authentication required",
401,
{"WWW-Authenticate": 'Basic realm="Proxy Admin"'},
)
return f(*args, **kwargs)
return decorated
# ==================== WebUI Routes ====================
@app.route("/")
@check_auth
def index():
config = load_config()
return render_template("index.html", config=config)
@app.route("/target/add", methods=["POST"])
@check_auth
def add_target():
config = load_config()
domains = []
domain_names = request.form.getlist("domain_name[]")
domain_ports = request.form.getlist("domain_port[]")
for name, port in zip(domain_names, domain_ports):
if name.strip():
domains.append({"domain": name.strip(), "port": int(port) if port else 443})
target = {
"name": request.form.get("name", "").strip().replace(" ", "_"),
"target_host": request.form.get("target_host", "").strip(),
"target_port": int(request.form.get("target_port", 80)),
"target_scheme": request.form.get("target_scheme", "http"),
"listen_port": int(request.form.get("listen_port", 0) or 0),
"domains": domains,
"enabled": True,
}
if not target["name"] or not target["target_host"]:
return redirect(url_for("index"))
config["targets"].append(target)
save_config(config)
generate_nginx_config(config)
reload_nginx()
return redirect(url_for("index"))
@app.route("/target/<int:idx>/delete", methods=["POST"])
@check_auth
def delete_target(idx):
config = load_config()
if 0 <= idx < len(config["targets"]):
config["targets"].pop(idx)
save_config(config)
generate_nginx_config(config)
reload_nginx()
return redirect(url_for("index"))
@app.route("/target/<int:idx>/toggle", methods=["POST"])
@check_auth
def toggle_target(idx):
config = load_config()
if 0 <= idx < len(config["targets"]):
config["targets"][idx]["enabled"] = not config["targets"][idx].get("enabled", True)
save_config(config)
generate_nginx_config(config)
reload_nginx()
return redirect(url_for("index"))
@app.route("/target/<int:idx>/edit", methods=["POST"])
@check_auth
def edit_target(idx):
config = load_config()
if 0 <= idx < len(config["targets"]):
domains = []
domain_names = request.form.getlist("domain_name[]")
domain_ports = request.form.getlist("domain_port[]")
for name, port in zip(domain_names, domain_ports):
if name.strip():
domains.append({"domain": name.strip(), "port": int(port) if port else 443})
config["targets"][idx] = {
"name": request.form.get("name", "").strip().replace(" ", "_"),
"target_host": request.form.get("target_host", "").strip(),
"target_port": int(request.form.get("target_port", 80)),
"target_scheme": request.form.get("target_scheme", "http"),
"listen_port": int(request.form.get("listen_port", 0) or 0),
"domains": domains,
"enabled": config["targets"][idx].get("enabled", True),
}
save_config(config)
generate_nginx_config(config)
reload_nginx()
return redirect(url_for("index"))
# ==================== API Routes ====================
@app.route("/api/targets", methods=["GET"])
@check_auth
def api_list_targets():
config = load_config()
return jsonify(config)
@app.route("/api/targets", methods=["POST"])
@check_auth
def api_add_target():
config = load_config()
target = request.get_json()
if not target:
return jsonify({"error": "Invalid JSON"}), 400
if not target.get("name") or not target.get("target_host"):
return jsonify({"error": "name and target_host are required"}), 400
target.setdefault("target_port", 80)
target.setdefault("target_scheme", "http")
target.setdefault("listen_port", 0)
target.setdefault("domains", [])
target.setdefault("enabled", True)
config["targets"].append(target)
save_config(config)
generate_nginx_config(config)
success, msg = reload_nginx()
return jsonify({"status": "ok" if success else "warning", "message": msg, "target": target}), 201
@app.route("/api/targets/<int:idx>", methods=["PUT"])
@check_auth
def api_update_target(idx):
config = load_config()
if idx < 0 or idx >= len(config["targets"]):
return jsonify({"error": "Target not found"}), 404
target = request.get_json()
if not target:
return jsonify({"error": "Invalid JSON"}), 400
config["targets"][idx] = target
save_config(config)
generate_nginx_config(config)
success, msg = reload_nginx()
return jsonify({"status": "ok" if success else "warning", "message": msg})
@app.route("/api/targets/<int:idx>", methods=["DELETE"])
@check_auth
def api_delete_target(idx):
config = load_config()
if idx < 0 or idx >= len(config["targets"]):
return jsonify({"error": "Target not found"}), 404
removed = config["targets"].pop(idx)
save_config(config)
generate_nginx_config(config)
success, msg = reload_nginx()
return jsonify({"status": "ok" if success else "warning", "message": msg, "removed": removed})
@app.route("/api/reload", methods=["POST"])
@check_auth
def api_reload():
config = load_config()
generate_nginx_config(config)
success, msg = reload_nginx()
return jsonify({"status": "ok" if success else "error", "message": msg})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)