Files
ARIA-AGENT/aria-brain/watcher.py
T
duffyduck 435b77e1df feat(trigger): entered_near + left_near — drei Modi fuer near()-Watcher
Stefan: bei aktuellen near()-Watcher gibt's nur "solange drin". Reale
Szenarien wollen aber differenzieren:
- VORWARNUNG vor Ziel (Blitzer-Warner 2 km vorher) → entered_near mit grossem r
- ANKUNFT exakt am Ziel → entered_near mit kleinem r
- VERLASSEN (Parkplatz, hast du was vergessen) → left_near
- KONTINUIERLICH-DRIN (bin noch in der Naehe?) → near (Default, throttled)

Zwei neue Funktionen in der Condition-Whitelist:

- entered_near(lat, lon, r): True NUR im Moment des Uebergangs
  draussen → innen. Fires einmal pro Eintritt.
- left_near(lat, lon, r): True NUR im Moment des Uebergangs innen →
  draussen. Fires einmal pro Austritt.

State-Tracking:
- pro Trigger pro near-Aufruf wird der letzte Auswertungs-Wert (true/
  false) im Watcher-Manifest gespeichert (Field "near_states", Key
  "lat.6,lon.6,radius"). Background-Loop liest's vor dem Eval, gibt's
  per collect_variables(prev_near_states=...) in die Closure, schreibt
  nach dem Eval die neuen Werte zurueck — UNABHAENGIG ob gefeuert
  wurde, sonst greift die Uebergangs-Erkennung nicht.

Background _tick:
- Aufteilung in Watcher-Pass (mit prev_near_states pro Trigger) und
  Timer-Pass (ohne State, gemeinsame vars). Bisher war collect_variables
  einmal pro Tick — jetzt einmal pro Watcher. Disk-Stats sind teuer
  aber unter 30 Watchern unkritisch; bei mehr koennen wir cachen.

ARIA-Tool-Description erweitert (trigger_watcher): erklaert die drei
Modi mit Use-Cases und empfohlenen Throttle-Werten (kurz fuer entered/
left, lang fuer near).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 18:29:39 +02:00

385 lines
15 KiB
Python

