From 7237f05344153a89187ddb0d0cf9ea79054a1a83 Mon Sep 17 00:00:00 2001 From: duffyduck Date: Tue, 12 May 2026 01:50:32 +0200 Subject: [PATCH] =?UTF-8?q?fix(trigger):=20Trigger-Antworten=20landen=20je?= =?UTF-8?q?tzt=20im=20Chat=20=E2=80=94=20Brain=20=E2=86=92=20Bridge=20Push?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: Wenn der Brain-Background-Loop einen Timer/Watcher feuert, ruft er agent.chat() direkt im eigenen Prozess. Die Antwort wurde nur ins Trigger-Log geschrieben — kein RVS-Broadcast, kein TTS, nichts in App/Diagnostic sichtbar. Fix: Bridge ↔ Brain bekommen einen internen HTTP-Push-Kanal. Bridge (Port 8090, nicht exposed, nur aria-net intern): asyncio.start_server-basierter HTTP-Listener. POST /internal/trigger-fired body: {reply, trigger_name, type, events} → _handle_trigger_fired feuert Side-Channel-Events (trigger_created/skill_created/location_tracking) erst, dann _process_core_response(reply) — exakt der gleiche Pfad wie normale Chat-Antworten (Chat-Bubble + TTS + chat_backup). Brain background.py: Nach agent.chat() in _fire wird agent.pop_events() ausgelesen und zusammen mit dem Reply via urllib an aria-bridge:8090 gepostet (run_in_executor damit es den asyncio-Loop nicht blockiert). Failures werden geloggt, der Trigger selbst bleibt trotzdem als 'fired' markiert. Co-Authored-By: Claude Opus 4.7 (1M context) --- aria-brain/background.py | 37 ++++++++++ bridge/aria_bridge.py | 141 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+) diff --git a/aria-brain/background.py b/aria-brain/background.py index 1c03bbc..c2e131f 100644 --- a/aria-brain/background.py +++ b/aria-brain/background.py @@ -14,7 +14,11 @@ Feuern bedeutet: from __future__ import annotations import asyncio +import json import logging +import os +import urllib.error +import urllib.request from datetime import datetime, timezone from typing import Optional @@ -24,6 +28,34 @@ import watcher as watcher_mod logger = logging.getLogger(__name__) TICK_SEC = 30 +BRIDGE_URL = os.environ.get("BRIDGE_URL", "http://aria-bridge:8090") + + +def _push_to_bridge(reply: str, trigger_name: str, ttype: str, events: list) -> None: + """POSTed eine Trigger-Antwort an die Bridge fuer RVS-Broadcast + TTS. + + Synchron via urllib — wird per run_in_executor aus dem async-Loop + gerufen. Failures werden geloggt, brechen aber nicht ab. + """ + payload = json.dumps({ + "reply": reply, + "trigger_name": trigger_name, + "type": ttype, + "events": events or [], + }).encode("utf-8") + url = f"{BRIDGE_URL}/internal/trigger-fired" + try: + req = urllib.request.Request( + url, data=payload, method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=15) as resp: + if resp.status != 200: + logger.warning("[trigger-push] Bridge hat %s zurueckgegeben", resp.status) + except urllib.error.URLError as exc: + logger.warning("[trigger-push] Bridge unerreichbar (%s): %s", url, exc) + except Exception as exc: + logger.warning("[trigger-push] Push fehlgeschlagen: %s", exc) def _now_iso() -> str: @@ -114,8 +146,13 @@ async def _fire(trigger: dict, agent_factory) -> None: try: agent = agent_factory() reply = agent.chat(prompt, source="trigger") + events = agent.pop_events() logger.info("[trigger] %s gefeuert → ARIA-Reply: %s", name, reply[:80]) triggers_mod.append_log(name, {"event": "reply", "text": reply[:500]}) + # Reply an die Bridge pushen, damit App + Diagnostic + TTS sie kriegen. + # Ohne diesen Push wuerde die Antwort nur im Brain-Log landen. + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _push_to_bridge, reply, name, ttype, events) except Exception as e: logger.exception("Trigger %s feuern fehlgeschlagen: %s", name, e) triggers_mod.append_log(name, {"event": "error", "error": str(e)[:300]}) diff --git a/bridge/aria_bridge.py b/bridge/aria_bridge.py index cb66761..63f3b5f 100644 --- a/bridge/aria_bridge.py +++ b/bridge/aria_bridge.py @@ -2392,6 +2392,145 @@ class ARIABridge: logger.exception("Fehler in der Audio-Schleife") await asyncio.sleep(1) + # ── Internal HTTP (Brain → Bridge: Trigger-Feuer-Push) ─── + + async def _serve_internal_http(self) -> None: + """Kleiner asyncio HTTP-Listener auf Port 8090. + + Empfaengt Push-Events vom Brain wenn ein Trigger feuert. Nicht + nach aussen exposed — nur erreichbar im docker-internen aria-net. + Endpoint: + POST /internal/trigger-fired + { "reply": "...", "trigger_name": "...", "type": "timer", + "events": [{"type":"trigger_created",...}, ...] } + """ + host, port = "0.0.0.0", 8090 + + async def _send_response(writer, status: int, payload: dict) -> None: + body = json.dumps(payload).encode("utf-8") + status_text = "OK" if status == 200 else "Error" + writer.write( + f"HTTP/1.1 {status} {status_text}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(body)}\r\n" + f"Connection: close\r\n\r\n".encode("utf-8") + ) + writer.write(body) + await writer.drain() + + async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + try: + request_line = await asyncio.wait_for(reader.readline(), timeout=10) + if not request_line: + return + try: + method, path, _ver = request_line.decode("utf-8", "ignore").strip().split(" ", 2) + except ValueError: + await _send_response(writer, 400, {"error": "bad request line"}) + return + headers: dict[str, str] = {} + while True: + line = await asyncio.wait_for(reader.readline(), timeout=5) + if not line or line in (b"\r\n", b"\n"): + break + name, _, value = line.decode("utf-8", "ignore").partition(":") + headers[name.strip().lower()] = value.strip() + content_length = int(headers.get("content-length", "0") or "0") + body = await reader.readexactly(content_length) if content_length else b"" + + if method == "POST" and path == "/internal/trigger-fired": + try: + data = json.loads(body.decode("utf-8", "ignore")) + except Exception as exc: + await _send_response(writer, 400, {"error": f"bad json: {exc}"}) + return + reply = (data.get("reply") or "").strip() + trigger_name = data.get("trigger_name", "") + ttype = data.get("type", "trigger") + events = data.get("events") or [] + logger.info("[bridge ← brain] Trigger '%s' (%s) gefeuert, reply=%d chars, events=%d", + trigger_name, ttype, len(reply), len(events)) + # Async-spawn — HTTP-Antwort nicht durch RVS-Broadcast blockieren + asyncio.create_task( + self._handle_trigger_fired(reply, trigger_name, ttype, events) + ) + await _send_response(writer, 200, {"ok": True}) + elif method == "GET" and path == "/health": + await _send_response(writer, 200, {"ok": True, "service": "bridge-internal"}) + else: + await _send_response(writer, 404, {"error": "not found"}) + except asyncio.TimeoutError: + logger.warning("[bridge http] Timeout beim Request-Lesen") + except Exception as exc: + logger.exception("[bridge http] Fehler: %s", exc) + try: + await _send_response(writer, 500, {"error": str(exc)[:200]}) + except Exception: + pass + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + try: + server = await asyncio.start_server(handle, host, port) + logger.info("[bridge] Internal HTTP-Listener auf %s:%d (Brain-Push)", host, port) + async with server: + await server.serve_forever() + except Exception: + logger.exception("[bridge] Internal HTTP-Listener konnte nicht starten") + + async def _handle_trigger_fired(self, reply: str, trigger_name: str, + ttype: str, events: list) -> None: + """Spiegelt eine Brain-Trigger-Antwort wie eine normale ARIA-Antwort. + + Side-Channel-Events zuerst (trigger_created, location_tracking, ...), + dann _process_core_response (Chat-Bubble, TTS, chat_backup). + """ + # Side-Channel-Events erst (gleich wie in send_to_core) + for event in events or []: + etype = event.get("type") + try: + if etype == "skill_created": + await self._send_to_rvs({ + "type": "skill_created", + "payload": event.get("skill", {}), + "timestamp": int(asyncio.get_event_loop().time() * 1000), + }) + elif etype == "trigger_created": + await self._send_to_rvs({ + "type": "trigger_created", + "payload": event.get("trigger", {}), + "timestamp": int(asyncio.get_event_loop().time() * 1000), + }) + elif etype == "location_tracking": + await self._send_to_rvs({ + "type": "location_tracking", + "payload": { + "on": bool(event.get("on")), + "reason": event.get("reason") or "", + }, + "timestamp": int(asyncio.get_event_loop().time() * 1000), + }) + except Exception: + logger.exception("[trigger-fire] Side-Channel-Event %s fehlgeschlagen", etype) + + if not reply: + logger.info("[trigger-fire] Trigger '%s' hat leeren Reply — nichts zu broadcasten", + trigger_name) + return + + # Reply wie eine normale ARIA-Antwort behandeln + try: + await self._process_core_response( + reply, + {"metadata": {"trigger_name": trigger_name, "trigger_type": ttype}}, + ) + except Exception: + logger.exception("[trigger-fire] _process_core_response fehlgeschlagen") + # ── Run & Shutdown ─────────────────────────────────────── async def run(self) -> None: @@ -2405,6 +2544,8 @@ class ARIABridge: # connect_to_core entfaellt — Bridge ruft jetzt aria-brain ueber # HTTP (siehe send_to_core). Keine persistente WS-Verbindung mehr. asyncio.create_task(self.connect_to_rvs()), + # Interner HTTP-Listener — empfaengt Trigger-Feuer-Pushes vom Brain. + asyncio.create_task(self._serve_internal_http()), ] if self.audio_available: