""" Triggers — passive Aufweck-Quellen fuer ARIA. Skills sind aktiv (ARIA ruft sie). Triggers sind passiv — das System ruft ARIA wenn ein Event passiert. Drei Typen: timer Einmalig zu einem festen Zeitpunkt watcher Recurring: Condition pruefen, bei True → feuern (mit Throttle) cron Cron-Expression (vorerst nicht implementiert, Platzhalter) Layout: /data/triggers/.json Manifest pro Trigger /data/triggers/logs/.jsonl Append-only Log pro Feuerung Polling-Kosten: Brain-internes Background-Polling (kein LLM-Call). ARIA wird nur aufgeweckt wenn ein Trigger tatsaechlich feuert. """ from __future__ import annotations import json import logging import os import re import shutil import time from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) TRIGGERS_DIR = Path(os.environ.get("TRIGGERS_DIR", "/data/triggers")) LOGS_DIR = TRIGGERS_DIR / "logs" NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$") VALID_TYPES = {"timer", "watcher", "cron"} def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _safe_name(name: str) -> str: if not isinstance(name, str) or not NAME_RE.match(name): raise ValueError(f"Ungueltiger Trigger-Name: {name!r}") return name def _path(name: str) -> Path: return TRIGGERS_DIR / f"{_safe_name(name)}.json" def _ensure_dirs(): TRIGGERS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) # ─── CRUD ─────────────────────────────────────────────────────────── def list_triggers(active_only: bool = False) -> list[dict]: if not TRIGGERS_DIR.exists(): return [] out: list[dict] = [] for f in sorted(TRIGGERS_DIR.glob("*.json")): try: data = json.loads(f.read_text(encoding="utf-8")) if active_only and not data.get("active", True): continue out.append(data) except Exception as e: logger.warning("Trigger lesen %s: %s", f, e) return out def read(name: str) -> Optional[dict]: p = _path(name) if not p.exists(): return None try: return json.loads(p.read_text(encoding="utf-8")) except Exception as e: logger.warning("Trigger %s lesen: %s", name, e) return None def write(name: str, data: dict) -> None: _ensure_dirs() data["updated_at"] = _now_iso() p = _path(name) tmp = p.with_suffix(".tmp") tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") tmp.replace(p) def delete(name: str) -> None: p = _path(name) if not p.exists(): raise ValueError(f"Trigger '{name}' nicht gefunden") p.unlink() # Logs auch wegraeumen log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl" if log_file.exists(): log_file.unlink() def update(name: str, patch: dict) -> dict: data = read(name) if data is None: raise ValueError(f"Trigger '{name}' nicht gefunden") allowed = {"active", "message", "condition", "throttle_sec", "check_interval_sec", "fires_at"} for k, v in patch.items(): if k in allowed: data[k] = v write(name, data) return data # ─── Create-Helpers (typ-spezifisch) ──────────────────────────────── def create_timer( name: str, fires_at_iso: str, message: str, author: str = "aria", ) -> dict: _safe_name(name) if _path(name).exists(): raise ValueError(f"Trigger '{name}' existiert schon") # ISO validieren try: datetime.fromisoformat(fires_at_iso.replace("Z", "+00:00")) except Exception: raise ValueError(f"fires_at_iso ungueltig: {fires_at_iso}") data = { "name": name, "type": "timer", "active": True, "author": author, "created_at": _now_iso(), "fires_at": fires_at_iso, "message": message, "fire_count": 0, "last_fired_at": None, } write(name, data) logger.info("Trigger angelegt: %s (timer, fires_at=%s)", name, fires_at_iso) return data def create_watcher( name: str, condition: str, message: str, check_interval_sec: int = 300, throttle_sec: int = 3600, author: str = "aria", ) -> dict: _safe_name(name) if _path(name).exists(): raise ValueError(f"Trigger '{name}' existiert schon") # Condition parsen-pruefen (wirft bei Syntax-Fehler) from watcher import parse_condition parse_condition(condition) # nur Validate if check_interval_sec < 30: check_interval_sec = 30 # nicht oefter als alle 30s pruefen if throttle_sec < 0: throttle_sec = 0 data = { "name": name, "type": "watcher", "active": True, "author": author, "created_at": _now_iso(), "condition": condition, "check_interval_sec": int(check_interval_sec), "throttle_sec": int(throttle_sec), "message": message, "fire_count": 0, "last_fired_at": None, "last_checked_at": None, } write(name, data) logger.info("Trigger angelegt: %s (watcher, cond='%s')", name, condition) return data # ─── Feuern + Log ─────────────────────────────────────────────────── def mark_fired(name: str) -> dict: data = read(name) if data is None: raise ValueError(f"Trigger '{name}' nicht gefunden") data["fire_count"] = int(data.get("fire_count", 0)) + 1 data["last_fired_at"] = _now_iso() # Timer: nach Feuern auto-deaktivieren (one-shot) if data.get("type") == "timer": data["active"] = False write(name, data) return data def append_log(name: str, entry: dict) -> None: _ensure_dirs() log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl" record = {"ts": _now_iso()} record.update(entry) try: with log_file.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception as e: logger.warning("Trigger-Log append %s: %s", name, e) def list_logs(name: str, limit: int = 50) -> list[dict]: log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl" if not log_file.exists(): return [] try: lines = log_file.read_text(encoding="utf-8").splitlines() out: list[dict] = [] for line in lines[-limit:]: try: out.append(json.loads(line)) except Exception: pass return out except Exception: return []