import asyncio import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.db_models import Account, FilterRule from app.services.encryption import decrypt from app.services.filter_engine import apply_rules from app.services.imap_client import IMAPClient logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler() def _build_smtp_config(account: Account) -> dict | None: if account.smtp_host and account.smtp_username and account.smtp_password: return { "host": account.smtp_host, "port": account.smtp_port or 465, "username": account.smtp_username, "password": decrypt(account.smtp_password), } return None def _poll_account_sync(account_id: int) -> None: db: Session = SessionLocal() try: account = db.get(Account, account_id) if not account or not account.enabled: return rules = ( db.query(FilterRule) .filter(FilterRule.account_id == account_id, FilterRule.enabled.is_(True)) .order_by(FilterRule.priority) .all() ) if not rules: logger.debug("Keine aktiven Regeln für Konto '%s'", account.name) return # Collect unique source folders source_folders = list({r.source_folder for r in rules}) smtp_config = _build_smtp_config(account) client = IMAPClient( host=account.imap_host, port=account.imap_port, username=account.username, password=decrypt(account.password), use_ssl=account.use_ssl, ) with client: for folder in source_folders: folder_rules = [r for r in rules if r.source_folder == folder] try: messages = client.fetch_unseen(folder) except Exception as e: logger.error("Fehler beim Abrufen von %s/%s: %s", account.name, folder, e) continue if messages: logger.info( "Konto '%s', Ordner '%s': %d ungelesene Mails", account.name, folder, len(messages), ) for mail in messages: results = apply_rules(client, mail, folder_rules, smtp_config) for r in results: level = logging.INFO if r["success"] else logging.ERROR logger.log( level, "Konto '%s': %s %s -> %s (%s)", account.name, r["action"], r.get("parameter", ""), "OK" if r["success"] else "FEHLER", r["rule"], ) account.last_poll_at = datetime.utcnow() db.commit() except Exception as e: logger.error("Fehler beim Polling von Konto %s: %s", account_id, e) finally: db.close() async def poll_account(account_id: int) -> None: await asyncio.to_thread(_poll_account_sync, account_id) def add_account_job(account: Account) -> None: job_id = f"poll_account_{account.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) if account.enabled: scheduler.add_job( poll_account, "interval", seconds=account.poll_interval_seconds, id=job_id, args=[account.id], replace_existing=True, ) logger.info( "Job für Konto '%s' registriert (alle %ds)", account.name, account.poll_interval_seconds, ) def remove_account_job(account_id: int) -> None: job_id = f"poll_account_{account_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) logger.info("Job für Konto %s entfernt", account_id) def start_scheduler() -> None: db = SessionLocal() try: accounts = db.query(Account).filter(Account.enabled.is_(True)).all() for account in accounts: add_account_job(account) finally: db.close() scheduler.start() logger.info("Scheduler gestartet mit %d Jobs", len(scheduler.get_jobs())) def stop_scheduler() -> None: scheduler.shutdown(wait=False) logger.info("Scheduler gestoppt")