ARIA-AGENT/bridge/aria_bridge.py

1100 lines
40 KiB
Python

"""
ARIA Voice Bridge — Hauptmodul.
Verbindet die Android App (via RVS) mit ARIA-Core und bietet
lokale Spracheingabe (Wake-Word + Whisper STT) und Sprachausgabe (Piper TTS).
Nachrichtenfluss:
App → RVS → Bridge → aria-core
aria-core → Bridge → RVS → App
→ Lautsprecher (TTS)
Stimmen:
- Ramona (de_DE-ramona-low) — Alltag, Gespraeche
- Thorsten (de_DE-thorsten-high) — epische Momente, Alarme
"""
from __future__ import annotations
import asyncio
import base64
import json
import logging
import os
import signal
import ssl
import sys
import tempfile
import uuid
import wave
from pathlib import Path
from typing import Optional
import numpy as np
import sounddevice as sd
import websockets
from faster_whisper import WhisperModel
from openwakeword.model import Model as WakeWordModel
from piper import PiperVoice
from modes import Mode, detect_mode_switch, should_speak
# ── Logging ──────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("aria-bridge")
# ── Konfiguration ───────────────────────────────────────────
CONFIG_PATH = Path("/config/aria.env")
VOICES_DIR = Path("/voices")
CORE_WS_URL = os.getenv("ARIA_CORE_WS", "ws://127.0.0.1:18789")
CORE_AUTH_TOKEN = os.getenv("ARIA_AUTH_TOKEN", "") # OpenClaw Gateway Token
RVS_HOST = os.getenv("RVS_HOST", "") # z.B. rvs.hackersoft.de
RVS_PORT = os.getenv("RVS_PORT", "443") # Port des RVS
RVS_TLS = os.getenv("RVS_TLS", "true") # true = wss://, false = ws://
RVS_TLS_FALLBACK = os.getenv("RVS_TLS_FALLBACK", "true") # Bei TLS-Fehler ws:// versuchen
RVS_TOKEN = os.getenv("RVS_TOKEN", "") # Pairing-Token (gleich wie in der App)
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small")
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "de")
# Audio-Parameter
SAMPLE_RATE = 16000
CHANNELS = 1
BLOCK_SIZE = 1280 # 80ms bei 16kHz — gut fuer Wake-Word-Erkennung
RECORD_SECONDS = 8 # Max. Aufnahmedauer nach Wake-Word
# Epische Trigger — bei diesen Woertern spricht Thorsten
EPIC_TRIGGERS = [
"deploy",
"erfolgreich",
"alarm",
"so soll es sein",
"kritisch",
"server down",
"sicherheitswarnung",
"ticket geloest",
"aufgabe abgeschlossen",
]
def load_config() -> dict[str, str]:
"""Laedt Konfiguration aus /config/aria.env."""
config: dict[str, str] = {}
if CONFIG_PATH.exists():
for line in CONFIG_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
config[key.strip()] = value.strip()
logger.info("Konfiguration geladen aus %s", CONFIG_PATH)
else:
logger.warning("Keine Konfiguration gefunden: %s", CONFIG_PATH)
return config
# ── Voice Engine ─────────────────────────────────────────────
class VoiceEngine:
"""Verwaltet Piper TTS mit zwei Stimmen: Ramona und Thorsten."""
def __init__(self, voices_dir: Path) -> None:
self.voices_dir = voices_dir
self.voices: dict[str, PiperVoice] = {}
def initialize(self) -> None:
"""Laedt die Piper-Stimmen aus dem Voices-Verzeichnis."""
voice_configs = {
"ramona": "de_DE-ramona-low",
"thorsten": "de_DE-thorsten-high",
}
for name, model_name in voice_configs.items():
model_path = self.voices_dir / f"{model_name}.onnx"
config_path = self.voices_dir / f"{model_name}.onnx.json"
if not model_path.exists():
logger.error("Stimme nicht gefunden: %s", model_path)
continue
self.voices[name] = PiperVoice.load(
str(model_path),
config_path=str(config_path) if config_path.exists() else None,
)
logger.info("Stimme geladen: %s (%s)", name, model_name)
if not self.voices:
logger.error("Keine Stimmen geladen — TTS deaktiviert")
def select_voice(
self, text: str, requested_voice: Optional[str] = None
) -> str:
"""Waehlt die passende Stimme basierend auf Text oder Anfrage.
Thorsten wird bei epischen Triggern verwendet,
sonst Ramona als Standardstimme.
Args:
text: Der zu sprechende Text (fuer Epic-Trigger-Erkennung).
requested_voice: Explizit angeforderte Stimme ("ramona" | "thorsten").
Returns:
Name der gewaehlten Stimme.
"""
if requested_voice and requested_voice in self.voices:
return requested_voice
# Epische Trigger pruefen
text_lower = text.lower()
for trigger in EPIC_TRIGGERS:
if trigger in text_lower:
logger.info("Epischer Trigger erkannt: '%s' — Thorsten spricht", trigger)
return "thorsten"
return "ramona"
def synthesize(self, text: str, voice_name: str = "ramona") -> Optional[bytes]:
"""Erzeugt Audio-Daten aus Text mit der gewaehlten Stimme.
Args:
text: Der zu sprechende Text.
voice_name: Name der Stimme ("ramona" oder "thorsten").
Returns:
WAV-Audiodaten als bytes oder None bei Fehler.
"""
voice = self.voices.get(voice_name)
if voice is None:
logger.error("Stimme '%s' nicht verfuegbar", voice_name)
return None
try:
# Piper gibt PCM-Samples zurueck, wir schreiben sie als WAV
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
with wave.open(tmp_path, "wb") as wav_file:
voice.synthesize(text, wav_file)
audio_data = Path(tmp_path).read_bytes()
Path(tmp_path).unlink(missing_ok=True)
logger.info(
"TTS: %d bytes erzeugt mit %s'%s'",
len(audio_data),
voice_name,
text[:60],
)
return audio_data
except Exception:
logger.exception("TTS-Fehler bei Stimme '%s'", voice_name)
return None
def speak(self, text: str, requested_voice: Optional[str] = None) -> None:
"""Spricht den Text ueber das Audio-Geraet.
Waehlt automatisch die passende Stimme und gibt das Audio aus.
Args:
text: Der zu sprechende Text.
requested_voice: Optionale explizite Stimmenwahl.
"""
voice_name = self.select_voice(text, requested_voice)
audio_data = self.synthesize(text, voice_name)
if audio_data is None:
return
try:
# WAV-Daten lesen und ueber sounddevice abspielen
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
with wave.open(tmp_path, "rb") as wf:
frames = wf.readframes(wf.getnframes())
sample_width = wf.getsampwidth()
rate = wf.getframerate()
channels = wf.getnchannels()
Path(tmp_path).unlink(missing_ok=True)
# Numpy-Array aus PCM-Daten
dtype_map = {1: np.int8, 2: np.int16, 4: np.int32}
dtype = dtype_map.get(sample_width, np.int16)
audio_array = np.frombuffer(frames, dtype=dtype)
if channels > 1:
audio_array = audio_array.reshape(-1, channels)
sd.play(audio_array, samplerate=rate)
sd.wait() # Warten bis Wiedergabe fertig
except Exception:
logger.exception("Audio-Wiedergabe fehlgeschlagen")
# ── STT Engine ───────────────────────────────────────────────
class STTEngine:
"""Whisper Speech-to-Text — laeuft komplett lokal."""
def __init__(self, model_size: str = "small", language: str = "de") -> None:
self.model_size = model_size
self.language = language
self.model: Optional[WhisperModel] = None
def initialize(self) -> None:
"""Laedt das Whisper-Modell."""
logger.info(
"Lade Whisper-Modell '%s' (Sprache: %s)...",
self.model_size,
self.language,
)
self.model = WhisperModel(self.model_size, device="cpu", compute_type="int8")
logger.info("Whisper-Modell geladen")
def transcribe(self, audio_data: np.ndarray) -> str:
"""Transkribiert Audio-Daten zu Text.
Args:
audio_data: NumPy-Array mit Audio (float32, 16kHz, mono).
Returns:
Erkannter Text oder leerer String.
"""
if self.model is None:
logger.error("Whisper-Modell nicht initialisiert")
return ""
try:
# Audio als float32 normalisieren
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32) / 32768.0
segments, info = self.model.transcribe(
audio_data,
language=self.language,
beam_size=5,
vad_filter=True,
)
text = " ".join(segment.text.strip() for segment in segments)
logger.info("STT: '%s' (Sprache: %s, Dauer: %.1fs)", text, info.language, info.duration)
return text
except Exception:
logger.exception("STT-Fehler")
return ""
# ── Wake-Word Erkennung ──────────────────────────────────────
class WakeWordDetector:
"""Erkennt das Wake-Word im Audio-Stream.
Nutzt ein Custom-Modell aus /voices/wake_aria.onnx falls vorhanden,
sonst das eingebaute 'hey_jarvis' als Fallback.
"""
CUSTOM_MODEL_PATH = "/voices/wake_aria.onnx"
FALLBACK_MODEL = "hey_jarvis"
THRESHOLD = 0.5
def __init__(self) -> None:
self.model: Optional[WakeWordModel] = None
self.wake_word_key: str = ""
def initialize(self) -> None:
"""Laedt das Wake-Word-Modell.
Hinweis: WakeWordModel() wird OHNE Argumente aufgerufen.
Aeltere openwakeword-Versionen leiten unbekannte kwargs
an AudioFeatures weiter, was zum Crash fuehrt.
"""
logger.info("Lade Wake-Word-Modell...")
# Alle eingebauten Modelle laden (ohne kwargs — Kompatibilitaet!)
self.model = WakeWordModel()
# Verfuegbare Modelle ermitteln
available = list(self.model.models.keys()) if hasattr(self.model, 'models') else []
logger.info("Verfuegbare Wake-Words: %s", ", ".join(available) if available else "(keine)")
# Bestes Modell auswaehlen
if self.FALLBACK_MODEL in available:
self.wake_word_key = self.FALLBACK_MODEL
elif available:
self.wake_word_key = available[0]
else:
self.wake_word_key = self.FALLBACK_MODEL
logger.info("Wake-Word aktiv: '%s'", self.wake_word_key)
logger.info(
"Tipp: Custom 'aria' Wake-Word trainieren → "
"https://github.com/dscripka/openWakeWord#training-new-models"
)
def detect(self, audio_chunk: np.ndarray) -> bool:
"""Prueft ob das Wake-Word im Audio-Chunk enthalten ist.
Args:
audio_chunk: Audio-Daten (int16, 16kHz).
Returns:
True wenn Wake-Word erkannt wurde.
"""
if self.model is None:
return False
prediction = self.model.predict(audio_chunk)
# openwakeword gibt Scores pro Modell zurueck
for key, score in prediction.items():
if score > self.THRESHOLD:
logger.info("Wake-Word '%s' erkannt! (Score: %.2f)", key, score)
return True
return False
# ── Audio-Aufnahme ───────────────────────────────────────────
def record_audio(duration: float = RECORD_SECONDS) -> np.ndarray:
"""Nimmt Audio vom Mikrofon auf.
Args:
duration: Aufnahmedauer in Sekunden.
Returns:
NumPy-Array mit Audio-Daten (int16, 16kHz, mono).
"""
logger.info("Aufnahme laeuft... (%d Sekunden)", duration)
audio = sd.rec(
int(duration * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
logger.info("Aufnahme beendet")
return audio.flatten()
# ── Bridge Hauptklasse ───────────────────────────────────────
class ARIABridge:
"""ARIA Voice Bridge — verbindet App (via RVS) und Sprache mit ARIA-Core.
Drei parallele Aufgaben:
1. connect_to_core() — WebSocket zu aria-core (lokal)
2. connect_to_rvs() — WebSocket zum RVS (oeffentlich, fuer die App)
3. audio_loop() — Wake-Word + STT (lokales Mikrofon)
"""
def __init__(self) -> None:
self.config = load_config()
self.ws_url = self.config.get("ARIA_CORE_WS", CORE_WS_URL)
self.core_auth_token = self.config.get("ARIA_AUTH_TOKEN", CORE_AUTH_TOKEN)
self._req_id_counter = 0
self._session_key = "aria-bridge" # Feste Session fuer die Bridge
# RVS-Verbindungsinfo aus Config oder Env
rvs_host = self.config.get("RVS_HOST", RVS_HOST)
rvs_port = self.config.get("RVS_PORT", RVS_PORT)
rvs_tls = self.config.get("RVS_TLS", RVS_TLS).lower() == "true"
self.rvs_tls_fallback = self.config.get("RVS_TLS_FALLBACK", RVS_TLS_FALLBACK).lower() == "true"
self.rvs_token = self.config.get("RVS_TOKEN", RVS_TOKEN)
# URLs zusammenbauen (primaer + fallback)
if rvs_host:
proto = "wss" if rvs_tls else "ws"
self.rvs_url = f"{proto}://{rvs_host}:{rvs_port}"
# Fallback-URL (ohne TLS) nur wenn TLS aktiv und Fallback erlaubt
if rvs_tls and self.rvs_tls_fallback:
self.rvs_url_fallback = f"ws://{rvs_host}:{rvs_port}"
else:
self.rvs_url_fallback = ""
else:
self.rvs_url = ""
self.rvs_url_fallback = ""
self.current_mode = Mode.NORMAL
self.running = False
# Komponenten
self.voice_engine = VoiceEngine(VOICES_DIR)
self.stt_engine = STTEngine(
model_size=self.config.get("WHISPER_MODEL", WHISPER_MODEL),
language=self.config.get("WHISPER_LANGUAGE", WHISPER_LANGUAGE),
)
self.wake_word = WakeWordDetector()
# WebSocket-Verbindungen
self.ws_core: Optional[websockets.WebSocketClientProtocol] = None
self.ws_rvs: Optional[websockets.WebSocketClientProtocol] = None
def initialize(self) -> None:
"""Initialisiert alle Komponenten.
Audio-Komponenten (TTS, STT, Wake-Word) sind optional —
wenn kein Audio-Geraet vorhanden ist (z.B. VM ohne Soundkarte),
laeuft die Bridge trotzdem als reiner RVS-Relay.
"""
logger.info("=" * 50)
logger.info("ARIA Voice Bridge startet...")
logger.info("=" * 50)
# Voice-Engine IMMER laden — rendert Audio fuer die App (auch ohne Soundkarte)
self.voice_engine.initialize()
# Audio-Hardware pruefen (fuer lokales Mikro/Lautsprecher)
self.audio_available = False
try:
sd.query_devices()
self.audio_available = True
logger.info("Audio-Geraet gefunden — Wake-Word und lokale TTS aktiv")
self.stt_engine.initialize()
self.wake_word.initialize()
except (sd.PortAudioError, Exception):
logger.warning("Kein Audio-Geraet — Wake-Word und lokale TTS deaktiviert")
logger.info("Piper TTS rendert Audio fuer die App (via RVS)")
logger.info("Alle Komponenten initialisiert")
logger.info("aria-core: %s", self.ws_url)
if self.rvs_url and self.rvs_token:
logger.info("RVS: %s (Token: %s...)", self.rvs_url, self.rvs_token[:8])
else:
logger.warning("RVS nicht konfiguriert — App-Verbindung deaktiviert")
logger.warning(" Setze RVS_HOST, RVS_PORT, RVS_TOKEN in /config/aria.env")
logger.info("Modus: %s %s", self.current_mode.config.emoji, self.current_mode.config.name)
# ── aria-core Verbindung (OpenClaw Gateway Protokoll) ───
def _next_req_id(self) -> str:
"""Erzeugt eine eindeutige Request-ID fuer das OpenClaw-Protokoll."""
self._req_id_counter += 1
return f"bridge-{self._req_id_counter}"
async def _openclaw_handshake(self, ws: websockets.WebSocketClientProtocol) -> bool:
"""Fuehrt den OpenClaw Gateway Handshake durch.
1. Wartet auf connect.challenge Event vom Gateway
2. Sendet connect Request mit Auth-Token
3. Wartet auf hello-ok Response
Returns:
True wenn Handshake erfolgreich, sonst False.
"""
try:
# Schritt 1: Auf Challenge warten (max 10s)
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
challenge = json.loads(raw)
if challenge.get("type") != "event" or challenge.get("event") != "connect.challenge":
logger.error("[core] Unerwartete erste Nachricht: %s", raw[:200])
return False
nonce = challenge.get("payload", {}).get("nonce", "")
logger.info("[core] Challenge empfangen (nonce: %s...)", nonce[:8] if nonce else "?")
# Schritt 2: Connect Request senden
connect_req = {
"type": "req",
"id": self._next_req_id(),
"method": "connect",
"params": {
"minProtocol": 3,
"maxProtocol": 3,
"client": {
"id": "gateway-client",
"version": "0.0.3",
"platform": "linux",
"mode": "backend",
},
"role": "operator",
"scopes": ["operator.read", "operator.write"],
"caps": ["voice"],
"commands": [],
"permissions": {},
"auth": {"token": self.core_auth_token} if self.core_auth_token else {},
"locale": "de-DE",
"userAgent": "aria-bridge/0.0.3",
},
}
await ws.send(json.dumps(connect_req))
logger.info("[core] Connect-Request gesendet")
# Schritt 3: Auf hello-ok warten (max 10s)
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
response = json.loads(raw)
if response.get("type") == "res" and response.get("ok"):
logger.info("[core] Handshake erfolgreich — hello-ok empfangen")
return True
else:
error = response.get("error", response)
logger.error("[core] Handshake fehlgeschlagen: %s", json.dumps(error)[:200])
return False
except asyncio.TimeoutError:
logger.error("[core] Handshake-Timeout (10s)")
return False
except Exception:
logger.exception("[core] Handshake-Fehler")
return False
async def connect_to_core(self) -> None:
"""Persistente WebSocket-Verbindung zu aria-core (OpenClaw Gateway)."""
retry_delay = 2
max_conn_failures = 6
conn_fail_count = 0
while self.running:
try:
logger.info("[core] Verbinde: %s", self.ws_url)
async with websockets.connect(self.ws_url) as ws:
# OpenClaw Handshake durchfuehren
if not await self._openclaw_handshake(ws):
logger.error("[core] Handshake fehlgeschlagen — Reconnect")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
continue
self.ws_core = ws
retry_delay = 2
conn_fail_count = 0
logger.info("[core] Verbunden und authentifiziert")
async for message in ws:
await self._handle_core_message(message)
except websockets.ConnectionClosed:
logger.warning("[core] Verbindung verloren")
conn_fail_count += 1
except ConnectionRefusedError:
logger.warning("[core] Nicht erreichbar (%s)", self.ws_url)
conn_fail_count += 1
except Exception:
logger.exception("[core] WebSocket-Fehler")
conn_fail_count += 1
finally:
self.ws_core = None
# Nach N aufeinanderfolgenden Fehlern: Exit damit Docker neustartet
# (bekommt neuen Network-Namespace wenn aria-core restarted wurde)
if conn_fail_count >= max_conn_failures:
logger.error(
"[core] %dx nicht erreichbar — Exit fuer Docker-Restart",
conn_fail_count,
)
sys.exit(1)
if self.running:
logger.info("[core] Reconnect in %ds...", retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
async def _handle_core_message(self, raw_message: str) -> None:
"""Verarbeitet Nachrichten von aria-core (OpenClaw Gateway Protokoll).
Unterstuetzte Frame-Typen:
- event: chat:delta (Streaming-Tokens), chat:final (fertige Antwort)
- res: Antworten auf Requests (chat.send Acknowledgment)
"""
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
logger.error("[core] Ungueltige JSON: %s", raw_message[:100])
return
frame_type = message.get("type", "")
logger.info("[core] <<< Frame: type=%s event=%s method=%s | %s",
frame_type, message.get("event", "-"), message.get("method", "-"),
raw_message[:200])
# ── Response auf unsere Requests (z.B. chat.send Ack) ──
if frame_type == "res":
req_id = message.get("id", "")
if message.get("ok"):
logger.debug("[core] Request %s bestätigt", req_id)
else:
error = message.get("error", "Unbekannt")
logger.error("[core] Request %s fehlgeschlagen: %s", req_id, error)
return
# ── Events vom Gateway ──
if frame_type != "event":
logger.debug("[core] Unbekannter Frame-Typ: %s", frame_type)
return
event_name = message.get("event", "")
payload = message.get("payload", {})
# ── agent Events: Streaming-Deltas vom LLM ──
if event_name == "agent":
data = payload.get("data", {})
delta = data.get("delta", "")
if delta and payload.get("stream") == "assistant":
logger.debug("[core] Delta: '%s'", delta[:40])
return
# ── chat Events: Snapshots mit state=delta|final|error ──
if event_name == "chat":
state = payload.get("state", "")
if state == "final":
text = self._extract_chat_text(payload)
if not text:
logger.warning("[core] chat final ohne Text: %s", json.dumps(payload)[:200])
return
logger.info("[core] Antwort: '%s'", text[:80])
await self._process_core_response(text, payload)
return
if state == "error":
error = payload.get("error", "Unbekannt")
logger.error("[core] Chat-Fehler: %s", error)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Fehler] {error}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
# state=delta — periodischer Snapshot, ignorieren
return
# ── Legacy event names (chat:delta, chat:final, chat:error) ──
if event_name == "chat:delta":
delta = payload.get("delta", payload.get("text", ""))
if delta:
logger.debug("[core] Delta (legacy): '%s'", delta[:40])
return
if event_name == "chat:final":
text = payload.get("text", payload.get("message", ""))
if not text:
text = self._extract_chat_text(payload)
if not text:
logger.warning("[core] chat:final ohne Text: %s", json.dumps(payload)[:200])
return
logger.info("[core] Antwort (legacy): '%s'", text[:80])
await self._process_core_response(text, payload)
return
if event_name == "chat:error":
error = payload.get("error", payload.get("message", "Unbekannt"))
logger.error("[core] Chat-Fehler (legacy): %s", error)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Fehler] {error}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
# tick, health, etc. — ignorieren
if event_name in ("tick", "health"):
return
logger.debug("[core] Event: %s", event_name)
@staticmethod
def _extract_chat_text(payload: dict) -> str:
"""Extrahiert Text aus OpenClaw chat-Event message.content Array."""
try:
content = payload.get("message", {}).get("content", [])
if isinstance(content, list):
return "".join(
c.get("text", "") for c in content if c.get("type") == "text"
)
if isinstance(content, str):
return content
except (AttributeError, TypeError):
pass
return payload.get("text", "")
async def _process_core_response(self, text: str, payload: dict) -> None:
"""Verarbeitet eine fertige Antwort von aria-core.
- Leitet Antwort an die App weiter (via RVS)
- Sprachausgabe ueber TTS (wenn Modus erlaubt)
"""
metadata = payload.get("metadata", {})
is_critical = metadata.get("critical", False)
requested_voice = metadata.get("voice")
# Modus-Wechsel pruefen
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
logger.info(
"[core] Modus → %s %s",
self.current_mode.config.emoji,
self.current_mode.config.name,
)
await self._send_to_rvs({
"type": "mode",
"payload": {"mode": self.current_mode.name},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
# Stimme auswaehlen
voice_name = requested_voice or self.voice_engine.select_voice(text)
# Antwort an die App weiterleiten (als Chat-Nachricht)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": text,
"sender": "aria",
"voice": voice_name,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
# TTS-Audio rendern und an die App senden (wenn Modus es erlaubt)
if should_speak(self.current_mode, is_critical):
audio_data = self.voice_engine.synthesize(text, voice_name)
if audio_data:
audio_b64 = base64.b64encode(audio_data).decode("ascii")
await self._send_to_rvs({
"type": "audio",
"payload": {
"base64": audio_b64,
"mimeType": "audio/wav",
"voice": voice_name,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
logger.info("[core] TTS-Audio gesendet: %d bytes (%s)", len(audio_data), voice_name)
# Lokal abspielen (nur wenn Soundkarte vorhanden)
if self.audio_available:
self.voice_engine.speak(text, requested_voice)
else:
logger.info("[core] TTS unterdrueckt (Modus: %s)", self.current_mode.config.name)
async def send_to_core(self, text: str, source: str = "bridge") -> None:
"""Sendet Text an aria-core (OpenClaw chat.send Protokoll)."""
if self.ws_core is None:
logger.error("[core] Nicht verbunden — Nachricht verworfen: '%s'", text[:60])
return
req_id = self._next_req_id()
message = json.dumps({
"type": "req",
"id": req_id,
"method": "chat.send",
"params": {
"sessionKey": self._session_key,
"message": text,
"idempotencyKey": str(uuid.uuid4()),
},
})
try:
await self.ws_core.send(message)
logger.info("[core] chat.send (%s, id=%s): '%s'", source, req_id, text[:80])
except Exception:
logger.exception("[core] Sendefehler")
# ── RVS Verbindung (App-Relay) ──────────────────────────
async def connect_to_rvs(self) -> None:
"""Persistente WebSocket-Verbindung zum RVS mit Auto-Reconnect.
Bei TLS-Fehler wird automatisch auf ws:// gefallbackt
(wenn RVS_TLS_FALLBACK=true).
"""
if not self.rvs_url or not self.rvs_token:
logger.info("[rvs] Nicht konfiguriert — ueberspringe")
return
retry_delay = 2
current_url = self.rvs_url
using_fallback = False
while self.running:
try:
url = f"{current_url}?token={self.rvs_token}"
logger.info("[rvs] Verbinde: %s", current_url)
async with websockets.connect(url) as ws:
self.ws_rvs = ws
retry_delay = 2
logger.info("[rvs] Verbunden — warte auf App-Nachrichten")
# Heartbeat senden (RVS erwartet Ping alle 30s)
heartbeat_task = asyncio.create_task(self._rvs_heartbeat())
try:
async for raw_message in ws:
await self._handle_rvs_message(raw_message)
finally:
heartbeat_task.cancel()
except websockets.ConnectionClosed:
logger.warning("[rvs] Verbindung verloren")
except ConnectionRefusedError:
logger.warning("[rvs] Nicht erreichbar")
except (ssl.SSLError, OSError) as e:
# TLS-Fehler — Fallback auf ws:// versuchen
if not using_fallback and self.rvs_url_fallback:
logger.warning("[rvs] TLS-Fehler: %s", e)
logger.warning("[rvs] TLS gewollt aber nicht verfuegbar — Fallback auf ws://")
current_url = self.rvs_url_fallback
using_fallback = True
retry_delay = 1 # Sofort versuchen
else:
logger.error("[rvs] SSL-Fehler (kein Fallback): %s", e)
except Exception:
logger.exception("[rvs] WebSocket-Fehler")
finally:
self.ws_rvs = None
if self.running:
logger.info("[rvs] Reconnect in %ds...", retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
async def _rvs_heartbeat(self) -> None:
"""Sendet Heartbeats an den RVS damit die Verbindung offen bleibt."""
while True:
await asyncio.sleep(25)
if self.ws_rvs and self.ws_rvs.open:
try:
await self.ws_rvs.send(json.dumps({
"type": "heartbeat",
"timestamp": int(asyncio.get_event_loop().time() * 1000),
}))
except Exception:
break
async def _handle_rvs_message(self, raw_message: str) -> None:
"""Verarbeitet Nachrichten von der App (via RVS).
Unterstuetzte Typen:
- chat: Text-Nachricht → an aria-core weiterleiten
- audio: Audio-Daten → STT → an aria-core
- mode: Moduswechsel
- location: GPS-Daten (loggen, spaeter fuer Skills)
- file: Datei-Upload (an aria-core weiterleiten)
"""
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
logger.error("[rvs] Ungueltige JSON: %s", raw_message[:100])
return
msg_type = message.get("type", "")
payload = message.get("payload", {})
if msg_type == "chat":
# Text von der App → an aria-core
text = payload.get("text", "")
if text:
logger.info("[rvs] App-Chat: '%s'", text[:80])
await self.send_to_core(text, source="app")
elif msg_type == "mode":
# Moduswechsel von der App
mode_name = payload.get("mode", "")
new_mode = detect_mode_switch(mode_name)
if new_mode is not None:
self.current_mode = new_mode
logger.info(
"[rvs] Modus → %s %s (von App)",
self.current_mode.config.emoji,
self.current_mode.config.name,
)
elif msg_type == "location":
# GPS-Daten von der App
lat = payload.get("lat")
lng = payload.get("lng")
speed = payload.get("speed")
logger.info("[rvs] GPS: lat=%.4f lng=%.4f speed=%s", lat or 0, lng or 0, speed)
# An aria-core weiterleiten (fuer kontextbasierte Skills)
if self.ws_core:
await self.ws_core.send(raw_message)
elif msg_type == "file":
# Datei von der App → an aria-core
logger.info("[rvs] Datei empfangen: %s", payload.get("name", "?"))
if self.ws_core:
await self.ws_core.send(raw_message)
elif msg_type == "audio":
# Audio von der App → STT → an aria-core
logger.info("[rvs] Audio empfangen — TODO: STT")
# Spaeter: Audio decodieren, durch Whisper jagen, Ergebnis an core
else:
logger.debug("[rvs] Unbekannter Typ: %s", msg_type)
async def _send_to_rvs(self, message: dict) -> None:
"""Sendet eine Nachricht an die App (via RVS)."""
if self.ws_rvs is None or not self.ws_rvs.open:
return
try:
await self.ws_rvs.send(json.dumps(message))
except Exception:
logger.exception("[rvs] Sendefehler")
# ── Log-Streaming an die App ─────────────────────────────
async def send_log_to_app(self, source: str, message: str, level: str = "info") -> None:
"""Sendet einen Log-Eintrag an die App (erscheint im Log-Viewer)."""
await self._send_to_rvs({
"type": "log",
"payload": {
"source": source,
"message": message,
"level": level,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
async def send_event_to_app(self, title: str, description: str) -> None:
"""Sendet ein Event an die App (erscheint im Event-Feed)."""
await self._send_to_rvs({
"type": "event",
"payload": {
"title": title,
"description": description,
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
# ── Audio-Schleife (lokales Mikrofon) ────────────────────
async def audio_loop(self) -> None:
"""Wake-Word erkennen, aufnehmen, transkribieren, an aria-core senden."""
logger.info("Audio-Schleife gestartet — warte auf Wake-Word '%s'...", self.wake_word.wake_word_key)
loop = asyncio.get_event_loop()
while self.running:
try:
audio_chunk = sd.rec(
BLOCK_SIZE,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
detected = await loop.run_in_executor(
None, self.wake_word.detect, audio_chunk.flatten()
)
if detected:
logger.info("Wake-Word erkannt — starte Aufnahme")
await self.send_event_to_app(
"Wake-Word erkannt",
"ARIA hoert zu...",
)
audio_data = await loop.run_in_executor(None, record_audio)
text = await loop.run_in_executor(
None, self.stt_engine.transcribe, audio_data
)
if text.strip():
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
await self.send_to_core(text, source="bridge")
else:
logger.info("Keine Sprache erkannt — ignoriert")
except sd.PortAudioError:
logger.error("Audio-Geraet nicht verfuegbar — warte 5 Sekunden")
await asyncio.sleep(5)
except Exception:
logger.exception("Fehler in der Audio-Schleife")
await asyncio.sleep(1)
# ── Run & Shutdown ───────────────────────────────────────
async def run(self) -> None:
"""Startet die Bridge mit allen Verbindungen parallel.
Ohne Audio-Geraet laeuft nur core + rvs (reiner Relay-Modus).
"""
self.running = True
tasks = [
asyncio.create_task(self.connect_to_core()),
asyncio.create_task(self.connect_to_rvs()),
]
if self.audio_available:
tasks.append(asyncio.create_task(self.audio_loop()))
else:
logger.info("Audio-Loop deaktiviert — kein Audio-Geraet")
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logger.info("Bridge-Tasks abgebrochen")
def shutdown(self) -> None:
"""Faehrt die Bridge sauber herunter."""
logger.info("Bridge wird heruntergefahren...")
self.running = False
# ── Hauptprogramm ────────────────────────────────────────────
def main() -> None:
"""Startet die ARIA Voice Bridge."""
bridge = ARIABridge()
# Signal-Handler fuer sauberes Herunterfahren
def handle_signal(signum: int, _frame: object) -> None:
sig_name = signal.Signals(signum).name
logger.info("Signal %s empfangen — fahre herunter", sig_name)
bridge.shutdown()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Initialisierung (synchron — bevor die Event-Loop startet)
try:
bridge.initialize()
except Exception:
logger.exception("Initialisierung fehlgeschlagen")
sys.exit(1)
# Event-Loop starten
try:
asyncio.run(bridge.run())
except KeyboardInterrupt:
logger.info("Keyboard Interrupt — Bridge beendet")
finally:
logger.info("ARIA Voice Bridge beendet")
if __name__ == "__main__":
main()