diff --git a/alembic/versions/c14c86cbc9c0_add_rule_id_to_processed_mails.py b/alembic/versions/c14c86cbc9c0_add_rule_id_to_processed_mails.py new file mode 100644 index 0000000..1ca6d97 --- /dev/null +++ b/alembic/versions/c14c86cbc9c0_add_rule_id_to_processed_mails.py @@ -0,0 +1,40 @@ +"""add rule_id to processed_mails + +Revision ID: c14c86cbc9c0 +Revises: 0ef2a4f77557 +Create Date: 2026-03-19 15:46:32.787129 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'c14c86cbc9c0' +down_revision: Union[str, Sequence[str], None] = '0ef2a4f77557' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Alte processed_mails ohne rule_id löschen — werden beim nächsten Poll neu erstellt + op.execute("DELETE FROM processed_mails") + + with op.batch_alter_table('processed_mails', schema=None) as batch_op: + batch_op.add_column(sa.Column('rule_id', sa.Integer(), nullable=False)) + batch_op.create_index(batch_op.f('ix_processed_mails_rule_id'), ['rule_id'], unique=False) + batch_op.create_foreign_key('fk_processed_mails_rule_id', 'filter_rules', ['rule_id'], ['id'], ondelete='CASCADE') + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_mails', schema=None) as batch_op: + batch_op.drop_constraint(None, type_='foreignkey') + batch_op.drop_index(batch_op.f('ix_processed_mails_rule_id')) + batch_op.drop_column('rule_id') + + # ### end Alembic commands ### diff --git a/app/models/db_models.py b/app/models/db_models.py index c17c0ff..11e67cb 100644 --- a/app/models/db_models.py +++ b/app/models/db_models.py @@ -146,6 +146,7 @@ class ProcessedMail(Base): id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id", ondelete="CASCADE"), index=True) + rule_id: Mapped[int] = mapped_column(ForeignKey("filter_rules.id", ondelete="CASCADE"), index=True) folder: Mapped[str] = mapped_column(String(255)) mail_uid: Mapped[str] = mapped_column(String(100)) mail_subject: Mapped[str | None] = mapped_column(String(500), nullable=True) @@ -153,3 +154,4 @@ class ProcessedMail(Base): processed_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now()) account: Mapped["Account"] = relationship() + rule: Mapped["FilterRule"] = relationship() diff --git a/app/routers/filters.py b/app/routers/filters.py index fc83267..fcaf46d 100644 --- a/app/routers/filters.py +++ b/app/routers/filters.py @@ -10,15 +10,15 @@ from app.schemas.schemas import FilterRuleCreate, FilterRuleResponse, FilterRule logger = logging.getLogger(__name__) -def _reset_processed_for_folder(db: Session, account_id: int, folder: str) -> int: - """Reset processed mails for a specific account/folder so they get re-evaluated.""" +def _reset_processed_for_rule(db: Session, rule_id: int) -> int: + """Reset processed mails for a specific rule so they get re-evaluated.""" count = ( db.query(ProcessedMail) - .filter(ProcessedMail.account_id == account_id, ProcessedMail.folder == folder) + .filter(ProcessedMail.rule_id == rule_id) .delete() ) if count: - logger.info("Filter geändert: %d verarbeitete Mails in '%s' zurückgesetzt (Account %d)", count, folder, account_id) + logger.info("Filter geändert: %d verarbeitete Mails für Regel %d zurückgesetzt", count, rule_id) return count router = APIRouter(prefix="/api/filters", tags=["filters"]) @@ -70,8 +70,7 @@ def create_filter(data: FilterRuleCreate, db: Session = Depends(get_db)): action = FilterAction(rule_id=rule.id, **action_data.model_dump()) db.add(action) - # Neue Regel → Ordner zurücksetzen damit bestehende Mails geprüft werden - _reset_processed_for_folder(db, data.account_id, data.source_folder) + # Neue Regel → hat noch keinen processed-Status, wird automatisch alle Mails prüfen db.commit() db.refresh(rule) @@ -84,7 +83,6 @@ def update_filter(rule_id: int, data: FilterRuleUpdate, db: Session = Depends(ge if not rule: raise HTTPException(404, "Filterregel nicht gefunden") - old_folder = rule.source_folder update_data = data.model_dump(exclude_unset=True) # Update conditions if provided @@ -108,11 +106,8 @@ def update_filter(rule_id: int, data: FilterRuleUpdate, db: Session = Depends(ge for key, value in update_data.items(): setattr(rule, key, value) - # Regel geändert → betroffene Ordner zurücksetzen - _reset_processed_for_folder(db, rule.account_id, old_folder) - new_folder = rule.source_folder - if new_folder != old_folder: - _reset_processed_for_folder(db, rule.account_id, new_folder) + # Regel geändert → nur diese Regel zurücksetzen + _reset_processed_for_rule(db, rule.id) db.commit() db.refresh(rule) @@ -124,8 +119,7 @@ def delete_filter(rule_id: int, db: Session = Depends(get_db)): rule = db.get(FilterRule, rule_id) if not rule: raise HTTPException(404, "Filterregel nicht gefunden") - # Ordner zurücksetzen — andere Regeln könnten jetzt anders greifen - _reset_processed_for_folder(db, rule.account_id, rule.source_folder) + # processed-Einträge werden per CASCADE gelöscht db.delete(rule) db.commit() diff --git a/app/services/backup_service.py b/app/services/backup_service.py index 0f9eee9..eadab08 100644 --- a/app/services/backup_service.py +++ b/app/services/backup_service.py @@ -41,7 +41,6 @@ def export_backup(db: Session | None = None) -> str: "poll_interval_seconds": acc.poll_interval_seconds, "enabled": acc.enabled, "filter_rules": [], - "processed_mails": [], } for rule in sorted(acc.filter_rules, key=lambda r: r.priority): @@ -67,23 +66,25 @@ def export_backup(db: Session | None = None) -> str: } for action in rule.actions ], + "processed_mails": [], } - account_data["filter_rules"].append(rule_data) - # Verarbeitete Mails exportieren - processed = ( - db.query(ProcessedMail) - .filter(ProcessedMail.account_id == acc.id) - .all() - ) - for pm in processed: - account_data["processed_mails"].append({ - "folder": pm.folder, - "mail_uid": pm.mail_uid, - "mail_subject": pm.mail_subject, - "mail_from": pm.mail_from, - "processed_at": pm.processed_at.isoformat() if pm.processed_at else None, - }) + # Verarbeitete Mails pro Regel + processed = ( + db.query(ProcessedMail) + .filter(ProcessedMail.rule_id == rule.id) + .all() + ) + for pm in processed: + rule_data["processed_mails"].append({ + "folder": pm.folder, + "mail_uid": pm.mail_uid, + "mail_subject": pm.mail_subject, + "mail_from": pm.mail_from, + "processed_at": pm.processed_at.isoformat() if pm.processed_at else None, + }) + + account_data["filter_rules"].append(rule_data) data["accounts"].append(account_data) @@ -192,16 +193,17 @@ def import_backup(json_content: str, db: Session | None = None) -> dict: stats["rules_created"] += 1 - # Verarbeitete Mails wiederherstellen - for pm_data in acc_data.get("processed_mails", []): - db.add(ProcessedMail( - account_id=account.id, - folder=pm_data["folder"], - mail_uid=pm_data["mail_uid"], - mail_subject=pm_data.get("mail_subject"), - mail_from=pm_data.get("mail_from"), - )) - stats["processed_restored"] += 1 + # Verarbeitete Mails pro Regel wiederherstellen + for pm_data in rule_data.get("processed_mails", []): + db.add(ProcessedMail( + account_id=account.id, + rule_id=rule.id, + folder=pm_data["folder"], + mail_uid=pm_data["mail_uid"], + mail_subject=pm_data.get("mail_subject"), + mail_from=pm_data.get("mail_from"), + )) + stats["processed_restored"] += 1 db.commit() logger.info("Backup-Import abgeschlossen: %s", stats) diff --git a/app/services/scheduler.py b/app/services/scheduler.py index 0ecf0ce..8e09f8b 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.db_models import Account, FilterRule, LogLevel, ProcessedMail from app.services.encryption import decrypt -from app.services.filter_engine import apply_rules +from app.services.filter_engine import evaluate_conditions, execute_action from app.services.imap_client import IMAPClient from app.services.log_service import cleanup_old_logs, write_log @@ -73,13 +73,18 @@ def _poll_account_sync(account_id: int) -> None: total_actions = 0 total_errors = 0 - with client: - for folder in source_folders: - folder_rules = [r for r in rules if r.source_folder == folder] + # Mail-Cache: einmal geladene Mails pro Ordner/UID wiederverwenden + mail_cache: dict[tuple[str, str], "MailMessage"] = {} + # UIDs pro Ordner cachen + folder_uids: dict[str, list[str]] = {} - # Alle UIDs im Ordner holen + with client: + # Phase 1: UIDs pro Ordner laden + for folder in source_folders: try: all_uids = client.get_all_uids(folder, search="ALL") + folder_uids[folder] = all_uids + total_mails += len(all_uids) except Exception as e: write_log( message=f"Fehler beim Abrufen von Ordner '{folder}'", @@ -90,104 +95,122 @@ def _poll_account_sync(account_id: int) -> None: details=str(e), db=db, ) + + # Phase 2: Pro Regel die unverarbeiteten Mails prüfen + for rule in rules: + folder = rule.source_folder + if folder not in folder_uids: continue - total_mails += len(all_uids) + all_uids = folder_uids[folder] - # Bereits verarbeitete UIDs aus DB laden + # Bereits verarbeitete UIDs für DIESE Regel processed_uids = set( row[0] for row in db.query(ProcessedMail.mail_uid) .filter( ProcessedMail.account_id == account.id, + ProcessedMail.rule_id == rule.id, ProcessedMail.folder == folder, ) .all() ) - # Neue (unverarbeitete) UIDs ermitteln new_uids = [uid for uid in all_uids if uid not in processed_uids] - total_new += len(new_uids) if not new_uids: write_log( - message=f"Keine neuen Mails in '{folder}' ({len(all_uids)} gesamt, alle bereits verarbeitet)", + message=f"Regel '{rule.name}': keine neuen Mails in '{folder}' ({len(all_uids)} gesamt, alle bereits geprüft)", level=LogLevel.INFO, account_id=account.id, account_name=account.name, + rule_name=rule.name, folder=folder, db=db, ) continue + # Batch-Limit: maximal 500 Mails pro Regel pro Poll + BATCH_LIMIT = 500 + batch_uids = new_uids[:BATCH_LIMIT] + remaining = len(new_uids) - len(batch_uids) + + total_new += len(batch_uids) + msg = f"Regel '{rule.name}': {len(batch_uids)} Mail(s) in '{folder}' prüfen ({len(processed_uids)} bereits geprüft)" + if remaining > 0: + msg += f" — {remaining} weitere beim nächsten Poll" write_log( - message=f"{len(new_uids)} neue Mail(s) in '{folder}' ({len(all_uids)} gesamt, {len(processed_uids)} bereits verarbeitet)", + message=msg, level=LogLevel.INFO, account_id=account.id, account_name=account.name, + rule_name=rule.name, folder=folder, db=db, ) + new_uids = batch_uids - # Neue Mails abrufen und verarbeiten for uid in new_uids: - try: - mail = client.fetch_mail(uid) - except Exception as e: - write_log( - message=f"Fehler beim Abrufen von Mail {uid}", - level=LogLevel.ERROR, - account_id=account.id, - account_name=account.name, - mail_uid=uid, - folder=folder, - details=str(e), - db=db, - ) - continue - - if not mail: - continue - - results, eval_details = apply_rules(client, mail, folder_rules, smtp_config) - - # Eval-Details für Log aufbereiten - eval_summary = [] - for ev in eval_details: - status = "TREFFER" if ev["matched"] else "kein Treffer" - checks = " | ".join(ev["details"]) - eval_summary.append(f"Regel '{ev['rule']}': {status} [{checks}]") - - if not results: - write_log( - message=f"Keine Regel trifft zu", - level=LogLevel.INFO, - account_id=account.id, - account_name=account.name, - mail_uid=mail.uid, - mail_subject=mail.subject, - mail_from=mail.from_addr, - folder=folder, - details="\n".join(eval_summary), - db=db, - ) + # Mail aus Cache oder vom Server laden + cache_key = (folder, uid) + if cache_key in mail_cache: + mail = mail_cache[cache_key] else: + try: + # Ordner muss ausgewählt sein + if not hasattr(client, '_current_folder') or client._current_folder != folder: + client.conn.select(folder) + client._current_folder = folder + mail = client.fetch_mail(uid) + except Exception as e: + write_log( + message=f"Fehler beim Abrufen von Mail {uid}", + level=LogLevel.ERROR, + account_id=account.id, + account_name=account.name, + mail_uid=uid, + folder=folder, + details=str(e), + db=db, + ) + # Trotzdem als verarbeitet markieren damit wir nicht endlos retrien + db.add(ProcessedMail( + account_id=account.id, rule_id=rule.id, + folder=folder, mail_uid=uid, + )) + continue + + if not mail: + db.add(ProcessedMail( + account_id=account.id, rule_id=rule.id, + folder=folder, mail_uid=uid, + )) + continue + mail_cache[cache_key] = mail + + # Regel gegen Mail prüfen + matched, details = evaluate_conditions(mail, rule.conditions) + detail_str = " | ".join(details) + + if matched: total_matched += 1 - for r in results: - action_label = r["action"] - param = r.get("parameter", "") + # Aktionen ausführen + for action in rule.actions: + success = execute_action(client, mail, action, smtp_config) + action_label = action.action_type.value + param = action.parameter or "" if param: action_label += f" → {param}" - if r["success"]: + if success: total_actions += 1 write_log( message=f"Aktion ausgeführt: {action_label}", level=LogLevel.SUCCESS, account_id=account.id, account_name=account.name, - rule_name=r["rule"], - action_type=r["action"], - mail_uid=r["mail_uid"], + rule_name=rule.name, + action_type=action.action_type.value, + mail_uid=mail.uid, mail_subject=mail.subject, mail_from=mail.from_addr, folder=folder, @@ -201,19 +224,34 @@ def _poll_account_sync(account_id: int) -> None: level=LogLevel.ERROR, account_id=account.id, account_name=account.name, - rule_name=r["rule"], - action_type=r["action"], - mail_uid=r["mail_uid"], + rule_name=rule.name, + action_type=action.action_type.value, + mail_uid=mail.uid, mail_subject=mail.subject, mail_from=mail.from_addr, folder=folder, details=param, db=db, ) + else: + write_log( + message=f"Keine Übereinstimmung", + level=LogLevel.INFO, + account_id=account.id, + account_name=account.name, + rule_name=rule.name, + mail_uid=mail.uid, + mail_subject=mail.subject, + mail_from=mail.from_addr, + folder=folder, + details=detail_str, + db=db, + ) - # Mail als verarbeitet markieren + # Mail für DIESE Regel als verarbeitet markieren db.add(ProcessedMail( account_id=account.id, + rule_id=rule.id, folder=folder, mail_uid=mail.uid, mail_subject=mail.subject[:500] if mail.subject else None,