belege-import/app/mail_processor.py

347 lines
12 KiB
Python

import imaplib
import smtplib
import ssl
import email
from email import policy
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
import logging
from app.database import get_settings, add_log_entry
logger = logging.getLogger(__name__)
def _connect_imap(settings: dict) -> imaplib.IMAP4_SSL | imaplib.IMAP4:
server = settings["imap_server"]
port = int(settings.get("imap_port", 993))
use_ssl = settings.get("imap_ssl", "true") == "true"
if use_ssl:
ctx = ssl.create_default_context()
conn = imaplib.IMAP4_SSL(server, port, ssl_context=ctx)
else:
conn = imaplib.IMAP4(server, port)
conn.login(settings["imap_username"], settings["imap_password"])
return conn
def _connect_smtp(settings: dict) -> smtplib.SMTP | smtplib.SMTP_SSL:
server = settings["smtp_server"]
port = int(settings.get("smtp_port", 587))
mode = settings.get("smtp_ssl", "starttls")
if mode == "ssl":
ctx = ssl.create_default_context()
conn = smtplib.SMTP_SSL(server, port, context=ctx)
else:
conn = smtplib.SMTP(server, port)
if mode == "starttls":
ctx = ssl.create_default_context()
conn.starttls(context=ctx)
conn.login(settings["smtp_username"], settings["smtp_password"])
return conn
def _extract_attachments(msg: email.message.Message) -> list[tuple[str, bytes]]:
attachments = []
for part in msg.walk():
content_disposition = part.get("Content-Disposition", "")
if "attachment" not in content_disposition and "inline" not in content_disposition:
continue
filename = part.get_filename()
if not filename:
continue
# Decode filename if encoded
decoded_parts = email.header.decode_header(filename)
filename = ""
for data, charset in decoded_parts:
if isinstance(data, bytes):
filename += data.decode(charset or "utf-8", errors="replace")
else:
filename += data
if not filename.lower().endswith(".pdf"):
continue
payload = part.get_payload(decode=True)
if payload:
attachments.append((filename, payload))
return attachments
def _build_forward_email(
from_addr: str,
to_addr: str,
original_subject: str,
original_from: str,
attachments: list[tuple[str, bytes]],
) -> MIMEMultipart:
msg = MIMEMultipart()
msg["From"] = from_addr
msg["To"] = to_addr
msg["Subject"] = f"Belegimport: {original_subject}"
body = (
f"Automatisch weitergeleitet von LexOffice Belegimport.\n"
f"Original-Absender: {original_from}\n"
f"Original-Betreff: {original_subject}\n"
f"Anzahl Anhänge: {len(attachments)}"
)
msg.attach(MIMEText(body, "plain", "utf-8"))
for filename, data in attachments:
part = MIMEBase("application", "octet-stream")
part.set_payload(data)
encoders.encode_base64(part)
part.add_header("Content-Disposition", "attachment", filename=filename)
msg.attach(part)
return msg
def _ensure_folder_exists(conn: imaplib.IMAP4, folder: str):
status, _ = conn.select(f'"{folder}"')
if status != "OK":
conn.create(f'"{folder}"')
conn.subscribe(f'"{folder}"')
# Go back to INBOX to not stay in the folder
conn.select("INBOX")
def _move_email(conn: imaplib.IMAP4, msg_uid: bytes, dest_folder: str):
result = conn.uid("COPY", msg_uid, f'"{dest_folder}"')
if result[0] == "OK":
conn.uid("STORE", msg_uid, "+FLAGS", "(\\Deleted)")
conn.expunge()
async def process_mailbox() -> dict:
settings = await get_settings()
if not settings.get("imap_server") or not settings.get("lexoffice_email"):
logger.warning("IMAP oder LexOffice-Email nicht konfiguriert")
return {"processed": 0, "skipped": 0, "errors": 0, "error": "Nicht konfiguriert"}
source_folder = settings.get("source_folder", "INBOX")
processed_folder = settings.get("processed_folder", "INBOX/Verarbeitet")
lexoffice_email = settings["lexoffice_email"]
smtp_from = settings.get("smtp_username", "")
processed = 0
skipped = 0
errors = 0
imap_conn = None
smtp_conn = None
try:
imap_conn = _connect_imap(settings)
smtp_conn = _connect_smtp(settings)
_ensure_folder_exists(imap_conn, processed_folder)
status, _ = imap_conn.select(f'"{source_folder}"')
if status != "OK":
raise Exception(f"Ordner '{source_folder}' konnte nicht geöffnet werden")
# Build IMAP search criteria
search_criteria = "ALL"
fetch_since = settings.get("fetch_since_date", "")
if fetch_since:
try:
from datetime import datetime
dt = datetime.strptime(fetch_since, "%Y-%m-%d")
imap_date = dt.strftime("%d-%b-%Y")
search_criteria = f'(SINCE {imap_date})'
except ValueError:
logger.warning(f"Ungültiges Datum: {fetch_since}, verwende ALL")
status, data = imap_conn.uid("SEARCH", None, search_criteria)
if status != "OK" or not data[0]:
logger.info("Keine Emails im Ordner gefunden")
return {"processed": 0, "skipped": 0, "errors": 0}
msg_uids = data[0].split()
logger.info(f"{len(msg_uids)} Email(s) im Ordner '{source_folder}' gefunden")
for msg_uid in msg_uids:
try:
status, msg_data = imap_conn.uid("FETCH", msg_uid, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email, policy=policy.default)
subject = str(msg.get("Subject", "(Kein Betreff)"))
from_addr = str(msg.get("From", "(Unbekannt)"))
attachments = _extract_attachments(msg)
if not attachments:
skipped += 1
logger.debug(f"Übersprungen (keine Anhänge): {subject}")
continue
forward_msg = _build_forward_email(
from_addr=smtp_from,
to_addr=lexoffice_email,
original_subject=subject,
original_from=from_addr,
attachments=attachments,
)
smtp_conn.send_message(forward_msg)
# Re-select source folder before move (in case _ensure_folder changed it)
imap_conn.select(f'"{source_folder}"')
_move_email(imap_conn, msg_uid, processed_folder)
# Re-select after expunge to keep UIDs valid
imap_conn.select(f'"{source_folder}"')
processed += 1
logger.info(
f"Verarbeitet: {subject} ({len(attachments)} Anhänge)"
)
await add_log_entry(
email_subject=subject,
email_from=from_addr,
attachments_count=len(attachments),
status="success",
)
except Exception as e:
errors += 1
logger.error(f"Fehler bei Email UID {msg_uid}: {e}")
try:
await add_log_entry(
email_subject=subject if "subject" in dir() else "?",
email_from=from_addr if "from_addr" in dir() else "?",
attachments_count=0,
status="error",
error_message=str(e),
)
except Exception:
pass
except Exception as e:
logger.error(f"Verbindungsfehler: {e}")
await add_log_entry(
email_subject="",
email_from="",
attachments_count=0,
status="error",
error_message=f"Verbindungsfehler: {e}",
)
return {"processed": processed, "skipped": skipped, "errors": errors + 1, "error": str(e)}
finally:
if imap_conn:
try:
imap_conn.logout()
except Exception:
pass
if smtp_conn:
try:
smtp_conn.quit()
except Exception:
pass
logger.info(
f"Fertig: {processed} verarbeitet, {skipped} übersprungen, {errors} Fehler"
)
return {"processed": processed, "skipped": skipped, "errors": errors}
async def send_test_email() -> dict:
settings = await get_settings()
if not settings.get("smtp_server") or not settings.get("lexoffice_email"):
return {"success": False, "error": "SMTP oder LexOffice-Email nicht konfiguriert"}
try:
smtp_conn = _connect_smtp(settings)
msg = MIMEMultipart()
msg["From"] = settings["smtp_username"]
msg["To"] = settings["lexoffice_email"]
msg["Subject"] = "LexOffice Belegimport - Test-Email"
msg.attach(MIMEText(
"Dies ist eine Test-Email vom LexOffice Belegimport Service.\n"
"Wenn Sie diese Email erhalten, funktioniert die SMTP-Verbindung.",
"plain",
"utf-8",
))
smtp_conn.send_message(msg)
smtp_conn.quit()
return {"success": True}
except Exception as e:
logger.error(f"Test-Email fehlgeschlagen: {e}")
return {"success": False, "error": str(e)}
async def create_imap_folder(folder_name: str) -> dict:
settings = await get_settings()
if not settings.get("imap_server"):
return {"success": False, "error": "IMAP nicht konfiguriert"}
if not folder_name or not folder_name.strip():
return {"success": False, "error": "Ordnername darf nicht leer sein"}
folder_name = folder_name.strip()
try:
conn = _connect_imap(settings)
status, response = conn.create(f'"{folder_name}"')
if status == "OK":
conn.subscribe(f'"{folder_name}"')
conn.logout()
if status == "OK":
return {"success": True}
else:
msg = response[0].decode() if response and isinstance(response[0], bytes) else str(response)
return {"success": False, "error": msg}
except Exception as e:
logger.error(f"Ordner erstellen fehlgeschlagen: {e}")
return {"success": False, "error": str(e)}
async def test_imap_connection() -> dict:
settings = await get_settings()
if not settings.get("imap_server"):
return {"success": False, "error": "IMAP nicht konfiguriert", "folders": []}
try:
conn = _connect_imap(settings)
status, folder_data = conn.list()
folders = []
delimiter = "."
if status == "OK":
for item in folder_data:
decoded = item.decode() if isinstance(item, bytes) else item
# Parse IMAP LIST response: (\\flags) "delimiter" "name"
parts = decoded.split('"')
if len(parts) >= 4:
# parts[1] is the delimiter, parts[3] is the folder name
if not delimiter or delimiter == ".":
delimiter = parts[1]
folders.append(parts[-2])
elif len(parts) >= 2:
folders.append(parts[-1].strip())
conn.logout()
return {"success": True, "folders": sorted(folders), "delimiter": delimiter}
except Exception as e:
logger.error(f"IMAP-Test fehlgeschlagen: {e}")
return {"success": False, "error": str(e), "folders": [], "delimiter": "."}