import asyncio import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.db_models import Account, FilterRule, LogLevel, ProcessedMail from app.services.encryption import decrypt from app.services.filter_engine import apply_rules from app.services.imap_client import IMAPClient from app.services.log_service import cleanup_old_logs, write_log logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler() def _build_smtp_config(account: Account) -> dict | None: if account.smtp_host and account.smtp_username and account.smtp_password: return { "host": account.smtp_host, "port": account.smtp_port or 465, "username": account.smtp_username, "password": decrypt(account.smtp_password), } return None def _poll_account_sync(account_id: int) -> None: db: Session = SessionLocal() try: account = db.get(Account, account_id) if not account or not account.enabled: return rules = ( db.query(FilterRule) .filter(FilterRule.account_id == account_id, FilterRule.enabled.is_(True)) .order_by(FilterRule.priority) .all() ) write_log( message=f"Poll gestartet ({len(rules)} aktive Regel(n))", level=LogLevel.INFO, account_id=account.id, account_name=account.name, details=", ".join(r.name for r in rules) if rules else "Keine Regeln konfiguriert", db=db, ) if not rules: account.last_poll_at = datetime.utcnow() db.commit() return source_folders = list({r.source_folder for r in rules}) smtp_config = _build_smtp_config(account) client = IMAPClient( host=account.imap_host, port=account.imap_port, username=account.username, password=decrypt(account.password), use_ssl=account.use_ssl, ) total_mails = 0 total_new = 0 total_matched = 0 total_actions = 0 total_errors = 0 with client: for folder in source_folders: folder_rules = [r for r in rules if r.source_folder == folder] # Alle UIDs im Ordner holen try: all_uids = client.get_all_uids(folder, search="ALL") except Exception as e: write_log( message=f"Fehler beim Abrufen von Ordner '{folder}'", level=LogLevel.ERROR, account_id=account.id, account_name=account.name, folder=folder, details=str(e), db=db, ) continue total_mails += len(all_uids) # Bereits verarbeitete UIDs aus DB laden processed_uids = set( row[0] for row in db.query(ProcessedMail.mail_uid) .filter( ProcessedMail.account_id == account.id, ProcessedMail.folder == folder, ) .all() ) # Neue (unverarbeitete) UIDs ermitteln new_uids = [uid for uid in all_uids if uid not in processed_uids] total_new += len(new_uids) if not new_uids: write_log( message=f"Keine neuen Mails in '{folder}' ({len(all_uids)} gesamt, alle bereits verarbeitet)", level=LogLevel.INFO, account_id=account.id, account_name=account.name, folder=folder, db=db, ) continue write_log( message=f"{len(new_uids)} neue Mail(s) in '{folder}' ({len(all_uids)} gesamt, {len(processed_uids)} bereits verarbeitet)", level=LogLevel.INFO, account_id=account.id, account_name=account.name, folder=folder, db=db, ) # Neue Mails abrufen und verarbeiten for uid in new_uids: try: mail = client.fetch_mail(uid) except Exception as e: write_log( message=f"Fehler beim Abrufen von Mail {uid}", level=LogLevel.ERROR, account_id=account.id, account_name=account.name, mail_uid=uid, folder=folder, details=str(e), db=db, ) continue if not mail: continue results, eval_details = apply_rules(client, mail, folder_rules, smtp_config) # Eval-Details für Log aufbereiten eval_summary = [] for ev in eval_details: status = "TREFFER" if ev["matched"] else "kein Treffer" checks = " | ".join(ev["details"]) eval_summary.append(f"Regel '{ev['rule']}': {status} [{checks}]") if not results: write_log( message=f"Keine Regel trifft zu", level=LogLevel.INFO, account_id=account.id, account_name=account.name, mail_uid=mail.uid, mail_subject=mail.subject, mail_from=mail.from_addr, folder=folder, details="\n".join(eval_summary), db=db, ) else: total_matched += 1 for r in results: action_label = r["action"] param = r.get("parameter", "") if param: action_label += f" → {param}" if r["success"]: total_actions += 1 write_log( message=f"Aktion ausgeführt: {action_label}", level=LogLevel.SUCCESS, account_id=account.id, account_name=account.name, rule_name=r["rule"], action_type=r["action"], mail_uid=r["mail_uid"], mail_subject=mail.subject, mail_from=mail.from_addr, folder=folder, details=param, db=db, ) else: total_errors += 1 write_log( message=f"Aktion fehlgeschlagen: {action_label}", level=LogLevel.ERROR, account_id=account.id, account_name=account.name, rule_name=r["rule"], action_type=r["action"], mail_uid=r["mail_uid"], mail_subject=mail.subject, mail_from=mail.from_addr, folder=folder, details=param, db=db, ) # Mail als verarbeitet markieren db.add(ProcessedMail( account_id=account.id, folder=folder, mail_uid=mail.uid, mail_subject=mail.subject[:500] if mail.subject else None, mail_from=mail.from_addr[:255] if mail.from_addr else None, )) db.flush() # Poll-Zusammenfassung summary_parts = [ f"{total_mails} Mail(s) im Ordner", f"{total_new} neu", f"{total_matched} Treffer", f"{total_actions} Aktion(en)", ] if total_errors > 0: summary_parts.append(f"{total_errors} Fehler") write_log( message=f"Poll abgeschlossen: {', '.join(summary_parts)}", level=LogLevel.ERROR if total_errors > 0 else LogLevel.SUCCESS if total_actions > 0 else LogLevel.INFO, account_id=account.id, account_name=account.name, db=db, ) account.last_poll_at = datetime.utcnow() db.commit() except Exception as e: logger.error("Fehler beim Polling von Konto %s: %s", account_id, e) db.rollback() write_log( message=f"Polling fehlgeschlagen", level=LogLevel.ERROR, account_id=account_id, details=str(e), ) finally: db.close() async def poll_account(account_id: int) -> None: await asyncio.to_thread(_poll_account_sync, account_id) def add_account_job(account: Account) -> None: job_id = f"poll_account_{account.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) if account.enabled: scheduler.add_job( poll_account, "interval", seconds=account.poll_interval_seconds, id=job_id, args=[account.id], replace_existing=True, ) logger.info( "Job für Konto '%s' registriert (alle %ds)", account.name, account.poll_interval_seconds, ) def remove_account_job(account_id: int) -> None: job_id = f"poll_account_{account_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) logger.info("Job für Konto %s entfernt", account_id) def start_scheduler() -> None: db = SessionLocal() try: accounts = db.query(Account).filter(Account.enabled.is_(True)).all() for account in accounts: add_account_job(account) finally: db.close() # Täglicher Cleanup alter Logs scheduler.add_job( lambda: asyncio.get_event_loop().run_in_executor(None, cleanup_old_logs, 30), "interval", hours=24, id="cleanup_logs", replace_existing=True, ) scheduler.start() logger.info("Scheduler gestartet mit %d Jobs", len(scheduler.get_jobs())) def stop_scheduler() -> None: scheduler.shutdown(wait=False) logger.info("Scheduler gestoppt")