""" Background-Loop fuer Triggers. Laeuft alle TICK_SEC Sekunden in einem asyncio Task, geht ueber alle active Triggers und entscheidet ob sie feuern muessen. Feuern bedeutet: 1. Trigger-Manifest update (fire_count++, last_fired_at, ggf. deaktivieren) 2. Log-Eintrag schreiben 3. agent.chat() mit einem system-Praefix aufrufen (NICHT als 'user'!) → ARIA bekommt das wie eine Push-Nachricht und kann antworten """ from __future__ import annotations import asyncio import json import logging import os import urllib.error import urllib.request from datetime import datetime, timezone from typing import Optional import triggers as triggers_mod import watcher as watcher_mod logger = logging.getLogger(__name__) # Polling-Frequenz des Background-Loops. Vorher 30s → Auto-Vorbeifahrt # durch einen 300m-Radius bei >50 km/h konnte zwischen zwei Ticks komplett # verpasst werden. Mit 8s ist auch eine 18-Sekunden-Durchfahrt (120 km/h # durch 300m) garantiert mind. einmal getroffen. Der Loop ist billig # (paar Dateilesungen + AST-Eval), das macht Brain nicht warm. TICK_SEC = 8 BRIDGE_URL = os.environ.get("BRIDGE_URL", "http://aria-bridge:8090") def _push_to_bridge(reply: str, trigger_name: str, ttype: str, events: list) -> None: """POSTed eine Trigger-Antwort an die Bridge fuer RVS-Broadcast + TTS. Synchron via urllib — wird per run_in_executor aus dem async-Loop gerufen. Failures werden geloggt, brechen aber nicht ab. """ payload = json.dumps({ "reply": reply, "trigger_name": trigger_name, "type": ttype, "events": events or [], }).encode("utf-8") url = f"{BRIDGE_URL}/internal/trigger-fired" try: req = urllib.request.Request( url, data=payload, method="POST", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=15) as resp: if resp.status != 200: logger.warning("[trigger-push] Bridge hat %s zurueckgegeben", resp.status) except urllib.error.URLError as exc: logger.warning("[trigger-push] Bridge unerreichbar (%s): %s", url, exc) except Exception as exc: logger.warning("[trigger-push] Push fehlgeschlagen: %s", exc) def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _parse_iso(s: str) -> Optional[datetime]: if not s: return None try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except Exception: return None def _should_fire(trigger: dict, vars_: dict, now: datetime) -> bool: if not trigger.get("active", True): return False t = trigger.get("type", "") if t == "timer": fires_at = _parse_iso(trigger.get("fires_at", "")) if not fires_at: return False if fires_at.tzinfo is None: fires_at = fires_at.replace(tzinfo=timezone.utc) return now >= fires_at if t == "watcher": # Check-Interval respektieren (sonst pollen wir zu hektisch) check_interval = int(trigger.get("check_interval_sec", 300)) last_checked = _parse_iso(trigger.get("last_checked_at", "")) if last_checked: if last_checked.tzinfo is None: last_checked = last_checked.replace(tzinfo=timezone.utc) if (now - last_checked).total_seconds() < check_interval: return False # Throttle: erst feuern wenn last_fired lange genug her ist last_fired = _parse_iso(trigger.get("last_fired_at", "")) throttle = int(trigger.get("throttle_sec", 3600)) if last_fired: if last_fired.tzinfo is None: last_fired = last_fired.replace(tzinfo=timezone.utc) if (now - last_fired).total_seconds() < throttle: return False # Condition pruefen cond = (trigger.get("condition") or "").strip() if not cond: return False try: return watcher_mod.evaluate(cond, vars_) except Exception as e: logger.warning("Trigger %s: Condition '%s' fehlerhaft: %s", trigger.get("name"), cond, e) return False if t == "cron": # TODO: später, wenn jemand Bock auf Cron-Parser hat return False return False async def _fire(trigger: dict, agent_factory) -> None: """Ruft ARIA mit einer System-Praefix-Nachricht auf.""" name = trigger.get("name", "?") message = trigger.get("message") or "(ohne Nachricht)" ttype = trigger.get("type", "?") # Manifest updaten try: triggers_mod.mark_fired(name) except Exception as e: logger.warning("mark_fired %s: %s", name, e) # Log triggers_mod.append_log(name, {"event": "fired", "type": ttype, "message": message}) # System-Nachricht an ARIA: nicht als User, sondern als Hinweis prompt = ( f"[Trigger ausgelöst: '{name}', Typ: {ttype}] " f"Geplante Nachricht: \"{message}\". " f"Sage Stefan jetzt diese Information, in deinem Stil. " f"Wenn der Trigger ein Watcher war (Bedingung wurde erfuellt), " f"erwaehne kurz worum es geht. Antworte direkt, keine Rueckfrage." ) try: agent = agent_factory() reply = agent.chat(prompt, source="trigger") events = agent.pop_events() logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80]) triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]}) # Reply an die Bridge pushen, damit App + Diagnostic + TTS sie kriegen. # Ohne diesen Push wuerde die Antwort nur im Brain-Log landen. loop = asyncio.get_event_loop() await loop.run_in_executor(None, _push_to_bridge, reply, name, ttype, events) except Exception as e: logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e) triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]}) async def _tick(agent_factory) -> None: """Ein Pruefdurchlauf. Geht ueber alle Triggers, feuert was zu feuern ist. near()-State-Tracking: entered_near/left_near brauchen die Information ob ein near()-Aufruf beim letzten Tick true war (Uebergang erkennen). Wir halten das pro Trigger als near_states-Dict im Manifest und aktualisieren es nach jedem Eval — auch wenn nicht gefeuert wird.""" try: all_triggers = triggers_mod.list_triggers(active_only=True) except Exception as e: logger.warning("triggers.list: %s", e) return if not all_triggers: return now = datetime.now(timezone.utc) for trigger in all_triggers: if trigger.get("type") != "watcher": continue try: # Variablen pro Trigger sammeln — wegen prev_near_states-Closure prev = trigger.get("near_states") or {} vars_ = watcher_mod.collect_variables(prev_near_states=prev) # Condition evaluieren via _should_fire (intern ruft watcher.evaluate) fired = _should_fire(trigger, vars_, now) # State immer updaten, egal ob gefeuert wurde — sonst greift # entered_near/left_near nicht new_states = vars_.get("_new_near_states") or {} trigger["near_states"] = new_states trigger["last_checked_at"] = _now_iso() try: triggers_mod.write(trigger["name"], trigger) except Exception as e: logger.warning("trigger.write %s: %s", trigger.get("name"), e) if fired: # Feuern als eigener Task — wenn ARIA langsam antwortet, # darf der naechste Tick nicht blockieren asyncio.create_task(_fire(trigger, agent_factory)) except Exception as e: logger.warning("Trigger-Check %s: %s", trigger.get("name"), e) # Timer (one-shot) — separat ohne near-State timer_vars = None for trigger in all_triggers: if trigger.get("type") != "timer": continue try: if timer_vars is None: timer_vars = watcher_mod.collect_variables() if _should_fire(trigger, timer_vars, now): asyncio.create_task(_fire(trigger, agent_factory)) except Exception as e: logger.warning("Timer-Check %s: %s", trigger.get("name"), e) # Module-Level-Slot fuer die agent_factory damit on-demand-Ticks (von # z.B. POST /triggers/check-now) Zugang haben ohne durch den ganzen # Lifespan-Pfad geschleust zu werden. _AGENT_FACTORY = None async def tick_now() -> dict: """Sofortiger Trigger-Check — nicht warten auf den naechsten Loop-Tick. Wird genutzt wenn ein neues GPS-Update reinkommt: Bridge ruft das nach _persist_location, damit Watcher mit near() den frischen Wert sofort sehen statt bis zu TICK_SEC Sekunden zu warten.""" if _AGENT_FACTORY is None: return {"ok": False, "error": "Background-Loop noch nicht gestartet"} try: await _tick(_AGENT_FACTORY) return {"ok": True} except Exception as exc: logger.exception("tick_now: %s", exc) return {"ok": False, "error": str(exc)} async def run_loop(agent_factory) -> None: """Endlosschleife — wird vom main lifespan gestartet + gestoppt.""" global _AGENT_FACTORY _AGENT_FACTORY = agent_factory logger.info("Trigger-Loop gestartet (TICK_SEC=%d)", TICK_SEC) while True: try: await _tick(agent_factory) except Exception as e: logger.exception("Tick-Fehler: %s", e) await asyncio.sleep(TICK_SEC)