fix(trigger): Trigger-Antworten landen jetzt im Chat — Brain → Bridge Push

Bug: Wenn der Brain-Background-Loop einen Timer/Watcher feuert, ruft
er agent.chat() direkt im eigenen Prozess. Die Antwort wurde nur ins
Trigger-Log geschrieben — kein RVS-Broadcast, kein TTS, nichts in
App/Diagnostic sichtbar.

Fix: Bridge ↔ Brain bekommen einen internen HTTP-Push-Kanal.

Bridge (Port 8090, nicht exposed, nur aria-net intern):
  asyncio.start_server-basierter HTTP-Listener.
  POST /internal/trigger-fired
    body: {reply, trigger_name, type, events}
  → _handle_trigger_fired feuert Side-Channel-Events
    (trigger_created/skill_created/location_tracking) erst,
    dann _process_core_response(reply) — exakt der gleiche Pfad
    wie normale Chat-Antworten (Chat-Bubble + TTS + chat_backup).

Brain background.py:
  Nach agent.chat() in _fire wird agent.pop_events() ausgelesen
  und zusammen mit dem Reply via urllib an aria-bridge:8090
  gepostet (run_in_executor damit es den asyncio-Loop nicht
  blockiert). Failures werden geloggt, der Trigger selbst bleibt
  trotzdem als 'fired' markiert.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 01:50:32 +02:00