"""
Built-in Condition-Variablen + sicherer Mini-Parser fuer Watcher-Triggers.
Erlaubte Variablen + die EINE Funktion `near(lat, lon, radius_m)` kommen
aus diesem Modul. Condition-Ausdruck ist ein sicheres Subset von Python
(kein eval, kein exec): nur Vergleiche, Boolean-Operatoren, Whitelisted
Funktionen, Variablen aus describe_variables(), Konstanten (Zahl/Bool/Str).
Beispiele:
disk_free_gb < 5
hour_of_day == 8 and day_of_week == "mon"
is_weekend and minute_of_hour == 0
near(53.123, 7.456, 500)
current_lat and location_age_sec < 120
"""
from __future__ import annotations
import ast
import json
import logging
import math
import os
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
STATE_DIR = Path("/shared/state")
# ─── State-Helfer (gemeinsam mit Bridge: /shared/state/*.json) ──────
def _read_state(name: str) -> dict | None:
f = STATE_DIR / f"{name}.json"
if not f.exists():
return None
try:
return json.loads(f.read_text(encoding="utf-8"))
except Exception:
return None
# ─── Variablen-Quellen ──────────────────────────────────────────────
def _disk_stats() -> tuple[float, float]:
"""Returns (free_gb, free_pct). Schaut /shared (geteiltes Volume) — sonst /."""
target = "/shared" if os.path.exists("/shared") else "/"
try:
st = shutil.disk_usage(target)
free_gb = st.free / (1024 ** 3)
free_pct = 100.0 * st.free / st.total if st.total else 0.0
return free_gb, free_pct
except Exception as e:
logger.warning("disk_usage: %s", e)
return 0.0, 0.0
def _uptime_sec() -> int:
try:
with open("/proc/uptime", "r") as f:
return int(float(f.read().split()[0]))
except Exception:
return 0
def _ram_free_mb() -> int:
"""Container-RAM: MemAvailable aus /proc/meminfo (kB → MB)."""
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if line.startswith("MemAvailable:"):
return int(line.split()[1]) // 1024
except Exception:
pass
return 0
def _cpu_load_1min() -> float:
"""load avg ueber 1 Minute (linux). Vorsicht: das ist die HOST-load,
nicht container-spezifisch."""
try:
with open("/proc/loadavg", "r") as f:
return float(f.read().split()[0])
except Exception:
return 0.0
_DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
# Maximales GPS-Alter fuer near()-Auswertung. Wenn die App laenger nicht
# gepushed hat (z.B. Tracking aus, Mobilfunk weg, App geschlossen), gilt
# die Position als "unbekannt" und near() liefert False — verhindert
# Phantom-Fires basierend auf einer wochen-alten Position.
NEAR_MAX_AGE_SEC = 5 * 60
def _gps_state() -> dict[str, Any]:
"""Letzte bekannte Position aus /shared/state/location.json.
Returns dict mit current_lat, current_lon (oder None), location_age_sec."""
data = _read_state("location") or {}
now = int(time.time())
age = -1
lat = data.get("lat")
lon = data.get("lon")
ts = data.get("ts_unix")
if isinstance(ts, (int, float)):
age = int(now - ts)
return {
"current_lat": float(lat) if isinstance(lat, (int, float)) else None,
"current_lon": float(lon) if isinstance(lon, (int, float)) else None,
"location_age_sec": age,
}
def _user_activity_age() -> int:
"""Sekunden seit letzter User-Aktion (Chat oder Voice). -1 wenn nie."""
data = _read_state("activity") or {}
ts = data.get("last_user_ts")
if not isinstance(ts, (int, float)):
return -1
return int(time.time() - ts)
def _near_key(lat: float, lon: float, radius_m: float) -> str:
"""Stabiler Schluessel pro near()-Aufruf — fuer entered_near/left_near
State-Tracking pro Trigger pro Aufrufstelle."""
return f"{float(lat):.6f},{float(lon):.6f},{int(float(radius_m))}"
def collect_variables(prev_near_states: Optional[Dict[str, bool]] = None) -> Dict[str, Any]:
"""Liefert aktuellen Snapshot aller Built-in-Variablen + near()-Helper.
prev_near_states: pro Trigger gespeicherter Zustand vom letzten Eval
(für entered_near/left_near). Wird vom background-Loop reingegeben.
Nach dem Eval kann man `vars_['_new_near_states']` auslesen, um den
Update-Snapshot zurueck ins Trigger-Manifest zu schreiben."""
if prev_near_states is None:
prev_near_states = {}
new_near_states: Dict[str, bool] = {}
free_gb, free_pct = _disk_stats()
now = datetime.now()
gps = _gps_state()
# Memory-Counts aus der Vector-DB (lazy import, sonst zirkulaer)
memory_count = 0
pinned_count = 0
try:
from main import store # type: ignore
s = store()
memory_count = s.count()
try:
pinned_count = len(s.list_pinned())
except Exception:
pass
except Exception:
pass
vars_: dict[str, Any] = {
# Disk + System
"disk_free_gb": round(free_gb, 2),
"disk_free_pct": round(free_pct, 1),
"ram_free_mb": _ram_free_mb(),
"cpu_load_1min": round(_cpu_load_1min(), 2),
"uptime_sec": _uptime_sec(),
# Zeit
"hour_of_day": now.hour,
"minute_of_hour": now.minute,
"day_of_month": now.day,
"month": now.month,
"year": now.year,
"day_of_week": _DAYS[now.weekday()],
"is_weekend": now.weekday() >= 5,
"unix_timestamp": int(time.time()),
# GPS
"current_lat": gps["current_lat"],
"current_lon": gps["current_lon"],
"location_age_sec": gps["location_age_sec"],
# Activity
"last_user_message_ago_sec": _user_activity_age(),
# Memory
"memory_count": memory_count,
"pinned_count": pinned_count,
# rvs_connected: kann Brain noch nicht zuverlaessig feststellen
# (Bridge muesste eigenen Heartbeat-State schreiben — kommt spaeter)
"rvs_connected": False,
}
# Funktion-Helper — wird vom Parser als ast.Call mit Name "near" erkannt.
# Closure ueber die GPS-Werte, damit eval keine extra Variablen braucht.
def _compute_near(lat: float, lon: float, radius_m: float) -> bool:
"""Haversine-Distanz: True wenn aktuelle Position < radius_m vom Punkt.
Plus Age-Schutz: GPS-Daten aelter als NEAR_MAX_AGE_SEC werden als
veraltet betrachtet → False."""
cur_lat = vars_.get("current_lat")
cur_lon = vars_.get("current_lon")
if cur_lat is None or cur_lon is None:
return False
age = vars_.get("location_age_sec")
if isinstance(age, (int, float)) and age >= 0 and age > NEAR_MAX_AGE_SEC:
return False
try:
R = 6371000.0
phi1 = math.radians(float(cur_lat))
phi2 = math.radians(float(lat))
dphi = math.radians(float(lat) - float(cur_lat))
dlam = math.radians(float(lon) - float(cur_lon))
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
distance = 2 * R * math.asin(math.sqrt(a))
return distance < float(radius_m)
except Exception:
return False
def _near(lat: float, lon: float, radius_m: float) -> bool:
"""True solange im Radius drin. Plus State-Tracking fuer
entered_near/left_near — wir merken uns das letzte Ergebnis
damit Uebergaenge erkannt werden koennen."""
current = _compute_near(lat, lon, radius_m)
new_near_states[_near_key(lat, lon, radius_m)] = current
return current
def _entered_near(lat: float, lon: float, radius_m: float) -> bool:
"""True NUR beim Uebergang draussen → innen. Use-Case: einmal
feuern wenn der User in den Radius reinfaehrt (Blitzer-Warner,
Ankunft-Erinnerung). Bei groesserem Radius = Vorwarnung."""
current = _compute_near(lat, lon, radius_m)
key = _near_key(lat, lon, radius_m)
new_near_states[key] = current
prev = bool(prev_near_states.get(key, False))
return current and not prev
def _left_near(lat: float, lon: float, radius_m: float) -> bool:
"""True NUR beim Uebergang innen → draussen. Use-Case: 'Hast
du am Parkplatz X was vergessen?' beim Verlassen."""
current = _compute_near(lat, lon, radius_m)
key = _near_key(lat, lon, radius_m)
new_near_states[key] = current
prev = bool(prev_near_states.get(key, False))
return prev and not current
vars_["near"] = _near
vars_["entered_near"] = _entered_near
vars_["left_near"] = _left_near
# Update-Snapshot fuer den Caller (background-Loop schreibt das pro
# Trigger zurueck damit beim naechsten Tick prev_near_states stimmt)
vars_["_new_near_states"] = new_near_states
return vars_
def describe_variables() -> list[dict]:
"""Beschreibung — fuer System-Prompt + UI."""
return [
# Disk / System
{"name": "disk_free_gb", "type": "number", "desc": "freier Plattenplatz in GB (auf /shared)"},
{"name": "disk_free_pct", "type": "number", "desc": "freier Plattenplatz in Prozent"},
{"name": "ram_free_mb", "type": "number", "desc": "freier RAM im Brain-Container (MB)"},
{"name": "cpu_load_1min", "type": "number", "desc": "Load-Avg 1min (Host)"},
{"name": "uptime_sec", "type": "number", "desc": "Sekunden seit Brain-Start"},
# Zeit
{"name": "hour_of_day", "type": "number", "desc": "0..23, lokale Zeit"},
{"name": "minute_of_hour", "type": "number", "desc": "0..59"},
{"name": "day_of_month", "type": "number", "desc": "1..31"},
{"name": "month", "type": "number", "desc": "1..12"},
{"name": "year", "type": "number", "desc": "z.B. 2026"},
{"name": "day_of_week", "type": "string", "desc": "mon|tue|wed|thu|fri|sat|sun"},
{"name": "is_weekend", "type": "bool", "desc": "True wenn Samstag oder Sonntag"},
{"name": "unix_timestamp", "type": "number", "desc": "Sekunden seit Epoche (UTC)"},
# GPS
{"name": "current_lat", "type": "number", "desc": "letzte bekannte Breitengrad (oder None)"},
{"name": "current_lon", "type": "number", "desc": "letzte bekannte Laengengrad (oder None)"},
{"name": "location_age_sec", "type": "number", "desc": "Sekunden seit letzter Position (-1 = nie)"},
# Activity
{"name": "last_user_message_ago_sec", "type": "number",
"desc": "Sekunden seit letztem User-Input (-1 = nie)"},
# Memory
{"name": "memory_count", "type": "number", "desc": "Anzahl Memories total"},
{"name": "pinned_count", "type": "number", "desc": "Anzahl pinned (Hot Memory)"},
{"name": "rvs_connected", "type": "bool", "desc": "RVS-Verbindung (z.Zt. immer False)"},
]
def describe_functions() -> list[dict]:
"""Whitelisted Funktionen fuer Conditions."""
return [
{
"name": "near",
"signature": "near(lat, lon, radius_m)",
"desc": "True SOLANGE die aktuelle GPS-Position innerhalb von radius_m "
"Metern vom Punkt (lat, lon) liegt. Feuert wiederholt (mit throttle). "
"Use-Case: 'bin noch in der Naehe von X?'. "
"Haversine. Bei unbekannter oder > 5min alter Position: False.",
},
{
"name": "entered_near",
"signature": "entered_near(lat, lon, radius_m)",
"desc": "True NUR im Moment des Eintritts in den Radius (Uebergang "
"draussen → innen). Use-Case: einmaliger Fire bei Ankunft / "
"Blitzer-Warnung. Mit grossem Radius (z.B. 2000) wird das zur "
"Vorwarnung bevor man am Punkt ist.",
},
{
"name": "left_near",
"signature": "left_near(lat, lon, radius_m)",
"desc": "True NUR im Moment des Verlassens des Radius (Uebergang "
"innen → draussen). Use-Case: 'Hast du am Parkplatz X was "
"vergessen?' beim Wegfahren.",
},
]
_ALLOWED_FUNCTIONS = {f["name"] for f in describe_functions()}
# ─── Sicherer Condition-Parser ──────────────────────────────────────
_ALLOWED_NODES = (
ast.Expression, ast.BoolOp, ast.UnaryOp, ast.Compare,
ast.Name, ast.Constant, ast.Load,
ast.And, ast.Or, ast.Not,
ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE,
ast.Call,
)
def parse_condition(expr: str) -> ast.Expression:
"""Parst einen Condition-Ausdruck und validiert ihn gegen das Safe-Subset.
Wirft ValueError bei verbotenen Konstrukten."""
expr = (expr or "").strip()
if not expr:
raise ValueError("Leere Condition")
if len(expr) > 500:
raise ValueError("Condition zu lang (>500 Zeichen)")
try:
tree = ast.parse(expr, mode="eval")
except SyntaxError as e:
raise ValueError(f"Condition Syntax-Fehler: {e}")
allowed_names = {v["name"] for v in describe_variables()}
for node in ast.walk(tree):
if not isinstance(node, _ALLOWED_NODES):
raise ValueError(f"Verbotener Ausdruck: {type(node).__name__}")
if isinstance(node, ast.Call):
# Nur direkter Funktionsname, kein attribute-access (foo.bar())
if not isinstance(node.func, ast.Name):
raise ValueError("Funktionsaufruf nur ueber direkten Namen erlaubt")
if node.func.id not in _ALLOWED_FUNCTIONS:
raise ValueError(f"Verbotene Funktion: {node.func.id}")
# Args muessen Constants oder einzelne Names sein
for a in node.args:
if not isinstance(a, (ast.Constant, ast.Name, ast.UnaryOp)):
raise ValueError(f"Argument-Typ in {node.func.id}() nicht erlaubt")
if node.keywords:
raise ValueError("Keyword-Argumente in Funktionen nicht erlaubt")
if isinstance(node, ast.Name):
if (node.id not in allowed_names
and node.id not in _ALLOWED_FUNCTIONS
and node.id not in ("True", "False")):
raise ValueError(f"Unbekannte Variable: {node.id}")
if isinstance(node, ast.Constant):
if not isinstance(node.value, (int, float, str, bool)) and node.value is not None:
raise ValueError(f"Verbotener Konstant-Typ: {type(node.value).__name__}")
return tree
def evaluate(expr: str, variables: dict[str, Any] | None = None) -> bool:
"""Evaluiert die Condition gegen die aktuellen Variablen.
Returns bool. Bei Fehler in Variablen → False (defensiv)."""
tree = parse_condition(expr)
vars_ = variables if variables is not None else collect_variables()
code = compile(tree, "<condition>", "eval")
# Globals leer, locals enthalten Variablen + near()-Funktion → kein Builtin-Zugriff
try:
result = eval(code, {"__builtins__": {}}, vars_)
except Exception as e:
logger.warning("Condition '%s' eval-Fehler: %s", expr, e)
return False
return bool(result)