""" Built-in Condition-Variablen + sicherer Mini-Parser fuer Watcher-Triggers. Erlaubte Variablen + die EINE Funktion `near(lat, lon, radius_m)` kommen aus diesem Modul. Condition-Ausdruck ist ein sicheres Subset von Python (kein eval, kein exec): nur Vergleiche, Boolean-Operatoren, Whitelisted Funktionen, Variablen aus describe_variables(), Konstanten (Zahl/Bool/Str). Beispiele: disk_free_gb < 5 hour_of_day == 8 and day_of_week == "mon" is_weekend and minute_of_hour == 0 near(53.123, 7.456, 500) current_lat and location_age_sec < 120 """ from __future__ import annotations import ast import json import logging import math import os import shutil import time from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional logger = logging.getLogger(__name__) STATE_DIR = Path("/shared/state") # ─── State-Helfer (gemeinsam mit Bridge: /shared/state/*.json) ────── def _read_state(name: str) -> dict | None: f = STATE_DIR / f"{name}.json" if not f.exists(): return None try: return json.loads(f.read_text(encoding="utf-8")) except Exception: return None # ─── Variablen-Quellen ────────────────────────────────────────────── def _disk_stats() -> tuple[float, float]: """Returns (free_gb, free_pct). Schaut /shared (geteiltes Volume) — sonst /.""" target = "/shared" if os.path.exists("/shared") else "/" try: st = shutil.disk_usage(target) free_gb = st.free / (1024 ** 3) free_pct = 100.0 * st.free / st.total if st.total else 0.0 return free_gb, free_pct except Exception as e: logger.warning("disk_usage: %s", e) return 0.0, 0.0 def _uptime_sec() -> int: try: with open("/proc/uptime", "r") as f: return int(float(f.read().split()[0])) except Exception: return 0 def _ram_free_mb() -> int: """Container-RAM: MemAvailable aus /proc/meminfo (kB → MB).""" try: with open("/proc/meminfo", "r") as f: for line in f: if line.startswith("MemAvailable:"): return int(line.split()[1]) // 1024 except Exception: pass return 0 def _cpu_load_1min() -> float: """load avg ueber 1 Minute (linux). Vorsicht: das ist die HOST-load, nicht container-spezifisch.""" try: with open("/proc/loadavg", "r") as f: return float(f.read().split()[0]) except Exception: return 0.0 _DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] # Maximales GPS-Alter fuer near()-Auswertung. Wenn die App laenger nicht # gepushed hat (z.B. Tracking aus, Mobilfunk weg, App geschlossen), gilt # die Position als "unbekannt" und near() liefert False — verhindert # Phantom-Fires basierend auf einer wochen-alten Position. NEAR_MAX_AGE_SEC = 5 * 60 def _gps_state() -> dict[str, Any]: """Letzte bekannte Position aus /shared/state/location.json. Returns dict mit current_lat, current_lon (oder None), location_age_sec.""" data = _read_state("location") or {} now = int(time.time()) age = -1 lat = data.get("lat") lon = data.get("lon") ts = data.get("ts_unix") if isinstance(ts, (int, float)): age = int(now - ts) return { "current_lat": float(lat) if isinstance(lat, (int, float)) else None, "current_lon": float(lon) if isinstance(lon, (int, float)) else None, "location_age_sec": age, } def _user_activity_age() -> int: """Sekunden seit letzter User-Aktion (Chat oder Voice). -1 wenn nie.""" data = _read_state("activity") or {} ts = data.get("last_user_ts") if not isinstance(ts, (int, float)): return -1 return int(time.time() - ts) def _near_key(lat: float, lon: float, radius_m: float) -> str: """Stabiler Schluessel pro near()-Aufruf — fuer entered_near/left_near State-Tracking pro Trigger pro Aufrufstelle.""" return f"{float(lat):.6f},{float(lon):.6f},{int(float(radius_m))}" def collect_variables(prev_near_states: Optional[Dict[str, bool]] = None) -> Dict[str, Any]: """Liefert aktuellen Snapshot aller Built-in-Variablen + near()-Helper. prev_near_states: pro Trigger gespeicherter Zustand vom letzten Eval (für entered_near/left_near). Wird vom background-Loop reingegeben. Nach dem Eval kann man `vars_['_new_near_states']` auslesen, um den Update-Snapshot zurueck ins Trigger-Manifest zu schreiben.""" if prev_near_states is None: prev_near_states = {} new_near_states: Dict[str, bool] = {} free_gb, free_pct = _disk_stats() now = datetime.now() gps = _gps_state() # Memory-Counts aus der Vector-DB (lazy import, sonst zirkulaer) memory_count = 0 pinned_count = 0 try: from main import store # type: ignore s = store() memory_count = s.count() try: pinned_count = len(s.list_pinned()) except Exception: pass except Exception: pass vars_: dict[str, Any] = { # Disk + System "disk_free_gb": round(free_gb, 2), "disk_free_pct": round(free_pct, 1), "ram_free_mb": _ram_free_mb(), "cpu_load_1min": round(_cpu_load_1min(), 2), "uptime_sec": _uptime_sec(), # Zeit "hour_of_day": now.hour, "minute_of_hour": now.minute, "day_of_month": now.day, "month": now.month, "year": now.year, "day_of_week": _DAYS[now.weekday()], "is_weekend": now.weekday() >= 5, "unix_timestamp": int(time.time()), # GPS "current_lat": gps["current_lat"], "current_lon": gps["current_lon"], "location_age_sec": gps["location_age_sec"], # Activity "last_user_message_ago_sec": _user_activity_age(), # Memory "memory_count": memory_count, "pinned_count": pinned_count, # rvs_connected: kann Brain noch nicht zuverlaessig feststellen # (Bridge muesste eigenen Heartbeat-State schreiben — kommt spaeter) "rvs_connected": False, } # Funktion-Helper — wird vom Parser als ast.Call mit Name "near" erkannt. # Closure ueber die GPS-Werte, damit eval keine extra Variablen braucht. def _compute_near(lat: float, lon: float, radius_m: float) -> bool: """Haversine-Distanz: True wenn aktuelle Position < radius_m vom Punkt. Plus Age-Schutz: GPS-Daten aelter als NEAR_MAX_AGE_SEC werden als veraltet betrachtet → False.""" cur_lat = vars_.get("current_lat") cur_lon = vars_.get("current_lon") if cur_lat is None or cur_lon is None: return False age = vars_.get("location_age_sec") if isinstance(age, (int, float)) and age >= 0 and age > NEAR_MAX_AGE_SEC: return False try: R = 6371000.0 phi1 = math.radians(float(cur_lat)) phi2 = math.radians(float(lat)) dphi = math.radians(float(lat) - float(cur_lat)) dlam = math.radians(float(lon) - float(cur_lon)) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 distance = 2 * R * math.asin(math.sqrt(a)) return distance < float(radius_m) except Exception: return False def _near(lat: float, lon: float, radius_m: float) -> bool: """True solange im Radius drin. Plus State-Tracking fuer entered_near/left_near — wir merken uns das letzte Ergebnis damit Uebergaenge erkannt werden koennen.""" current = _compute_near(lat, lon, radius_m) new_near_states[_near_key(lat, lon, radius_m)] = current return current def _entered_near(lat: float, lon: float, radius_m: float) -> bool: """True NUR beim Uebergang draussen → innen. Use-Case: einmal feuern wenn der User in den Radius reinfaehrt (Blitzer-Warner, Ankunft-Erinnerung). Bei groesserem Radius = Vorwarnung.""" current = _compute_near(lat, lon, radius_m) key = _near_key(lat, lon, radius_m) new_near_states[key] = current prev = bool(prev_near_states.get(key, False)) return current and not prev def _left_near(lat: float, lon: float, radius_m: float) -> bool: """True NUR beim Uebergang innen → draussen. Use-Case: 'Hast du am Parkplatz X was vergessen?' beim Verlassen.""" current = _compute_near(lat, lon, radius_m) key = _near_key(lat, lon, radius_m) new_near_states[key] = current prev = bool(prev_near_states.get(key, False)) return prev and not current vars_["near"] = _near vars_["entered_near"] = _entered_near vars_["left_near"] = _left_near # Update-Snapshot fuer den Caller (background-Loop schreibt das pro # Trigger zurueck damit beim naechsten Tick prev_near_states stimmt) vars_["_new_near_states"] = new_near_states return vars_ def describe_variables() -> list[dict]: """Beschreibung — fuer System-Prompt + UI.""" return [ # Disk / System {"name": "disk_free_gb", "type": "number", "desc": "freier Plattenplatz in GB (auf /shared)"}, {"name": "disk_free_pct", "type": "number", "desc": "freier Plattenplatz in Prozent"}, {"name": "ram_free_mb", "type": "number", "desc": "freier RAM im Brain-Container (MB)"}, {"name": "cpu_load_1min", "type": "number", "desc": "Load-Avg 1min (Host)"}, {"name": "uptime_sec", "type": "number", "desc": "Sekunden seit Brain-Start"}, # Zeit {"name": "hour_of_day", "type": "number", "desc": "0..23, lokale Zeit"}, {"name": "minute_of_hour", "type": "number", "desc": "0..59"}, {"name": "day_of_month", "type": "number", "desc": "1..31"}, {"name": "month", "type": "number", "desc": "1..12"}, {"name": "year", "type": "number", "desc": "z.B. 2026"}, {"name": "day_of_week", "type": "string", "desc": "mon|tue|wed|thu|fri|sat|sun"}, {"name": "is_weekend", "type": "bool", "desc": "True wenn Samstag oder Sonntag"}, {"name": "unix_timestamp", "type": "number", "desc": "Sekunden seit Epoche (UTC)"}, # GPS {"name": "current_lat", "type": "number", "desc": "letzte bekannte Breitengrad (oder None)"}, {"name": "current_lon", "type": "number", "desc": "letzte bekannte Laengengrad (oder None)"}, {"name": "location_age_sec", "type": "number", "desc": "Sekunden seit letzter Position (-1 = nie)"}, # Activity {"name": "last_user_message_ago_sec", "type": "number", "desc": "Sekunden seit letztem User-Input (-1 = nie)"}, # Memory {"name": "memory_count", "type": "number", "desc": "Anzahl Memories total"}, {"name": "pinned_count", "type": "number", "desc": "Anzahl pinned (Hot Memory)"}, {"name": "rvs_connected", "type": "bool", "desc": "RVS-Verbindung (z.Zt. immer False)"}, ] def describe_functions() -> list[dict]: """Whitelisted Funktionen fuer Conditions.""" return [ { "name": "near", "signature": "near(lat, lon, radius_m)", "desc": "True SOLANGE die aktuelle GPS-Position innerhalb von radius_m " "Metern vom Punkt (lat, lon) liegt. Feuert wiederholt (mit throttle). " "Use-Case: 'bin noch in der Naehe von X?'. " "Haversine. Bei unbekannter oder > 5min alter Position: False.", }, { "name": "entered_near", "signature": "entered_near(lat, lon, radius_m)", "desc": "True NUR im Moment des Eintritts in den Radius (Uebergang " "draussen → innen). Use-Case: einmaliger Fire bei Ankunft / " "Blitzer-Warnung. Mit grossem Radius (z.B. 2000) wird das zur " "Vorwarnung bevor man am Punkt ist.", }, { "name": "left_near", "signature": "left_near(lat, lon, radius_m)", "desc": "True NUR im Moment des Verlassens des Radius (Uebergang " "innen → draussen). Use-Case: 'Hast du am Parkplatz X was " "vergessen?' beim Wegfahren.", }, ] _ALLOWED_FUNCTIONS = {f["name"] for f in describe_functions()} # ─── Sicherer Condition-Parser ────────────────────────────────────── _ALLOWED_NODES = ( ast.Expression, ast.BoolOp, ast.UnaryOp, ast.Compare, ast.Name, ast.Constant, ast.Load, ast.And, ast.Or, ast.Not, ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE, ast.Call, ) def parse_condition(expr: str) -> ast.Expression: """Parst einen Condition-Ausdruck und validiert ihn gegen das Safe-Subset. Wirft ValueError bei verbotenen Konstrukten.""" expr = (expr or "").strip() if not expr: raise ValueError("Leere Condition") if len(expr) > 500: raise ValueError("Condition zu lang (>500 Zeichen)") try: tree = ast.parse(expr, mode="eval") except SyntaxError as e: raise ValueError(f"Condition Syntax-Fehler: {e}") allowed_names = {v["name"] for v in describe_variables()} for node in ast.walk(tree): if not isinstance(node, _ALLOWED_NODES): raise ValueError(f"Verbotener Ausdruck: {type(node).__name__}") if isinstance(node, ast.Call): # Nur direkter Funktionsname, kein attribute-access (foo.bar()) if not isinstance(node.func, ast.Name): raise ValueError("Funktionsaufruf nur ueber direkten Namen erlaubt") if node.func.id not in _ALLOWED_FUNCTIONS: raise ValueError(f"Verbotene Funktion: {node.func.id}") # Args muessen Constants oder einzelne Names sein for a in node.args: if not isinstance(a, (ast.Constant, ast.Name, ast.UnaryOp)): raise ValueError(f"Argument-Typ in {node.func.id}() nicht erlaubt") if node.keywords: raise ValueError("Keyword-Argumente in Funktionen nicht erlaubt") if isinstance(node, ast.Name): if (node.id not in allowed_names and node.id not in _ALLOWED_FUNCTIONS and node.id not in ("True", "False")): raise ValueError(f"Unbekannte Variable: {node.id}") if isinstance(node, ast.Constant): if not isinstance(node.value, (int, float, str, bool)) and node.value is not None: raise ValueError(f"Verbotener Konstant-Typ: {type(node.value).__name__}") return tree def evaluate(expr: str, variables: dict[str, Any] | None = None) -> bool: """Evaluiert die Condition gegen die aktuellen Variablen. Returns bool. Bei Fehler in Variablen → False (defensiv).""" tree = parse_condition(expr) vars_ = variables if variables is not None else collect_variables() code = compile(tree, "", "eval") # Globals leer, locals enthalten Variablen + near()-Funktion → kein Builtin-Zugriff try: result = eval(code, {"__builtins__": {}}, vars_) except Exception as e: logger.warning("Condition '%s' eval-Fehler: %s", expr, e) return False return bool(result)