Files
ARIA-AGENT/aria-brain/background.py
T
duffyduck 6f80e442cf fix(trigger): near() fired bei Auto-Vorbeifahrten verpasst — Loop schneller + event-getrieben
Stefan ist mehrmals an einem 300m-near()-Watcher (DRK Kreyenbrueck)
vorbeigefahren, kein Fire. Ursache: Background-Loop tickte alle 30s,
Auto-Durchfahrt durch 600m-Durchmesser-Radius dauert bei 50-120 km/h
nur 18-43 Sekunden — der Tick konnte komplett dazwischen liegen.

Drei Fixes (A + B aus Stefans Vorschlag):

A1. Background-Loop-Frequenz: TICK_SEC 30 → 8.
    Garantiert mind. 2 Checks auch bei 120 km/h durch 300m. Loop ist
    billig (paar Dateilesungen + AST-Eval), Brain merkt das nicht.

A2. near() bekommt Age-Schutz (watcher.py NEAR_MAX_AGE_SEC=300):
    Wenn location_age_sec > 5 min, gilt die Position als unbekannt
    und near() liefert False. Verhindert Phantom-Fires wenn Tracking
    aus ist oder Mobilfunk weg war — vorher haette der letzte
    bekannte Wert weiter ausgewertet werden koennen.

B. Event-getriebener Tick:
    - background.py: tick_now()-Funktion + Module-Slot fuer
      agent_factory damit man von ausserhalb des Lifespan-Pfads
      einen Tick triggern kann
    - main.py: POST /triggers/check-now Endpoint ruft tick_now()
    - bridge: _persist_location feuert nach jedem Save ein fire-and-
      forget POST /triggers/check-now (run_in_executor, timeout 8s,
      blockt nichts wenn Brain stockt)

Damit fires near() sofort wenn die App ein location_update schickt —
Polling ist nur noch der Fallback fuer Watcher OHNE GPS-Bezug
(disk_free, hour_of_day etc.) und als Sicherheits-Tick falls
location_update mal ausfaellt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 18:16:53 +02:00

235 lines
8.4 KiB
Python

