Files
ARIA-AGENT/aria-brain/triggers.py
T
duffyduck 31aa86a2a9 feat(brain+ui+app): Triggers — passive Aufweck-Quellen fuer ARIA
ARIA hatte bisher nur ein "User fragt → Brain antwortet"-Modell. Neu:
Trigger laufen passiv im Hintergrund (kein LLM-Call) und wecken ARIA
nur dann auf wenn ein Event tatsaechlich passiert.

Drei Typen, zwei aktuell implementiert:
  timer   — einmalig zu festem ISO-Timestamp ("erinner mich in 10min")
  watcher — Polling alle N Sek einer Condition, feuert bei True mit Throttle
            (z.B. "disk_free_gb < 5", max 1x/h)
  cron    — Platzhalter fuer spaeter

aria-brain/triggers.py
  CRUD auf /data/triggers/<name>.json + /data/triggers/logs/<name>.jsonl.
  create_timer, create_watcher, mark_fired, list_logs, etc.

aria-brain/watcher.py
  Built-in Condition-Variablen: disk_free_gb, disk_free_pct, uptime_sec,
  hour_of_day, day_of_week, rvs_connected, memory_count.
  Sicherer Condition-Parser via ast — Whitelist auf Vergleich + BoolOp +
  Name + Const. Kein eval, kein exec, keine Builtins.

aria-brain/background.py
  Async Loop laeuft alle 30s, sammelt einmalig Variables, geht durch
  Trigger-Liste, _should_fire-Check (Timer: fires_at vergangen / Watcher:
  check_interval + throttle respektiert + condition true). Fire ruft
  agent.chat(prompt, source="trigger") — ARIA bekommt das wie eine
  Push-Nachricht und antwortet via Bridge → RVS → App.

aria-brain/main.py
  /triggers/list, /{name}, /{name}/logs, /timer, /watcher, PATCH, DELETE,
  /triggers/conditions (Variablen + aktuelle Werte). Lifespan-Handler
  startet den Background-Loop beim Container-Start, stoppt beim Shutdown.

aria-brain/agent.py
  Meta-Tools fuer ARIA: trigger_timer, trigger_watcher, trigger_cancel,
  trigger_list. ARIA legt Trigger via Tool-Call selbst an wenn Stefan das
  wuenscht. Side-Channel-Event 'trigger_created' wird in chat-Response
  mitgeschickt damit App + Diagnostic eine Bubble zeigen.

aria-brain/prompts.py
  Neue System-Prompt-Section: Liste aktiver Triggers + verfuegbare
  Condition-Variablen mit aktuellen Werten + Operatoren-Erklaerung.
  ARIA weiss damit immer was es schon gibt und welche Vars sie nutzen kann.

bridge/aria_bridge.py + rvs/server.js
  trigger_created als neuer RVS-Message-Type, Bridge forwarded das aus
  data.events analog zu skill_created.

diagnostic/index.html
  Neuer Top-Tab "Trigger". Liste mit Type-Badges (⏱ TIMER / 👁 WATCHER),
  Status, Fire-Count, last_fired. Aktivieren/Deaktivieren + Löschen pro
  Trigger. "+ Neu"-Modal mit Type-Dropdown, Timer-Minuten oder
  Watcher-Condition + Vars-Anzeige + Throttle. Info-Modal-Eintrag mit
  Erklaerung. Live-Bubble im Chat wenn ARIA selbst einen anlegt.