parent e26226f370
commit 7237f05344
2 changed files with 178 additions and 0 deletions
+37
View File
@@ -14,7 +14,11 @@ Feuern bedeutet:
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import json
import logging import logging
import os
import urllib.error
import urllib.request
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional
@@ -24,6 +28,34 @@ import watcher as watcher_mod
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TICK_SEC = 30 TICK_SEC = 30
BRIDGE_URL = os.environ.get("BRIDGE_URL", "http://aria-bridge:8090")
def _push_to_bridge(reply: str, trigger_name: str, ttype: str, events: list) -> None:
"""POSTed eine Trigger-Antwort an die Bridge fuer RVS-Broadcast + TTS.
Synchron via urllib — wird per run_in_executor aus dem async-Loop
gerufen. Failures werden geloggt, brechen aber nicht ab.
"""
payload = json.dumps({
"reply": reply,
"trigger_name": trigger_name,
"type": ttype,
"events": events or [],
}).encode("utf-8")
url = f"{BRIDGE_URL}/internal/trigger-fired"
try:
req = urllib.request.Request(
url, data=payload, method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status != 200:
logger.warning("[trigger-push] Bridge hat %s zurueckgegeben", resp.status)
except urllib.error.URLError as exc:
logger.warning("[trigger-push] Bridge unerreichbar (%s): %s", url, exc)
except Exception as exc:
logger.warning("[trigger-push] Push fehlgeschlagen: %s", exc)
def _now_iso() -> str: def _now_iso() -> str:
@@ -114,8 +146,13 @@ async def _fire(trigger: dict, agent_factory) -> None:
try: try:
agent = agent_factory() agent = agent_factory()
reply = agent.chat(prompt, source="trigger") reply = agent.chat(prompt, source="trigger")
events = agent.pop_events()
logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80]) logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80])
triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]}) triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]})
# Reply an die Bridge pushen, damit App + Diagnostic + TTS sie kriegen.
# Ohne diesen Push wuerde die Antwort nur im Brain-Log landen.
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _push_to_bridge, reply, name, ttype, events)
except Exception as e: except Exception as e:
logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e) logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e)
triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]}) triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]})
+141
View File
@@ -2392,6 +2392,145 @@ class ARIABridge:
logger.exception("Fehler in der Audio-Schleife") logger.exception("Fehler in der Audio-Schleife")
await asyncio.sleep(1) await asyncio.sleep(1)
# ── Internal HTTP (Brain → Bridge: Trigger-Feuer-Push) ───
async def _serve_internal_http(self) -> None:
"""Kleiner asyncio HTTP-Listener auf Port 8090.
Empfaengt Push-Events vom Brain wenn ein Trigger feuert. Nicht
nach aussen exposed — nur erreichbar im docker-internen aria-net.
Endpoint:
POST /internal/trigger-fired
{ "reply": "...", "trigger_name": "...", "type": "timer",
"events": [{"type":"trigger_created",...}, ...] }
"""
host, port = "0.0.0.0", 8090
async def _send_response(writer, status: int, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
status_text = "OK" if status == 200 else "Error"
writer.write(
f"HTTP/1.1 {status} {status_text}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n".encode("utf-8")
)
writer.write(body)
await writer.drain()
async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
try:
request_line = await asyncio.wait_for(reader.readline(), timeout=10)
if not request_line:
return
try:
method, path, _ver = request_line.decode("utf-8", "ignore").strip().split(" ", 2)
except ValueError:
await _send_response(writer, 400, {"error": "bad request line"})
return
headers: dict[str, str] = {}
while True:
line = await asyncio.wait_for(reader.readline(), timeout=5)
if not line or line in (b"\r\n", b"\n"):
break
name, _, value = line.decode("utf-8", "ignore").partition(":")
headers[name.strip().lower()] = value.strip()
content_length = int(headers.get("content-length", "0") or "0")
body = await reader.readexactly(content_length) if content_length else b""
if method == "POST" and path == "/internal/trigger-fired":
try:
data = json.loads(body.decode("utf-8", "ignore"))
except Exception as exc:
await _send_response(writer, 400, {"error": f"bad json: {exc}"})
return
reply = (data.get("reply") or "").strip()
trigger_name = data.get("trigger_name", "")
ttype = data.get("type", "trigger")
events = data.get("events") or []
logger.info("[bridge ← brain] Trigger '%s' (%s) gefeuert, reply=%d chars, events=%d",
trigger_name, ttype, len(reply), len(events))
# Async-spawn — HTTP-Antwort nicht durch RVS-Broadcast blockieren
asyncio.create_task(
self._handle_trigger_fired(reply, trigger_name, ttype, events)
)
await _send_response(writer, 200, {"ok": True})
elif method == "GET" and path == "/health":
await _send_response(writer, 200, {"ok": True, "service": "bridge-internal"})
else:
await _send_response(writer, 404, {"error": "not found"})
except asyncio.TimeoutError:
logger.warning("[bridge http] Timeout beim Request-Lesen")
except Exception as exc:
logger.exception("[bridge http] Fehler: %s", exc)
try:
await _send_response(writer, 500, {"error": str(exc)[:200]})
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
try:
server = await asyncio.start_server(handle, host, port)
logger.info("[bridge] Internal HTTP-Listener auf %s:%d (Brain-Push)", host, port)
async with server:
await server.serve_forever()
except Exception:
logger.exception("[bridge] Internal HTTP-Listener konnte nicht starten")
async def _handle_trigger_fired(self, reply: str, trigger_name: str,
ttype: str, events: list) -> None:
"""Spiegelt eine Brain-Trigger-Antwort wie eine normale ARIA-Antwort.
Side-Channel-Events zuerst (trigger_created, location_tracking, ...),
dann _process_core_response (Chat-Bubble, TTS, chat_backup).
"""
# Side-Channel-Events erst (gleich wie in send_to_core)
for event in events or []:
etype = event.get("type")
try:
if etype == "skill_created":
await self._send_to_rvs({
"type": "skill_created",
"payload": event.get("skill", {}),
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
elif etype == "trigger_created":
await self._send_to_rvs({
"type": "trigger_created",
"payload": event.get("trigger", {}),
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
elif etype == "location_tracking":
await self._send_to_rvs({
"type": "location_tracking",
"payload": {
"on": bool(event.get("on")),
"reason": event.get("reason") or "",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception:
logger.exception("[trigger-fire] Side-Channel-Event %s fehlgeschlagen", etype)
if not reply:
logger.info("[trigger-fire] Trigger '%s' hat leeren Reply — nichts zu broadcasten",
trigger_name)
return
# Reply wie eine normale ARIA-Antwort behandeln
try:
await self._process_core_response(
reply,
{"metadata": {"trigger_name": trigger_name, "trigger_type": ttype}},
)
except Exception:
logger.exception("[trigger-fire] _process_core_response fehlgeschlagen")
# ── Run & Shutdown ─────────────────────────────────────── # ── Run & Shutdown ───────────────────────────────────────
async def run(self) -> None: async def run(self) -> None:
@@ -2405,6 +2544,8 @@ class ARIABridge:
# connect_to_core entfaellt — Bridge ruft jetzt aria-brain ueber # connect_to_core entfaellt — Bridge ruft jetzt aria-brain ueber
# HTTP (siehe send_to_core). Keine persistente WS-Verbindung mehr. # HTTP (siehe send_to_core). Keine persistente WS-Verbindung mehr.
asyncio.create_task(self.connect_to_rvs()), asyncio.create_task(self.connect_to_rvs()),
# Interner HTTP-Listener — empfaengt Trigger-Feuer-Pushes vom Brain.
asyncio.create_task(self._serve_internal_http()),
] ]
if self.audio_available: if self.audio_available: