ARIA-AGENT/bridge/aria_bridge.py

1849 lines
78 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ARIA Voice Bridge — Hauptmodul.
Verbindet die Android App (via RVS) mit ARIA-Core und bietet
lokale Spracheingabe (Wake-Word + Whisper STT) und Sprachausgabe (Piper TTS).
Nachrichtenfluss:
App → RVS → Bridge → aria-core
aria-core → Bridge → RVS → App
→ Lautsprecher (TTS)
Stimmen:
- Ramona (de_DE-ramona-low) — Alltag, Gespraeche
- Thorsten (de_DE-thorsten-high) — epische Momente, Alarme
"""
from __future__ import annotations
import asyncio
import base64
import json
import logging
import os
import signal
import ssl
import sys
import tempfile
import uuid
from pathlib import Path
from typing import Optional
import subprocess
import urllib.request
import numpy as np
import sounddevice as sd
import websockets
from faster_whisper import WhisperModel
from openwakeword.model import Model as WakeWordModel
from modes import Mode, canonical_id, detect_mode_switch, mode_from_id, should_speak
# ── Logging ──────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("aria-bridge")
# ── Konfiguration ───────────────────────────────────────────
CONFIG_PATH = Path("/config/aria.env")
VOICES_DIR = Path("/voices")
CORE_WS_URL = os.getenv("ARIA_CORE_WS", "ws://127.0.0.1:18789")
CORE_AUTH_TOKEN = os.getenv("ARIA_AUTH_TOKEN", "") # OpenClaw Gateway Token
RVS_HOST = os.getenv("RVS_HOST", "") # z.B. rvs.hackersoft.de
RVS_PORT = os.getenv("RVS_PORT", "443") # Port des RVS
RVS_TLS = os.getenv("RVS_TLS", "true") # true = wss://, false = ws://
RVS_TLS_FALLBACK = os.getenv("RVS_TLS_FALLBACK", "true") # Bei TLS-Fehler ws:// versuchen
RVS_TOKEN = os.getenv("RVS_TOKEN", "") # Pairing-Token (gleich wie in der App)
DIAGNOSTIC_URL = os.getenv("DIAGNOSTIC_URL", "http://127.0.0.1:3001") # Diagnostic API
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "medium")
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "de")
# Audio-Parameter
SAMPLE_RATE = 16000
CHANNELS = 1
BLOCK_SIZE = 1280 # 80ms bei 16kHz — gut fuer Wake-Word-Erkennung
RECORD_SECONDS = 8 # Max. Aufnahmedauer nach Wake-Word
def load_config() -> dict[str, str]:
"""Laedt Konfiguration.
Reihenfolge (hoechste Prioritaet zuletzt):
1. /config/aria.env (bind-mount)
2. /shared/config/runtime.json (zentral gepflegt ueber Diagnostic UI)
Werte aus runtime.json ueberschreiben die env-Datei.
"""
config: dict[str, str] = {}
if CONFIG_PATH.exists():
for line in CONFIG_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
config[key.strip()] = value.strip()
logger.info("Konfiguration geladen aus %s", CONFIG_PATH)
else:
logger.warning("Keine Konfiguration gefunden: %s", CONFIG_PATH)
# Runtime-Overrides aus zentralem Shared-Volume (Diagnostic UI)
runtime_path = Path("/shared/config/runtime.json")
if runtime_path.exists():
try:
runtime = json.loads(runtime_path.read_text())
overrides = {k: str(v) for k, v in runtime.items() if v not in (None, "")}
if overrides:
config.update(overrides)
logger.info("Runtime-Overrides geladen: %s", sorted(overrides.keys()))
except Exception as e:
logger.warning("runtime.json konnte nicht gelesen werden: %s", e)
return config
# ── Voice Engine ─────────────────────────────────────────────
import re as _re_tts
_NUM_WORDS_DE = {
0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fuenf",
6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn",
11: "elf", 12: "zwoelf", 13: "dreizehn", 14: "vierzehn", 15: "fuenfzehn",
16: "sechzehn", 17: "siebzehn", 18: "achtzehn", 19: "neunzehn", 20: "zwanzig",
}
_TENS_DE = {30: "dreissig", 40: "vierzig", 50: "fuenfzig"}
def _num_to_words_de(n: int) -> str:
"""Zahlen 0-59 als deutsches Wort — fuer Uhrzeiten und kleine Bereiche."""
if n in _NUM_WORDS_DE:
return _NUM_WORDS_DE[n]
if 21 <= n <= 29:
return f"{_NUM_WORDS_DE[n - 20]}undzwanzig"
if 30 <= n <= 59:
tens = (n // 10) * 10
ones = n % 10
tens_word = _TENS_DE.get(tens, str(tens))
if ones == 0:
return tens_word
return f"{_NUM_WORDS_DE.get(ones, str(ones))}und{tens_word}"
return str(n)
def _time_range_to_words(m):
"""'8:00-9:00 Uhr''acht bis neun Uhr', '8-9 Uhr''acht bis neun Uhr'."""
h1 = int(m.group(1))
h2 = int(m.group(3))
return f"{_num_to_words_de(h1)} bis {_num_to_words_de(h2)} Uhr"
def _small_range_to_words(m):
"""'5-6''fuenf bis sechs' (nur wenn beide Zahlen ≤ 24)."""
a, b = int(m.group(1)), int(m.group(2))
if a > 24 or b > 24 or a >= b:
return m.group(0)
return f"{_num_to_words_de(a)} bis {_num_to_words_de(b)}"
def _decimal_to_words(m):
"""'0.1' / '0,1''null komma eins', '1,25''eins komma zwei fuenf'."""
int_part = int(m.group(1))
dec_part = m.group(2)
int_word = _num_to_words_de(int_part) if 0 <= int_part <= 59 else str(int_part)
dec_words = " ".join(_num_to_words_de(int(d)) for d in dec_part)
return f"{int_word} komma {dec_words}"
_UNIT_WORDS = [
(r'\bTB\b', 'Terabyte'),
(r'\bGB\b', 'Gigabyte'),
(r'\bMB\b', 'Megabyte'),
(r'\bKB\b', 'Kilobyte'),
(r'\bkB\b', 'Kilobyte'),
(r'\bms\b', 'Millisekunden'),
(r'\bkm/h\b', 'Kilometer pro Stunde'),
(r'\bkm\b', 'Kilometer'),
(r'\bm/s\b', 'Meter pro Sekunde'),
(r'\bkg\b', 'Kilogramm'),
(r'\b°C\b', 'Grad Celsius'),
(r'°C', ' Grad Celsius'),
(r'\bMbps\b', 'Megabit pro Sekunde'),
(r'\bGbps\b', 'Gigabit pro Sekunde'),
(r'\bMhz\b|\bMHz\b', 'Megahertz'),
(r'\bGhz\b|\bGHz\b', 'Gigahertz'),
(r'%', ' Prozent'),
(r'\bCPU\b', 'C P U'),
(r'\bGPU\b', 'G P U'),
(r'\bRAM\b', 'R A M'),
(r'\bSSD\b', 'S S D'),
(r'\bHDD\b', 'H D D'),
(r'\bURL\b', 'U R L'),
(r'\bAPI\b', 'A P I'),
(r'\bRVS\b', 'R V S'),
(r'\bSSH\b', 'S S H'),
(r'\bVM\b', 'V M'),
(r'\bUI\b', 'U I'),
(r'\bTTS\b', 'T T S'),
(r'\bSTT\b', 'S T T'),
(r'\bTLS\b', 'T L S'),
]
def clean_text_for_tts(text: str) -> str:
"""Bereitet Chat-Text fuer Sprachausgabe auf.
- `<voice>...</voice>` Tag: wenn vorhanden, NUR dieser Inhalt wird gelesen
- Code-Bloecke (```...``` und `...`) werden komplett entfernt
- Markdown (Fett, Kursiv, Links, Headings, Listen, Zitate) wird abgeraeumt
- Einheiten und gaengige Abkuerzungen werden ausgeschrieben (22GB → 22 Gigabyte)
- URLs werden durch "ein Link" ersetzt
- Mehrfach-Leerzeichen/Umbrueche normalisiert
"""
if not text:
return ""
# <voice>...</voice> wenn vorhanden → nur das nehmen
voice_match = _re_tts.search(r'<voice>([\s\S]*?)</voice>', text, _re_tts.IGNORECASE)
if voice_match:
text = voice_match.group(1)
t = text
# Code-Bloecke komplett raus (Zeilenumbruch statt Platzhalter — sonst bricht Satzlogik)
t = _re_tts.sub(r'```[\s\S]*?```', '. ', t)
t = _re_tts.sub(r'`[^`]+`', '', t)
# Markdown
t = _re_tts.sub(r'\*\*([^*]+)\*\*', r'\1', t)
t = _re_tts.sub(r'\*([^*]+)\*', r'\1', t)
t = _re_tts.sub(r'__([^_]+)__', r'\1', t)
t = _re_tts.sub(r'\[([^\]]+)\]\((https?://[^)]+)\)', r'\1, ein Link', t)
t = _re_tts.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', t)
t = _re_tts.sub(r'https?://\S+', 'ein Link', t)
t = _re_tts.sub(r'^#{1,6}\s*', '', t, flags=_re_tts.MULTILINE)
t = _re_tts.sub(r'^>\s*', '', t, flags=_re_tts.MULTILINE)
t = _re_tts.sub(r'^[\-\*]\s+', '', t, flags=_re_tts.MULTILINE)
# Zeitbereiche: "8:00-9:00 Uhr" / "8-9 Uhr" → "acht bis neun Uhr"
t = _re_tts.sub(r'\b(\d{1,2})(:\d{2})?\s*[-]\s*(\d{1,2})(:\d{2})?\s*Uhr\b', _time_range_to_words, t)
# Uhrzeiten mit Minuten: "8:30 Uhr" → "acht Uhr dreissig", "8:00 Uhr" → "acht Uhr"
def _single_time(m):
h = int(m.group(1))
mn = int(m.group(2)) if m.group(2) else 0
words = _num_to_words_de(h) + " Uhr"
if mn > 0:
words += " " + _num_to_words_de(mn)
return words
t = _re_tts.sub(r'\b(\d{1,2}):(\d{2})\s*Uhr\b', _single_time, t)
# Volle Uhrzeiten ohne ":" — "15 Uhr" → "fuenfzehn Uhr"
t = _re_tts.sub(r'\b(\d{1,2})\s+Uhr\b', lambda m: f"{_num_to_words_de(int(m.group(1)))} Uhr", t)
# Kleine Zahlen-Bereiche ohne "Uhr": "5-6" → "fuenf bis sechs"
t = _re_tts.sub(r'\b(\d{1,2})\s*[-]\s*(\d{1,2})\b', _small_range_to_words, t)
# Dezimalzahlen: "0.1" / "0,5" / "1,25" → "null komma eins" / "null komma fuenf" / ...
# Muss vor "Zahl+Einheit" laufen, sonst frisst die Unit-Regel den Nachkommaanteil.
# Lookahead verhindert Match auf IP-artigen Strings wie 192.168.1.1.
t = _re_tts.sub(r'\b(\d+)[.,](\d+)(?![.,\d])', _decimal_to_words, t)
# Zahlen + Einheit: "22GB" → "22 Gigabyte" (Leerzeichen einfuegen)
t = _re_tts.sub(r'(\d+)([A-Za-z]{1,4})\b', r'\1 \2', t)
# Einheiten/Abkuerzungen ausschreiben
for pat, repl in _UNIT_WORDS:
t = _re_tts.sub(pat, repl, t)
# Generisches Buchstabieren: alle verbleibenden 2-5-Zeichen-Grossbuchstaben-Woerter
# (XTTS, USB, DNS, JSON, HTML, ...) → "X T T S". Laeuft NACH der expliziten Liste,
# damit TTS/GPU/... schon aufgeloest sind. "WLAN"-artige, die als Wort gesprochen
# werden, koennen bei Bedarf explizit in _UNIT_WORDS uebersteuert werden.
t = _re_tts.sub(r'\b([A-Z]{2,5})\b', lambda m: " ".join(m.group(1)), t)
# Anfuehrungszeichen
t = _re_tts.sub(r'["""„`]', '', t)
# Absaetze/Zeilenumbrueche normalisieren
t = _re_tts.sub(r'\n{2,}', '. ', t)
t = _re_tts.sub(r'\n', ', ', t)
t = _re_tts.sub(r'\s{2,}', ' ', t)
t = _re_tts.sub(r'\s*\.\s*\.\s*', '. ', t)
return t.strip()
# ── STT Engine ───────────────────────────────────────────────
class STTEngine:
"""Whisper Speech-to-Text — laeuft komplett lokal."""
def __init__(self, model_size: str = "small", language: str = "de") -> None:
self.model_size = model_size
self.language = language
self.model: Optional[WhisperModel] = None
def initialize(self) -> None:
"""Laedt das Whisper-Modell."""
logger.info(
"Lade Whisper-Modell '%s' (Sprache: %s)...",
self.model_size,
self.language,
)
self.model = WhisperModel(self.model_size, device="cpu", compute_type="int8")
logger.info("Whisper-Modell geladen")
def reload(self, model_size: str) -> bool:
"""Laedt ein anderes Whisper-Modell (bei Config-Aenderung)."""
if model_size == self.model_size and self.model is not None:
return False
allowed = {"tiny", "base", "small", "medium", "large-v3"}
if model_size not in allowed:
logger.warning("Ungueltiges Whisper-Modell: %s (erlaubt: %s)", model_size, allowed)
return False
logger.info("Lade Whisper-Modell neu: %s -> %s", self.model_size, model_size)
self.model_size = model_size
self.model = None
try:
self.model = WhisperModel(model_size, device="cpu", compute_type="int8")
logger.info("Whisper-Modell '%s' geladen", model_size)
return True
except Exception:
logger.exception("Whisper-Modell '%s' konnte nicht geladen werden", model_size)
return False
def transcribe(self, audio_data: np.ndarray) -> str:
"""Transkribiert Audio-Daten zu Text.
Args:
audio_data: NumPy-Array mit Audio (float32, 16kHz, mono).
Returns:
Erkannter Text oder leerer String.
"""
if self.model is None:
# Lazy-Load: normalerweise laeuft STT remote auf der Gamebox.
# Erst wenn das Fallback hier zuschlaegt, laden wir lokal.
logger.info("Lokales Whisper-Fallback — Modell wird nachgeladen...")
try:
self.initialize()
except Exception:
logger.exception("Lokales Whisper konnte nicht geladen werden")
return ""
if self.model is None:
return ""
try:
# Audio als float32 normalisieren
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32) / 32768.0
segments, info = self.model.transcribe(
audio_data,
language=self.language,
beam_size=5,
vad_filter=True,
)
text = " ".join(segment.text.strip() for segment in segments)
logger.info("STT: '%s' (Sprache: %s, Dauer: %.1fs)", text, info.language, info.duration)
return text
except Exception:
logger.exception("STT-Fehler")
return ""
# ── Wake-Word Erkennung ──────────────────────────────────────
class WakeWordDetector:
"""Erkennt das Wake-Word im Audio-Stream.
Nutzt ein Custom-Modell aus /voices/wake_aria.onnx falls vorhanden,
sonst das eingebaute 'hey_jarvis' als Fallback.
"""
CUSTOM_MODEL_PATH = "/voices/wake_aria.onnx"
FALLBACK_MODEL = "hey_jarvis"
THRESHOLD = 0.5
def __init__(self) -> None:
self.model: Optional[WakeWordModel] = None
self.wake_word_key: str = ""
def initialize(self) -> None:
"""Laedt das Wake-Word-Modell.
Hinweis: WakeWordModel() wird OHNE Argumente aufgerufen.
Aeltere openwakeword-Versionen leiten unbekannte kwargs
an AudioFeatures weiter, was zum Crash fuehrt.
"""
logger.info("Lade Wake-Word-Modell...")
# Alle eingebauten Modelle laden (ohne kwargs — Kompatibilitaet!)
self.model = WakeWordModel()
# Verfuegbare Modelle ermitteln
available = list(self.model.models.keys()) if hasattr(self.model, 'models') else []
logger.info("Verfuegbare Wake-Words: %s", ", ".join(available) if available else "(keine)")
# Bestes Modell auswaehlen
if self.FALLBACK_MODEL in available:
self.wake_word_key = self.FALLBACK_MODEL
elif available:
self.wake_word_key = available[0]
else:
self.wake_word_key = self.FALLBACK_MODEL
logger.info("Wake-Word aktiv: '%s'", self.wake_word_key)
logger.info(
"Tipp: Custom 'aria' Wake-Word trainieren → "
"https://github.com/dscripka/openWakeWord#training-new-models"
)
def detect(self, audio_chunk: np.ndarray) -> bool:
"""Prueft ob das Wake-Word im Audio-Chunk enthalten ist.
Args:
audio_chunk: Audio-Daten (int16, 16kHz).
Returns:
True wenn Wake-Word erkannt wurde.
"""
if self.model is None:
return False
prediction = self.model.predict(audio_chunk)
# openwakeword gibt Scores pro Modell zurueck
for key, score in prediction.items():
if score > self.THRESHOLD:
logger.info("Wake-Word '%s' erkannt! (Score: %.2f)", key, score)
return True
return False
# ── Audio-Aufnahme ───────────────────────────────────────────
def record_audio(duration: float = RECORD_SECONDS) -> np.ndarray:
"""Nimmt Audio vom Mikrofon auf.
Args:
duration: Aufnahmedauer in Sekunden.
Returns:
NumPy-Array mit Audio-Daten (int16, 16kHz, mono).
"""
logger.info("Aufnahme laeuft... (%d Sekunden)", duration)
audio = sd.rec(
int(duration * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
logger.info("Aufnahme beendet")
return audio.flatten()
# ── Bridge Hauptklasse ───────────────────────────────────────
class ARIABridge:
"""ARIA Voice Bridge — verbindet App (via RVS) und Sprache mit ARIA-Core.
Drei parallele Aufgaben:
1. connect_to_core() — WebSocket zu aria-core (lokal)
2. connect_to_rvs() — WebSocket zum RVS (oeffentlich, fuer die App)
3. audio_loop() — Wake-Word + STT (lokales Mikrofon)
"""
def __init__(self) -> None:
self.config = load_config()
self.ws_url = self.config.get("ARIA_CORE_WS", CORE_WS_URL)
self.core_auth_token = self.config.get("ARIA_AUTH_TOKEN", CORE_AUTH_TOKEN)
self._req_id_counter = 0
self._session_key = "main" # Fallback, wird per Diagnostic API aktualisiert
self._diagnostic_url = self.config.get("DIAGNOSTIC_URL", DIAGNOSTIC_URL)
# RVS-Verbindungsinfo aus Config oder Env
rvs_host = self.config.get("RVS_HOST", RVS_HOST)
rvs_port = self.config.get("RVS_PORT", RVS_PORT)
rvs_tls = self.config.get("RVS_TLS", RVS_TLS).lower() == "true"
self.rvs_tls_fallback = self.config.get("RVS_TLS_FALLBACK", RVS_TLS_FALLBACK).lower() == "true"
self.rvs_token = self.config.get("RVS_TOKEN", RVS_TOKEN)
# URLs zusammenbauen (primaer + fallback)
if rvs_host:
proto = "wss" if rvs_tls else "ws"
self.rvs_url = f"{proto}://{rvs_host}:{rvs_port}"
# Fallback-URL (ohne TLS) nur wenn TLS aktiv und Fallback erlaubt
if rvs_tls and self.rvs_tls_fallback:
self.rvs_url_fallback = f"ws://{rvs_host}:{rvs_port}"
else:
self.rvs_url_fallback = ""
else:
self.rvs_url = ""
self.rvs_url_fallback = ""
# Mode aus Shared Config laden (persistiert ueber Container-Restarts)
self.current_mode = self._load_persisted_mode()
self.running = False
# Komponenten (TTS: immer XTTS remote, Piper wurde entfernt)
self.tts_enabled = True
self.xtts_voice = ""
self._f5tts_config: dict = {}
vc: dict = {}
# Gespeicherte Voice-Config laden
try:
vc_path = "/shared/config/voice_config.json"
if os.path.exists(vc_path):
with open(vc_path) as f:
vc = json.load(f)
self.tts_enabled = vc.get("ttsEnabled", True)
self.xtts_voice = vc.get("xttsVoice", "")
# F5-TTS-Felder aufsammeln (werden spaeter via RVS rebroadcastet,
# damit die f5tts-bridge auf der Gamebox die Settings auch nach
# Restart wiederbekommt — sonst stuende sie auf Hard-Defaults)
for k in ("f5ttsModel", "f5ttsCkptFile", "f5ttsVocabFile",
"f5ttsCfgStrength", "f5ttsNfeStep"):
if k in vc:
self._f5tts_config[k] = vc[k]
logger.info("Voice-Config geladen: tts=%s voice=%s f5tts=%s",
self.tts_enabled, self.xtts_voice or "default",
self._f5tts_config or "defaults")
except Exception as e:
logger.warning("Voice-Config laden fehlgeschlagen: %s", e)
# Whisper-Modell: Config hat Vorrang, dann env/Default (medium)
whisper_model = vc.get("whisperModel") or self.config.get("WHISPER_MODEL", WHISPER_MODEL)
self.stt_engine = STTEngine(
model_size=whisper_model,
language=self.config.get("WHISPER_LANGUAGE", WHISPER_LANGUAGE),
)
self.wake_word = WakeWordDetector()
# WebSocket-Verbindungen
self.ws_core: Optional[websockets.WebSocketClientProtocol] = None
self.ws_rvs: Optional[websockets.WebSocketClientProtocol] = None
# Letzter gesendeter agent_activity-State (zum Entduplizieren)
self._last_activity_state: Optional[tuple] = None
# Zeitstempel des letzten chat:final — waehrend 3s danach werden
# trailing Agent-Events unterdrueckt (Core raeumt manchmal nach).
self._last_chat_final_at: float = 0.0
# requestId → messageId Map fuer XTTS-Audio-Cache (App-seitige Zuordnung)
self._xtts_request_to_message: dict[str, str] = {}
# Voice-Override aus letzter Chat-Nachricht einer App.
# Wird fuer die direkt folgende ARIA-Antwort genutzt und dann zurueckgesetzt.
# So kann jedes Geraet seine bevorzugte Stimme bekommen (pro Request).
self._next_voice_override: Optional[str] = None
# Gleiche Logik fuer die Wiedergabegeschwindigkeit (F5-TTS speed-Param,
# App-Setting aria_tts_speed, 1.0 = normal).
self._next_speed_override: Optional[float] = None
# STT-Requests die aktuell auf Antwort von der whisper-bridge (Gamebox) warten.
# requestId → Future mit dem Text (oder None bei Fehler).
self._pending_stt: dict[str, asyncio.Future] = {}
# whisper-bridge service_status: True wenn ready, False/None wenn loading/unbekannt.
# Beeinflusst das Timeout fuer stt_request — bei "loading" warten wir laenger,
# weil das Modell beim ersten Request noch ~1-2 Min runtergeladen werden kann.
self._remote_stt_ready: bool = False
def initialize(self) -> None:
"""Initialisiert alle Komponenten.
Audio-Komponenten (TTS, STT, Wake-Word) sind optional —
wenn kein Audio-Geraet vorhanden ist (z.B. VM ohne Soundkarte),
laeuft die Bridge trotzdem als reiner RVS-Relay.
"""
logger.info("=" * 50)
logger.info("ARIA Voice Bridge startet...")
logger.info("=" * 50)
# STT wird standardmaessig von der whisper-bridge (Gamebox) erledigt.
# Lokales Whisper ist nur Fallback und wird lazy geladen wenn remote nicht
# antwortet. Das spart RAM auf der VM und Startup-Zeit.
# Audio-Hardware pruefen (fuer lokales Mikro/Lautsprecher)
self.audio_available = False
try:
devices = sd.query_devices()
sd.query_devices(kind='output')
self.audio_available = True
logger.info("Audio-Geraet gefunden — Wake-Word und lokale TTS aktiv")
self.wake_word.initialize()
except (sd.PortAudioError, Exception):
logger.warning("Kein Audio-Geraet — Wake-Word und lokale Wiedergabe deaktiviert")
logger.info("TTS rendert fuer App (via RVS), STT verarbeitet App-Audio")
logger.info("Alle Komponenten initialisiert")
logger.info("aria-core: %s", self.ws_url)
if self.rvs_url and self.rvs_token:
logger.info("RVS: %s (Token: %s...)", self.rvs_url, self.rvs_token[:8])
else:
logger.warning("RVS nicht konfiguriert — App-Verbindung deaktiviert")
logger.warning(" Setze RVS_HOST, RVS_PORT, RVS_TOKEN in /config/aria.env")
logger.info("Modus: %s %s", self.current_mode.config.emoji, self.current_mode.config.name)
# ── aria-core Verbindung (OpenClaw Gateway Protokoll) ───
def _next_req_id(self) -> str:
"""Erzeugt eine eindeutige Request-ID fuer das OpenClaw-Protokoll."""
self._req_id_counter += 1
return f"bridge-{self._req_id_counter}"
async def _openclaw_handshake(self, ws: websockets.WebSocketClientProtocol) -> bool:
"""Fuehrt den OpenClaw Gateway Handshake durch.
1. Wartet auf connect.challenge Event vom Gateway
2. Sendet connect Request mit Auth-Token
3. Wartet auf hello-ok Response
Returns:
True wenn Handshake erfolgreich, sonst False.
"""
try:
# Schritt 1: Auf Challenge warten (max 10s)
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
challenge = json.loads(raw)
if challenge.get("type") != "event" or challenge.get("event") != "connect.challenge":
logger.error("[core] Unerwartete erste Nachricht: %s", raw[:200])
return False
nonce = challenge.get("payload", {}).get("nonce", "")
logger.info("[core] Challenge empfangen (nonce: %s...)", nonce[:8] if nonce else "?")
# Schritt 2: Connect Request senden
connect_req = {
"type": "req",
"id": self._next_req_id(),
"method": "connect",
"params": {
"minProtocol": 3,
"maxProtocol": 3,
"client": {
"id": "gateway-client",
"version": "0.0.3",
"platform": "linux",
"mode": "backend",
},
"role": "operator",
"scopes": ["operator.read", "operator.write"],
"caps": ["voice"],
"commands": [],
"permissions": {},
"auth": {"token": self.core_auth_token} if self.core_auth_token else {},
"locale": "de-DE",
"userAgent": "aria-bridge/0.0.3",
},
}
await ws.send(json.dumps(connect_req))
logger.info("[core] Connect-Request gesendet")
# Schritt 3: Auf hello-ok warten (max 10s)
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
response = json.loads(raw)
if response.get("type") == "res" and response.get("ok"):
logger.info("[core] Handshake erfolgreich — hello-ok empfangen")
return True
else:
error = response.get("error", response)
logger.error("[core] Handshake fehlgeschlagen: %s", json.dumps(error)[:200])
return False
except asyncio.TimeoutError:
logger.error("[core] Handshake-Timeout (10s)")
return False
except Exception:
logger.exception("[core] Handshake-Fehler")
return False
async def connect_to_core(self) -> None:
"""Persistente WebSocket-Verbindung zu aria-core (OpenClaw Gateway)."""
retry_delay = 2
max_conn_failures = 6
conn_fail_count = 0
while self.running:
try:
logger.info("[core] Verbinde: %s", self.ws_url)
async with websockets.connect(self.ws_url) as ws:
# OpenClaw Handshake durchfuehren
if not await self._openclaw_handshake(ws):
logger.error("[core] Handshake fehlgeschlagen — Reconnect")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
continue
self.ws_core = ws
retry_delay = 2
conn_fail_count = 0
logger.info("[core] Verbunden und authentifiziert")
async for message in ws:
await self._handle_core_message(message)
except websockets.ConnectionClosed:
logger.warning("[core] Verbindung verloren")
conn_fail_count += 1
except ConnectionRefusedError:
logger.warning("[core] Nicht erreichbar (%s)", self.ws_url)
conn_fail_count += 1
except Exception:
logger.exception("[core] WebSocket-Fehler")
conn_fail_count += 1
finally:
self.ws_core = None
# Nach N aufeinanderfolgenden Fehlern: Exit damit Docker neustartet
# (bekommt neuen Network-Namespace wenn aria-core restarted wurde)
if conn_fail_count >= max_conn_failures:
logger.error(
"[core] %dx nicht erreichbar — Exit fuer Docker-Restart",
conn_fail_count,
)
sys.exit(1)
if self.running:
logger.info("[core] Reconnect in %ds...", retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
async def _handle_core_message(self, raw_message: str) -> None:
"""Verarbeitet Nachrichten von aria-core (OpenClaw Gateway Protokoll).
Unterstuetzte Frame-Typen:
- event: chat:delta (Streaming-Tokens), chat:final (fertige Antwort)
- res: Antworten auf Requests (chat.send Acknowledgment)
"""
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
logger.error("[core] Ungueltige JSON: %s", raw_message[:100])
return
frame_type = message.get("type", "")
logger.info("[core] <<< Frame: type=%s event=%s method=%s | %s",
frame_type, message.get("event", "-"), message.get("method", "-"),
raw_message[:200])
# ── Response auf unsere Requests (z.B. chat.send Ack) ──
if frame_type == "res":
req_id = message.get("id", "")
if message.get("ok"):
logger.debug("[core] Request %s bestätigt", req_id)
else:
error = message.get("error", "Unbekannt")
logger.error("[core] Request %s fehlgeschlagen: %s", req_id, error)
return
# ── Events vom Gateway ──
if frame_type != "event":
logger.debug("[core] Unbekannter Frame-Typ: %s", frame_type)
return
event_name = message.get("event", "")
payload = message.get("payload", {})
# ── agent Events: Streaming-Deltas vom LLM ──
if event_name == "agent":
data = payload.get("data", {})
delta = data.get("delta", "")
stream = payload.get("stream", "")
if delta and stream == "assistant":
logger.debug("[core] Delta: '%s'", delta[:40])
# Activity-Signal zur App (entdupliziert)
tool_name = data.get("name") or data.get("tool") or payload.get("tool") or ""
if stream == "tool_use" or data.get("type") == "tool_use":
activity = "tool"
elif stream == "assistant":
activity = "assistant"
else:
activity = "thinking"
await self._emit_activity(activity, tool_name)
return
# ── chat Events: Snapshots mit state=delta|final|error ──
if event_name == "chat":
state = payload.get("state", "")
if state == "final":
text = self._extract_chat_text(payload)
self._last_chat_final_at = asyncio.get_event_loop().time()
await self._emit_activity("idle", "")
if not text:
logger.warning("[core] chat final ohne Text: %s", json.dumps(payload)[:200])
return
logger.info("[core] Antwort: '%s'", text[:80])
await self._process_core_response(text, payload)
return
if state == "error":
error = payload.get("error", "Unbekannt")
logger.error("[core] Chat-Fehler: %s", error)
self._last_chat_final_at = asyncio.get_event_loop().time()
await self._emit_activity("idle", "")
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Fehler] {error}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
# state=delta — periodischer Snapshot, ignorieren
return
# ── Legacy event names (chat:delta, chat:final, chat:error) ──
if event_name == "chat:delta":
delta = payload.get("delta", payload.get("text", ""))
if delta:
logger.debug("[core] Delta (legacy): '%s'", delta[:40])
return
if event_name == "chat:final":
text = payload.get("text", payload.get("message", ""))
if not text:
text = self._extract_chat_text(payload)
if not text:
logger.warning("[core] chat:final ohne Text: %s", json.dumps(payload)[:200])
return
logger.info("[core] Antwort (legacy): '%s'", text[:80])
await self._process_core_response(text, payload)
return
if event_name == "chat:error":
error = payload.get("error", payload.get("message", "Unbekannt"))
logger.error("[core] Chat-Fehler (legacy): %s", error)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Fehler] {error}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
# tick, health, etc. — ignorieren
if event_name in ("tick", "health"):
return
logger.debug("[core] Event: %s", event_name)
@staticmethod
def _extract_chat_text(payload: dict) -> str:
"""Extrahiert Text aus OpenClaw chat-Event message.content Array."""
try:
content = payload.get("message", {}).get("content", [])
if isinstance(content, list):
return "".join(
c.get("text", "") for c in content if c.get("type") == "text"
)
if isinstance(content, str):
return content
except (AttributeError, TypeError):
pass
return payload.get("text", "")
async def _process_core_response(self, text: str, payload: dict) -> None:
"""Verarbeitet eine fertige Antwort von aria-core.
- Leitet Antwort an die App weiter (via RVS)
- Sprachausgabe ueber TTS (wenn Modus erlaubt)
"""
# NO_REPLY Token: ARIA signalisiert explizit "nicht antworten"
# → komplett verwerfen (keine Chat-Nachricht, kein TTS)
# Toleranz fuer Variationen: "NO_REPLY", "no_reply", mit Punkt/Anfuehrungszeichen
stripped = text.strip().strip('."\'`*').upper()
if stripped == "NO_REPLY" or stripped.startswith("NO_REPLY"):
logger.info("[core] NO_REPLY empfangen — Antwort still verworfen")
return
metadata = payload.get("metadata", {})
is_critical = metadata.get("critical", False)
requested_voice = metadata.get("voice")
# Modus-Wechsel pruefen (Sprachbefehl im Text)
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
self._persist_mode()
logger.info(
"[core] Modus → %s %s",
self.current_mode.config.emoji,
self.current_mode.config.name,
)
await self._broadcast_current_mode()
# Eindeutige Message-ID fuer Audio-Cache-Zuordnung
message_id = str(uuid.uuid4())
# TTS-aufbereitete Variante fuer Debug (Diagnostic zeigt optional)
tts_text_preview = clean_text_for_tts(text)
# Antwort an die App weiterleiten (als Chat-Nachricht)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": text,
"sender": "aria",
"messageId": message_id,
# Debug: aufbereiteter Text fuer TTS (App ignoriert, Diagnostic zeigt optional)
"ttsText": tts_text_preview if tts_text_preview != text else "",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
# TTS ueber XTTS (XTTS-Bridge auf Gaming-PC)
if not (getattr(self, 'tts_enabled', True) and should_speak(self.current_mode, is_critical)):
logger.info("[core] TTS unterdrueckt (Modus: %s)", self.current_mode.config.name)
return
# Voice bestimmen: App-Override fuer diesen Request > globale Default-Voice
xtts_voice = self._next_voice_override or getattr(self, 'xtts_voice', '')
# Override verbrauchen (gilt nur fuer genau diese naechste Antwort)
if self._next_voice_override:
logger.info("[core] Nutze Voice-Override: %s", self._next_voice_override)
self._next_voice_override = None
# Speed ebenfalls aus App-Override nehmen (fallback 1.0)
xtts_speed = self._next_speed_override or 1.0
if self._next_speed_override:
logger.info("[core] Nutze Speed-Override: %.2fx", self._next_speed_override)
self._next_speed_override = None
tts_text = tts_text_preview or text
if not tts_text:
logger.info("[core] TTS-Text leer nach Cleanup — uebersprungen")
return
try:
xtts_request_id = str(uuid.uuid4())
self._xtts_request_to_message[xtts_request_id] = message_id
if len(self._xtts_request_to_message) > 100:
oldest = next(iter(self._xtts_request_to_message))
self._xtts_request_to_message.pop(oldest, None)
await self._send_to_rvs({
"type": "xtts_request",
"payload": {
"text": tts_text,
"voice": xtts_voice,
"speed": xtts_speed,
"language": "de",
"requestId": xtts_request_id,
"messageId": message_id,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
logger.info("[core] XTTS-Request gesendet (voice=%s, speed=%.2fx): '%s'",
xtts_voice or "default", xtts_speed, tts_text[:60])
except Exception as e:
logger.error("[core] XTTS-Request fehlgeschlagen: %s — kein Audio", e)
# ── Mode Persistence (global, nicht pro Geraet) ──────
_MODE_FILE = "/shared/config/mode.json"
def _load_persisted_mode(self) -> Mode:
"""Laedt den zuletzt aktiven Modus aus Shared Config oder NORMAL."""
try:
if os.path.exists(self._MODE_FILE):
data = json.loads(Path(self._MODE_FILE).read_text())
mode_name = data.get("mode", "NORMAL")
for m in Mode:
if m.name == mode_name:
logger.info("[mode] Persistierter Modus geladen: %s", m.config.name)
return m
except Exception as e:
logger.warning("[mode] Laden fehlgeschlagen: %s", e)
return Mode.NORMAL
def _persist_mode(self) -> None:
"""Speichert den aktuellen Modus in Shared Config."""
try:
os.makedirs("/shared/config", exist_ok=True)
Path(self._MODE_FILE).write_text(json.dumps({"mode": self.current_mode.name}))
except Exception as e:
logger.warning("[mode] Speichern fehlgeschlagen: %s", e)
async def _broadcast_current_mode(self) -> None:
"""Broadcastet den aktuellen Modus an alle RVS-Clients (App + Diagnostic)."""
try:
await self._send_to_rvs({
"type": "mode",
"payload": {
"mode": canonical_id(self.current_mode),
"name": self.current_mode.config.name,
"emoji": self.current_mode.config.emoji,
"sender": "bridge", # Filter in mode-Handler gegen Loops
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.debug("[mode] Broadcast fehlgeschlagen: %s", e)
async def _broadcast_persisted_config(self) -> None:
"""Broadcastet die aktuelle voice_config.json einmalig nach RVS-Connect.
Damit bekommen frisch verbundene Bridges (insbesondere die f5tts-bridge
auf der Gamebox nach Container-Restart) die zuletzt in Diagnostic
gewaehlten Settings — ohne dass der User in Diagnostic was klicken muss.
"""
try:
payload = {
"ttsEnabled": getattr(self, "tts_enabled", True),
"xttsVoice": getattr(self, "xtts_voice", ""),
"whisperModel": self.stt_engine.model_size,
}
payload.update(getattr(self, "_f5tts_config", {}) or {})
await self._send_to_rvs({
"type": "config",
"payload": payload,
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
logger.info("[rvs] Persistierte Config broadcastet: %s", payload)
except Exception as e:
logger.debug("[rvs] Config-Broadcast fehlgeschlagen: %s", e)
def _fetch_active_session(self) -> None:
"""Holt die aktive Session vom Diagnostic-Endpoint."""
try:
req = urllib.request.Request(f"{self._diagnostic_url}/api/session", method="GET")
with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read().decode())
new_key = data.get("sessionKey", "")
if new_key and new_key != self._session_key:
logger.info("[session] Aktive Session gewechselt: %s -> %s", self._session_key, new_key)
self._session_key = new_key
except Exception as e:
logger.debug("[session] Diagnostic nicht erreichbar (%s) — nutze '%s'", e, self._session_key)
async def send_to_core(self, text: str, source: str = "bridge") -> None:
"""Sendet Text an aria-core (OpenClaw chat.send Protokoll)."""
if self.ws_core is None:
logger.error("[core] Nicht verbunden — Nachricht verworfen: '%s'", text[:60])
return
# Aktive Session vom Diagnostic holen
self._fetch_active_session()
req_id = self._next_req_id()
message = json.dumps({
"type": "req",
"id": req_id,
"method": "chat.send",
"params": {
"sessionKey": self._session_key,
"message": text,
"idempotencyKey": str(uuid.uuid4()),
},
})
try:
await self.ws_core.send(message)
logger.info("[core] chat.send (%s, id=%s): '%s'", source, req_id, text[:80])
except Exception:
logger.exception("[core] Sendefehler")
# ── RVS Verbindung (App-Relay) ──────────────────────────
async def connect_to_rvs(self) -> None:
"""Persistente WebSocket-Verbindung zum RVS mit Auto-Reconnect.
Bei TLS-Fehler wird automatisch auf ws:// gefallbackt
(wenn RVS_TLS_FALLBACK=true).
"""
if not self.rvs_url or not self.rvs_token:
logger.info("[rvs] Nicht konfiguriert — ueberspringe")
return
retry_delay = 2
current_url = self.rvs_url
using_fallback = False
while self.running:
try:
url = f"{current_url}?token={self.rvs_token}"
logger.info("[rvs] Verbinde: %s", current_url)
async with websockets.connect(url) as ws:
self.ws_rvs = ws
retry_delay = 2
logger.info("[rvs] Verbunden — warte auf App-Nachrichten")
# Aktuellen Modus broadcasten damit gerade verbundene Apps/Diagnostic
# ihren UI-State sofort syncen koennen
await self._broadcast_current_mode()
# Persistierte Voice-Config broadcasten — die f5tts-bridge auf
# der Gamebox bekommt damit nach Restart die zuletzt in
# Diagnostic gewaehlten Settings wieder (sonst stuende sie auf
# ihren Hard-Defaults).
asyncio.create_task(self._broadcast_persisted_config())
# Heartbeat senden (RVS erwartet Ping alle 30s)
heartbeat_task = asyncio.create_task(self._rvs_heartbeat())
try:
async for raw_message in ws:
await self._handle_rvs_message(raw_message)
finally:
heartbeat_task.cancel()
except websockets.ConnectionClosed:
logger.warning("[rvs] Verbindung verloren")
except ConnectionRefusedError:
logger.warning("[rvs] Nicht erreichbar")
except (ssl.SSLError, OSError) as e:
# TLS-Fehler — Fallback auf ws:// versuchen
if not using_fallback and self.rvs_url_fallback:
logger.warning("[rvs] TLS-Fehler: %s", e)
logger.warning("[rvs] TLS gewollt aber nicht verfuegbar — Fallback auf ws://")
current_url = self.rvs_url_fallback
using_fallback = True
retry_delay = 1 # Sofort versuchen
else:
logger.error("[rvs] SSL-Fehler (kein Fallback): %s", e)
except Exception:
logger.exception("[rvs] WebSocket-Fehler")
finally:
self.ws_rvs = None
if self.running:
logger.info("[rvs] Reconnect in %ds...", retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
async def _rvs_heartbeat(self) -> None:
"""Sendet Heartbeats + WebSocket Pings an den RVS damit die Verbindung offen bleibt."""
while True:
await asyncio.sleep(15)
if self.ws_rvs:
try:
# WebSocket Protocol-Level Ping (haelt TCP-Verbindung am Leben)
pong = await self.ws_rvs.ping()
await asyncio.wait_for(pong, timeout=10)
except Exception:
logger.warning("[rvs] Ping fehlgeschlagen — Verbindung tot, erzwinge Reconnect")
try:
await self.ws_rvs.close()
except Exception:
pass
self.ws_rvs = None
break
try:
await self.ws_rvs.send(json.dumps({
"type": "heartbeat",
"timestamp": int(asyncio.get_event_loop().time() * 1000),
}))
except Exception:
break
async def _handle_rvs_message(self, raw_message: str) -> None:
"""Verarbeitet Nachrichten von der App (via RVS).
Unterstuetzte Typen:
- chat: Text-Nachricht → an aria-core weiterleiten
- audio: Audio-Daten → STT → an aria-core
- mode: Moduswechsel
- location: GPS-Daten (loggen, spaeter fuer Skills)
- file: Datei-Upload (an aria-core weiterleiten)
"""
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
logger.error("[rvs] Ungueltige JSON: %s", raw_message[:100])
return
msg_type = message.get("type", "")
payload = message.get("payload", {})
if msg_type == "chat":
# Nur User-Nachrichten weiterleiten — ARIA/Diagnostic-Antworten ignorieren (sonst Loop!)
sender = payload.get("sender", "")
if sender in ("aria", "stt"):
return
text = payload.get("text", "")
# Voice-Override fuer die naechste ARIA-Antwort merken
voice_override = payload.get("voice", "")
if voice_override:
self._next_voice_override = voice_override
logger.info("[rvs] Voice-Override fuer naechste Antwort: %s", voice_override)
# Speed-Override (TTS-Wiedergabegeschwindigkeit, pro Geraet)
try:
speed = float(payload.get("speed", 0) or 0)
if 0.1 <= speed <= 5.0:
self._next_speed_override = speed
except (TypeError, ValueError):
pass
if text:
logger.info("[rvs] App-Chat: '%s'", text[:80])
await self.send_to_core(text, source="app")
return
if msg_type == "cancel_request":
logger.info("[rvs] Cancel-Request von App — rufe Diagnostic /api/cancel auf")
await self._cancel_via_diagnostic()
await self._emit_activity("idle", "")
return
elif msg_type == "audio_pcm":
# Audio-PCM geht direkt von XTTS-Bridge an die App.
# Die aria-bridge darf es NICHT rebroadcasten — sonst bekommt die App
# jeden Chunk doppelt (einmal direkt von XTTS-Bridge via RVS-Broadcast,
# einmal indirekt via uns).
# Wir ignorieren diese Message hier einfach — messageId wird von
# XTTS-Bridge selbst im Payload mitgeliefert.
return
elif msg_type == "xtts_response":
# Legacy-Pfad (alte XTTS-Bridge mit WAV-Response). Weiterleiten als
# type "audio" — App nutzt den bestehenden WAV-Queue-Spieler.
audio_b64 = payload.get("base64", "")
error = payload.get("error", "")
req_id_full = payload.get("requestId", "")
req_id_base = req_id_full.rsplit("_", 1)[0] if "_" in req_id_full else req_id_full
linked_message_id = self._xtts_request_to_message.get(req_id_base, "")
if error:
logger.warning("[rvs] XTTS Fehler: %s", error)
return
if audio_b64:
logger.info("[rvs] XTTS-Audio legacy empfangen: %dKB", len(audio_b64) // 1365)
await self._send_to_rvs({
"type": "audio",
"payload": {
"base64": audio_b64,
"mimeType": payload.get("mimeType", "audio/wav"),
"voice": payload.get("voice", "xtts"),
"messageId": linked_message_id,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
elif msg_type == "tts_request":
# App fordert TTS-Audio fuer einen Text an (Play-Button) → immer XTTS.
text = payload.get("text", "")
message_id = payload.get("messageId", "")
if not text:
return
tts_text = clean_text_for_tts(text) or text
# Voice + Speed aus App-Payload gewinnen, sonst global/default
xtts_voice = payload.get("voice", "") or getattr(self, 'xtts_voice', '')
try:
xtts_speed = float(payload.get("speed", 0) or 0)
if not (0.1 <= xtts_speed <= 5.0):
xtts_speed = 1.0
except (TypeError, ValueError):
xtts_speed = 1.0
try:
xtts_request_id = str(uuid.uuid4())
if message_id:
self._xtts_request_to_message[xtts_request_id] = message_id
await self._send_to_rvs({
"type": "xtts_request",
"payload": {
"text": tts_text,
"voice": xtts_voice,
"speed": xtts_speed,
"language": "de",
"requestId": xtts_request_id,
"messageId": message_id,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
logger.info("[rvs] TTS on-demand via XTTS: '%s'", tts_text[:60])
except Exception as e:
logger.warning("[rvs] TTS on-demand fehlgeschlagen: %s", e)
return
elif msg_type == "config":
# Konfiguration von App/Diagnostic empfangen + persistent speichern.
# Felder die nicht direkt zur aria-bridge gehoeren (f5tts*) werden
# nur persistiert; die f5tts-bridge auf der Gamebox empfaengt den
# gleichen RVS-Broadcast und reagiert selber.
changed = False
if "ttsEnabled" in payload:
self.tts_enabled = bool(payload["ttsEnabled"])
logger.info("[rvs] TTS %s", "aktiviert" if self.tts_enabled else "deaktiviert")
changed = True
if "xttsVoice" in payload:
self.xtts_voice = payload["xttsVoice"]
logger.info("[rvs] XTTS-Stimme: %s", self.xtts_voice or "default")
changed = True
if "whisperModel" in payload:
new_model = payload["whisperModel"]
allowed = {"tiny", "base", "small", "medium", "large-v3"}
if new_model in allowed and new_model != self.stt_engine.model_size:
logger.info("[rvs] Whisper-Modell → %s (nur Config; Modell laedt Gamebox)",
new_model)
self.stt_engine.model_size = new_model
self.stt_engine.model = None
changed = True
# F5-TTS-Felder: einfach persistieren, f5tts-bridge applied selber.
for k in ("f5ttsModel", "f5ttsCkptFile", "f5ttsVocabFile",
"f5ttsCfgStrength", "f5ttsNfeStep"):
if k in payload:
if not hasattr(self, "_f5tts_config"):
self._f5tts_config = {}
self._f5tts_config[k] = payload[k]
changed = True
# Persistent speichern in Shared Volume
if changed:
try:
os.makedirs("/shared/config", exist_ok=True)
config_data = {
"ttsEnabled": getattr(self, "tts_enabled", True),
"xttsVoice": getattr(self, "xtts_voice", ""),
"whisperModel": self.stt_engine.model_size,
}
config_data.update(getattr(self, "_f5tts_config", {}))
with open("/shared/config/voice_config.json", "w") as f:
json.dump(config_data, f, indent=2)
logger.info("[rvs] Voice-Config gespeichert: %s", config_data)
except Exception as e:
logger.warning("[rvs] Config speichern fehlgeschlagen: %s", e)
return
elif msg_type == "mode":
# Moduswechsel von der App — ID ('normal', 'dnd', ...) ODER Aktivierungsphrase
mode_name = payload.get("mode", "")
# Sender kann der Broadcast der Bridge selbst sein — den ignorieren damit
# andere Apps nicht in eine Loop geraten
if payload.get("sender") == "bridge":
return
new_mode = mode_from_id(mode_name) or detect_mode_switch(mode_name)
if new_mode is not None and new_mode != self.current_mode:
self.current_mode = new_mode
self._persist_mode()
logger.info(
"[rvs] Modus → %s %s (von App)",
self.current_mode.config.emoji,
self.current_mode.config.name,
)
# Broadcast an ALLE Clients (App + Diagnostic) damit UI ueberall sync ist
await self._broadcast_current_mode()
elif new_mode is None:
logger.warning("[rvs] Unbekannter Modus: '%s'", mode_name)
elif msg_type == "location":
# GPS-Daten von der App
lat = payload.get("lat")
lng = payload.get("lng")
speed = payload.get("speed")
logger.info("[rvs] GPS: lat=%.4f lng=%.4f speed=%s", lat or 0, lng or 0, speed)
# An aria-core weiterleiten (fuer kontextbasierte Skills)
if self.ws_core:
await self.ws_core.send(raw_message)
elif msg_type == "file":
# Datei von der App → als Text-Nachricht an aria-core
file_name = payload.get("name", "unbekannt")
file_type = payload.get("type", "")
file_b64 = payload.get("base64", "")
file_size = payload.get("size", 0)
width = payload.get("width", 0)
height = payload.get("height", 0)
logger.info("[rvs] Datei empfangen: %s (%s, %dKB)",
file_name, file_type, len(file_b64) // 1365 if file_b64 else 0)
# Shared Volume: /shared/ ist in Bridge UND aria-core gemountet
SHARED_DIR = "/shared/uploads"
os.makedirs(SHARED_DIR, exist_ok=True)
if file_b64 and file_type.startswith("image/"):
# Bild in Shared Volume speichern
ext = ".jpg" if "jpeg" in file_type or "jpg" in file_type else ".png"
safe_name = f"img_{int(asyncio.get_event_loop().time())}_{file_name.replace('/', '_')}"
file_path = os.path.join(SHARED_DIR, safe_name if safe_name.endswith(ext) else safe_name + ext)
with open(file_path, "wb") as f:
f.write(base64.b64decode(file_b64))
size_kb = len(file_b64) // 1365
logger.info("[rvs] Bild gespeichert: %s (%dKB)", file_path, size_kb)
# ERST an aria-core senden (wichtigster Schritt)
text = (f"Stefan hat dir ein Bild geschickt: {file_name}"
f"{f' ({width}x{height}px)' if width else ''}"
f", {size_kb}KB."
f" Das Bild liegt unter: {file_path}"
f" Warte auf Stefans Anweisung was du damit tun sollst.")
await self.send_to_core(text, source="app-file")
# Dann App informieren (optional, darf nicht crashen)
try:
await self._send_to_rvs({
"type": "file_saved",
"payload": {"name": file_name, "serverPath": file_path, "mimeType": file_type},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
elif file_b64:
# Andere Datei in Shared Volume speichern
safe_name = f"file_{int(asyncio.get_event_loop().time())}_{file_name.replace('/', '_')}"
file_path = os.path.join(SHARED_DIR, safe_name)
with open(file_path, "wb") as f:
f.write(base64.b64decode(file_b64))
size_kb = len(file_b64) // 1365
logger.info("[rvs] Datei gespeichert: %s (%dKB)", file_path, size_kb)
# ERST an aria-core senden
text = (f"Stefan hat dir eine Datei geschickt: {file_name}"
f" ({file_type}, {size_kb}KB)."
f" Die Datei liegt unter: {file_path}"
f" Warte auf Stefans Anweisung was du damit tun sollst.")
await self.send_to_core(text, source="app-file")
try:
await self._send_to_rvs({
"type": "file_saved",
"payload": {"name": file_name, "serverPath": file_path, "mimeType": file_type},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
else:
text = f"Stefan hat eine Datei gesendet ({file_name}, {file_type}) aber die Daten sind leer angekommen."
await self.send_to_core(text, source="app-file")
elif msg_type == "file_request":
# App fordert eine Datei an (Re-Download nach Cache-Leerung)
server_path = payload.get("serverPath", "")
req_id = payload.get("requestId", "")
if not server_path or not server_path.startswith("/shared/"):
logger.warning("[rvs] Ungueltiger file_request: %s", server_path)
return
if not os.path.isfile(server_path):
logger.warning("[rvs] Datei nicht gefunden: %s", server_path)
await self._send_to_rvs({
"type": "file_response",
"payload": {"requestId": req_id, "error": "Datei nicht gefunden"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
with open(server_path, "rb") as f:
file_b64 = base64.b64encode(f.read()).decode("ascii")
logger.info("[rvs] Re-Download: %s (%dKB)", server_path, len(file_b64) // 1365)
await self._send_to_rvs({
"type": "file_response",
"payload": {
"requestId": req_id,
"serverPath": server_path,
"base64": file_b64,
"name": os.path.basename(server_path),
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
elif msg_type == "audio":
# Audio von der App → decodieren → STT → an aria-core
audio_b64 = payload.get("base64", "")
mime_type = payload.get("mimeType", "audio/mp4")
duration_ms = payload.get("durationMs", 0)
if not audio_b64:
logger.warning("[rvs] Audio ohne Daten empfangen")
return
# Voice-Override fuer die kommende ARIA-Antwort (App-lokal gewaehlt)
voice_override = payload.get("voice", "")
if voice_override:
self._next_voice_override = voice_override
logger.info("[rvs] Voice-Override (via Audio): %s", voice_override)
try:
speed = float(payload.get("speed", 0) or 0)
if 0.1 <= speed <= 5.0:
self._next_speed_override = speed
except (TypeError, ValueError):
pass
logger.info("[rvs] Audio empfangen: %s, %dms, %dKB",
mime_type, duration_ms, len(audio_b64) // 1365)
asyncio.create_task(self._process_app_audio(audio_b64, mime_type))
elif msg_type == "stt_response":
# Antwort der whisper-bridge auf unseren stt_request
request_id = payload.get("requestId", "")
future = self._pending_stt.get(request_id)
if future is None or future.done():
return
error = payload.get("error", "")
if error:
logger.warning("[rvs] stt_response Fehler: %s", error)
future.set_result(None)
else:
text = payload.get("text", "")
stt_ms = payload.get("sttMs", 0)
model = payload.get("model", "?")
logger.info("[rvs] Remote-STT OK (%s, %dms): '%s'", model, stt_ms, (text or "")[:80])
future.set_result(text)
return
elif msg_type == "service_status":
# Gamebox-Bridges (whisper / f5tts) melden ihren Lade-Status.
# Wir nutzen das fuer den dynamischen STT-Timeout: solange whisper
# im 'loading' steckt, geben wir der Bridge mehr Zeit (Modell-Download
# kann 1-2 Min dauern), statt nach 45s lokal zu fallbacken.
svc = payload.get("service", "")
state = payload.get("state", "")
if svc == "whisper":
was_ready = self._remote_stt_ready
self._remote_stt_ready = (state == "ready")
if self._remote_stt_ready != was_ready:
logger.info("[rvs] whisper-bridge -> %s", state)
return
elif msg_type == "config_request":
# Eine andere Bridge (whisper/f5tts) bittet um die aktuelle Voice-
# Config — passiert wenn sie sich connected, weil sie sonst die
# Diagnostic-Settings nicht kennt. Wir broadcasten die persistierte
# Config (auch beim normalen Connect von aria-bridge selber, aber
# da war eventuell die andere Bridge noch nicht connected).
requester = payload.get("service", "?")
logger.info("[rvs] config_request von %s — broadcaste Voice-Config", requester)
asyncio.create_task(self._broadcast_persisted_config())
return
else:
logger.debug("[rvs] Unbekannter Typ: %s", msg_type)
# STT-Orchestrierung: zuerst Remote (Gamebox), Fallback lokal.
# Zwei Timeouts:
# ready=True → 45s reicht selbst fuer lange Audios
# ready=False → 300s, weil das Modell evtl. noch heruntergeladen wird
# (large-v3 ~3GB, kann auf der Gamebox 1-2 Min dauern).
_STT_REMOTE_TIMEOUT_READY_S = 45.0
_STT_REMOTE_TIMEOUT_LOADING_S = 300.0
async def _process_app_audio(self, audio_b64: str, mime_type: str) -> None:
"""App-Audio → STT → aria-core. Primaer via whisper-bridge (RVS), Fallback lokal."""
# Erst Remote versuchen
text = await self._stt_remote(audio_b64, mime_type)
if text is None:
# Remote hat nicht geantwortet → lokales Whisper
logger.warning("[rvs] Remote-STT nicht verfuegbar — Fallback auf lokales Whisper")
text = await self._stt_local(audio_b64, mime_type)
if text is None:
return
if text.strip():
logger.info("[rvs] STT Ergebnis: '%s'", text[:80])
# ERST an aria-core senden (wichtigster Schritt)
await self.send_to_core(text, source="app-voice")
# STT-Text an RVS senden (fuer Anzeige in App + Diagnostic)
# sender="stt" damit Bridge es ignoriert (kein Loop)
try:
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": text,
"sender": "stt",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.warning("[rvs] STT-Text konnte nicht an RVS gesendet werden: %s", e)
else:
logger.info("[rvs] Keine Sprache erkannt — ignoriert")
async def _stt_remote(self, audio_b64: str, mime_type: str) -> Optional[str]:
"""Schickt Audio an die whisper-bridge und wartet auf stt_response.
Rueckgabe:
str — erkannter Text (kann leer sein)
None — Remote-STT nicht erreichbar oder Fehler/Timeout (→ Fallback)
"""
if self.ws_rvs is None:
return None
request_id = str(uuid.uuid4())
loop = asyncio.get_event_loop()
future: asyncio.Future = loop.create_future()
self._pending_stt[request_id] = future
try:
model = getattr(self.stt_engine, "model_size", "small")
logger.info("[rvs] stt_request → whisper-bridge (id=%s, model=%s, %dKB)",
request_id[:8], model, len(audio_b64) // 1365)
ok = await self._send_to_rvs({
"type": "stt_request",
"payload": {
"requestId": request_id,
"audio": audio_b64,
"mimeType": mime_type,
"model": model,
"language": getattr(self.stt_engine, "language", "de"),
},
"timestamp": int(loop.time() * 1000),
})
if not ok:
logger.warning("[rvs] stt_request konnte nicht gesendet werden — skip Remote")
return None
timeout_s = (self._STT_REMOTE_TIMEOUT_READY_S
if self._remote_stt_ready
else self._STT_REMOTE_TIMEOUT_LOADING_S)
logger.info("[rvs] STT-Timeout %ds (whisper-bridge %s)",
int(timeout_s), "ready" if self._remote_stt_ready else "loading")
return await asyncio.wait_for(future, timeout=timeout_s)
except asyncio.TimeoutError:
logger.warning("[rvs] Remote-STT Timeout (%.0fs)", self._STT_REMOTE_TIMEOUT_S)
return None
except Exception as e:
logger.warning("[rvs] Remote-STT Fehler: %s", e)
return None
finally:
self._pending_stt.pop(request_id, None)
async def _stt_local(self, audio_b64: str, mime_type: str) -> Optional[str]:
"""Lokales Whisper-Fallback: FFmpeg → float32 → stt_engine.transcribe."""
loop = asyncio.get_event_loop()
tmp_in = None
tmp_out = None
try:
ext = ".mp4" if "mp4" in mime_type else ".wav" if "wav" in mime_type else ".ogg"
tmp_in = tempfile.NamedTemporaryFile(suffix=ext, delete=False)
tmp_in.write(base64.b64decode(audio_b64))
tmp_in.close()
tmp_out = tempfile.NamedTemporaryFile(suffix=".raw", delete=False)
tmp_out.close()
cmd = [
"ffmpeg", "-y", "-i", tmp_in.name,
"-ar", "16000", "-ac", "1", "-f", "f32le",
tmp_out.name,
]
result = await loop.run_in_executor(
None,
lambda: subprocess.run(cmd, capture_output=True, timeout=30),
)
if result.returncode != 0:
logger.error("[rvs] FFmpeg Fehler: %s", result.stderr.decode()[:200])
return None
audio_data = np.fromfile(tmp_out.name, dtype=np.float32)
if len(audio_data) == 0:
logger.warning("[rvs] Leere Audio-Daten nach Konvertierung")
return None
duration_s = len(audio_data) / 16000.0
logger.info("[rvs] Lokal-STT: %.1fs Audio, model=%s", duration_s, self.stt_engine.model_size)
return await loop.run_in_executor(None, self.stt_engine.transcribe, audio_data)
except Exception:
logger.exception("[rvs] Lokales STT fehlgeschlagen")
return None
finally:
for f in (tmp_in, tmp_out):
if f:
try:
os.unlink(f.name)
except OSError:
pass
async def _send_to_rvs(self, message: dict) -> bool:
"""Sendet eine Nachricht an die App (via RVS) mit Verbindungs-Check.
Rueckgabe: True wenn erfolgreich gesendet, False wenn Verbindung tot.
"""
if self.ws_rvs is None:
return False
# Ping-Check: Verbindung wirklich aktiv?
try:
pong = await self.ws_rvs.ping()
await asyncio.wait_for(pong, timeout=5)
except Exception:
logger.warning("[rvs] Ping fehlgeschlagen — Verbindung tot, erzwinge Reconnect")
try:
await self.ws_rvs.close()
except Exception:
pass
self.ws_rvs = None
# Reconnect wird vom connect_to_rvs Loop uebernommen
return False
try:
await self.ws_rvs.send(json.dumps(message))
return True
except Exception:
logger.warning("[rvs] Sendefehler — RVS nicht erreichbar")
return False
# ── Log-Streaming an die App ─────────────────────────────
async def _cancel_via_diagnostic(self) -> None:
"""Ruft das Diagnostic /api/cancel an — dort laeuft die volle Abbruch-Logik
(openclaw doctor --fix mit Docker-Socket)."""
def _do_request():
try:
req = urllib.request.Request(
f"{self._diagnostic_url}/api/cancel",
method="POST",
data=b"",
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status
except Exception as e:
return f"error: {e}"
status = await asyncio.get_event_loop().run_in_executor(None, _do_request)
logger.info("[cancel] Diagnostic /api/cancel: %s", status)
async def _emit_activity(self, activity: str, tool: str = "") -> None:
"""Sendet agent_activity an die App — nur wenn sich der State geaendert hat.
Trailing Agent-Events nach chat:final werden 3s lang unterdrueckt
(nur 'idle' kommt immer durch)."""
if activity != "idle" and self._last_chat_final_at > 0:
since_final = asyncio.get_event_loop().time() - self._last_chat_final_at
if since_final < 3.0:
return
state = (activity, tool)
if state == self._last_activity_state:
return
self._last_activity_state = state
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": activity, "tool": tool},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
async def send_log_to_app(self, source: str, message: str, level: str = "info") -> None:
"""Sendet einen Log-Eintrag an die App (erscheint im Log-Viewer)."""
await self._send_to_rvs({
"type": "log",
"payload": {
"source": source,
"message": message,
"level": level,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
async def send_event_to_app(self, title: str, description: str) -> None:
"""Sendet ein Event an die App (erscheint im Event-Feed)."""
await self._send_to_rvs({
"type": "event",
"payload": {
"title": title,
"description": description,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
# ── Audio-Schleife (lokales Mikrofon) ────────────────────
async def audio_loop(self) -> None:
"""Wake-Word erkennen, aufnehmen, transkribieren, an aria-core senden."""
logger.info("Audio-Schleife gestartet — warte auf Wake-Word '%s'...", self.wake_word.wake_word_key)
loop = asyncio.get_event_loop()
while self.running:
try:
audio_chunk = sd.rec(
BLOCK_SIZE,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
detected = await loop.run_in_executor(
None, self.wake_word.detect, audio_chunk.flatten()
)
if detected:
logger.info("Wake-Word erkannt — starte Aufnahme")
await self.send_event_to_app(
"Wake-Word erkannt",
"ARIA hoert zu...",
)
audio_data = await loop.run_in_executor(None, record_audio)
text = await loop.run_in_executor(
None, self.stt_engine.transcribe, audio_data
)
if text.strip():
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
await self.send_to_core(text, source="bridge")
else:
logger.info("Keine Sprache erkannt — ignoriert")
except sd.PortAudioError:
if not hasattr(self, '_audio_warned'):
logger.warning("Audio-Geraet nicht verfuegbar — lokales Mikrofon deaktiviert (kein Spam mehr)")
self._audio_warned = True
await asyncio.sleep(60) # 60s statt 5s — spart Log-Spam
except Exception:
logger.exception("Fehler in der Audio-Schleife")
await asyncio.sleep(1)
# ── Run & Shutdown ───────────────────────────────────────
async def run(self) -> None:
"""Startet die Bridge mit allen Verbindungen parallel.
Ohne Audio-Geraet laeuft nur core + rvs (reiner Relay-Modus).
"""
self.running = True
tasks = [
asyncio.create_task(self.connect_to_core()),
asyncio.create_task(self.connect_to_rvs()),
]
if self.audio_available:
tasks.append(asyncio.create_task(self.audio_loop()))
else:
logger.info("Audio-Loop deaktiviert — kein Audio-Geraet")
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logger.info("Bridge-Tasks abgebrochen")
def shutdown(self) -> None:
"""Faehrt die Bridge sauber herunter."""
logger.info("Bridge wird heruntergefahren...")
self.running = False
# ── Hauptprogramm ────────────────────────────────────────────
def main() -> None:
"""Startet die ARIA Voice Bridge."""
bridge = ARIABridge()
# Signal-Handler fuer sauberes Herunterfahren
def handle_signal(signum: int, _frame: object) -> None:
sig_name = signal.Signals(signum).name
logger.info("Signal %s empfangen — fahre herunter", sig_name)
bridge.shutdown()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Initialisierung (synchron — bevor die Event-Loop startet)
try:
bridge.initialize()
except Exception:
logger.exception("Initialisierung fehlgeschlagen")
sys.exit(1)
# Event-Loop starten
try:
asyncio.run(bridge.run())
except KeyboardInterrupt:
logger.info("Keyboard Interrupt — Bridge beendet")
finally:
logger.info("ARIA Voice Bridge beendet")
if __name__ == "__main__":
main()