android/src/screens/ChatScreen.tsx
  trigger_created RVS-Handler → eigene Bubble (gelber Border, " ARIA
  hat einen Trigger angelegt", Type/Detail/Message/Zeit). ChatMessage
  bekam triggerCreated-Feld. Lokal-only-Schutz beim Server-Sync analog
  zu skill_created.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 00:38:58 +02:00

230 lines
6.7 KiB
Python

"""
Triggers — passive Aufweck-Quellen fuer ARIA.
Skills sind aktiv (ARIA ruft sie). Triggers sind passiv — das System ruft
ARIA wenn ein Event passiert. Drei Typen:
timer Einmalig zu einem festen Zeitpunkt
watcher Recurring: Condition pruefen, bei True → feuern (mit Throttle)
cron Cron-Expression (vorerst nicht implementiert, Platzhalter)
Layout:
/data/triggers/<name>.json Manifest pro Trigger
/data/triggers/logs/<name>.jsonl Append-only Log pro Feuerung
Polling-Kosten: Brain-internes Background-Polling (kein LLM-Call).
ARIA wird nur aufgeweckt wenn ein Trigger tatsaechlich feuert.
"""
from __future__ import annotations
import json
import logging
import os
import re
import shutil
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
TRIGGERS_DIR = Path(os.environ.get("TRIGGERS_DIR", "/data/triggers"))
LOGS_DIR = TRIGGERS_DIR / "logs"
NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$")
VALID_TYPES = {"timer", "watcher", "cron"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _safe_name(name: str) -> str:
if not isinstance(name, str) or not NAME_RE.match(name):
raise ValueError(f"Ungueltiger Trigger-Name: {name!r}")
return name
def _path(name: str) -> Path:
return TRIGGERS_DIR / f"{_safe_name(name)}.json"
def _ensure_dirs():
TRIGGERS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
# ─── CRUD ───────────────────────────────────────────────────────────
def list_triggers(active_only: bool = False) -> list[dict]:
if not TRIGGERS_DIR.exists():
return []
out: list[dict] = []
for f in sorted(TRIGGERS_DIR.glob("*.json")):
try:
data = json.loads(f.read_text(encoding="utf-8"))
if active_only and not data.get("active", True):
continue
out.append(data)
except Exception as e:
logger.warning("Trigger lesen %s: %s", f, e)
return out
def read(name: str) -> Optional[dict]:
p = _path(name)
if not p.exists():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception as e:
logger.warning("Trigger %s lesen: %s", name, e)
return None
def write(name: str, data: dict) -> None:
_ensure_dirs()
data["updated_at"] = _now_iso()
p = _path(name)
tmp = p.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
tmp.replace(p)
def delete(name: str) -> None:
p = _path(name)
if not p.exists():
raise ValueError(f"Trigger '{name}' nicht gefunden")
p.unlink()
# Logs auch wegraeumen
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
if log_file.exists():
log_file.unlink()
def update(name: str, patch: dict) -> dict:
data = read(name)
if data is None:
raise ValueError(f"Trigger '{name}' nicht gefunden")
allowed = {"active", "message", "condition", "throttle_sec",
"check_interval_sec", "fires_at"}
for k, v in patch.items():
if k in allowed:
data[k] = v
write(name, data)
return data
# ─── Create-Helpers (typ-spezifisch) ────────────────────────────────
def create_timer(
name: str,
fires_at_iso: str,
message: str,
author: str = "aria",
) -> dict:
_safe_name(name)
if _path(name).exists():
raise ValueError(f"Trigger '{name}' existiert schon")
# ISO validieren
try:
datetime.fromisoformat(fires_at_iso.replace("Z", "+00:00"))
except Exception:
raise ValueError(f"fires_at_iso ungueltig: {fires_at_iso}")
data = {
"name": name,
"type": "timer",
"active": True,
"author": author,
"created_at": _now_iso(),
"fires_at": fires_at_iso,
"message": message,
"fire_count": 0,
"last_fired_at": None,
}
write(name, data)
logger.info("Trigger angelegt: %s (timer, fires_at=%s)", name, fires_at_iso)
return data
def create_watcher(
name: str,
condition: str,
message: str,
check_interval_sec: int = 300,
throttle_sec: int = 3600,
author: str = "aria",
) -> dict:
_safe_name(name)
if _path(name).exists():
raise ValueError(f"Trigger '{name}' existiert schon")
# Condition parsen-pruefen (wirft bei Syntax-Fehler)
from watcher import parse_condition
parse_condition(condition) # nur Validate
if check_interval_sec < 30:
check_interval_sec = 30 # nicht oefter als alle 30s pruefen
if throttle_sec < 0:
throttle_sec = 0
data = {
"name": name,
"type": "watcher",
"active": True,
"author": author,
"created_at": _now_iso(),
"condition": condition,
"check_interval_sec": int(check_interval_sec),
"throttle_sec": int(throttle_sec),
"message": message,
"fire_count": 0,
"last_fired_at": None,
"last_checked_at": None,
}
write(name, data)
logger.info("Trigger angelegt: %s (watcher, cond='%s')", name, condition)
return data
# ─── Feuern + Log ───────────────────────────────────────────────────
def mark_fired(name: str) -> dict:
data = read(name)
if data is None:
raise ValueError(f"Trigger '{name}' nicht gefunden")
data["fire_count"] = int(data.get("fire_count", 0)) + 1
data["last_fired_at"] = _now_iso()
# Timer: nach Feuern auto-deaktivieren (one-shot)
if data.get("type") == "timer":
data["active"] = False
write(name, data)
return data
def append_log(name: str, entry: dict) -> None:
_ensure_dirs()
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
record = {"ts": _now_iso()}
record.update(entry)
try:
with log_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning("Trigger-Log append %s: %s", name, e)
def list_logs(name: str, limit: int = 50) -> list[dict]:
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
if not log_file.exists():
return []
try:
lines = log_file.read_text(encoding="utf-8").splitlines()
out: list[dict] = []
for line in lines[-limit:]:
try:
out.append(json.loads(line))
except Exception:
pass
return out
except Exception:
return []