belege-import/app/main.py

328 lines
10 KiB
Python

import asyncio
import logging
import os
import uuid
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request, Form, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse, Response, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sse_starlette.sse import EventSourceResponse
from app.database import init_db, get_settings, save_settings, get_log_entries
from app.mail_processor import process_mailbox, send_test_email, test_imap_connection, create_imap_folder
from app.scheduler import start_scheduler, configure_job, get_scheduler_status
from app.scanner import process_scanned_pdf, generate_separator_pdf, UPLOAD_DIR
from app.smb_processor import process_smb_share, test_smb_connection, create_smb_folder, list_smb_folders
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
start_scheduler()
settings = await get_settings()
interval = int(settings.get("interval_minutes", 5))
enabled = settings.get("scheduler_enabled", "false") == "true"
configure_job(interval, enabled)
logger.info("Belegimport gestartet")
yield
logger.info("Belegimport beendet")
app = FastAPI(title="Belegimport", lifespan=lifespan)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
settings = await get_settings()
logs = await get_log_entries(limit=20)
status = get_scheduler_status()
return templates.TemplateResponse("settings.html", {
"request": request,
"settings": settings,
"logs": logs,
"status": status,
"message": None,
"message_type": None,
})
async def _save_form_settings(request: Request) -> dict:
"""Extract form data and save settings. Returns the saved data dict."""
form = await request.form()
current = await get_settings()
data = {
"imap_server": form.get("imap_server", ""),
"imap_port": form.get("imap_port", "993"),
"imap_ssl": form.get("imap_ssl", "true"),
"imap_username": form.get("imap_username", ""),
"imap_password": form.get("imap_password") or current.get("imap_password", ""),
"smtp_server": form.get("smtp_server", ""),
"smtp_port": form.get("smtp_port", "587"),
"smtp_ssl": form.get("smtp_ssl", "starttls"),
"smtp_username": form.get("smtp_username", ""),
"smtp_password": form.get("smtp_password") or current.get("smtp_password", ""),
"import_email": form.get("import_email", ""),
"source_folder": form.get("source_folder", "Rechnungen"),
"processed_folder": form.get("processed_folder", "Rechnungen/Verarbeitet"),
"interval_minutes": form.get("interval_minutes", "5"),
"scheduler_enabled": form.get("scheduler_enabled", "false"),
"fetch_since_date": form.get("fetch_since_date", ""),
# SMB
"smb_enabled": form.get("smb_enabled", "false"),
"smb_server": form.get("smb_server", ""),
"smb_port": form.get("smb_port", "445"),
"smb_username": form.get("smb_username", ""),
"smb_password": form.get("smb_password") or current.get("smb_password", ""),
"smb_domain": form.get("smb_domain", ""),
"smb_share": form.get("smb_share", ""),
"smb_source_path": form.get("smb_source_path", ""),
"smb_processed_path": form.get("smb_processed_path", "Verarbeitet"),
"smb_mode": form.get("smb_mode", "forward"),
}
await save_settings(data)
interval = int(data["interval_minutes"])
enabled = data["scheduler_enabled"] == "true"
configure_job(interval, enabled)
return data
@app.post("/settings", response_class=HTMLResponse)
async def save(request: Request):
await _save_form_settings(request)
settings = await get_settings()
logs = await get_log_entries(limit=20)
status = get_scheduler_status()
return templates.TemplateResponse("settings.html", {
"request": request,
"settings": settings,
"logs": logs,
"status": status,
"message": "Einstellungen gespeichert",
"message_type": "success",
})
@app.post("/api/save-settings")
async def api_save_settings(request: Request):
await _save_form_settings(request)
return JSONResponse({"success": True})
@app.post("/api/test-imap")
async def api_test_imap(request: Request):
# Save settings first, then test
await _save_form_settings(request)
result = await test_imap_connection()
return JSONResponse(result)
@app.post("/api/test-email")
async def api_test_email(request: Request):
# Save settings first, then test
await _save_form_settings(request)
result = await send_test_email()
return JSONResponse(result)
@app.post("/api/process")
async def api_process(request: Request):
# Save settings first, then process
await _save_form_settings(request)
result = await process_mailbox()
return JSONResponse(result)
@app.post("/api/create-folder")
async def api_create_folder(request: Request):
body = await request.json()
folder_name = body.get("folder_name", "")
result = await create_imap_folder(folder_name)
if result["success"]:
# Return updated folder list
folders_result = await test_imap_connection()
result["folders"] = folders_result.get("folders", [])
return JSONResponse(result)
@app.post("/api/test-smb")
async def api_test_smb(request: Request):
await _save_form_settings(request)
result = await test_smb_connection()
return JSONResponse(result)
@app.post("/api/process-smb")
async def api_process_smb(request: Request):
await _save_form_settings(request)
result = await process_smb_share()
return JSONResponse(result)
@app.post("/api/create-smb-folder")
async def api_create_smb_folder(request: Request):
body = await request.json()
folder_name = body.get("folder_name", "")
result = await create_smb_folder(folder_name)
if result["success"]:
folders_result = await list_smb_folders()
result["folders"] = folders_result.get("folders", [])
return JSONResponse(result)
@app.get("/log", response_class=HTMLResponse)
async def log_page(request: Request):
logs = await get_log_entries(limit=500)
return templates.TemplateResponse("log.html", {
"request": request,
"logs": logs,
})
@app.get("/api/status")
async def api_status():
return get_scheduler_status()
# --- Scan Upload ---
# In-memory progress store for SSE
_scan_progress: dict[str, list[dict]] = {}
@app.get("/scan", response_class=HTMLResponse)
async def scan_page(request: Request):
return templates.TemplateResponse("scan.html", {"request": request})
@app.post("/api/scan-upload-chunk")
async def scan_upload_chunk(
request: Request,
file: UploadFile = Form(...),
chunk_index: int = Form(...),
total_chunks: int = Form(...),
upload_id: str = Form(...),
filename: str = Form(...),
):
# Validate upload_id is UUID-like to prevent path traversal
try:
uuid.UUID(upload_id)
except ValueError:
return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
part_path = UPLOAD_DIR / f"{upload_id}.part"
# Append chunk to file
mode = "ab" if chunk_index > 0 else "wb"
content = await file.read()
with open(part_path, mode) as f:
f.write(content)
# If last chunk, rename to .pdf
if chunk_index >= total_chunks - 1:
pdf_path = UPLOAD_DIR / f"{upload_id}.pdf"
part_path.rename(pdf_path)
return JSONResponse({"status": "complete", "upload_id": upload_id})
return JSONResponse({"status": "ok", "chunk": chunk_index})
@app.post("/api/scan-process")
async def scan_process(request: Request):
body = await request.json()
upload_id = body.get("upload_id", "")
try:
uuid.UUID(upload_id)
except ValueError:
return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400)
pdf_path = UPLOAD_DIR / f"{upload_id}.pdf"
if not pdf_path.exists():
return JSONResponse({"error": "Upload nicht gefunden"}, status_code=404)
# Initialize progress
_scan_progress[upload_id] = []
def progress_callback(stage, current, total, message=None):
entry = {"stage": stage, "current": current, "total": total}
if message:
entry["message"] = message
_scan_progress.setdefault(upload_id, []).append(entry)
# Process in background task
async def _process():
try:
result = await process_scanned_pdf(str(pdf_path), progress_callback)
_scan_progress.setdefault(upload_id, []).append({
"stage": "done", "result": result
})
except Exception as e:
logger.error(f"Scan-Verarbeitung fehlgeschlagen: {e}")
_scan_progress.setdefault(upload_id, []).append({
"stage": "error", "message": str(e)
})
finally:
# Cleanup uploaded file
try:
pdf_path.unlink(missing_ok=True)
except Exception:
pass
asyncio.create_task(_process())
return JSONResponse({"status": "processing", "upload_id": upload_id})
@app.get("/api/scan-status/{upload_id}")
async def scan_status_sse(upload_id: str):
try:
uuid.UUID(upload_id)
except ValueError:
return JSONResponse({"error": "Ungültige Upload-ID"}, status_code=400)
async def event_generator():
seen = 0
while True:
entries = _scan_progress.get(upload_id, [])
while seen < len(entries):
entry = entries[seen]
seen += 1
import json
yield {"event": entry.get("stage", "status"), "data": json.dumps(entry)}
if entry.get("stage") in ("done", "error"):
# Cleanup progress data
_scan_progress.pop(upload_id, None)
return
await asyncio.sleep(0.3)
return EventSourceResponse(event_generator())
@app.get("/api/separator-pdf")
async def separator_pdf():
pdf_bytes = generate_separator_pdf()
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=Trennseite.pdf"},
)