feat(brain+ui+app): Triggers — passive Aufweck-Quellen fuer ARIA

ARIA hatte bisher nur ein "User fragt → Brain antwortet"-Modell. Neu:
Trigger laufen passiv im Hintergrund (kein LLM-Call) und wecken ARIA
nur dann auf wenn ein Event tatsaechlich passiert.

Drei Typen, zwei aktuell implementiert:
  timer   — einmalig zu festem ISO-Timestamp ("erinner mich in 10min")
  watcher — Polling alle N Sek einer Condition, feuert bei True mit Throttle
            (z.B. "disk_free_gb < 5", max 1x/h)
  cron    — Platzhalter fuer spaeter

aria-brain/triggers.py
  CRUD auf /data/triggers/<name>.json + /data/triggers/logs/<name>.jsonl.
  create_timer, create_watcher, mark_fired, list_logs, etc.

aria-brain/watcher.py
  Built-in Condition-Variablen: disk_free_gb, disk_free_pct, uptime_sec,
  hour_of_day, day_of_week, rvs_connected, memory_count.
  Sicherer Condition-Parser via ast — Whitelist auf Vergleich + BoolOp +
  Name + Const. Kein eval, kein exec, keine Builtins.

aria-brain/background.py
  Async Loop laeuft alle 30s, sammelt einmalig Variables, geht durch
  Trigger-Liste, _should_fire-Check (Timer: fires_at vergangen / Watcher:
  check_interval + throttle respektiert + condition true). Fire ruft
  agent.chat(prompt, source="trigger") — ARIA bekommt das wie eine
  Push-Nachricht und antwortet via Bridge → RVS → App.

aria-brain/main.py
  /triggers/list, /{name}, /{name}/logs, /timer, /watcher, PATCH, DELETE,
  /triggers/conditions (Variablen + aktuelle Werte). Lifespan-Handler
  startet den Background-Loop beim Container-Start, stoppt beim Shutdown.

aria-brain/agent.py
  Meta-Tools fuer ARIA: trigger_timer, trigger_watcher, trigger_cancel,
  trigger_list. ARIA legt Trigger via Tool-Call selbst an wenn Stefan das
  wuenscht. Side-Channel-Event 'trigger_created' wird in chat-Response
  mitgeschickt damit App + Diagnostic eine Bubble zeigen.

aria-brain/prompts.py
  Neue System-Prompt-Section: Liste aktiver Triggers + verfuegbare
  Condition-Variablen mit aktuellen Werten + Operatoren-Erklaerung.
  ARIA weiss damit immer was es schon gibt und welche Vars sie nutzen kann.

bridge/aria_bridge.py + rvs/server.js
  trigger_created als neuer RVS-Message-Type, Bridge forwarded das aus
  data.events analog zu skill_created.

diagnostic/index.html
  Neuer Top-Tab "Trigger". Liste mit Type-Badges (⏱ TIMER / 👁 WATCHER),
  Status, Fire-Count, last_fired. Aktivieren/Deaktivieren + Löschen pro
  Trigger. "+ Neu"-Modal mit Type-Dropdown, Timer-Minuten oder
  Watcher-Condition + Vars-Anzeige + Throttle. Info-Modal-Eintrag mit
  Erklaerung. Live-Bubble im Chat wenn ARIA selbst einen anlegt.

