""" Built-in Condition-Variablen + sicherer Mini-Parser fuer Watcher-Triggers. Erlaubte Variablen + die EINE Funktion `near(lat, lon, radius_m)` kommen aus diesem Modul. Condition-Ausdruck ist ein sicheres Subset von Python (kein eval, kein exec): nur Vergleiche, Boolean-Operatoren, Whitelisted Funktionen, Variablen aus describe_variables(), Konstanten (Zahl/Bool/Str). Beispiele: disk_free_gb < 5 hour_of_day == 8 and day_of_week == "mon" is_weekend and minute_of_hour == 0 near(53.123, 7.456, 500) current_lat and location_age_sec < 120 """ from __future__ import annotations import ast import json import logging import math import os import shutil import time from datetime import datetime from pathlib import Path from typing import Any logger = logging.getLogger(__name__) STATE_DIR = Path("/shared/state") # ─── State-Helfer (gemeinsam mit Bridge: /shared/state/*.json) ────── def _read_state(name: str) -> dict | None: f = STATE_DIR / f"{name}.json" if not f.exists(): return None try: return json.loads(f.read_text(encoding="utf-8")) except Exception: return None # ─── Variablen-Quellen ────────────────────────────────────────────── def _disk_stats() -> tuple[float, float]: """Returns (free_gb, free_pct). Schaut /shared (geteiltes Volume) — sonst /.""" target = "/shared" if os.path.exists("/shared") else "/" try: st = shutil.disk_usage(target) free_gb = st.free / (1024 ** 3) free_pct = 100.0 * st.free / st.total if st.total else 0.0 return free_gb, free_pct except Exception as e: logger.warning("disk_usage: %s", e) return 0.0, 0.0 def _uptime_sec() -> int: try: with open("/proc/uptime", "r") as f: return int(float(f.read().split()[0])) except Exception: return 0 def _ram_free_mb() -> int: """Container-RAM: MemAvailable aus /proc/meminfo (kB → MB).""" try: with open("/proc/meminfo", "r") as f: for line in f: if line.startswith("MemAvailable:"): return int(line.split()[1]) // 1024 except Exception: pass return 0 def _cpu_load_1min() -> float: """load avg ueber 1 Minute (linux). Vorsicht: das ist die HOST-load, nicht container-spezifisch.""" try: with open("/proc/loadavg", "r") as f: return float(f.read().split()[0]) except Exception: return 0.0 _DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] def _gps_state() -> dict[str, Any]: """Letzte bekannte Position aus /shared/state/location.json. Returns dict mit current_lat, current_lon (oder None), location_age_sec.""" data = _read_state("location") or {} now = int(time.time()) age = -1 lat = data.get("lat") lon = data.get("lon") ts = data.get("ts_unix") if isinstance(ts, (int, float)): age = int(now - ts) return { "current_lat": float(lat) if isinstance(lat, (int, float)) else None, "current_lon": float(lon) if isinstance(lon, (int, float)) else None, "location_age_sec": age, } def _user_activity_age() -> int: """Sekunden seit letzter User-Aktion (Chat oder Voice). -1 wenn nie.""" data = _read_state("activity") or {} ts = data.get("last_user_ts") if not isinstance(ts, (int, float)): return -1 return int(time.time() - ts) def collect_variables() -> dict[str, Any]: """Liefert aktuellen Snapshot aller Built-in-Variablen + near()-Helper.""" free_gb, free_pct = _disk_stats() now = datetime.now() gps = _gps_state() # Memory-Counts aus der Vector-DB (lazy import, sonst zirkulaer) memory_count = 0 pinned_count = 0 try: from main import store # type: ignore s = store() memory_count = s.count() try: pinned_count = len(s.list_pinned()) except Exception: pass except Exception: pass vars_: dict[str, Any] = { # Disk + System "disk_free_gb": round(free_gb, 2), "disk_free_pct": round(free_pct, 1), "ram_free_mb": _ram_free_mb(), "cpu_load_1min": round(_cpu_load_1min(), 2), "uptime_sec": _uptime_sec(), # Zeit "hour_of_day": now.hour, "minute_of_hour": now.minute, "day_of_month": now.day, "month": now.month, "year": now.year, "day_of_week": _DAYS[now.weekday()], "is_weekend": now.weekday() >= 5, "unix_timestamp": int(time.time()), # GPS "current_lat": gps["current_lat"], "current_lon": gps["current_lon"], "location_age_sec": gps["location_age_sec"], # Activity "last_user_message_ago_sec": _user_activity_age(), # Memory "memory_count": memory_count, "pinned_count": pinned_count, # rvs_connected: kann Brain noch nicht zuverlaessig feststellen # (Bridge muesste eigenen Heartbeat-State schreiben — kommt spaeter) "rvs_connected": False, } # Funktion-Helper — wird vom Parser als ast.Call mit Name "near" erkannt. # Closure ueber die GPS-Werte, damit eval keine extra Variablen braucht. def _near(lat: float, lon: float, radius_m: float) -> bool: """Haversine-Distanz: True wenn aktuelle Position < radius_m vom Punkt.""" cur_lat = vars_.get("current_lat") cur_lon = vars_.get("current_lon") if cur_lat is None or cur_lon is None: return False try: R = 6371000.0 phi1 = math.radians(float(cur_lat)) phi2 = math.radians(float(lat)) dphi = math.radians(float(lat) - float(cur_lat)) dlam = math.radians(float(lon) - float(cur_lon)) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 distance = 2 * R * math.asin(math.sqrt(a)) return distance < float(radius_m) except Exception: return False vars_["near"] = _near return vars_ def describe_variables() -> list[dict]: """Beschreibung — fuer System-Prompt + UI.""" return [ # Disk / System {"name": "disk_free_gb", "type": "number", "desc": "freier Plattenplatz in GB (auf /shared)"}, {"name": "disk_free_pct", "type": "number", "desc": "freier Plattenplatz in Prozent"}, {"name": "ram_free_mb", "type": "number", "desc": "freier RAM im Brain-Container (MB)"}, {"name": "cpu_load_1min", "type": "number", "desc": "Load-Avg 1min (Host)"}, {"name": "uptime_sec", "type": "number", "desc": "Sekunden seit Brain-Start"}, # Zeit {"name": "hour_of_day", "type": "number", "desc": "0..23, lokale Zeit"}, {"name": "minute_of_hour", "type": "number", "desc": "0..59"}, {"name": "day_of_month", "type": "number", "desc": "1..31"}, {"name": "month", "type": "number", "desc": "1..12"}, {"name": "year", "type": "number", "desc": "z.B. 2026"}, {"name": "day_of_week", "type": "string", "desc": "mon|tue|wed|thu|fri|sat|sun"}, {"name": "is_weekend", "type": "bool", "desc": "True wenn Samstag oder Sonntag"}, {"name": "unix_timestamp", "type": "number", "desc": "Sekunden seit Epoche (UTC)"}, # GPS {"name": "current_lat", "type": "number", "desc": "letzte bekannte Breitengrad (oder None)"}, {"name": "current_lon", "type": "number", "desc": "letzte bekannte Laengengrad (oder None)"}, {"name": "location_age_sec", "type": "number", "desc": "Sekunden seit letzter Position (-1 = nie)"}, # Activity {"name": "last_user_message_ago_sec", "type": "number", "desc": "Sekunden seit letztem User-Input (-1 = nie)"}, # Memory {"name": "memory_count", "type": "number", "desc": "Anzahl Memories total"}, {"name": "pinned_count", "type": "number", "desc": "Anzahl pinned (Hot Memory)"}, {"name": "rvs_connected", "type": "bool", "desc": "RVS-Verbindung (z.Zt. immer False)"}, ] def describe_functions() -> list[dict]: """Whitelisted Funktionen fuer Conditions.""" return [ { "name": "near", "signature": "near(lat, lon, radius_m)", "desc": "True wenn die aktuelle GPS-Position innerhalb von radius_m Metern " "vom Punkt (lat, lon) liegt. Haversine. Bei unbekannter Position: False.", }, ] _ALLOWED_FUNCTIONS = {f["name"] for f in describe_functions()} # ─── Sicherer Condition-Parser ────────────────────────────────────── _ALLOWED_NODES = ( ast.Expression, ast.BoolOp, ast.UnaryOp, ast.Compare, ast.Name, ast.Constant, ast.Load, ast.And, ast.Or, ast.Not, ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE, ast.Call, ) def parse_condition(expr: str) -> ast.Expression: """Parst einen Condition-Ausdruck und validiert ihn gegen das Safe-Subset. Wirft ValueError bei verbotenen Konstrukten.""" expr = (expr or "").strip() if not expr: raise ValueError("Leere Condition") if len(expr) > 500: raise ValueError("Condition zu lang (>500 Zeichen)") try: tree = ast.parse(expr, mode="eval") except SyntaxError as e: raise ValueError(f"Condition Syntax-Fehler: {e}") allowed_names = {v["name"] for v in describe_variables()} for node in ast.walk(tree): if not isinstance(node, _ALLOWED_NODES): raise ValueError(f"Verbotener Ausdruck: {type(node).__name__}") if isinstance(node, ast.Call): # Nur direkter Funktionsname, kein attribute-access (foo.bar()) if not isinstance(node.func, ast.Name): raise ValueError("Funktionsaufruf nur ueber direkten Namen erlaubt") if node.func.id not in _ALLOWED_FUNCTIONS: raise ValueError(f"Verbotene Funktion: {node.func.id}") # Args muessen Constants oder einzelne Names sein for a in node.args: if not isinstance(a, (ast.Constant, ast.Name, ast.UnaryOp)): raise ValueError(f"Argument-Typ in {node.func.id}() nicht erlaubt") if node.keywords: raise ValueError("Keyword-Argumente in Funktionen nicht erlaubt") if isinstance(node, ast.Name): if (node.id not in allowed_names and node.id not in _ALLOWED_FUNCTIONS and node.id not in ("True", "False")): raise ValueError(f"Unbekannte Variable: {node.id}") if isinstance(node, ast.Constant): if not isinstance(node.value, (int, float, str, bool)) and node.value is not None: raise ValueError(f"Verbotener Konstant-Typ: {type(node.value).__name__}") return tree def evaluate(expr: str, variables: dict[str, Any] | None = None) -> bool: """Evaluiert die Condition gegen die aktuellen Variablen. Returns bool. Bei Fehler in Variablen → False (defensiv).""" tree = parse_condition(expr) vars_ = variables if variables is not None else collect_variables() code = compile(tree, "", "eval") # Globals leer, locals enthalten Variablen + near()-Funktion → kein Builtin-Zugriff try: result = eval(code, {"__builtins__": {}}, vars_) except Exception as e: logger.warning("Condition '%s' eval-Fehler: %s", expr, e) return False return bool(result)