"""
Background-Loop fuer Triggers.
Laeuft alle TICK_SEC Sekunden in einem asyncio Task, geht ueber alle
active Triggers und entscheidet ob sie feuern muessen.
Feuern bedeutet:
1. Trigger-Manifest update (fire_count++, last_fired_at, ggf. deaktivieren)
2. Log-Eintrag schreiben
3. agent.chat() mit einem system-Praefix aufrufen (NICHT als 'user'!)
→ ARIA bekommt das wie eine Push-Nachricht und kann antworten
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Optional
import triggers as triggers_mod
import watcher as watcher_mod
logger = logging.getLogger(__name__)
# Polling-Frequenz des Background-Loops. Vorher 30s → Auto-Vorbeifahrt
# durch einen 300m-Radius bei >50 km/h konnte zwischen zwei Ticks komplett
# verpasst werden. Mit 8s ist auch eine 18-Sekunden-Durchfahrt (120 km/h
# durch 300m) garantiert mind. einmal getroffen. Der Loop ist billig
# (paar Dateilesungen + AST-Eval), das macht Brain nicht warm.
TICK_SEC = 8
BRIDGE_URL = os.environ.get("BRIDGE_URL", "http://aria-bridge:8090")
def _push_to_bridge(reply: str, trigger_name: str, ttype: str, events: list) -> None:
"""POSTed eine Trigger-Antwort an die Bridge fuer RVS-Broadcast + TTS.
Synchron via urllib — wird per run_in_executor aus dem async-Loop
gerufen. Failures werden geloggt, brechen aber nicht ab.
"""
payload = json.dumps({
"reply": reply,
"trigger_name": trigger_name,
"type": ttype,
"events": events or [],
}).encode("utf-8")
url = f"{BRIDGE_URL}/internal/trigger-fired"
try:
req = urllib.request.Request(
url, data=payload, method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status != 200:
logger.warning("[trigger-push] Bridge hat %s zurueckgegeben", resp.status)
except urllib.error.URLError as exc:
logger.warning("[trigger-push] Bridge unerreichbar (%s): %s", url, exc)
except Exception as exc:
logger.warning("[trigger-push] Push fehlgeschlagen: %s", exc)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_iso(s: str) -> Optional[datetime]:
if not s:
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _should_fire(trigger: dict, vars_: dict, now: datetime) -> bool:
if not trigger.get("active", True):
return False
t = trigger.get("type", "")
if t == "timer":
fires_at = _parse_iso(trigger.get("fires_at", ""))
if not fires_at:
return False
if fires_at.tzinfo is None:
fires_at = fires_at.replace(tzinfo=timezone.utc)
return now >= fires_at
if t == "watcher":
# Check-Interval respektieren (sonst pollen wir zu hektisch)
check_interval = int(trigger.get("check_interval_sec", 300))
last_checked = _parse_iso(trigger.get("last_checked_at", ""))
if last_checked:
if last_checked.tzinfo is None:
last_checked = last_checked.replace(tzinfo=timezone.utc)
if (now - last_checked).total_seconds() < check_interval:
return False
# Throttle: erst feuern wenn last_fired lange genug her ist
last_fired = _parse_iso(trigger.get("last_fired_at", ""))
throttle = int(trigger.get("throttle_sec", 3600))
if last_fired:
if last_fired.tzinfo is None:
last_fired = last_fired.replace(tzinfo=timezone.utc)
if (now - last_fired).total_seconds() < throttle:
return False
# Condition pruefen
cond = (trigger.get("condition") or "").strip()
if not cond:
return False
try:
return watcher_mod.evaluate(cond, vars_)
except Exception as e:
logger.warning("Trigger %s: Condition '%s' fehlerhaft: %s",
trigger.get("name"), cond, e)
return False
if t == "cron":
# TODO: später, wenn jemand Bock auf Cron-Parser hat
return False
return False
async def _fire(trigger: dict, agent_factory) -> None:
"""Ruft ARIA mit einer System-Praefix-Nachricht auf."""
name = trigger.get("name", "?")
message = trigger.get("message") or "(ohne Nachricht)"
ttype = trigger.get("type", "?")
# Manifest updaten
try:
triggers_mod.mark_fired(name)
except Exception as e:
logger.warning("mark_fired %s: %s", name, e)
# Log
triggers_mod.append_log(name, {"event": "fired", "type": ttype, "message": message})
# System-Nachricht an ARIA: nicht als User, sondern als Hinweis
prompt = (
f"[Trigger ausgelöst: '{name}', Typ: {ttype}] "
f"Geplante Nachricht: \"{message}\". "
f"Sage Stefan jetzt diese Information, in deinem Stil. "
f"Wenn der Trigger ein Watcher war (Bedingung wurde erfuellt), "
f"erwaehne kurz worum es geht. Antworte direkt, keine Rueckfrage."
)
try:
agent = agent_factory()
reply = agent.chat(prompt, source="trigger")
events = agent.pop_events()
logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80])
triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]})
# Reply an die Bridge pushen, damit App + Diagnostic + TTS sie kriegen.
# Ohne diesen Push wuerde die Antwort nur im Brain-Log landen.
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _push_to_bridge, reply, name, ttype, events)
except Exception as e:
logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e)
triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]})
async def _tick(agent_factory) -> None:
"""Ein Pruefdurchlauf. Geht ueber alle Triggers, feuert was zu feuern ist."""
try:
all_triggers = triggers_mod.list_triggers(active_only=True)
except Exception as e:
logger.warning("triggers.list: %s", e)
return
if not all_triggers:
return
now = datetime.now(timezone.utc)
# Variablen einmal pro Tick sammeln (nicht pro Trigger — Disk-Stat ist teuer)
try:
vars_ = watcher_mod.collect_variables()
except Exception as e:
logger.warning("collect_variables: %s", e)
vars_ = {}
# Watcher: last_checked_at jetzt updaten (auch wenn nicht gefeuert wird,
# damit der Check-Interval respektiert wird)
for t in all_triggers:
if t.get("type") == "watcher":
try:
t["last_checked_at"] = _now_iso()
triggers_mod.write(t["name"], t)
except Exception:
pass
for trigger in all_triggers:
try:
if _should_fire(trigger, vars_, now):
# Feuern als eigener Task — wenn ARIA langsam antwortet,
# darf der naechste Tick nicht blockieren
asyncio.create_task(_fire(trigger, agent_factory))
except Exception as e:
logger.warning("Trigger-Check %s: %s", trigger.get("name"), e)
# Module-Level-Slot fuer die agent_factory damit on-demand-Ticks (von
# z.B. POST /triggers/check-now) Zugang haben ohne durch den ganzen
# Lifespan-Pfad geschleust zu werden.
_AGENT_FACTORY = None
async def tick_now() -> dict:
"""Sofortiger Trigger-Check — nicht warten auf den naechsten Loop-Tick.
Wird genutzt wenn ein neues GPS-Update reinkommt: Bridge ruft das nach
_persist_location, damit Watcher mit near() den frischen Wert sofort
sehen statt bis zu TICK_SEC Sekunden zu warten."""
if _AGENT_FACTORY is None:
return {"ok": False, "error": "Background-Loop noch nicht gestartet"}
try:
await _tick(_AGENT_FACTORY)
return {"ok": True}
except Exception as exc:
logger.exception("tick_now: %s", exc)
return {"ok": False, "error": str(exc)}
async def run_loop(agent_factory) -> None:
"""Endlosschleife — wird vom main lifespan gestartet + gestoppt."""
global _AGENT_FACTORY
_AGENT_FACTORY = agent_factory
logger.info("Trigger-Loop gestartet (TICK_SEC=%d)", TICK_SEC)
while True:
try:
await _tick(agent_factory)
except Exception as e:
logger.exception("Tick-Fehler: %s", e)
await asyncio.sleep(TICK_SEC)