import asyncio import logging import os import uuid from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI, Request, Form, UploadFile from fastapi.responses import HTMLResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sse_starlette.sse import EventSourceResponse from app.database import init_db, get_settings, save_settings, get_log_entries, clear_log_entries, reset_downloaded_invoices from app.mail_processor import process_mailbox, send_test_email, test_imap_connection, create_imap_folder from app.scheduler import start_scheduler, configure_job, get_scheduler_status from app.scanner import process_scanned_pdf, generate_separator_pdf, UPLOAD_DIR from app.smb_processor import process_smb_share, test_smb_connection, create_smb_folder, list_smb_folders from app.amazon_processor import ( start_login as amazon_start_login, submit_otp as amazon_submit_otp, get_login_state as amazon_get_login_state, check_session_valid as amazon_check_session, clear_session as amazon_clear_session, process_amazon, start_interactive_login as amazon_start_interactive, get_browser_screenshot as amazon_get_screenshot, send_browser_click as amazon_browser_click, send_browser_type as amazon_browser_type, send_browser_key as amazon_browser_key, close_interactive_login as amazon_close_interactive, is_interactive_login_active as amazon_login_active, ) from app.amazon_api import ( get_oauth_authorize_url, exchange_auth_code, check_api_configured, process_amazon_api, ) logging.basicConfig( level=getattr(logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): await init_db() start_scheduler() settings = await get_settings() interval = int(settings.get("interval_minutes", 5)) enabled = settings.get("scheduler_enabled", "false") == "true" configure_job(interval, enabled) logger.info("Belegimport gestartet") yield logger.info("Belegimport beendet") app = FastAPI(title="Belegimport", lifespan=lifespan) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("scan.html", {"request": request}) @app.get("/settings", response_class=HTMLResponse) async def settings_page(request: Request): settings = await get_settings() logs = await get_log_entries(limit=20) status = get_scheduler_status() return templates.TemplateResponse("settings.html", { "request": request, "settings": settings, "logs": logs, "status": status, "message": None, "message_type": None, }) async def _save_form_settings(request: Request) -> dict: """Extract form data and save settings. Returns the saved data dict.""" form = await request.form() current = await get_settings() data = { "imap_server": form.get("imap_server", ""), "imap_port": form.get("imap_port", "993"), "imap_ssl": form.get("imap_ssl", "true"), "imap_username": form.get("imap_username", ""), "imap_password": form.get("imap_password") or current.get("imap_password", ""), "smtp_server": form.get("smtp_server", ""), "smtp_port": form.get("smtp_port", "587"), "smtp_ssl": form.get("smtp_ssl", "starttls"), "smtp_username": form.get("smtp_username", ""), "smtp_password": form.get("smtp_password") or current.get("smtp_password", ""), "import_email": form.get("import_email", ""), "import_email_eingang": form.get("import_email_eingang", ""), "import_email_ausgang": form.get("import_email_ausgang", ""), "source_folder": form.get("source_folder", "Rechnungen"), "processed_folder": form.get("processed_folder", "Rechnungen/Verarbeitet"), "source_folder_ausgang": form.get("source_folder_ausgang", ""), "processed_folder_ausgang": form.get("processed_folder_ausgang", ""), "interval_minutes": form.get("interval_minutes", "5"), "scheduler_enabled": form.get("scheduler_enabled", "false"), "fetch_since_date": form.get("fetch_since_date", ""), # SMB "smb_enabled": form.get("smb_enabled", "false"), "smb_server": form.get("smb_server", ""), "smb_port": form.get("smb_port", "445"), "smb_username": form.get("smb_username", ""), "smb_password": form.get("smb_password") or current.get("smb_password", ""), "smb_domain": form.get("smb_domain", ""), "smb_share": form.get("smb_share", ""), "smb_source_path": form.get("smb_source_path", ""), "smb_processed_path": form.get("smb_processed_path", "Verarbeitet"), "smb_source_path_ausgang": form.get("smb_source_path_ausgang", ""), "smb_processed_path_ausgang": form.get("smb_processed_path_ausgang", ""), "smb_mode": form.get("smb_mode", "forward"), # Debug "debug_save_amazon_pdfs": form.get("debug_save_amazon_pdfs", "false"), } await save_settings(data) interval = int(data["interval_minutes"]) enabled = data["scheduler_enabled"] == "true" configure_job(interval, enabled) return data @app.post("/settings", response_class=HTMLResponse) async def save(request: Request): await _save_form_settings(request) settings = await get_settings() logs = await get_log_entries(limit=20) status = get_scheduler_status() return templates.TemplateResponse("settings.html", { "request": request, "settings": settings, "logs": logs, "status": status, "message": "Einstellungen gespeichert", "message_type": "success", }) @app.post("/api/save-settings") async def api_save_settings(request: Request): await _save_form_settings(request) return JSONResponse({"success": True}) @app.post("/api/test-imap") async def api_test_imap(request: Request): # Save settings first, then test await _save_form_settings(request) result = await test_imap_connection() return JSONResponse(result) @app.post("/api/test-email") async def api_test_email(request: Request): # Save settings first, then test await _save_form_settings(request) result = await send_test_email() return JSONResponse(result) @app.post("/api/process") async def api_process(request: Request): # Save settings first, then process await _save_form_settings(request) result = await process_mailbox() return JSONResponse(result) @app.post("/api/create-folder") async def api_create_folder(request: Request): body = await request.json() folder_name = body.get("folder_name", "") result = await create_imap_folder(folder_name) if result["success"]: # Return updated folder list folders_result = await test_imap_connection() result["folders"] = folders_result.get("folders", []) return JSONResponse(result) @app.post("/api/test-smb") async def api_test_smb(request: Request): await _save_form_settings(request) result = await test_smb_connection() return JSONResponse(result) @app.post("/api/process-smb") async def api_process_smb(request: Request): await _save_form_settings(request) result = await process_smb_share() return JSONResponse(result) @app.post("/api/create-smb-folder") async def api_create_smb_folder(request: Request): body = await request.json() folder_name = body.get("folder_name", "") result = await create_smb_folder(folder_name) if result["success"]: folders_result = await list_smb_folders() result["folders"] = folders_result.get("folders", []) return JSONResponse(result) @app.get("/log", response_class=HTMLResponse) async def log_page(request: Request): logs = await get_log_entries(limit=500) return templates.TemplateResponse("log.html", { "request": request, "logs": logs, }) @app.post("/api/clear-log") async def api_clear_log(): count = await clear_log_entries() return JSONResponse({"success": True, "count": count}) @app.get("/api/status") async def api_status(): return get_scheduler_status() # --- Scan Upload --- # In-memory progress store for SSE _scan_progress: dict[str, list[dict]] = {} @app.get("/scan", response_class=HTMLResponse) async def scan_page(request: Request): return templates.TemplateResponse("scan.html", {"request": request}) @app.post("/api/scan-upload-chunk") async def scan_upload_chunk( request: Request, file: UploadFile = Form(...), chunk_index: int = Form(...), total_chunks: int = Form(...), upload_id: str = Form(...), filename: str = Form(...), ): # Validate upload_id is UUID-like to prevent path traversal try: uuid.UUID(upload_id) except ValueError: return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) part_path = UPLOAD_DIR / f"{upload_id}.part" # Append chunk to file mode = "ab" if chunk_index > 0 else "wb" content = await file.read() with open(part_path, mode) as f: f.write(content) # If last chunk, rename to .pdf if chunk_index >= total_chunks - 1: pdf_path = UPLOAD_DIR / f"{upload_id}.pdf" part_path.rename(pdf_path) return JSONResponse({"status": "complete", "upload_id": upload_id}) return JSONResponse({"status": "ok", "chunk": chunk_index}) @app.post("/api/scan-process") async def scan_process(request: Request): body = await request.json() upload_id = body.get("upload_id", "") beleg_type = body.get("beleg_type", "eingang") try: uuid.UUID(upload_id) except ValueError: return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400) pdf_path = UPLOAD_DIR / f"{upload_id}.pdf" if not pdf_path.exists(): return JSONResponse({"error": "Upload nicht gefunden"}, status_code=404) # Initialize progress _scan_progress[upload_id] = [] def progress_callback(stage, current, total, message=None): entry = {"stage": stage, "current": current, "total": total} if message: entry["message"] = message _scan_progress.setdefault(upload_id, []).append(entry) # Process in background task async def _process(): try: result = await process_scanned_pdf(str(pdf_path), progress_callback, beleg_type=beleg_type) _scan_progress.setdefault(upload_id, []).append({ "stage": "done", "result": result }) except Exception as e: logger.error(f"Scan-Verarbeitung fehlgeschlagen: {e}") _scan_progress.setdefault(upload_id, []).append({ "stage": "error", "message": str(e) }) finally: # Cleanup uploaded file try: pdf_path.unlink(missing_ok=True) except Exception: pass asyncio.create_task(_process()) return JSONResponse({"status": "processing", "upload_id": upload_id}) @app.get("/api/scan-status/{upload_id}") async def scan_status_sse(upload_id: str): try: uuid.UUID(upload_id) except ValueError: return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400) async def event_generator(): seen = 0 while True: entries = _scan_progress.get(upload_id, []) while seen < len(entries): entry = entries[seen] seen += 1 import json yield {"event": entry.get("stage", "status"), "data": json.dumps(entry)} if entry.get("stage") in ("done", "error"): # Cleanup progress data _scan_progress.pop(upload_id, None) return await asyncio.sleep(0.3) return EventSourceResponse(event_generator()) @app.get("/api/separator-pdf") async def separator_pdf(): pdf_bytes = generate_separator_pdf() return Response( content=pdf_bytes, media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=Trennseite.pdf"}, ) # --- Plattformen (Amazon) --- @app.get("/platforms", response_class=HTMLResponse) async def platforms_page(request: Request): settings = await get_settings() status = get_scheduler_status() return templates.TemplateResponse("platforms.html", { "request": request, "settings": settings, "status": status, "message": None, "message_type": None, }) @app.post("/api/amazon-settings") async def api_amazon_settings(request: Request): body = await request.json() current = await get_settings() data = { "amazon_enabled": body.get("amazon_enabled", "false"), "amazon_domain": body.get("amazon_domain", "amazon.de"), "amazon_email": body.get("amazon_email", ""), "amazon_password": body.get("amazon_password") or current.get("amazon_password", ""), "amazon_since_date": body.get("amazon_since_date", ""), "amazon_mode": body.get("amazon_mode", "browser"), "amazon_app_id": body.get("amazon_app_id", ""), "amazon_client_id": body.get("amazon_client_id", ""), "amazon_client_secret": body.get("amazon_client_secret") or current.get("amazon_client_secret", ""), } await save_settings(data) return JSONResponse({"success": True}) @app.get("/api/amazon-status") async def api_amazon_status(): settings = await get_settings() mode = settings.get("amazon_mode", "browser") if mode == "api": api_status = await check_api_configured() return JSONResponse({ "mode": "api", "session_valid": api_status.get("authorized", False), "login_active": False, "api_configured": api_status.get("configured", False), "api_authorized": api_status.get("authorized", False), }) else: valid = await amazon_check_session() login_active = amazon_login_active() return JSONResponse({ "mode": "browser", "session_valid": valid, "login_active": login_active, }) def _get_oauth_redirect_uri(request: Request) -> str: """Get OAuth redirect URI from env var or request.""" base = os.environ.get("OAUTH_REDIRECT_BASE", "").rstrip("/") if not base: base = str(request.base_url).rstrip("/") return f"{base}/api/amazon-oauth-callback" @app.get("/api/amazon-oauth-url") async def api_amazon_oauth_url(request: Request): """Generate OAuth authorization URL for Amazon Business API.""" settings = await get_settings() app_id = settings.get("amazon_app_id", "") if not app_id: return JSONResponse({"error": "App-ID nicht konfiguriert"}, status_code=400) redirect_uri = _get_oauth_redirect_uri(request) domain = settings.get("amazon_domain", "amazon.de") state = str(uuid.uuid4()) url = get_oauth_authorize_url(app_id, redirect_uri, domain, state) return JSONResponse({"url": url, "state": state}) @app.get("/api/amazon-oauth-callback") async def api_amazon_oauth_callback(request: Request): """Handle OAuth callback from Amazon.""" code = request.query_params.get("spapi_oauth_code") or request.query_params.get("code", "") error = request.query_params.get("error", "") if error: return HTMLResponse(f"
{error}
Fenster kann geschlossen werden.
") if not code: return HTMLResponse("Fenster kann geschlossen werden.
") settings = await get_settings() client_id = settings.get("amazon_client_id", "") client_secret = settings.get("amazon_client_secret", "") redirect_uri = _get_oauth_redirect_uri(request) result = await exchange_auth_code(code, client_id, client_secret, redirect_uri) if "error" in result: return HTMLResponse(f"{result['error']}
") refresh_token = result.get("refresh_token", "") if refresh_token: await save_settings({"amazon_refresh_token": refresh_token}) return HTMLResponse( "Refresh-Token wurde gespeichert. Dieses Fenster kann geschlossen werden.
" "" ) return HTMLResponse("