imap-mail-filter-service/app/services/scheduler.py

350 lines
13 KiB
Python

import asyncio
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models.db_models import Account, FilterRule, LogLevel, ProcessedMail
from app.services.encryption import decrypt
from app.services.filter_engine import evaluate_conditions, execute_action
from app.services.imap_client import IMAPClient
from app.services.log_service import cleanup_old_logs, write_log
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def _build_smtp_config(account: Account) -> dict | None:
if account.smtp_host and account.smtp_username and account.smtp_password:
return {
"host": account.smtp_host,
"port": account.smtp_port or 465,
"username": account.smtp_username,
"password": decrypt(account.smtp_password),
}
return None
def _poll_account_sync(account_id: int) -> None:
db: Session = SessionLocal()
try:
account = db.get(Account, account_id)
if not account or not account.enabled:
return
rules = (
db.query(FilterRule)
.filter(FilterRule.account_id == account_id, FilterRule.enabled.is_(True))
.order_by(FilterRule.priority)
.all()
)
write_log(
message=f"Poll gestartet ({len(rules)} aktive Regel(n))",
level=LogLevel.INFO,
account_id=account.id,
account_name=account.name,
details=", ".join(r.name for r in rules) if rules else "Keine Regeln konfiguriert",
db=db,
)
if not rules:
account.last_poll_at = datetime.utcnow()
db.commit()
return
source_folders = list({r.source_folder for r in rules})
smtp_config = _build_smtp_config(account)
client = IMAPClient(
host=account.imap_host,
port=account.imap_port,
username=account.username,
password=decrypt(account.password),
use_ssl=account.use_ssl,
)
total_mails = 0
total_new = 0
total_matched = 0
total_actions = 0
total_errors = 0
# Mail-Cache: einmal geladene Mails pro Ordner/UID wiederverwenden
mail_cache: dict[tuple[str, str], "MailMessage"] = {}
# UIDs pro Ordner cachen
folder_uids: dict[str, list[str]] = {}
with client:
# Phase 1: UIDs pro Ordner laden
for folder in source_folders:
try:
all_uids = client.get_all_uids(folder, search="ALL")
folder_uids[folder] = all_uids
total_mails += len(all_uids)
except Exception as e:
write_log(
message=f"Fehler beim Abrufen von Ordner '{folder}'",
level=LogLevel.ERROR,
account_id=account.id,
account_name=account.name,
folder=folder,
details=str(e),
db=db,
)
# Phase 2: Pro Regel die unverarbeiteten Mails prüfen
for rule in rules:
folder = rule.source_folder
if folder not in folder_uids:
continue
all_uids = folder_uids[folder]
# Bereits verarbeitete UIDs für DIESE Regel
processed_uids = set(
row[0] for row in db.query(ProcessedMail.mail_uid)
.filter(
ProcessedMail.account_id == account.id,
ProcessedMail.rule_id == rule.id,
ProcessedMail.folder == folder,
)
.all()
)
new_uids = [uid for uid in all_uids if uid not in processed_uids]
if not new_uids:
write_log(
message=f"Regel '{rule.name}': keine neuen Mails in '{folder}' ({len(all_uids)} gesamt, alle bereits geprüft)",
level=LogLevel.INFO,
account_id=account.id,
account_name=account.name,
rule_name=rule.name,
folder=folder,
db=db,
)
continue
# Batch-Limit: maximal 500 Mails pro Regel pro Poll
BATCH_LIMIT = 500
batch_uids = new_uids[:BATCH_LIMIT]
remaining = len(new_uids) - len(batch_uids)
total_new += len(batch_uids)
msg = f"Regel '{rule.name}': {len(batch_uids)} Mail(s) in '{folder}' prüfen ({len(processed_uids)} bereits geprüft)"
if remaining > 0:
msg += f"{remaining} weitere beim nächsten Poll"
write_log(
message=msg,
level=LogLevel.INFO,
account_id=account.id,
account_name=account.name,
rule_name=rule.name,
folder=folder,
db=db,
)
new_uids = batch_uids
for uid in new_uids:
# Mail aus Cache oder vom Server laden
cache_key = (folder, uid)
if cache_key in mail_cache:
mail = mail_cache[cache_key]
else:
try:
# Ordner muss ausgewählt sein
if not hasattr(client, '_current_folder') or client._current_folder != folder:
client.conn.select(folder)
client._current_folder = folder
mail = client.fetch_mail(uid)
except Exception as e:
write_log(
message=f"Fehler beim Abrufen von Mail {uid}",
level=LogLevel.ERROR,
account_id=account.id,
account_name=account.name,
mail_uid=uid,
folder=folder,
details=str(e),
db=db,
)
# Trotzdem als verarbeitet markieren damit wir nicht endlos retrien
db.add(ProcessedMail(
account_id=account.id, rule_id=rule.id,
folder=folder, mail_uid=uid,
))
continue
if not mail:
db.add(ProcessedMail(
account_id=account.id, rule_id=rule.id,
folder=folder, mail_uid=uid,
))
continue
mail_cache[cache_key] = mail
# Regel gegen Mail prüfen
matched, details = evaluate_conditions(mail, rule.conditions)
detail_str = " | ".join(details)
if matched:
total_matched += 1
# Aktionen ausführen
for action in rule.actions:
success = execute_action(client, mail, action, smtp_config)
action_label = action.action_type.value
param = action.parameter or ""
if param:
action_label += f"{param}"
if success:
total_actions += 1
write_log(
message=f"Aktion ausgeführt: {action_label}",
level=LogLevel.SUCCESS,
account_id=account.id,
account_name=account.name,
rule_name=rule.name,
action_type=action.action_type.value,
mail_uid=mail.uid,
mail_subject=mail.subject,
mail_from=mail.from_addr,
folder=folder,
details=param,
db=db,
)
else:
total_errors += 1
write_log(
message=f"Aktion fehlgeschlagen: {action_label}",
level=LogLevel.ERROR,
account_id=account.id,
account_name=account.name,
rule_name=rule.name,
action_type=action.action_type.value,
mail_uid=mail.uid,
mail_subject=mail.subject,
mail_from=mail.from_addr,
folder=folder,
details=param,
db=db,
)
else:
write_log(
message=f"Keine Übereinstimmung",
level=LogLevel.INFO,
account_id=account.id,
account_name=account.name,
rule_name=rule.name,
mail_uid=mail.uid,
mail_subject=mail.subject,
mail_from=mail.from_addr,
folder=folder,
details=detail_str,
db=db,
)
# Mail für DIESE Regel als verarbeitet markieren
db.add(ProcessedMail(
account_id=account.id,
rule_id=rule.id,
folder=folder,
mail_uid=mail.uid,
mail_subject=mail.subject[:500] if mail.subject else None,
mail_from=mail.from_addr[:255] if mail.from_addr else None,
))
db.flush()
# Poll-Zusammenfassung
summary_parts = [
f"{total_mails} Mail(s) im Ordner",
f"{total_new} neu",
f"{total_matched} Treffer",
f"{total_actions} Aktion(en)",
]
if total_errors > 0:
summary_parts.append(f"{total_errors} Fehler")
write_log(
message=f"Poll abgeschlossen: {', '.join(summary_parts)}",
level=LogLevel.ERROR if total_errors > 0 else LogLevel.SUCCESS if total_actions > 0 else LogLevel.INFO,
account_id=account.id,
account_name=account.name,
db=db,
)
account.last_poll_at = datetime.utcnow()
db.commit()
except Exception as e:
logger.error("Fehler beim Polling von Konto %s: %s", account_id, e)
db.rollback()
write_log(
message=f"Polling fehlgeschlagen",
level=LogLevel.ERROR,
account_id=account_id,
details=str(e),
)
finally:
db.close()
async def poll_account(account_id: int) -> None:
await asyncio.to_thread(_poll_account_sync, account_id)
def add_account_job(account: Account) -> None:
job_id = f"poll_account_{account.id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if account.enabled:
scheduler.add_job(
poll_account,
"interval",
seconds=account.poll_interval_seconds,
id=job_id,
args=[account.id],
replace_existing=True,
)
logger.info(
"Job für Konto '%s' registriert (alle %ds)",
account.name, account.poll_interval_seconds,
)
def remove_account_job(account_id: int) -> None:
job_id = f"poll_account_{account_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
logger.info("Job für Konto %s entfernt", account_id)
def start_scheduler() -> None:
db = SessionLocal()
try:
accounts = db.query(Account).filter(Account.enabled.is_(True)).all()
for account in accounts:
add_account_job(account)
finally:
db.close()
# Täglicher Cleanup alter Logs
scheduler.add_job(
lambda: asyncio.get_event_loop().run_in_executor(None, cleanup_old_logs, 30),
"interval",
hours=24,
id="cleanup_logs",
replace_existing=True,
)
scheduler.start()
logger.info("Scheduler gestartet mit %d Jobs", len(scheduler.get_jobs()))
def stop_scheduler() -> None:
scheduler.shutdown(wait=False)
logger.info("Scheduler gestoppt")