android/src/screens/ChatScreen.tsx
  trigger_created RVS-Handler → eigene Bubble (gelber Border, " ARIA
  hat einen Trigger angelegt", Type/Detail/Message/Zeit). ChatMessage
  bekam triggerCreated-Feld. Lokal-only-Schutz beim Server-Sync analog
  zu skill_created.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 00:38:58 +02:00
parent 87cb687610
commit 31aa86a2a9
10 changed files with 1190 additions and 3 deletions
+141 -1
View File
@@ -25,6 +25,8 @@ from memory import Embedder, VectorStore, MemoryPoint
from prompts import build_system_prompt
from proxy_client import ProxyClient, Message as ProxyMessage
import skills as skills_mod
import triggers as triggers_mod
import watcher as watcher_mod
logger = logging.getLogger(__name__)
@@ -90,6 +92,90 @@ META_TOOLS = [
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "trigger_timer",
"description": (
"Lege einen Timer-Trigger an — feuert EINMALIG zum angegebenen Zeitpunkt "
"und ruft dich selbst auf (Push-Nachricht an Stefan). "
"Use-Case: 'erinnere mich in 10min', 'sag mir um 14:30 Bescheid'."
),
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "kurzer kebab-case-Name, a-z 0-9 - _"},
"fires_at": {
"type": "string",
"description": (
"Absoluter ISO-Timestamp UTC, z.B. '2026-05-12T14:30:00Z'. "
"Berechne aus relativer Angabe ('in 10min') selbst — die "
"aktuelle Zeit findest du im System-Prompt nicht, also nutze "
"Bash: `date -u -d '+10 minutes' --iso-8601=seconds`."
),
},
"message": {"type": "string", "description": "Was soll bei der Erinnerung gesagt werden"},
},
"required": ["name", "fires_at", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_watcher",
"description": (
"Lege einen Watcher-Trigger an — pollt alle paar Minuten eine Condition, "
"feuert wenn sie wahr wird (mit Throttle damit's nicht spammt). "
"Use-Case: 'sag bescheid wenn Disk unter 5GB', 'pingt mich wenn um 8 Uhr'. "
"Welche Variablen verfuegbar sind und ihre Bedeutung steht im System-Prompt."
),
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "kurzer Name"},
"condition": {
"type": "string",
"description": (
"Boolescher Ausdruck mit den erlaubten Variablen, z.B. "
"'disk_free_gb < 5', 'hour_of_day == 8 and day_of_week == \"mon\"'. "
"Operatoren: < > <= >= == != and or not"
),
},
"message": {"type": "string", "description": "Was soll bei Erfuellung gesagt werden"},
"check_interval_sec": {
"type": "integer",
"description": "Wie oft Condition pruefen (Default 300 = alle 5min, min 30)",
},
"throttle_sec": {
"type": "integer",
"description": "Mindestabstand zwischen 2 Feuerungen (Default 3600 = max 1x/h)",
},
},
"required": ["name", "condition", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_cancel",
"description": "Loescht einen Trigger (Timer abbrechen oder Watcher entfernen).",
"parameters": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_list",
"description": "Zeigt alle Trigger (active + inaktiv). Selten noetig — Stefan sieht sie im Diagnostic.",
"parameters": {"type": "object", "properties": {}},
},
},
]
@@ -175,8 +261,14 @@ class Agent:
active_skills = [s for s in all_skills if s.get("active", True)]
tools = list(META_TOOLS) + [_skill_to_tool(s) for s in active_skills]
# Trigger-Liste + Variablen-Info fuer den System-Prompt
all_triggers = triggers_mod.list_triggers(active_only=False)
condition_vars = watcher_mod.describe_variables()
# 5. System-Prompt + Window-Messages
system_prompt = build_system_prompt(hot, cold, skills=all_skills)
system_prompt = build_system_prompt(hot, cold, skills=all_skills,
triggers=all_triggers,
condition_vars=condition_vars)
messages = [ProxyMessage(role="system", content=system_prompt)]
for t in self.conversation.window():
messages.append(ProxyMessage(role=t.role, content=t.content))
@@ -273,6 +365,54 @@ class Agent:
if err:
out += f"\nstderr:\n{err}"
return out
if name == "trigger_timer":
t = triggers_mod.create_timer(
name=arguments["name"],
fires_at_iso=arguments["fires_at"],
message=arguments["message"],
author="aria",
)
self._pending_events.append({
"type": "trigger_created",
"trigger": {"name": t["name"], "type": "timer",
"fires_at": t["fires_at"], "message": t["message"]},
})
return f"OK — Timer '{t['name']}' angelegt, feuert um {t['fires_at']}."
if name == "trigger_watcher":
t = triggers_mod.create_watcher(
name=arguments["name"],
condition=arguments["condition"],
message=arguments["message"],
check_interval_sec=int(arguments.get("check_interval_sec", 300)),
throttle_sec=int(arguments.get("throttle_sec", 3600)),
author="aria",
)
self._pending_events.append({
"type": "trigger_created",
"trigger": {"name": t["name"], "type": "watcher",
"condition": t["condition"], "message": t["message"]},
})
return f"OK — Watcher '{t['name']}' angelegt: feuert wenn '{t['condition']}'."
if name == "trigger_cancel":
try:
triggers_mod.delete(arguments["name"])
return f"OK — Trigger '{arguments['name']}' geloescht."
except ValueError as e:
return f"FEHLER: {e}"
if name == "trigger_list":
items = triggers_mod.list_triggers(active_only=False)
if not items:
return "(keine Trigger vorhanden)"
lines = []
for t in items:
state = "aktiv" if t.get("active", True) else "DEAKTIVIERT"
if t["type"] == "timer":
lines.append(f"- {t['name']} (timer, {state}): feuert {t.get('fires_at')}\"{t.get('message','')[:50]}\"")
elif t["type"] == "watcher":
lines.append(f"- {t['name']} (watcher, {state}): cond=\"{t.get('condition')}\", throttle={t.get('throttle_sec')}s")
else:
lines.append(f"- {t['name']} ({t['type']}, {state})")
return "\n".join(lines)
return f"Unbekanntes Tool: {name}"
except Exception as exc:
logger.exception("Tool '%s' fehlgeschlagen", name)
+169
View File
@@ -0,0 +1,169 @@
"""
Background-Loop fuer Triggers.
Laeuft alle TICK_SEC Sekunden in einem asyncio Task, geht ueber alle
active Triggers und entscheidet ob sie feuern muessen.
Feuern bedeutet:
1. Trigger-Manifest update (fire_count++, last_fired_at, ggf. deaktivieren)
2. Log-Eintrag schreiben
3. agent.chat() mit einem system-Praefix aufrufen (NICHT als 'user'!)
→ ARIA bekommt das wie eine Push-Nachricht und kann antworten
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
import triggers as triggers_mod
import watcher as watcher_mod
logger = logging.getLogger(__name__)
TICK_SEC = 30
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_iso(s: str) -> Optional[datetime]:
if not s:
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _should_fire(trigger: dict, vars_: dict, now: datetime) -> bool:
if not trigger.get("active", True):
return False
t = trigger.get("type", "")
if t == "timer":
fires_at = _parse_iso(trigger.get("fires_at", ""))
if not fires_at:
return False
if fires_at.tzinfo is None:
fires_at = fires_at.replace(tzinfo=timezone.utc)
return now >= fires_at
if t == "watcher":
# Check-Interval respektieren (sonst pollen wir zu hektisch)
check_interval = int(trigger.get("check_interval_sec", 300))
last_checked = _parse_iso(trigger.get("last_checked_at", ""))
if last_checked:
if last_checked.tzinfo is None:
last_checked = last_checked.replace(tzinfo=timezone.utc)
if (now - last_checked).total_seconds() < check_interval:
return False
# Throttle: erst feuern wenn last_fired lange genug her ist
last_fired = _parse_iso(trigger.get("last_fired_at", ""))
throttle = int(trigger.get("throttle_sec", 3600))
if last_fired:
if last_fired.tzinfo is None:
last_fired = last_fired.replace(tzinfo=timezone.utc)
if (now - last_fired).total_seconds() < throttle:
return False
# Condition pruefen
cond = (trigger.get("condition") or "").strip()
if not cond:
return False
try:
return watcher_mod.evaluate(cond, vars_)
except Exception as e:
logger.warning("Trigger %s: Condition '%s' fehlerhaft: %s",
trigger.get("name"), cond, e)
return False
if t == "cron":
# TODO: später, wenn jemand Bock auf Cron-Parser hat
return False
return False
async def _fire(trigger: dict, agent_factory) -> None:
"""Ruft ARIA mit einer System-Praefix-Nachricht auf."""
name = trigger.get("name", "?")
message = trigger.get("message") or "(ohne Nachricht)"
ttype = trigger.get("type", "?")
# Manifest updaten
try:
triggers_mod.mark_fired(name)
except Exception as e:
logger.warning("mark_fired %s: %s", name, e)
# Log
triggers_mod.append_log(name, {"event": "fired", "type": ttype, "message": message})
# System-Nachricht an ARIA: nicht als User, sondern als Hinweis
prompt = (
f"[Trigger ausgelöst: '{name}', Typ: {ttype}] "
f"Geplante Nachricht: \"{message}\". "
f"Sage Stefan jetzt diese Information, in deinem Stil. "
f"Wenn der Trigger ein Watcher war (Bedingung wurde erfuellt), "
f"erwaehne kurz worum es geht. Antworte direkt, keine Rueckfrage."
)
try:
agent = agent_factory()
reply = agent.chat(prompt, source="trigger")
logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80])
triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]})
except Exception as e:
logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e)
triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]})
async def _tick(agent_factory) -> None:
"""Ein Pruefdurchlauf. Geht ueber alle Triggers, feuert was zu feuern ist."""
try:
all_triggers = triggers_mod.list_triggers(active_only=True)
except Exception as e:
logger.warning("triggers.list: %s", e)
return
if not all_triggers:
return
now = datetime.now(timezone.utc)
# Variablen einmal pro Tick sammeln (nicht pro Trigger — Disk-Stat ist teuer)
try:
vars_ = watcher_mod.collect_variables()
except Exception as e:
logger.warning("collect_variables: %s", e)
vars_ = {}
# Watcher: last_checked_at jetzt updaten (auch wenn nicht gefeuert wird,
# damit der Check-Interval respektiert wird)
for t in all_triggers:
if t.get("type") == "watcher":
try:
t["last_checked_at"] = _now_iso()
triggers_mod.write(t["name"], t)
except Exception:
pass
for trigger in all_triggers:
try:
if _should_fire(trigger, vars_, now):
# Feuern als eigener Task — wenn ARIA langsam antwortet,
# darf der naechste Tick nicht blockieren
asyncio.create_task(_fire(trigger, agent_factory))
except Exception as e:
logger.warning("Trigger-Check %s: %s", trigger.get("name"), e)
async def run_loop(agent_factory) -> None:
"""Endlosschleife — wird vom main lifespan gestartet + gestoppt."""
logger.info("Trigger-Loop gestartet (TICK_SEC=%d)", TICK_SEC)
while True:
try:
await _tick(agent_factory)
except Exception as e:
logger.exception("Tick-Fehler: %s", e)
await asyncio.sleep(TICK_SEC)
+120 -1
View File
@@ -20,6 +20,9 @@ import logging
import os
from typing import List, Optional
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import Response
from pydantic import BaseModel, Field
@@ -30,6 +33,9 @@ from proxy_client import ProxyClient
from agent import Agent
import skills as skills_mod
import metrics as metrics_mod
import triggers as triggers_mod
import watcher as watcher_mod
import background as background_mod
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("aria-brain")
@@ -37,7 +43,23 @@ logger = logging.getLogger("aria-brain")
QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
app = FastAPI(title="ARIA Brain", version="0.1.0")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Beim Brain-Start: Trigger-Background-Loop anwerfen. Beim Shutdown: stoppen."""
task = asyncio.create_task(background_mod.run_loop(agent))
logger.info("Lifespan: Trigger-Loop gestartet")
try:
yield
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info("Lifespan: Trigger-Loop gestoppt")
app = FastAPI(title="ARIA Brain", version="0.1.0", lifespan=lifespan)
_embedder: Optional[Embedder] = None
_store: Optional[VectorStore] = None
@@ -414,6 +436,103 @@ def metrics_calls():
return metrics_mod.stats()
# ─── Triggers (passive Aufweck-Quellen) ─────────────────────────────
class TriggerTimerBody(BaseModel):
name: str
fires_at: str # ISO timestamp
message: str
author: str = "stefan"
class TriggerWatcherBody(BaseModel):
name: str
condition: str
message: str
check_interval_sec: int = 300
throttle_sec: int = 3600
author: str = "stefan"
class TriggerPatch(BaseModel):
active: bool | None = None
message: str | None = None
condition: str | None = None
throttle_sec: int | None = None
check_interval_sec: int | None = None
fires_at: str | None = None
@app.get("/triggers/list")
def triggers_list(active_only: bool = False):
return {"triggers": triggers_mod.list_triggers(active_only=active_only)}
@app.get("/triggers/conditions")
def triggers_conditions():
"""Verfuegbare Variablen fuer Watcher-Conditions (mit aktuellen Werten)."""
return {
"variables": watcher_mod.describe_variables(),
"current": watcher_mod.collect_variables(),
}
@app.get("/triggers/{name}")
def triggers_get(name: str):
t = triggers_mod.read(name)
if t is None:
raise HTTPException(404, f"Trigger '{name}' nicht gefunden")
return t
@app.get("/triggers/{name}/logs")
def triggers_get_logs(name: str, limit: int = 50):
return {"logs": triggers_mod.list_logs(name, limit=limit)}
@app.post("/triggers/timer")
def triggers_create_timer(body: TriggerTimerBody):
try:
return triggers_mod.create_timer(
name=body.name, fires_at_iso=body.fires_at,
message=body.message, author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/triggers/watcher")
def triggers_create_watcher(body: TriggerWatcherBody):
try:
return triggers_mod.create_watcher(
name=body.name, condition=body.condition,
message=body.message,
check_interval_sec=body.check_interval_sec,
throttle_sec=body.throttle_sec,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/triggers/{name}")
def triggers_patch(name: str, body: TriggerPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return triggers_mod.update(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/triggers/{name}")
def triggers_delete(name: str):
try:
triggers_mod.delete(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
# ─── Skills ─────────────────────────────────────────────────────────
class SkillCreate(BaseModel):
+38 -1
View File
@@ -115,16 +115,53 @@ def build_skills_section(skills: List[dict]) -> str:
return "\n".join(lines)
def build_triggers_section(triggers: List[dict], condition_vars: List[dict]) -> str:
"""Triggers (passive Aufweck-Quellen) + verfuegbare Condition-Variablen."""
lines = ["## Trigger (passive Aufweck-Quellen)"]
lines.append("")
lines.append("Trigger sind ANDERS als Skills: das System ruft DICH wenn ein Event passiert. "
"Du legst sie an wenn Stefan sagt 'erinner mich an X' oder 'sag bescheid wenn Y'.")
lines.append("")
if triggers:
lines.append("### Aktuelle Trigger")
for t in triggers:
active = t.get("active", True)
mark = "" if active else " [INAKTIV]"
if t["type"] == "timer":
lines.append(f"- **{t['name']}**{mark} (timer) feuert {t.get('fires_at')}: \"{t.get('message','')[:80]}\"")
elif t["type"] == "watcher":
lines.append(f"- **{t['name']}**{mark} (watcher) cond=`{t.get('condition')}`: \"{t.get('message','')[:80]}\"")
lines.append("")
lines.append("### Verfuegbare Condition-Variablen (fuer Watcher)")
for v in condition_vars:
lines.append(f"- `{v['name']}` ({v['type']}) — {v['desc']}")
lines.append("")
lines.append("Operatoren in Conditions: `<` `>` `<=` `>=` `==` `!=` `and` `or` `not`. "
"Beispiel: `disk_free_gb < 5 and hour_of_day >= 8`. "
"String-Werte in Quotes: `day_of_week == \"mon\"`.")
lines.append("")
lines.append("### Wann welcher Typ?")
lines.append("- **Timer** fuer einmalige Erinnerungen mit konkreter Zeit ('in 10min', 'um 14:30').")
lines.append("- **Watcher** fuer 'wenn X passiert' (Disk voll, bestimmte Tageszeit).")
lines.append("- ARIA legt Trigger NUR auf Stefan-Wunsch an, nicht eigenmaechtig.")
return "\n".join(lines)
def build_system_prompt(
pinned: List[MemoryPoint],
cold: List[MemoryPoint] | None = None,
skills: List[dict] | None = None,
triggers: List[dict] | None = None,
condition_vars: List[dict] | None = None,
) -> str:
"""Kompletter System-Prompt: Hot + Cold + Skills."""
"""Kompletter System-Prompt: Hot + Cold + Skills + Triggers."""
parts = [build_hot_memory_section(pinned)]
if skills:
parts.append("")
parts.append(build_skills_section(skills))
if condition_vars:
parts.append("")
parts.append(build_triggers_section(triggers or [], condition_vars))
if cold:
parts.append("")
parts.append(build_cold_memory_section(cold))
+229
View File
@@ -0,0 +1,229 @@
"""
Triggers — passive Aufweck-Quellen fuer ARIA.
Skills sind aktiv (ARIA ruft sie). Triggers sind passiv — das System ruft
ARIA wenn ein Event passiert. Drei Typen:
timer Einmalig zu einem festen Zeitpunkt
watcher Recurring: Condition pruefen, bei True → feuern (mit Throttle)
cron Cron-Expression (vorerst nicht implementiert, Platzhalter)
Layout:
/data/triggers/<name>.json Manifest pro Trigger
/data/triggers/logs/<name>.jsonl Append-only Log pro Feuerung
Polling-Kosten: Brain-internes Background-Polling (kein LLM-Call).
ARIA wird nur aufgeweckt wenn ein Trigger tatsaechlich feuert.
"""
from __future__ import annotations
import json
import logging
import os
import re
import shutil
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
TRIGGERS_DIR = Path(os.environ.get("TRIGGERS_DIR", "/data/triggers"))
LOGS_DIR = TRIGGERS_DIR / "logs"
NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$")
VALID_TYPES = {"timer", "watcher", "cron"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _safe_name(name: str) -> str:
if not isinstance(name, str) or not NAME_RE.match(name):
raise ValueError(f"Ungueltiger Trigger-Name: {name!r}")
return name
def _path(name: str) -> Path:
return TRIGGERS_DIR / f"{_safe_name(name)}.json"
def _ensure_dirs():
TRIGGERS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
# ─── CRUD ───────────────────────────────────────────────────────────
def list_triggers(active_only: bool = False) -> list[dict]:
if not TRIGGERS_DIR.exists():
return []
out: list[dict] = []
for f in sorted(TRIGGERS_DIR.glob("*.json")):
try:
data = json.loads(f.read_text(encoding="utf-8"))
if active_only and not data.get("active", True):
continue
out.append(data)
except Exception as e:
logger.warning("Trigger lesen %s: %s", f, e)
return out
def read(name: str) -> Optional[dict]:
p = _path(name)
if not p.exists():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception as e:
logger.warning("Trigger %s lesen: %s", name, e)
return None
def write(name: str, data: dict) -> None:
_ensure_dirs()
data["updated_at"] = _now_iso()
p = _path(name)
tmp = p.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
tmp.replace(p)
def delete(name: str) -> None:
p = _path(name)
if not p.exists():
raise ValueError(f"Trigger '{name}' nicht gefunden")
p.unlink()
# Logs auch wegraeumen
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
if log_file.exists():
log_file.unlink()
def update(name: str, patch: dict) -> dict:
data = read(name)
if data is None:
raise ValueError(f"Trigger '{name}' nicht gefunden")
allowed = {"active", "message", "condition", "throttle_sec",
"check_interval_sec", "fires_at"}
for k, v in patch.items():
if k in allowed:
data[k] = v
write(name, data)
return data
# ─── Create-Helpers (typ-spezifisch) ────────────────────────────────
def create_timer(
name: str,
fires_at_iso: str,
message: str,
author: str = "aria",
) -> dict:
_safe_name(name)
if _path(name).exists():
raise ValueError(f"Trigger '{name}' existiert schon")
# ISO validieren
try:
datetime.fromisoformat(fires_at_iso.replace("Z", "+00:00"))
except Exception:
raise ValueError(f"fires_at_iso ungueltig: {fires_at_iso}")
data = {
"name": name,
"type": "timer",
"active": True,
"author": author,
"created_at": _now_iso(),
"fires_at": fires_at_iso,
"message": message,
"fire_count": 0,
"last_fired_at": None,
}
write(name, data)
logger.info("Trigger angelegt: %s (timer, fires_at=%s)", name, fires_at_iso)
return data
def create_watcher(
name: str,
condition: str,
message: str,
check_interval_sec: int = 300,
throttle_sec: int = 3600,
author: str = "aria",
) -> dict:
_safe_name(name)
if _path(name).exists():
raise ValueError(f"Trigger '{name}' existiert schon")
# Condition parsen-pruefen (wirft bei Syntax-Fehler)
from watcher import parse_condition
parse_condition(condition) # nur Validate
if check_interval_sec < 30:
check_interval_sec = 30 # nicht oefter als alle 30s pruefen
if throttle_sec < 0:
throttle_sec = 0
data = {
"name": name,
"type": "watcher",
"active": True,
"author": author,
"created_at": _now_iso(),
"condition": condition,
"check_interval_sec": int(check_interval_sec),
"throttle_sec": int(throttle_sec),
"message": message,
"fire_count": 0,
"last_fired_at": None,
"last_checked_at": None,
}
write(name, data)
logger.info("Trigger angelegt: %s (watcher, cond='%s')", name, condition)
return data
# ─── Feuern + Log ───────────────────────────────────────────────────
def mark_fired(name: str) -> dict:
data = read(name)
if data is None:
raise ValueError(f"Trigger '{name}' nicht gefunden")
data["fire_count"] = int(data.get("fire_count", 0)) + 1
data["last_fired_at"] = _now_iso()
# Timer: nach Feuern auto-deaktivieren (one-shot)
if data.get("type") == "timer":
data["active"] = False
write(name, data)
return data
def append_log(name: str, entry: dict) -> None:
_ensure_dirs()
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
record = {"ts": _now_iso()}
record.update(entry)
try:
with log_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning("Trigger-Log append %s: %s", name, e)
def list_logs(name: str, limit: int = 50) -> list[dict]:
log_file = LOGS_DIR / f"{_safe_name(name)}.jsonl"
if not log_file.exists():
return []
try:
lines = log_file.read_text(encoding="utf-8").splitlines()
out: list[dict] = []
for line in lines[-limit:]:
try:
out.append(json.loads(line))
except Exception:
pass
return out
except Exception:
return []
+150
View File
@@ -0,0 +1,150 @@
"""
Built-in Condition-Variablen + sicherer Mini-Parser fuer Watcher-Triggers.
Erlaubte Variablen kommen aus diesem Modul. Condition-Ausdruck ist ein
sicheres Subset von Python (kein eval, kein exec): nur Vergleiche und
Boolean-Operatoren, nur die hier deklarierten Variablen, nur Zahlen +
String-Literale als rechte Seite.
Beispiele:
disk_free_gb < 5
hour_of_day == 8 and day_of_week == "mon"
rvs_connected == False
(disk_free_pct < 10 and uptime_sec > 3600)
"""
from __future__ import annotations
import ast
import logging
import os
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ─── Variablen-Quellen ──────────────────────────────────────────────
def _disk_stats() -> tuple[float, float]:
"""Returns (free_gb, free_pct). Schaut /shared (geteiltes Volume) — sonst /."""
target = "/shared" if os.path.exists("/shared") else "/"
try:
st = shutil.disk_usage(target)
free_gb = st.free / (1024 ** 3)
free_pct = 100.0 * st.free / st.total if st.total else 0.0
return free_gb, free_pct
except Exception as e:
logger.warning("disk_usage: %s", e)
return 0.0, 0.0
def _uptime_sec() -> int:
try:
with open("/proc/uptime", "r") as f:
return int(float(f.read().split()[0]))
except Exception:
return 0
def _rvs_connected() -> bool:
"""Liest /shared/config/runtime.json oder ein Bridge-State-File.
Aktuell: wir koennen das nicht zuverlaessig aus dem Brain-Container
bestimmen — gibt False als sicheren Default zurueck.
Spaeter: Bridge schreibt einen Heartbeat-File den wir hier lesen."""
return False
_DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
def collect_variables() -> dict[str, Any]:
"""Liefert aktuellen Snapshot aller Built-in-Variablen."""
free_gb, free_pct = _disk_stats()
now = datetime.now()
# Memory-Count aus der Vector-DB (importiert lazy um zirkulaere Imports
# zu vermeiden — beim Modul-Load gibt's noch keinen Store)
memory_count = 0
try:
from main import store # type: ignore
s = store()
memory_count = s.count()
except Exception:
pass
return {
"disk_free_gb": round(free_gb, 2),
"disk_free_pct": round(free_pct, 1),
"uptime_sec": _uptime_sec(),
"hour_of_day": now.hour,
"day_of_week": _DAYS[now.weekday()],
"rvs_connected": _rvs_connected(),
"memory_count": memory_count,
}
def describe_variables() -> list[dict]:
"""Liste der verfuegbaren Variablen + Beschreibung — fuer System-Prompt + UI."""
return [
{"name": "disk_free_gb", "type": "number", "desc": "freier Plattenplatz in GB (auf /shared)"},
{"name": "disk_free_pct", "type": "number", "desc": "freier Plattenplatz in Prozent"},
{"name": "uptime_sec", "type": "number", "desc": "Sekunden seit Brain-Start"},
{"name": "hour_of_day", "type": "number", "desc": "0..23, lokale Zeit"},
{"name": "day_of_week", "type": "string", "desc": "mon|tue|wed|thu|fri|sat|sun"},
{"name": "rvs_connected", "type": "bool", "desc": "True wenn RVS-Verbindung steht"},
{"name": "memory_count", "type": "number", "desc": "Anzahl Memories in der Vector-DB"},
]
# ─── Sicherer Condition-Parser ──────────────────────────────────────
_ALLOWED_NODES = (
ast.Expression, ast.BoolOp, ast.UnaryOp, ast.Compare,
ast.Name, ast.Constant, ast.Load,
ast.And, ast.Or, ast.Not,
ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE,
)
def parse_condition(expr: str) -> ast.Expression:
"""Parst einen Condition-Ausdruck und validiert ihn gegen das Safe-Subset.
Wirft ValueError bei verbotenen Konstrukten."""
expr = (expr or "").strip()
if not expr:
raise ValueError("Leere Condition")
if len(expr) > 500:
raise ValueError("Condition zu lang (>500 Zeichen)")
try:
tree = ast.parse(expr, mode="eval")
except SyntaxError as e:
raise ValueError(f"Condition Syntax-Fehler: {e}")
# Whitelist-Walk
allowed_names = {v["name"] for v in describe_variables()}
for node in ast.walk(tree):
if not isinstance(node, _ALLOWED_NODES):
raise ValueError(f"Verbotener Ausdruck: {type(node).__name__}")
if isinstance(node, ast.Name):
if node.id not in allowed_names and node.id not in ("True", "False"):
raise ValueError(f"Unbekannte Variable: {node.id}")
if isinstance(node, ast.Constant):
if not isinstance(node.value, (int, float, str, bool)) and node.value is not None:
raise ValueError(f"Verbotener Konstant-Typ: {type(node.value).__name__}")
return tree
def evaluate(expr: str, variables: dict[str, Any] | None = None) -> bool:
"""Evaluiert die Condition gegen die aktuellen Variablen.
Returns bool. Bei Fehler in Variablen → False (defensiv)."""
tree = parse_condition(expr)
vars_ = variables if variables is not None else collect_variables()
code = compile(tree, "<condition>", "eval")
# Globals leer, locals nur die erlaubten Variablen → kein Builtin-Zugriff
try:
result = eval(code, {"__builtins__": {}}, vars_)
except Exception as e:
logger.warning("Condition '%s' eval-Fehler: %s", expr, e)
return False
return bool(result)