"""Certificate Authority management web routes.""" from fastapi import APIRouter, Request, Depends, Form, UploadFile, File from fastapi.responses import HTMLResponse, RedirectResponse, Response from sqlalchemy.orm import Session from typing import Optional from ..database import get_db from ..models.user import User from ..models.certificate_authority import CertificateAuthority, CAStatus from ..services.certificate_service import CertificateService from .deps import require_admin_web, flash, get_flashed_messages router = APIRouter() @router.get("/ca", response_class=HTMLResponse) async def list_cas( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """List all Certificate Authorities.""" if current_user.is_super_admin: cas = db.query(CertificateAuthority).all() else: cas = db.query(CertificateAuthority).filter( (CertificateAuthority.tenant_id == current_user.tenant_id) | (CertificateAuthority.tenant_id == None) ).all() return request.app.state.templates.TemplateResponse( "ca/list.html", { "request": request, "current_user": current_user, "cas": cas, "flash_messages": get_flashed_messages(request) } ) @router.get("/ca/new", response_class=HTMLResponse) async def new_ca_form( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """New CA form.""" return request.app.state.templates.TemplateResponse( "ca/form.html", { "request": request, "current_user": current_user, "ca": None, "mode": "create", "flash_messages": get_flashed_messages(request) } ) @router.post("/ca/new") async def create_ca( request: Request, name: str = Form(...), description: str = Form(None), key_size: int = Form(4096), validity_days: int = Form(3650), organization: str = Form("mGuard VPN"), country: str = Form("DE"), state: str = Form("NRW"), city: str = Form("Dortmund"), is_default: bool = Form(False), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Create new CA.""" try: service = CertificateService(db) tenant_id = None if current_user.is_super_admin else current_user.tenant_id ca = service.create_ca( name=name, key_size=key_size, validity_days=validity_days, organization=organization, country=country, state=state, city=city, tenant_id=tenant_id, created_by_id=current_user.id, is_default=is_default ) flash(request, f"CA '{name}' wird erstellt. DH-Parameter werden im Hintergrund generiert...", "success") return RedirectResponse(url=f"/ca/{ca.id}", status_code=303) except Exception as e: flash(request, f"Fehler beim Erstellen der CA: {str(e)}", "danger") return RedirectResponse(url="/ca/new", status_code=303) @router.get("/ca/import", response_class=HTMLResponse) async def import_ca_form( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Import CA form.""" return request.app.state.templates.TemplateResponse( "ca/form.html", { "request": request, "current_user": current_user, "ca": None, "mode": "import", "flash_messages": get_flashed_messages(request) } ) @router.post("/ca/import") async def import_ca( request: Request, name: str = Form(...), description: str = Form(None), ca_cert: UploadFile = File(...), ca_key: UploadFile = File(...), dh_params: Optional[UploadFile] = File(None), db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Import existing CA from files.""" try: service = CertificateService(db) ca_cert_pem = (await ca_cert.read()).decode('utf-8') ca_key_pem = (await ca_key.read()).decode('utf-8') dh_params_pem = (await dh_params.read()).decode('utf-8') if dh_params else None tenant_id = None if current_user.is_super_admin else current_user.tenant_id ca = service.import_ca( name=name, ca_cert_pem=ca_cert_pem, ca_key_pem=ca_key_pem, dh_params_pem=dh_params_pem, tenant_id=tenant_id, created_by_id=current_user.id ) if dh_params_pem: flash(request, f"CA '{name}' erfolgreich importiert", "success") else: flash(request, f"CA '{name}' importiert. DH-Parameter werden generiert...", "success") return RedirectResponse(url=f"/ca/{ca.id}", status_code=303) except Exception as e: flash(request, f"Fehler beim Importieren: {str(e)}", "danger") return RedirectResponse(url="/ca/import", status_code=303) @router.get("/ca/{ca_id}", response_class=HTMLResponse) async def ca_detail( request: Request, ca_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """CA detail page.""" ca = db.query(CertificateAuthority).filter( CertificateAuthority.id == ca_id ).first() if not ca: flash(request, "CA nicht gefunden", "danger") return RedirectResponse(url="/ca", status_code=303) # Check access if not current_user.is_super_admin and ca.tenant_id != current_user.tenant_id and ca.tenant_id is not None: flash(request, "Zugriff verweigert", "danger") return RedirectResponse(url="/ca", status_code=303) return request.app.state.templates.TemplateResponse( "ca/detail.html", { "request": request, "current_user": current_user, "ca": ca, "flash_messages": get_flashed_messages(request) } ) @router.post("/ca/{ca_id}/set-default") async def set_ca_default( request: Request, ca_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Set CA as default.""" ca = db.query(CertificateAuthority).filter( CertificateAuthority.id == ca_id ).first() if not ca: flash(request, "CA nicht gefunden", "danger") return RedirectResponse(url="/ca", status_code=303) # Unset other defaults for same tenant db.query(CertificateAuthority).filter( CertificateAuthority.tenant_id == ca.tenant_id, CertificateAuthority.id != ca.id ).update({"is_default": False}) ca.is_default = True db.commit() flash(request, f"'{ca.name}' ist jetzt die Standard-CA", "success") return RedirectResponse(url=f"/ca/{ca_id}", status_code=303) @router.get("/ca/{ca_id}/download/cert") async def download_ca_cert( ca_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Download CA certificate.""" ca = db.query(CertificateAuthority).filter( CertificateAuthority.id == ca_id ).first() if not ca or not ca.ca_cert: return Response(status_code=404) return Response( content=ca.ca_cert, media_type="application/x-pem-file", headers={"Content-Disposition": f"attachment; filename={ca.name}-ca.crt"} ) @router.get("/ca/{ca_id}/download/crl") async def download_crl( ca_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Download Certificate Revocation List.""" ca = db.query(CertificateAuthority).filter( CertificateAuthority.id == ca_id ).first() if not ca: return Response(status_code=404) service = CertificateService(db) crl = service.get_crl(ca) return Response( content=crl, media_type="application/x-pem-file", headers={"Content-Disposition": f"attachment; filename={ca.name}-crl.pem"} ) @router.post("/ca/{ca_id}/delete") async def delete_ca( request: Request, ca_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_web) ): """Delete CA.""" ca = db.query(CertificateAuthority).filter( CertificateAuthority.id == ca_id ).first() if not ca: flash(request, "CA nicht gefunden", "danger") return RedirectResponse(url="/ca", status_code=303) # Check if CA has servers or profiles if ca.vpn_servers: flash(request, "CA wird noch von VPN-Servern verwendet. Bitte zuerst löschen.", "danger") return RedirectResponse(url=f"/ca/{ca_id}", status_code=303) if ca.vpn_profiles: flash(request, "CA wird noch von VPN-Profilen verwendet. Bitte zuerst löschen.", "danger") return RedirectResponse(url=f"/ca/{ca_id}", status_code=303) name = ca.name db.delete(ca) db.commit() flash(request, f"CA '{name}' gelöscht", "warning") return RedirectResponse(url="/ca", status_code=303)