1781 lines
74 KiB
Python
1781 lines
74 KiB
Python
"""
|
||
ARIA Voice Bridge — Hauptmodul.
|
||
|
||
Verbindet die Android App (via RVS) mit ARIA-Core und bietet
|
||
lokale Spracheingabe (Wake-Word + Whisper STT) und Sprachausgabe (Piper TTS).
|
||
|
||
Nachrichtenfluss:
|
||
App → RVS → Bridge → aria-core
|
||
aria-core → Bridge → RVS → App
|
||
→ Lautsprecher (TTS)
|
||
|
||
Stimmen:
|
||
- Ramona (de_DE-ramona-low) — Alltag, Gespraeche
|
||
- Thorsten (de_DE-thorsten-high) — epische Momente, Alarme
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
import logging
|
||
import os
|
||
import signal
|
||
import ssl
|
||
import sys
|
||
import tempfile
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import subprocess
|
||
import urllib.request
|
||
import numpy as np
|
||
import sounddevice as sd
|
||
import websockets
|
||
from faster_whisper import WhisperModel
|
||
from openwakeword.model import Model as WakeWordModel
|
||
|
||
from modes import Mode, canonical_id, detect_mode_switch, mode_from_id, should_speak
|
||
|
||
# ── Logging ──────────────────────────────────────────────────
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
stream=sys.stdout,
|
||
)
|
||
logger = logging.getLogger("aria-bridge")
|
||
|
||
# ── Konfiguration ───────────────────────────────────────────
|
||
|
||
CONFIG_PATH = Path("/config/aria.env")
|
||
VOICES_DIR = Path("/voices")
|
||
CORE_WS_URL = os.getenv("ARIA_CORE_WS", "ws://127.0.0.1:18789")
|
||
CORE_AUTH_TOKEN = os.getenv("ARIA_AUTH_TOKEN", "") # OpenClaw Gateway Token
|
||
RVS_HOST = os.getenv("RVS_HOST", "") # z.B. rvs.hackersoft.de
|
||
RVS_PORT = os.getenv("RVS_PORT", "443") # Port des RVS
|
||
RVS_TLS = os.getenv("RVS_TLS", "true") # true = wss://, false = ws://
|
||
RVS_TLS_FALLBACK = os.getenv("RVS_TLS_FALLBACK", "true") # Bei TLS-Fehler ws:// versuchen
|
||
RVS_TOKEN = os.getenv("RVS_TOKEN", "") # Pairing-Token (gleich wie in der App)
|
||
DIAGNOSTIC_URL = os.getenv("DIAGNOSTIC_URL", "http://127.0.0.1:3001") # Diagnostic API
|
||
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "medium")
|
||
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "de")
|
||
|
||
# Audio-Parameter
|
||
SAMPLE_RATE = 16000
|
||
CHANNELS = 1
|
||
BLOCK_SIZE = 1280 # 80ms bei 16kHz — gut fuer Wake-Word-Erkennung
|
||
RECORD_SECONDS = 8 # Max. Aufnahmedauer nach Wake-Word
|
||
|
||
def load_config() -> dict[str, str]:
|
||
"""Laedt Konfiguration.
|
||
|
||
Reihenfolge (hoechste Prioritaet zuletzt):
|
||
1. /config/aria.env (bind-mount)
|
||
2. /shared/config/runtime.json (zentral gepflegt ueber Diagnostic UI)
|
||
|
||
Werte aus runtime.json ueberschreiben die env-Datei.
|
||
"""
|
||
config: dict[str, str] = {}
|
||
if CONFIG_PATH.exists():
|
||
for line in CONFIG_PATH.read_text().splitlines():
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" in line:
|
||
key, _, value = line.partition("=")
|
||
config[key.strip()] = value.strip()
|
||
logger.info("Konfiguration geladen aus %s", CONFIG_PATH)
|
||
else:
|
||
logger.warning("Keine Konfiguration gefunden: %s", CONFIG_PATH)
|
||
|
||
# Runtime-Overrides aus zentralem Shared-Volume (Diagnostic UI)
|
||
runtime_path = Path("/shared/config/runtime.json")
|
||
if runtime_path.exists():
|
||
try:
|
||
runtime = json.loads(runtime_path.read_text())
|
||
overrides = {k: str(v) for k, v in runtime.items() if v not in (None, "")}
|
||
if overrides:
|
||
config.update(overrides)
|
||
logger.info("Runtime-Overrides geladen: %s", sorted(overrides.keys()))
|
||
except Exception as e:
|
||
logger.warning("runtime.json konnte nicht gelesen werden: %s", e)
|
||
return config
|
||
|
||
|
||
# ── Voice Engine ─────────────────────────────────────────────
|
||
|
||
|
||
import re as _re_tts
|
||
|
||
_NUM_WORDS_DE = {
|
||
0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fuenf",
|
||
6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn",
|
||
11: "elf", 12: "zwoelf", 13: "dreizehn", 14: "vierzehn", 15: "fuenfzehn",
|
||
16: "sechzehn", 17: "siebzehn", 18: "achtzehn", 19: "neunzehn", 20: "zwanzig",
|
||
}
|
||
_TENS_DE = {30: "dreissig", 40: "vierzig", 50: "fuenfzig"}
|
||
|
||
|
||
def _num_to_words_de(n: int) -> str:
|
||
"""Zahlen 0-59 als deutsches Wort — fuer Uhrzeiten und kleine Bereiche."""
|
||
if n in _NUM_WORDS_DE:
|
||
return _NUM_WORDS_DE[n]
|
||
if 21 <= n <= 29:
|
||
return f"{_NUM_WORDS_DE[n - 20]}undzwanzig"
|
||
if 30 <= n <= 59:
|
||
tens = (n // 10) * 10
|
||
ones = n % 10
|
||
tens_word = _TENS_DE.get(tens, str(tens))
|
||
if ones == 0:
|
||
return tens_word
|
||
return f"{_NUM_WORDS_DE.get(ones, str(ones))}und{tens_word}"
|
||
return str(n)
|
||
|
||
|
||
def _time_range_to_words(m):
|
||
"""'8:00-9:00 Uhr' → 'acht bis neun Uhr', '8-9 Uhr' → 'acht bis neun Uhr'."""
|
||
h1 = int(m.group(1))
|
||
h2 = int(m.group(3))
|
||
return f"{_num_to_words_de(h1)} bis {_num_to_words_de(h2)} Uhr"
|
||
|
||
|
||
def _small_range_to_words(m):
|
||
"""'5-6' → 'fuenf bis sechs' (nur wenn beide Zahlen ≤ 24)."""
|
||
a, b = int(m.group(1)), int(m.group(2))
|
||
if a > 24 or b > 24 or a >= b:
|
||
return m.group(0)
|
||
return f"{_num_to_words_de(a)} bis {_num_to_words_de(b)}"
|
||
|
||
|
||
def _decimal_to_words(m):
|
||
"""'0.1' / '0,1' → 'null komma eins', '1,25' → 'eins komma zwei fuenf'."""
|
||
int_part = int(m.group(1))
|
||
dec_part = m.group(2)
|
||
int_word = _num_to_words_de(int_part) if 0 <= int_part <= 59 else str(int_part)
|
||
dec_words = " ".join(_num_to_words_de(int(d)) for d in dec_part)
|
||
return f"{int_word} komma {dec_words}"
|
||
|
||
|
||
_UNIT_WORDS = [
|
||
(r'\bTB\b', 'Terabyte'),
|
||
(r'\bGB\b', 'Gigabyte'),
|
||
(r'\bMB\b', 'Megabyte'),
|
||
(r'\bKB\b', 'Kilobyte'),
|
||
(r'\bkB\b', 'Kilobyte'),
|
||
(r'\bms\b', 'Millisekunden'),
|
||
(r'\bkm/h\b', 'Kilometer pro Stunde'),
|
||
(r'\bkm\b', 'Kilometer'),
|
||
(r'\bm/s\b', 'Meter pro Sekunde'),
|
||
(r'\bkg\b', 'Kilogramm'),
|
||
(r'\b°C\b', 'Grad Celsius'),
|
||
(r'°C', ' Grad Celsius'),
|
||
(r'\bMbps\b', 'Megabit pro Sekunde'),
|
||
(r'\bGbps\b', 'Gigabit pro Sekunde'),
|
||
(r'\bMhz\b|\bMHz\b', 'Megahertz'),
|
||
(r'\bGhz\b|\bGHz\b', 'Gigahertz'),
|
||
(r'%', ' Prozent'),
|
||
(r'\bCPU\b', 'C P U'),
|
||
(r'\bGPU\b', 'G P U'),
|
||
(r'\bRAM\b', 'R A M'),
|
||
(r'\bSSD\b', 'S S D'),
|
||
(r'\bHDD\b', 'H D D'),
|
||
(r'\bURL\b', 'U R L'),
|
||
(r'\bAPI\b', 'A P I'),
|
||
(r'\bRVS\b', 'R V S'),
|
||
(r'\bSSH\b', 'S S H'),
|
||
(r'\bVM\b', 'V M'),
|
||
(r'\bUI\b', 'U I'),
|
||
(r'\bTTS\b', 'T T S'),
|
||
(r'\bSTT\b', 'S T T'),
|
||
(r'\bTLS\b', 'T L S'),
|
||
]
|
||
|
||
|
||
def clean_text_for_tts(text: str) -> str:
|
||
"""Bereitet Chat-Text fuer Sprachausgabe auf.
|
||
|
||
- `<voice>...</voice>` Tag: wenn vorhanden, NUR dieser Inhalt wird gelesen
|
||
- Code-Bloecke (```...``` und `...`) werden komplett entfernt
|
||
- Markdown (Fett, Kursiv, Links, Headings, Listen, Zitate) wird abgeraeumt
|
||
- Einheiten und gaengige Abkuerzungen werden ausgeschrieben (22GB → 22 Gigabyte)
|
||
- URLs werden durch "ein Link" ersetzt
|
||
- Mehrfach-Leerzeichen/Umbrueche normalisiert
|
||
"""
|
||
if not text:
|
||
return ""
|
||
|
||
# <voice>...</voice> wenn vorhanden → nur das nehmen
|
||
voice_match = _re_tts.search(r'<voice>([\s\S]*?)</voice>', text, _re_tts.IGNORECASE)
|
||
if voice_match:
|
||
text = voice_match.group(1)
|
||
|
||
t = text
|
||
|
||
# Code-Bloecke komplett raus (Zeilenumbruch statt Platzhalter — sonst bricht Satzlogik)
|
||
t = _re_tts.sub(r'```[\s\S]*?```', '. ', t)
|
||
t = _re_tts.sub(r'`[^`]+`', '', t)
|
||
|
||
# Markdown
|
||
t = _re_tts.sub(r'\*\*([^*]+)\*\*', r'\1', t)
|
||
t = _re_tts.sub(r'\*([^*]+)\*', r'\1', t)
|
||
t = _re_tts.sub(r'__([^_]+)__', r'\1', t)
|
||
t = _re_tts.sub(r'\[([^\]]+)\]\((https?://[^)]+)\)', r'\1, ein Link', t)
|
||
t = _re_tts.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', t)
|
||
t = _re_tts.sub(r'https?://\S+', 'ein Link', t)
|
||
t = _re_tts.sub(r'^#{1,6}\s*', '', t, flags=_re_tts.MULTILINE)
|
||
t = _re_tts.sub(r'^>\s*', '', t, flags=_re_tts.MULTILINE)
|
||
t = _re_tts.sub(r'^[\-\*]\s+', '', t, flags=_re_tts.MULTILINE)
|
||
|
||
# Zeitbereiche: "8:00-9:00 Uhr" / "8-9 Uhr" → "acht bis neun Uhr"
|
||
t = _re_tts.sub(r'\b(\d{1,2})(:\d{2})?\s*[-–]\s*(\d{1,2})(:\d{2})?\s*Uhr\b', _time_range_to_words, t)
|
||
# Uhrzeiten mit Minuten: "8:30 Uhr" → "acht Uhr dreissig", "8:00 Uhr" → "acht Uhr"
|
||
def _single_time(m):
|
||
h = int(m.group(1))
|
||
mn = int(m.group(2)) if m.group(2) else 0
|
||
words = _num_to_words_de(h) + " Uhr"
|
||
if mn > 0:
|
||
words += " " + _num_to_words_de(mn)
|
||
return words
|
||
t = _re_tts.sub(r'\b(\d{1,2}):(\d{2})\s*Uhr\b', _single_time, t)
|
||
# Volle Uhrzeiten ohne ":" — "15 Uhr" → "fuenfzehn Uhr"
|
||
t = _re_tts.sub(r'\b(\d{1,2})\s+Uhr\b', lambda m: f"{_num_to_words_de(int(m.group(1)))} Uhr", t)
|
||
# Kleine Zahlen-Bereiche ohne "Uhr": "5-6" → "fuenf bis sechs"
|
||
t = _re_tts.sub(r'\b(\d{1,2})\s*[-–]\s*(\d{1,2})\b', _small_range_to_words, t)
|
||
|
||
# Dezimalzahlen: "0.1" / "0,5" / "1,25" → "null komma eins" / "null komma fuenf" / ...
|
||
# Muss vor "Zahl+Einheit" laufen, sonst frisst die Unit-Regel den Nachkommaanteil.
|
||
# Lookahead verhindert Match auf IP-artigen Strings wie 192.168.1.1.
|
||
t = _re_tts.sub(r'\b(\d+)[.,](\d+)(?![.,\d])', _decimal_to_words, t)
|
||
|
||
# Zahlen + Einheit: "22GB" → "22 Gigabyte" (Leerzeichen einfuegen)
|
||
t = _re_tts.sub(r'(\d+)([A-Za-z]{1,4})\b', r'\1 \2', t)
|
||
|
||
# Einheiten/Abkuerzungen ausschreiben
|
||
for pat, repl in _UNIT_WORDS:
|
||
t = _re_tts.sub(pat, repl, t)
|
||
|
||
# Generisches Buchstabieren: alle verbleibenden 2-5-Zeichen-Grossbuchstaben-Woerter
|
||
# (XTTS, USB, DNS, JSON, HTML, ...) → "X T T S". Laeuft NACH der expliziten Liste,
|
||
# damit TTS/GPU/... schon aufgeloest sind. "WLAN"-artige, die als Wort gesprochen
|
||
# werden, koennen bei Bedarf explizit in _UNIT_WORDS uebersteuert werden.
|
||
t = _re_tts.sub(r'\b([A-Z]{2,5})\b', lambda m: " ".join(m.group(1)), t)
|
||
|
||
# Anfuehrungszeichen
|
||
t = _re_tts.sub(r'["""„`]', '', t)
|
||
|
||
# Absaetze/Zeilenumbrueche normalisieren
|
||
t = _re_tts.sub(r'\n{2,}', '. ', t)
|
||
t = _re_tts.sub(r'\n', ', ', t)
|
||
t = _re_tts.sub(r'\s{2,}', ' ', t)
|
||
t = _re_tts.sub(r'\s*\.\s*\.\s*', '. ', t)
|
||
|
||
return t.strip()
|
||
|
||
|
||
# ── STT Engine ───────────────────────────────────────────────
|
||
|
||
|
||
class STTEngine:
|
||
"""Whisper Speech-to-Text — laeuft komplett lokal."""
|
||
|
||
def __init__(self, model_size: str = "small", language: str = "de") -> None:
|
||
self.model_size = model_size
|
||
self.language = language
|
||
self.model: Optional[WhisperModel] = None
|
||
|
||
def initialize(self) -> None:
|
||
"""Laedt das Whisper-Modell."""
|
||
logger.info(
|
||
"Lade Whisper-Modell '%s' (Sprache: %s)...",
|
||
self.model_size,
|
||
self.language,
|
||
)
|
||
self.model = WhisperModel(self.model_size, device="cpu", compute_type="int8")
|
||
logger.info("Whisper-Modell geladen")
|
||
|
||
def reload(self, model_size: str) -> bool:
|
||
"""Laedt ein anderes Whisper-Modell (bei Config-Aenderung)."""
|
||
if model_size == self.model_size and self.model is not None:
|
||
return False
|
||
allowed = {"tiny", "base", "small", "medium", "large-v3"}
|
||
if model_size not in allowed:
|
||
logger.warning("Ungueltiges Whisper-Modell: %s (erlaubt: %s)", model_size, allowed)
|
||
return False
|
||
logger.info("Lade Whisper-Modell neu: %s -> %s", self.model_size, model_size)
|
||
self.model_size = model_size
|
||
self.model = None
|
||
try:
|
||
self.model = WhisperModel(model_size, device="cpu", compute_type="int8")
|
||
logger.info("Whisper-Modell '%s' geladen", model_size)
|
||
return True
|
||
except Exception:
|
||
logger.exception("Whisper-Modell '%s' konnte nicht geladen werden", model_size)
|
||
return False
|
||
|
||
def transcribe(self, audio_data: np.ndarray) -> str:
|
||
"""Transkribiert Audio-Daten zu Text.
|
||
|
||
Args:
|
||
audio_data: NumPy-Array mit Audio (float32, 16kHz, mono).
|
||
|
||
Returns:
|
||
Erkannter Text oder leerer String.
|
||
"""
|
||
if self.model is None:
|
||
# Lazy-Load: normalerweise laeuft STT remote auf der Gamebox.
|
||
# Erst wenn das Fallback hier zuschlaegt, laden wir lokal.
|
||
logger.info("Lokales Whisper-Fallback — Modell wird nachgeladen...")
|
||
try:
|
||
self.initialize()
|
||
except Exception:
|
||
logger.exception("Lokales Whisper konnte nicht geladen werden")
|
||
return ""
|
||
if self.model is None:
|
||
return ""
|
||
|
||
try:
|
||
# Audio als float32 normalisieren
|
||
if audio_data.dtype != np.float32:
|
||
audio_data = audio_data.astype(np.float32) / 32768.0
|
||
|
||
segments, info = self.model.transcribe(
|
||
audio_data,
|
||
language=self.language,
|
||
beam_size=5,
|
||
vad_filter=True,
|
||
)
|
||
|
||
text = " ".join(segment.text.strip() for segment in segments)
|
||
logger.info("STT: '%s' (Sprache: %s, Dauer: %.1fs)", text, info.language, info.duration)
|
||
return text
|
||
|
||
except Exception:
|
||
logger.exception("STT-Fehler")
|
||
return ""
|
||
|
||
|
||
# ── Wake-Word Erkennung ──────────────────────────────────────
|
||
|
||
|
||
class WakeWordDetector:
|
||
"""Erkennt das Wake-Word im Audio-Stream.
|
||
|
||
Nutzt ein Custom-Modell aus /voices/wake_aria.onnx falls vorhanden,
|
||
sonst das eingebaute 'hey_jarvis' als Fallback.
|
||
"""
|
||
|
||
CUSTOM_MODEL_PATH = "/voices/wake_aria.onnx"
|
||
FALLBACK_MODEL = "hey_jarvis"
|
||
THRESHOLD = 0.5
|
||
|
||
def __init__(self) -> None:
|
||
self.model: Optional[WakeWordModel] = None
|
||
self.wake_word_key: str = ""
|
||
|
||
def initialize(self) -> None:
|
||
"""Laedt das Wake-Word-Modell.
|
||
|
||
Hinweis: WakeWordModel() wird OHNE Argumente aufgerufen.
|
||
Aeltere openwakeword-Versionen leiten unbekannte kwargs
|
||
an AudioFeatures weiter, was zum Crash fuehrt.
|
||
"""
|
||
logger.info("Lade Wake-Word-Modell...")
|
||
|
||
# Alle eingebauten Modelle laden (ohne kwargs — Kompatibilitaet!)
|
||
self.model = WakeWordModel()
|
||
|
||
# Verfuegbare Modelle ermitteln
|
||
available = list(self.model.models.keys()) if hasattr(self.model, 'models') else []
|
||
logger.info("Verfuegbare Wake-Words: %s", ", ".join(available) if available else "(keine)")
|
||
|
||
# Bestes Modell auswaehlen
|
||
if self.FALLBACK_MODEL in available:
|
||
self.wake_word_key = self.FALLBACK_MODEL
|
||
elif available:
|
||
self.wake_word_key = available[0]
|
||
else:
|
||
self.wake_word_key = self.FALLBACK_MODEL
|
||
|
||
logger.info("Wake-Word aktiv: '%s'", self.wake_word_key)
|
||
logger.info(
|
||
"Tipp: Custom 'aria' Wake-Word trainieren → "
|
||
"https://github.com/dscripka/openWakeWord#training-new-models"
|
||
)
|
||
|
||
def detect(self, audio_chunk: np.ndarray) -> bool:
|
||
"""Prueft ob das Wake-Word im Audio-Chunk enthalten ist.
|
||
|
||
Args:
|
||
audio_chunk: Audio-Daten (int16, 16kHz).
|
||
|
||
Returns:
|
||
True wenn Wake-Word erkannt wurde.
|
||
"""
|
||
if self.model is None:
|
||
return False
|
||
|
||
prediction = self.model.predict(audio_chunk)
|
||
|
||
# openwakeword gibt Scores pro Modell zurueck
|
||
for key, score in prediction.items():
|
||
if score > self.THRESHOLD:
|
||
logger.info("Wake-Word '%s' erkannt! (Score: %.2f)", key, score)
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
# ── Audio-Aufnahme ───────────────────────────────────────────
|
||
|
||
|
||
def record_audio(duration: float = RECORD_SECONDS) -> np.ndarray:
|
||
"""Nimmt Audio vom Mikrofon auf.
|
||
|
||
Args:
|
||
duration: Aufnahmedauer in Sekunden.
|
||
|
||
Returns:
|
||
NumPy-Array mit Audio-Daten (int16, 16kHz, mono).
|
||
"""
|
||
logger.info("Aufnahme laeuft... (%d Sekunden)", duration)
|
||
audio = sd.rec(
|
||
int(duration * SAMPLE_RATE),
|
||
samplerate=SAMPLE_RATE,
|
||
channels=CHANNELS,
|
||
dtype="int16",
|
||
)
|
||
sd.wait()
|
||
logger.info("Aufnahme beendet")
|
||
return audio.flatten()
|
||
|
||
|
||
# ── Bridge Hauptklasse ───────────────────────────────────────
|
||
|
||
|
||
class ARIABridge:
|
||
"""ARIA Voice Bridge — verbindet App (via RVS) und Sprache mit ARIA-Core.
|
||
|
||
Drei parallele Aufgaben:
|
||
1. connect_to_core() — WebSocket zu aria-core (lokal)
|
||
2. connect_to_rvs() — WebSocket zum RVS (oeffentlich, fuer die App)
|
||
3. audio_loop() — Wake-Word + STT (lokales Mikrofon)
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
self.config = load_config()
|
||
self.ws_url = self.config.get("ARIA_CORE_WS", CORE_WS_URL)
|
||
self.core_auth_token = self.config.get("ARIA_AUTH_TOKEN", CORE_AUTH_TOKEN)
|
||
self._req_id_counter = 0
|
||
self._session_key = "main" # Fallback, wird per Diagnostic API aktualisiert
|
||
self._diagnostic_url = self.config.get("DIAGNOSTIC_URL", DIAGNOSTIC_URL)
|
||
# RVS-Verbindungsinfo aus Config oder Env
|
||
rvs_host = self.config.get("RVS_HOST", RVS_HOST)
|
||
rvs_port = self.config.get("RVS_PORT", RVS_PORT)
|
||
rvs_tls = self.config.get("RVS_TLS", RVS_TLS).lower() == "true"
|
||
self.rvs_tls_fallback = self.config.get("RVS_TLS_FALLBACK", RVS_TLS_FALLBACK).lower() == "true"
|
||
self.rvs_token = self.config.get("RVS_TOKEN", RVS_TOKEN)
|
||
# URLs zusammenbauen (primaer + fallback)
|
||
if rvs_host:
|
||
proto = "wss" if rvs_tls else "ws"
|
||
self.rvs_url = f"{proto}://{rvs_host}:{rvs_port}"
|
||
# Fallback-URL (ohne TLS) nur wenn TLS aktiv und Fallback erlaubt
|
||
if rvs_tls and self.rvs_tls_fallback:
|
||
self.rvs_url_fallback = f"ws://{rvs_host}:{rvs_port}"
|
||
else:
|
||
self.rvs_url_fallback = ""
|
||
else:
|
||
self.rvs_url = ""
|
||
self.rvs_url_fallback = ""
|
||
# Mode aus Shared Config laden (persistiert ueber Container-Restarts)
|
||
self.current_mode = self._load_persisted_mode()
|
||
self.running = False
|
||
|
||
# Komponenten (TTS: immer XTTS remote, Piper wurde entfernt)
|
||
self.tts_enabled = True
|
||
self.xtts_voice = ""
|
||
self._f5tts_config: dict = {}
|
||
vc: dict = {}
|
||
# Gespeicherte Voice-Config laden
|
||
try:
|
||
vc_path = "/shared/config/voice_config.json"
|
||
if os.path.exists(vc_path):
|
||
with open(vc_path) as f:
|
||
vc = json.load(f)
|
||
self.tts_enabled = vc.get("ttsEnabled", True)
|
||
self.xtts_voice = vc.get("xttsVoice", "")
|
||
# F5-TTS-Felder aufsammeln (werden spaeter via RVS rebroadcastet,
|
||
# damit die f5tts-bridge auf der Gamebox die Settings auch nach
|
||
# Restart wiederbekommt — sonst stuende sie auf Hard-Defaults)
|
||
for k in ("f5ttsModel", "f5ttsCkptFile", "f5ttsVocabFile",
|
||
"f5ttsCfgStrength", "f5ttsNfeStep"):
|
||
if k in vc:
|
||
self._f5tts_config[k] = vc[k]
|
||
logger.info("Voice-Config geladen: tts=%s voice=%s f5tts=%s",
|
||
self.tts_enabled, self.xtts_voice or "default",
|
||
self._f5tts_config or "defaults")
|
||
except Exception as e:
|
||
logger.warning("Voice-Config laden fehlgeschlagen: %s", e)
|
||
# Whisper-Modell: Config hat Vorrang, dann env/Default (medium)
|
||
whisper_model = vc.get("whisperModel") or self.config.get("WHISPER_MODEL", WHISPER_MODEL)
|
||
self.stt_engine = STTEngine(
|
||
model_size=whisper_model,
|
||
language=self.config.get("WHISPER_LANGUAGE", WHISPER_LANGUAGE),
|
||
)
|
||
self.wake_word = WakeWordDetector()
|
||
|
||
# WebSocket-Verbindungen
|
||
self.ws_core: Optional[websockets.WebSocketClientProtocol] = None
|
||
self.ws_rvs: Optional[websockets.WebSocketClientProtocol] = None
|
||
|
||
# Letzter gesendeter agent_activity-State (zum Entduplizieren)
|
||
self._last_activity_state: Optional[tuple] = None
|
||
# Zeitstempel des letzten chat:final — waehrend 3s danach werden
|
||
# trailing Agent-Events unterdrueckt (Core raeumt manchmal nach).
|
||
self._last_chat_final_at: float = 0.0
|
||
# requestId → messageId Map fuer XTTS-Audio-Cache (App-seitige Zuordnung)
|
||
self._xtts_request_to_message: dict[str, str] = {}
|
||
# Voice-Override aus letzter Chat-Nachricht einer App.
|
||
# Wird fuer die direkt folgende ARIA-Antwort genutzt und dann zurueckgesetzt.
|
||
# So kann jedes Geraet seine bevorzugte Stimme bekommen (pro Request).
|
||
self._next_voice_override: Optional[str] = None
|
||
# STT-Requests die aktuell auf Antwort von der whisper-bridge (Gamebox) warten.
|
||
# requestId → Future mit dem Text (oder None bei Fehler).
|
||
self._pending_stt: dict[str, asyncio.Future] = {}
|
||
|
||
def initialize(self) -> None:
|
||
"""Initialisiert alle Komponenten.
|
||
|
||
Audio-Komponenten (TTS, STT, Wake-Word) sind optional —
|
||
wenn kein Audio-Geraet vorhanden ist (z.B. VM ohne Soundkarte),
|
||
laeuft die Bridge trotzdem als reiner RVS-Relay.
|
||
"""
|
||
logger.info("=" * 50)
|
||
logger.info("ARIA Voice Bridge startet...")
|
||
logger.info("=" * 50)
|
||
|
||
# STT wird standardmaessig von der whisper-bridge (Gamebox) erledigt.
|
||
# Lokales Whisper ist nur Fallback und wird lazy geladen wenn remote nicht
|
||
# antwortet. Das spart RAM auf der VM und Startup-Zeit.
|
||
|
||
# Audio-Hardware pruefen (fuer lokales Mikro/Lautsprecher)
|
||
self.audio_available = False
|
||
try:
|
||
devices = sd.query_devices()
|
||
sd.query_devices(kind='output')
|
||
self.audio_available = True
|
||
logger.info("Audio-Geraet gefunden — Wake-Word und lokale TTS aktiv")
|
||
self.wake_word.initialize()
|
||
except (sd.PortAudioError, Exception):
|
||
logger.warning("Kein Audio-Geraet — Wake-Word und lokale Wiedergabe deaktiviert")
|
||
logger.info("TTS rendert fuer App (via RVS), STT verarbeitet App-Audio")
|
||
|
||
logger.info("Alle Komponenten initialisiert")
|
||
logger.info("aria-core: %s", self.ws_url)
|
||
if self.rvs_url and self.rvs_token:
|
||
logger.info("RVS: %s (Token: %s...)", self.rvs_url, self.rvs_token[:8])
|
||
else:
|
||
logger.warning("RVS nicht konfiguriert — App-Verbindung deaktiviert")
|
||
logger.warning(" Setze RVS_HOST, RVS_PORT, RVS_TOKEN in /config/aria.env")
|
||
logger.info("Modus: %s %s", self.current_mode.config.emoji, self.current_mode.config.name)
|
||
|
||
# ── aria-core Verbindung (OpenClaw Gateway Protokoll) ───
|
||
|
||
def _next_req_id(self) -> str:
|
||
"""Erzeugt eine eindeutige Request-ID fuer das OpenClaw-Protokoll."""
|
||
self._req_id_counter += 1
|
||
return f"bridge-{self._req_id_counter}"
|
||
|
||
async def _openclaw_handshake(self, ws: websockets.WebSocketClientProtocol) -> bool:
|
||
"""Fuehrt den OpenClaw Gateway Handshake durch.
|
||
|
||
1. Wartet auf connect.challenge Event vom Gateway
|
||
2. Sendet connect Request mit Auth-Token
|
||
3. Wartet auf hello-ok Response
|
||
|
||
Returns:
|
||
True wenn Handshake erfolgreich, sonst False.
|
||
"""
|
||
try:
|
||
# Schritt 1: Auf Challenge warten (max 10s)
|
||
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
|
||
challenge = json.loads(raw)
|
||
|
||
if challenge.get("type") != "event" or challenge.get("event") != "connect.challenge":
|
||
logger.error("[core] Unerwartete erste Nachricht: %s", raw[:200])
|
||
return False
|
||
|
||
nonce = challenge.get("payload", {}).get("nonce", "")
|
||
logger.info("[core] Challenge empfangen (nonce: %s...)", nonce[:8] if nonce else "?")
|
||
|
||
# Schritt 2: Connect Request senden
|
||
connect_req = {
|
||
"type": "req",
|
||
"id": self._next_req_id(),
|
||
"method": "connect",
|
||
"params": {
|
||
"minProtocol": 3,
|
||
"maxProtocol": 3,
|
||
"client": {
|
||
"id": "gateway-client",
|
||
"version": "0.0.3",
|
||
"platform": "linux",
|
||
"mode": "backend",
|
||
},
|
||
"role": "operator",
|
||
"scopes": ["operator.read", "operator.write"],
|
||
"caps": ["voice"],
|
||
"commands": [],
|
||
"permissions": {},
|
||
"auth": {"token": self.core_auth_token} if self.core_auth_token else {},
|
||
"locale": "de-DE",
|
||
"userAgent": "aria-bridge/0.0.3",
|
||
},
|
||
}
|
||
await ws.send(json.dumps(connect_req))
|
||
logger.info("[core] Connect-Request gesendet")
|
||
|
||
# Schritt 3: Auf hello-ok warten (max 10s)
|
||
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
|
||
response = json.loads(raw)
|
||
|
||
if response.get("type") == "res" and response.get("ok"):
|
||
logger.info("[core] Handshake erfolgreich — hello-ok empfangen")
|
||
return True
|
||
else:
|
||
error = response.get("error", response)
|
||
logger.error("[core] Handshake fehlgeschlagen: %s", json.dumps(error)[:200])
|
||
return False
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.error("[core] Handshake-Timeout (10s)")
|
||
return False
|
||
except Exception:
|
||
logger.exception("[core] Handshake-Fehler")
|
||
return False
|
||
|
||
async def connect_to_core(self) -> None:
|
||
"""Persistente WebSocket-Verbindung zu aria-core (OpenClaw Gateway)."""
|
||
retry_delay = 2
|
||
max_conn_failures = 6
|
||
conn_fail_count = 0
|
||
|
||
while self.running:
|
||
try:
|
||
logger.info("[core] Verbinde: %s", self.ws_url)
|
||
async with websockets.connect(self.ws_url) as ws:
|
||
# OpenClaw Handshake durchfuehren
|
||
if not await self._openclaw_handshake(ws):
|
||
logger.error("[core] Handshake fehlgeschlagen — Reconnect")
|
||
await asyncio.sleep(retry_delay)
|
||
retry_delay = min(retry_delay * 2, 30)
|
||
continue
|
||
|
||
self.ws_core = ws
|
||
retry_delay = 2
|
||
conn_fail_count = 0
|
||
logger.info("[core] Verbunden und authentifiziert")
|
||
|
||
async for message in ws:
|
||
await self._handle_core_message(message)
|
||
|
||
except websockets.ConnectionClosed:
|
||
logger.warning("[core] Verbindung verloren")
|
||
conn_fail_count += 1
|
||
except ConnectionRefusedError:
|
||
logger.warning("[core] Nicht erreichbar (%s)", self.ws_url)
|
||
conn_fail_count += 1
|
||
except Exception:
|
||
logger.exception("[core] WebSocket-Fehler")
|
||
conn_fail_count += 1
|
||
finally:
|
||
self.ws_core = None
|
||
|
||
# Nach N aufeinanderfolgenden Fehlern: Exit damit Docker neustartet
|
||
# (bekommt neuen Network-Namespace wenn aria-core restarted wurde)
|
||
if conn_fail_count >= max_conn_failures:
|
||
logger.error(
|
||
"[core] %dx nicht erreichbar — Exit fuer Docker-Restart",
|
||
conn_fail_count,
|
||
)
|
||
sys.exit(1)
|
||
|
||
if self.running:
|
||
logger.info("[core] Reconnect in %ds...", retry_delay)
|
||
await asyncio.sleep(retry_delay)
|
||
retry_delay = min(retry_delay * 2, 30)
|
||
|
||
async def _handle_core_message(self, raw_message: str) -> None:
|
||
"""Verarbeitet Nachrichten von aria-core (OpenClaw Gateway Protokoll).
|
||
|
||
Unterstuetzte Frame-Typen:
|
||
- event: chat:delta (Streaming-Tokens), chat:final (fertige Antwort)
|
||
- res: Antworten auf Requests (chat.send Acknowledgment)
|
||
"""
|
||
try:
|
||
message = json.loads(raw_message)
|
||
except json.JSONDecodeError:
|
||
logger.error("[core] Ungueltige JSON: %s", raw_message[:100])
|
||
return
|
||
|
||
frame_type = message.get("type", "")
|
||
logger.info("[core] <<< Frame: type=%s event=%s method=%s | %s",
|
||
frame_type, message.get("event", "-"), message.get("method", "-"),
|
||
raw_message[:200])
|
||
|
||
# ── Response auf unsere Requests (z.B. chat.send Ack) ──
|
||
if frame_type == "res":
|
||
req_id = message.get("id", "")
|
||
if message.get("ok"):
|
||
logger.debug("[core] Request %s bestätigt", req_id)
|
||
else:
|
||
error = message.get("error", "Unbekannt")
|
||
logger.error("[core] Request %s fehlgeschlagen: %s", req_id, error)
|
||
return
|
||
|
||
# ── Events vom Gateway ──
|
||
if frame_type != "event":
|
||
logger.debug("[core] Unbekannter Frame-Typ: %s", frame_type)
|
||
return
|
||
|
||
event_name = message.get("event", "")
|
||
payload = message.get("payload", {})
|
||
|
||
# ── agent Events: Streaming-Deltas vom LLM ──
|
||
if event_name == "agent":
|
||
data = payload.get("data", {})
|
||
delta = data.get("delta", "")
|
||
stream = payload.get("stream", "")
|
||
if delta and stream == "assistant":
|
||
logger.debug("[core] Delta: '%s'", delta[:40])
|
||
# Activity-Signal zur App (entdupliziert)
|
||
tool_name = data.get("name") or data.get("tool") or payload.get("tool") or ""
|
||
if stream == "tool_use" or data.get("type") == "tool_use":
|
||
activity = "tool"
|
||
elif stream == "assistant":
|
||
activity = "assistant"
|
||
else:
|
||
activity = "thinking"
|
||
await self._emit_activity(activity, tool_name)
|
||
return
|
||
|
||
# ── chat Events: Snapshots mit state=delta|final|error ──
|
||
if event_name == "chat":
|
||
state = payload.get("state", "")
|
||
|
||
if state == "final":
|
||
text = self._extract_chat_text(payload)
|
||
self._last_chat_final_at = asyncio.get_event_loop().time()
|
||
await self._emit_activity("idle", "")
|
||
if not text:
|
||
logger.warning("[core] chat final ohne Text: %s", json.dumps(payload)[:200])
|
||
return
|
||
logger.info("[core] Antwort: '%s'", text[:80])
|
||
await self._process_core_response(text, payload)
|
||
return
|
||
|
||
if state == "error":
|
||
error = payload.get("error", "Unbekannt")
|
||
logger.error("[core] Chat-Fehler: %s", error)
|
||
self._last_chat_final_at = asyncio.get_event_loop().time()
|
||
await self._emit_activity("idle", "")
|
||
await self._send_to_rvs({
|
||
"type": "chat",
|
||
"payload": {
|
||
"text": f"[Fehler] {error}",
|
||
"sender": "aria",
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
return
|
||
|
||
# state=delta — periodischer Snapshot, ignorieren
|
||
return
|
||
|
||
# ── Legacy event names (chat:delta, chat:final, chat:error) ──
|
||
if event_name == "chat:delta":
|
||
delta = payload.get("delta", payload.get("text", ""))
|
||
if delta:
|
||
logger.debug("[core] Delta (legacy): '%s'", delta[:40])
|
||
return
|
||
|
||
if event_name == "chat:final":
|
||
text = payload.get("text", payload.get("message", ""))
|
||
if not text:
|
||
text = self._extract_chat_text(payload)
|
||
if not text:
|
||
logger.warning("[core] chat:final ohne Text: %s", json.dumps(payload)[:200])
|
||
return
|
||
logger.info("[core] Antwort (legacy): '%s'", text[:80])
|
||
await self._process_core_response(text, payload)
|
||
return
|
||
|
||
if event_name == "chat:error":
|
||
error = payload.get("error", payload.get("message", "Unbekannt"))
|
||
logger.error("[core] Chat-Fehler (legacy): %s", error)
|
||
await self._send_to_rvs({
|
||
"type": "chat",
|
||
"payload": {
|
||
"text": f"[Fehler] {error}",
|
||
"sender": "aria",
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
return
|
||
|
||
# tick, health, etc. — ignorieren
|
||
if event_name in ("tick", "health"):
|
||
return
|
||
|
||
logger.debug("[core] Event: %s", event_name)
|
||
|
||
@staticmethod
|
||
def _extract_chat_text(payload: dict) -> str:
|
||
"""Extrahiert Text aus OpenClaw chat-Event message.content Array."""
|
||
try:
|
||
content = payload.get("message", {}).get("content", [])
|
||
if isinstance(content, list):
|
||
return "".join(
|
||
c.get("text", "") for c in content if c.get("type") == "text"
|
||
)
|
||
if isinstance(content, str):
|
||
return content
|
||
except (AttributeError, TypeError):
|
||
pass
|
||
return payload.get("text", "")
|
||
|
||
async def _process_core_response(self, text: str, payload: dict) -> None:
|
||
"""Verarbeitet eine fertige Antwort von aria-core.
|
||
|
||
- Leitet Antwort an die App weiter (via RVS)
|
||
- Sprachausgabe ueber TTS (wenn Modus erlaubt)
|
||
"""
|
||
# NO_REPLY Token: ARIA signalisiert explizit "nicht antworten"
|
||
# → komplett verwerfen (keine Chat-Nachricht, kein TTS)
|
||
# Toleranz fuer Variationen: "NO_REPLY", "no_reply", mit Punkt/Anfuehrungszeichen
|
||
stripped = text.strip().strip('."\'`*').upper()
|
||
if stripped == "NO_REPLY" or stripped.startswith("NO_REPLY"):
|
||
logger.info("[core] NO_REPLY empfangen — Antwort still verworfen")
|
||
return
|
||
|
||
metadata = payload.get("metadata", {})
|
||
is_critical = metadata.get("critical", False)
|
||
requested_voice = metadata.get("voice")
|
||
|
||
# Modus-Wechsel pruefen (Sprachbefehl im Text)
|
||
new_mode = detect_mode_switch(text)
|
||
if new_mode is not None:
|
||
self.current_mode = new_mode
|
||
self._persist_mode()
|
||
logger.info(
|
||
"[core] Modus → %s %s",
|
||
self.current_mode.config.emoji,
|
||
self.current_mode.config.name,
|
||
)
|
||
await self._broadcast_current_mode()
|
||
|
||
# Eindeutige Message-ID fuer Audio-Cache-Zuordnung
|
||
message_id = str(uuid.uuid4())
|
||
|
||
# TTS-aufbereitete Variante fuer Debug (Diagnostic zeigt optional)
|
||
tts_text_preview = clean_text_for_tts(text)
|
||
|
||
# Antwort an die App weiterleiten (als Chat-Nachricht)
|
||
await self._send_to_rvs({
|
||
"type": "chat",
|
||
"payload": {
|
||
"text": text,
|
||
"sender": "aria",
|
||
"messageId": message_id,
|
||
# Debug: aufbereiteter Text fuer TTS (App ignoriert, Diagnostic zeigt optional)
|
||
"ttsText": tts_text_preview if tts_text_preview != text else "",
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
|
||
# TTS ueber XTTS (XTTS-Bridge auf Gaming-PC)
|
||
if not (getattr(self, 'tts_enabled', True) and should_speak(self.current_mode, is_critical)):
|
||
logger.info("[core] TTS unterdrueckt (Modus: %s)", self.current_mode.config.name)
|
||
return
|
||
|
||
# Voice bestimmen: App-Override fuer diesen Request > globale Default-Voice
|
||
xtts_voice = self._next_voice_override or getattr(self, 'xtts_voice', '')
|
||
# Override verbrauchen (gilt nur fuer genau diese naechste Antwort)
|
||
if self._next_voice_override:
|
||
logger.info("[core] Nutze Voice-Override: %s", self._next_voice_override)
|
||
self._next_voice_override = None
|
||
|
||
tts_text = tts_text_preview or text
|
||
if not tts_text:
|
||
logger.info("[core] TTS-Text leer nach Cleanup — uebersprungen")
|
||
return
|
||
try:
|
||
xtts_request_id = str(uuid.uuid4())
|
||
self._xtts_request_to_message[xtts_request_id] = message_id
|
||
if len(self._xtts_request_to_message) > 100:
|
||
oldest = next(iter(self._xtts_request_to_message))
|
||
self._xtts_request_to_message.pop(oldest, None)
|
||
await self._send_to_rvs({
|
||
"type": "xtts_request",
|
||
"payload": {
|
||
"text": tts_text,
|
||
"voice": xtts_voice,
|
||
"language": "de",
|
||
"requestId": xtts_request_id,
|
||
"messageId": message_id,
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
logger.info("[core] XTTS-Request gesendet (%s): '%s'", xtts_voice or "default", tts_text[:60])
|
||
except Exception as e:
|
||
logger.error("[core] XTTS-Request fehlgeschlagen: %s — kein Audio", e)
|
||
|
||
# ── Mode Persistence (global, nicht pro Geraet) ──────
|
||
_MODE_FILE = "/shared/config/mode.json"
|
||
|
||
def _load_persisted_mode(self) -> Mode:
|
||
"""Laedt den zuletzt aktiven Modus aus Shared Config oder NORMAL."""
|
||
try:
|
||
if os.path.exists(self._MODE_FILE):
|
||
data = json.loads(Path(self._MODE_FILE).read_text())
|
||
mode_name = data.get("mode", "NORMAL")
|
||
for m in Mode:
|
||
if m.name == mode_name:
|
||
logger.info("[mode] Persistierter Modus geladen: %s", m.config.name)
|
||
return m
|
||
except Exception as e:
|
||
logger.warning("[mode] Laden fehlgeschlagen: %s", e)
|
||
return Mode.NORMAL
|
||
|
||
def _persist_mode(self) -> None:
|
||
"""Speichert den aktuellen Modus in Shared Config."""
|
||
try:
|
||
os.makedirs("/shared/config", exist_ok=True)
|
||
Path(self._MODE_FILE).write_text(json.dumps({"mode": self.current_mode.name}))
|
||
except Exception as e:
|
||
logger.warning("[mode] Speichern fehlgeschlagen: %s", e)
|
||
|
||
async def _broadcast_current_mode(self) -> None:
|
||
"""Broadcastet den aktuellen Modus an alle RVS-Clients (App + Diagnostic)."""
|
||
try:
|
||
await self._send_to_rvs({
|
||
"type": "mode",
|
||
"payload": {
|
||
"mode": canonical_id(self.current_mode),
|
||
"name": self.current_mode.config.name,
|
||
"emoji": self.current_mode.config.emoji,
|
||
"sender": "bridge", # Filter in mode-Handler gegen Loops
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
except Exception as e:
|
||
logger.debug("[mode] Broadcast fehlgeschlagen: %s", e)
|
||
|
||
async def _broadcast_persisted_config(self) -> None:
|
||
"""Broadcastet die aktuelle voice_config.json einmalig nach RVS-Connect.
|
||
|
||
Damit bekommen frisch verbundene Bridges (insbesondere die f5tts-bridge
|
||
auf der Gamebox nach Container-Restart) die zuletzt in Diagnostic
|
||
gewaehlten Settings — ohne dass der User in Diagnostic was klicken muss.
|
||
"""
|
||
try:
|
||
payload = {
|
||
"ttsEnabled": getattr(self, "tts_enabled", True),
|
||
"xttsVoice": getattr(self, "xtts_voice", ""),
|
||
"whisperModel": self.stt_engine.model_size,
|
||
}
|
||
payload.update(getattr(self, "_f5tts_config", {}) or {})
|
||
await self._send_to_rvs({
|
||
"type": "config",
|
||
"payload": payload,
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
logger.info("[rvs] Persistierte Config broadcastet: %s", payload)
|
||
except Exception as e:
|
||
logger.debug("[rvs] Config-Broadcast fehlgeschlagen: %s", e)
|
||
|
||
def _fetch_active_session(self) -> None:
|
||
"""Holt die aktive Session vom Diagnostic-Endpoint."""
|
||
try:
|
||
req = urllib.request.Request(f"{self._diagnostic_url}/api/session", method="GET")
|
||
with urllib.request.urlopen(req, timeout=2) as resp:
|
||
data = json.loads(resp.read().decode())
|
||
new_key = data.get("sessionKey", "")
|
||
if new_key and new_key != self._session_key:
|
||
logger.info("[session] Aktive Session gewechselt: %s -> %s", self._session_key, new_key)
|
||
self._session_key = new_key
|
||
except Exception as e:
|
||
logger.debug("[session] Diagnostic nicht erreichbar (%s) — nutze '%s'", e, self._session_key)
|
||
|
||
async def send_to_core(self, text: str, source: str = "bridge") -> None:
|
||
"""Sendet Text an aria-core (OpenClaw chat.send Protokoll)."""
|
||
if self.ws_core is None:
|
||
logger.error("[core] Nicht verbunden — Nachricht verworfen: '%s'", text[:60])
|
||
return
|
||
|
||
# Aktive Session vom Diagnostic holen
|
||
self._fetch_active_session()
|
||
|
||
req_id = self._next_req_id()
|
||
message = json.dumps({
|
||
"type": "req",
|
||
"id": req_id,
|
||
"method": "chat.send",
|
||
"params": {
|
||
"sessionKey": self._session_key,
|
||
"message": text,
|
||
"idempotencyKey": str(uuid.uuid4()),
|
||
},
|
||
})
|
||
|
||
try:
|
||
await self.ws_core.send(message)
|
||
logger.info("[core] chat.send (%s, id=%s): '%s'", source, req_id, text[:80])
|
||
except Exception:
|
||
logger.exception("[core] Sendefehler")
|
||
|
||
# ── RVS Verbindung (App-Relay) ──────────────────────────
|
||
|
||
async def connect_to_rvs(self) -> None:
|
||
"""Persistente WebSocket-Verbindung zum RVS mit Auto-Reconnect.
|
||
|
||
Bei TLS-Fehler wird automatisch auf ws:// gefallbackt
|
||
(wenn RVS_TLS_FALLBACK=true).
|
||
"""
|
||
if not self.rvs_url or not self.rvs_token:
|
||
logger.info("[rvs] Nicht konfiguriert — ueberspringe")
|
||
return
|
||
|
||
retry_delay = 2
|
||
current_url = self.rvs_url
|
||
using_fallback = False
|
||
|
||
while self.running:
|
||
try:
|
||
url = f"{current_url}?token={self.rvs_token}"
|
||
logger.info("[rvs] Verbinde: %s", current_url)
|
||
async with websockets.connect(url) as ws:
|
||
self.ws_rvs = ws
|
||
retry_delay = 2
|
||
logger.info("[rvs] Verbunden — warte auf App-Nachrichten")
|
||
|
||
# Aktuellen Modus broadcasten damit gerade verbundene Apps/Diagnostic
|
||
# ihren UI-State sofort syncen koennen
|
||
await self._broadcast_current_mode()
|
||
|
||
# Persistierte Voice-Config broadcasten — die f5tts-bridge auf
|
||
# der Gamebox bekommt damit nach Restart die zuletzt in
|
||
# Diagnostic gewaehlten Settings wieder (sonst stuende sie auf
|
||
# ihren Hard-Defaults).
|
||
asyncio.create_task(self._broadcast_persisted_config())
|
||
|
||
# Heartbeat senden (RVS erwartet Ping alle 30s)
|
||
heartbeat_task = asyncio.create_task(self._rvs_heartbeat())
|
||
|
||
try:
|
||
async for raw_message in ws:
|
||
await self._handle_rvs_message(raw_message)
|
||
finally:
|
||
heartbeat_task.cancel()
|
||
|
||
except websockets.ConnectionClosed:
|
||
logger.warning("[rvs] Verbindung verloren")
|
||
except ConnectionRefusedError:
|
||
logger.warning("[rvs] Nicht erreichbar")
|
||
except (ssl.SSLError, OSError) as e:
|
||
# TLS-Fehler — Fallback auf ws:// versuchen
|
||
if not using_fallback and self.rvs_url_fallback:
|
||
logger.warning("[rvs] TLS-Fehler: %s", e)
|
||
logger.warning("[rvs] TLS gewollt aber nicht verfuegbar — Fallback auf ws://")
|
||
current_url = self.rvs_url_fallback
|
||
using_fallback = True
|
||
retry_delay = 1 # Sofort versuchen
|
||
else:
|
||
logger.error("[rvs] SSL-Fehler (kein Fallback): %s", e)
|
||
except Exception:
|
||
logger.exception("[rvs] WebSocket-Fehler")
|
||
finally:
|
||
self.ws_rvs = None
|
||
|
||
if self.running:
|
||
logger.info("[rvs] Reconnect in %ds...", retry_delay)
|
||
await asyncio.sleep(retry_delay)
|
||
retry_delay = min(retry_delay * 2, 30)
|
||
|
||
async def _rvs_heartbeat(self) -> None:
|
||
"""Sendet Heartbeats + WebSocket Pings an den RVS damit die Verbindung offen bleibt."""
|
||
while True:
|
||
await asyncio.sleep(15)
|
||
if self.ws_rvs:
|
||
try:
|
||
# WebSocket Protocol-Level Ping (haelt TCP-Verbindung am Leben)
|
||
pong = await self.ws_rvs.ping()
|
||
await asyncio.wait_for(pong, timeout=10)
|
||
except Exception:
|
||
logger.warning("[rvs] Ping fehlgeschlagen — Verbindung tot, erzwinge Reconnect")
|
||
try:
|
||
await self.ws_rvs.close()
|
||
except Exception:
|
||
pass
|
||
self.ws_rvs = None
|
||
break
|
||
try:
|
||
await self.ws_rvs.send(json.dumps({
|
||
"type": "heartbeat",
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
}))
|
||
except Exception:
|
||
break
|
||
|
||
async def _handle_rvs_message(self, raw_message: str) -> None:
|
||
"""Verarbeitet Nachrichten von der App (via RVS).
|
||
|
||
Unterstuetzte Typen:
|
||
- chat: Text-Nachricht → an aria-core weiterleiten
|
||
- audio: Audio-Daten → STT → an aria-core
|
||
- mode: Moduswechsel
|
||
- location: GPS-Daten (loggen, spaeter fuer Skills)
|
||
- file: Datei-Upload (an aria-core weiterleiten)
|
||
"""
|
||
try:
|
||
message = json.loads(raw_message)
|
||
except json.JSONDecodeError:
|
||
logger.error("[rvs] Ungueltige JSON: %s", raw_message[:100])
|
||
return
|
||
|
||
msg_type = message.get("type", "")
|
||
payload = message.get("payload", {})
|
||
|
||
if msg_type == "chat":
|
||
# Nur User-Nachrichten weiterleiten — ARIA/Diagnostic-Antworten ignorieren (sonst Loop!)
|
||
sender = payload.get("sender", "")
|
||
if sender in ("aria", "stt"):
|
||
return
|
||
text = payload.get("text", "")
|
||
# Voice-Override fuer die naechste ARIA-Antwort merken
|
||
voice_override = payload.get("voice", "")
|
||
if voice_override:
|
||
self._next_voice_override = voice_override
|
||
logger.info("[rvs] Voice-Override fuer naechste Antwort: %s", voice_override)
|
||
if text:
|
||
logger.info("[rvs] App-Chat: '%s'", text[:80])
|
||
await self.send_to_core(text, source="app")
|
||
return
|
||
|
||
if msg_type == "cancel_request":
|
||
logger.info("[rvs] Cancel-Request von App — rufe Diagnostic /api/cancel auf")
|
||
await self._cancel_via_diagnostic()
|
||
await self._emit_activity("idle", "")
|
||
return
|
||
|
||
elif msg_type == "audio_pcm":
|
||
# Audio-PCM geht direkt von XTTS-Bridge an die App.
|
||
# Die aria-bridge darf es NICHT rebroadcasten — sonst bekommt die App
|
||
# jeden Chunk doppelt (einmal direkt von XTTS-Bridge via RVS-Broadcast,
|
||
# einmal indirekt via uns).
|
||
# Wir ignorieren diese Message hier einfach — messageId wird von
|
||
# XTTS-Bridge selbst im Payload mitgeliefert.
|
||
return
|
||
|
||
elif msg_type == "xtts_response":
|
||
# Legacy-Pfad (alte XTTS-Bridge mit WAV-Response). Weiterleiten als
|
||
# type "audio" — App nutzt den bestehenden WAV-Queue-Spieler.
|
||
audio_b64 = payload.get("base64", "")
|
||
error = payload.get("error", "")
|
||
req_id_full = payload.get("requestId", "")
|
||
req_id_base = req_id_full.rsplit("_", 1)[0] if "_" in req_id_full else req_id_full
|
||
linked_message_id = self._xtts_request_to_message.get(req_id_base, "")
|
||
if error:
|
||
logger.warning("[rvs] XTTS Fehler: %s", error)
|
||
return
|
||
if audio_b64:
|
||
logger.info("[rvs] XTTS-Audio legacy empfangen: %dKB", len(audio_b64) // 1365)
|
||
await self._send_to_rvs({
|
||
"type": "audio",
|
||
"payload": {
|
||
"base64": audio_b64,
|
||
"mimeType": payload.get("mimeType", "audio/wav"),
|
||
"voice": payload.get("voice", "xtts"),
|
||
"messageId": linked_message_id,
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
return
|
||
|
||
elif msg_type == "tts_request":
|
||
# App fordert TTS-Audio fuer einen Text an (Play-Button) → immer XTTS.
|
||
text = payload.get("text", "")
|
||
message_id = payload.get("messageId", "")
|
||
if not text:
|
||
return
|
||
tts_text = clean_text_for_tts(text) or text
|
||
# Voice aus App-Payload gewinnt, sonst global
|
||
xtts_voice = payload.get("voice", "") or getattr(self, 'xtts_voice', '')
|
||
try:
|
||
xtts_request_id = str(uuid.uuid4())
|
||
if message_id:
|
||
self._xtts_request_to_message[xtts_request_id] = message_id
|
||
await self._send_to_rvs({
|
||
"type": "xtts_request",
|
||
"payload": {
|
||
"text": tts_text,
|
||
"voice": xtts_voice,
|
||
"language": "de",
|
||
"requestId": xtts_request_id,
|
||
"messageId": message_id,
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
logger.info("[rvs] TTS on-demand via XTTS: '%s'", tts_text[:60])
|
||
except Exception as e:
|
||
logger.warning("[rvs] TTS on-demand fehlgeschlagen: %s", e)
|
||
return
|
||
|
||
elif msg_type == "config":
|
||
# Konfiguration von App/Diagnostic empfangen + persistent speichern.
|
||
# Felder die nicht direkt zur aria-bridge gehoeren (f5tts*) werden
|
||
# nur persistiert; die f5tts-bridge auf der Gamebox empfaengt den
|
||
# gleichen RVS-Broadcast und reagiert selber.
|
||
changed = False
|
||
if "ttsEnabled" in payload:
|
||
self.tts_enabled = bool(payload["ttsEnabled"])
|
||
logger.info("[rvs] TTS %s", "aktiviert" if self.tts_enabled else "deaktiviert")
|
||
changed = True
|
||
if "xttsVoice" in payload:
|
||
self.xtts_voice = payload["xttsVoice"]
|
||
logger.info("[rvs] XTTS-Stimme: %s", self.xtts_voice or "default")
|
||
changed = True
|
||
if "whisperModel" in payload:
|
||
new_model = payload["whisperModel"]
|
||
allowed = {"tiny", "base", "small", "medium", "large-v3"}
|
||
if new_model in allowed and new_model != self.stt_engine.model_size:
|
||
logger.info("[rvs] Whisper-Modell → %s (nur Config; Modell laedt Gamebox)",
|
||
new_model)
|
||
self.stt_engine.model_size = new_model
|
||
self.stt_engine.model = None
|
||
changed = True
|
||
# F5-TTS-Felder: einfach persistieren, f5tts-bridge applied selber.
|
||
for k in ("f5ttsModel", "f5ttsCkptFile", "f5ttsVocabFile",
|
||
"f5ttsCfgStrength", "f5ttsNfeStep"):
|
||
if k in payload:
|
||
if not hasattr(self, "_f5tts_config"):
|
||
self._f5tts_config = {}
|
||
self._f5tts_config[k] = payload[k]
|
||
changed = True
|
||
# Persistent speichern in Shared Volume
|
||
if changed:
|
||
try:
|
||
os.makedirs("/shared/config", exist_ok=True)
|
||
config_data = {
|
||
"ttsEnabled": getattr(self, "tts_enabled", True),
|
||
"xttsVoice": getattr(self, "xtts_voice", ""),
|
||
"whisperModel": self.stt_engine.model_size,
|
||
}
|
||
config_data.update(getattr(self, "_f5tts_config", {}))
|
||
with open("/shared/config/voice_config.json", "w") as f:
|
||
json.dump(config_data, f, indent=2)
|
||
logger.info("[rvs] Voice-Config gespeichert: %s", config_data)
|
||
except Exception as e:
|
||
logger.warning("[rvs] Config speichern fehlgeschlagen: %s", e)
|
||
return
|
||
|
||
elif msg_type == "mode":
|
||
# Moduswechsel von der App — ID ('normal', 'dnd', ...) ODER Aktivierungsphrase
|
||
mode_name = payload.get("mode", "")
|
||
# Sender kann der Broadcast der Bridge selbst sein — den ignorieren damit
|
||
# andere Apps nicht in eine Loop geraten
|
||
if payload.get("sender") == "bridge":
|
||
return
|
||
new_mode = mode_from_id(mode_name) or detect_mode_switch(mode_name)
|
||
if new_mode is not None and new_mode != self.current_mode:
|
||
self.current_mode = new_mode
|
||
self._persist_mode()
|
||
logger.info(
|
||
"[rvs] Modus → %s %s (von App)",
|
||
self.current_mode.config.emoji,
|
||
self.current_mode.config.name,
|
||
)
|
||
# Broadcast an ALLE Clients (App + Diagnostic) damit UI ueberall sync ist
|
||
await self._broadcast_current_mode()
|
||
elif new_mode is None:
|
||
logger.warning("[rvs] Unbekannter Modus: '%s'", mode_name)
|
||
|
||
elif msg_type == "location":
|
||
# GPS-Daten von der App
|
||
lat = payload.get("lat")
|
||
lng = payload.get("lng")
|
||
speed = payload.get("speed")
|
||
logger.info("[rvs] GPS: lat=%.4f lng=%.4f speed=%s", lat or 0, lng or 0, speed)
|
||
# An aria-core weiterleiten (fuer kontextbasierte Skills)
|
||
if self.ws_core:
|
||
await self.ws_core.send(raw_message)
|
||
|
||
elif msg_type == "file":
|
||
# Datei von der App → als Text-Nachricht an aria-core
|
||
file_name = payload.get("name", "unbekannt")
|
||
file_type = payload.get("type", "")
|
||
file_b64 = payload.get("base64", "")
|
||
file_size = payload.get("size", 0)
|
||
width = payload.get("width", 0)
|
||
height = payload.get("height", 0)
|
||
logger.info("[rvs] Datei empfangen: %s (%s, %dKB)",
|
||
file_name, file_type, len(file_b64) // 1365 if file_b64 else 0)
|
||
|
||
# Shared Volume: /shared/ ist in Bridge UND aria-core gemountet
|
||
SHARED_DIR = "/shared/uploads"
|
||
os.makedirs(SHARED_DIR, exist_ok=True)
|
||
|
||
if file_b64 and file_type.startswith("image/"):
|
||
# Bild in Shared Volume speichern
|
||
ext = ".jpg" if "jpeg" in file_type or "jpg" in file_type else ".png"
|
||
safe_name = f"img_{int(asyncio.get_event_loop().time())}_{file_name.replace('/', '_')}"
|
||
file_path = os.path.join(SHARED_DIR, safe_name if safe_name.endswith(ext) else safe_name + ext)
|
||
with open(file_path, "wb") as f:
|
||
f.write(base64.b64decode(file_b64))
|
||
size_kb = len(file_b64) // 1365
|
||
logger.info("[rvs] Bild gespeichert: %s (%dKB)", file_path, size_kb)
|
||
# ERST an aria-core senden (wichtigster Schritt)
|
||
text = (f"Stefan hat dir ein Bild geschickt: {file_name}"
|
||
f"{f' ({width}x{height}px)' if width else ''}"
|
||
f", {size_kb}KB."
|
||
f" Das Bild liegt unter: {file_path}"
|
||
f" Warte auf Stefans Anweisung was du damit tun sollst.")
|
||
await self.send_to_core(text, source="app-file")
|
||
# Dann App informieren (optional, darf nicht crashen)
|
||
try:
|
||
await self._send_to_rvs({
|
||
"type": "file_saved",
|
||
"payload": {"name": file_name, "serverPath": file_path, "mimeType": file_type},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
except Exception as e:
|
||
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
|
||
elif file_b64:
|
||
# Andere Datei in Shared Volume speichern
|
||
safe_name = f"file_{int(asyncio.get_event_loop().time())}_{file_name.replace('/', '_')}"
|
||
file_path = os.path.join(SHARED_DIR, safe_name)
|
||
with open(file_path, "wb") as f:
|
||
f.write(base64.b64decode(file_b64))
|
||
size_kb = len(file_b64) // 1365
|
||
logger.info("[rvs] Datei gespeichert: %s (%dKB)", file_path, size_kb)
|
||
# ERST an aria-core senden
|
||
text = (f"Stefan hat dir eine Datei geschickt: {file_name}"
|
||
f" ({file_type}, {size_kb}KB)."
|
||
f" Die Datei liegt unter: {file_path}"
|
||
f" Warte auf Stefans Anweisung was du damit tun sollst.")
|
||
await self.send_to_core(text, source="app-file")
|
||
try:
|
||
await self._send_to_rvs({
|
||
"type": "file_saved",
|
||
"payload": {"name": file_name, "serverPath": file_path, "mimeType": file_type},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
except Exception as e:
|
||
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
|
||
else:
|
||
text = f"Stefan hat eine Datei gesendet ({file_name}, {file_type}) aber die Daten sind leer angekommen."
|
||
await self.send_to_core(text, source="app-file")
|
||
|
||
elif msg_type == "file_request":
|
||
# App fordert eine Datei an (Re-Download nach Cache-Leerung)
|
||
server_path = payload.get("serverPath", "")
|
||
req_id = payload.get("requestId", "")
|
||
if not server_path or not server_path.startswith("/shared/"):
|
||
logger.warning("[rvs] Ungueltiger file_request: %s", server_path)
|
||
return
|
||
if not os.path.isfile(server_path):
|
||
logger.warning("[rvs] Datei nicht gefunden: %s", server_path)
|
||
await self._send_to_rvs({
|
||
"type": "file_response",
|
||
"payload": {"requestId": req_id, "error": "Datei nicht gefunden"},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
return
|
||
with open(server_path, "rb") as f:
|
||
file_b64 = base64.b64encode(f.read()).decode("ascii")
|
||
logger.info("[rvs] Re-Download: %s (%dKB)", server_path, len(file_b64) // 1365)
|
||
await self._send_to_rvs({
|
||
"type": "file_response",
|
||
"payload": {
|
||
"requestId": req_id,
|
||
"serverPath": server_path,
|
||
"base64": file_b64,
|
||
"name": os.path.basename(server_path),
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
|
||
elif msg_type == "audio":
|
||
# Audio von der App → decodieren → STT → an aria-core
|
||
audio_b64 = payload.get("base64", "")
|
||
mime_type = payload.get("mimeType", "audio/mp4")
|
||
duration_ms = payload.get("durationMs", 0)
|
||
if not audio_b64:
|
||
logger.warning("[rvs] Audio ohne Daten empfangen")
|
||
return
|
||
# Voice-Override fuer die kommende ARIA-Antwort (App-lokal gewaehlt)
|
||
voice_override = payload.get("voice", "")
|
||
if voice_override:
|
||
self._next_voice_override = voice_override
|
||
logger.info("[rvs] Voice-Override (via Audio): %s", voice_override)
|
||
logger.info("[rvs] Audio empfangen: %s, %dms, %dKB",
|
||
mime_type, duration_ms, len(audio_b64) // 1365)
|
||
asyncio.create_task(self._process_app_audio(audio_b64, mime_type))
|
||
|
||
elif msg_type == "stt_response":
|
||
# Antwort der whisper-bridge auf unseren stt_request
|
||
request_id = payload.get("requestId", "")
|
||
future = self._pending_stt.get(request_id)
|
||
if future is None or future.done():
|
||
return
|
||
error = payload.get("error", "")
|
||
if error:
|
||
logger.warning("[rvs] stt_response Fehler: %s", error)
|
||
future.set_result(None)
|
||
else:
|
||
text = payload.get("text", "")
|
||
stt_ms = payload.get("sttMs", 0)
|
||
model = payload.get("model", "?")
|
||
logger.info("[rvs] Remote-STT OK (%s, %dms): '%s'", model, stt_ms, (text or "")[:80])
|
||
future.set_result(text)
|
||
return
|
||
|
||
else:
|
||
logger.debug("[rvs] Unbekannter Typ: %s", msg_type)
|
||
|
||
# STT-Orchestrierung: zuerst Remote (Gamebox), Fallback lokal.
|
||
# Timeout grosszuegig gewaehlt, damit auch ein erstmaliger Modell-Load
|
||
# auf der Gamebox (bis ~30s bei large-v3) durchgeht.
|
||
_STT_REMOTE_TIMEOUT_S = 45.0
|
||
|
||
async def _process_app_audio(self, audio_b64: str, mime_type: str) -> None:
|
||
"""App-Audio → STT → aria-core. Primaer via whisper-bridge (RVS), Fallback lokal."""
|
||
# Erst Remote versuchen
|
||
text = await self._stt_remote(audio_b64, mime_type)
|
||
if text is None:
|
||
# Remote hat nicht geantwortet → lokales Whisper
|
||
logger.warning("[rvs] Remote-STT nicht verfuegbar — Fallback auf lokales Whisper")
|
||
text = await self._stt_local(audio_b64, mime_type)
|
||
if text is None:
|
||
return
|
||
|
||
if text.strip():
|
||
logger.info("[rvs] STT Ergebnis: '%s'", text[:80])
|
||
# ERST an aria-core senden (wichtigster Schritt)
|
||
await self.send_to_core(text, source="app-voice")
|
||
# STT-Text an RVS senden (fuer Anzeige in App + Diagnostic)
|
||
# sender="stt" damit Bridge es ignoriert (kein Loop)
|
||
try:
|
||
await self._send_to_rvs({
|
||
"type": "chat",
|
||
"payload": {
|
||
"text": text,
|
||
"sender": "stt",
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
except Exception as e:
|
||
logger.warning("[rvs] STT-Text konnte nicht an RVS gesendet werden: %s", e)
|
||
else:
|
||
logger.info("[rvs] Keine Sprache erkannt — ignoriert")
|
||
|
||
async def _stt_remote(self, audio_b64: str, mime_type: str) -> Optional[str]:
|
||
"""Schickt Audio an die whisper-bridge und wartet auf stt_response.
|
||
|
||
Rueckgabe:
|
||
str — erkannter Text (kann leer sein)
|
||
None — Remote-STT nicht erreichbar oder Fehler/Timeout (→ Fallback)
|
||
"""
|
||
if self.ws_rvs is None:
|
||
return None
|
||
|
||
request_id = str(uuid.uuid4())
|
||
loop = asyncio.get_event_loop()
|
||
future: asyncio.Future = loop.create_future()
|
||
self._pending_stt[request_id] = future
|
||
|
||
try:
|
||
model = getattr(self.stt_engine, "model_size", "small")
|
||
logger.info("[rvs] stt_request → whisper-bridge (id=%s, model=%s, %dKB)",
|
||
request_id[:8], model, len(audio_b64) // 1365)
|
||
ok = await self._send_to_rvs({
|
||
"type": "stt_request",
|
||
"payload": {
|
||
"requestId": request_id,
|
||
"audio": audio_b64,
|
||
"mimeType": mime_type,
|
||
"model": model,
|
||
"language": getattr(self.stt_engine, "language", "de"),
|
||
},
|
||
"timestamp": int(loop.time() * 1000),
|
||
})
|
||
if not ok:
|
||
logger.warning("[rvs] stt_request konnte nicht gesendet werden — skip Remote")
|
||
return None
|
||
return await asyncio.wait_for(future, timeout=self._STT_REMOTE_TIMEOUT_S)
|
||
except asyncio.TimeoutError:
|
||
logger.warning("[rvs] Remote-STT Timeout (%.0fs)", self._STT_REMOTE_TIMEOUT_S)
|
||
return None
|
||
except Exception as e:
|
||
logger.warning("[rvs] Remote-STT Fehler: %s", e)
|
||
return None
|
||
finally:
|
||
self._pending_stt.pop(request_id, None)
|
||
|
||
async def _stt_local(self, audio_b64: str, mime_type: str) -> Optional[str]:
|
||
"""Lokales Whisper-Fallback: FFmpeg → float32 → stt_engine.transcribe."""
|
||
loop = asyncio.get_event_loop()
|
||
tmp_in = None
|
||
tmp_out = None
|
||
try:
|
||
ext = ".mp4" if "mp4" in mime_type else ".wav" if "wav" in mime_type else ".ogg"
|
||
tmp_in = tempfile.NamedTemporaryFile(suffix=ext, delete=False)
|
||
tmp_in.write(base64.b64decode(audio_b64))
|
||
tmp_in.close()
|
||
|
||
tmp_out = tempfile.NamedTemporaryFile(suffix=".raw", delete=False)
|
||
tmp_out.close()
|
||
|
||
cmd = [
|
||
"ffmpeg", "-y", "-i", tmp_in.name,
|
||
"-ar", "16000", "-ac", "1", "-f", "f32le",
|
||
tmp_out.name,
|
||
]
|
||
result = await loop.run_in_executor(
|
||
None,
|
||
lambda: subprocess.run(cmd, capture_output=True, timeout=30),
|
||
)
|
||
if result.returncode != 0:
|
||
logger.error("[rvs] FFmpeg Fehler: %s", result.stderr.decode()[:200])
|
||
return None
|
||
|
||
audio_data = np.fromfile(tmp_out.name, dtype=np.float32)
|
||
if len(audio_data) == 0:
|
||
logger.warning("[rvs] Leere Audio-Daten nach Konvertierung")
|
||
return None
|
||
|
||
duration_s = len(audio_data) / 16000.0
|
||
logger.info("[rvs] Lokal-STT: %.1fs Audio, model=%s", duration_s, self.stt_engine.model_size)
|
||
return await loop.run_in_executor(None, self.stt_engine.transcribe, audio_data)
|
||
except Exception:
|
||
logger.exception("[rvs] Lokales STT fehlgeschlagen")
|
||
return None
|
||
finally:
|
||
for f in (tmp_in, tmp_out):
|
||
if f:
|
||
try:
|
||
os.unlink(f.name)
|
||
except OSError:
|
||
pass
|
||
|
||
async def _send_to_rvs(self, message: dict) -> bool:
|
||
"""Sendet eine Nachricht an die App (via RVS) mit Verbindungs-Check.
|
||
|
||
Rueckgabe: True wenn erfolgreich gesendet, False wenn Verbindung tot.
|
||
"""
|
||
if self.ws_rvs is None:
|
||
return False
|
||
|
||
# Ping-Check: Verbindung wirklich aktiv?
|
||
try:
|
||
pong = await self.ws_rvs.ping()
|
||
await asyncio.wait_for(pong, timeout=5)
|
||
except Exception:
|
||
logger.warning("[rvs] Ping fehlgeschlagen — Verbindung tot, erzwinge Reconnect")
|
||
try:
|
||
await self.ws_rvs.close()
|
||
except Exception:
|
||
pass
|
||
self.ws_rvs = None
|
||
# Reconnect wird vom connect_to_rvs Loop uebernommen
|
||
return False
|
||
|
||
try:
|
||
await self.ws_rvs.send(json.dumps(message))
|
||
return True
|
||
except Exception:
|
||
logger.warning("[rvs] Sendefehler — RVS nicht erreichbar")
|
||
return False
|
||
|
||
# ── Log-Streaming an die App ─────────────────────────────
|
||
|
||
async def _cancel_via_diagnostic(self) -> None:
|
||
"""Ruft das Diagnostic /api/cancel an — dort laeuft die volle Abbruch-Logik
|
||
(openclaw doctor --fix mit Docker-Socket)."""
|
||
def _do_request():
|
||
try:
|
||
req = urllib.request.Request(
|
||
f"{self._diagnostic_url}/api/cancel",
|
||
method="POST",
|
||
data=b"",
|
||
)
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
return resp.status
|
||
except Exception as e:
|
||
return f"error: {e}"
|
||
|
||
status = await asyncio.get_event_loop().run_in_executor(None, _do_request)
|
||
logger.info("[cancel] Diagnostic /api/cancel: %s", status)
|
||
|
||
async def _emit_activity(self, activity: str, tool: str = "") -> None:
|
||
"""Sendet agent_activity an die App — nur wenn sich der State geaendert hat.
|
||
|
||
Trailing Agent-Events nach chat:final werden 3s lang unterdrueckt
|
||
(nur 'idle' kommt immer durch)."""
|
||
if activity != "idle" and self._last_chat_final_at > 0:
|
||
since_final = asyncio.get_event_loop().time() - self._last_chat_final_at
|
||
if since_final < 3.0:
|
||
return
|
||
state = (activity, tool)
|
||
if state == self._last_activity_state:
|
||
return
|
||
self._last_activity_state = state
|
||
await self._send_to_rvs({
|
||
"type": "agent_activity",
|
||
"payload": {"activity": activity, "tool": tool},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
|
||
async def send_log_to_app(self, source: str, message: str, level: str = "info") -> None:
|
||
"""Sendet einen Log-Eintrag an die App (erscheint im Log-Viewer)."""
|
||
await self._send_to_rvs({
|
||
"type": "log",
|
||
"payload": {
|
||
"source": source,
|
||
"message": message,
|
||
"level": level,
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
|
||
async def send_event_to_app(self, title: str, description: str) -> None:
|
||
"""Sendet ein Event an die App (erscheint im Event-Feed)."""
|
||
await self._send_to_rvs({
|
||
"type": "event",
|
||
"payload": {
|
||
"title": title,
|
||
"description": description,
|
||
},
|
||
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||
})
|
||
|
||
# ── Audio-Schleife (lokales Mikrofon) ────────────────────
|
||
|
||
async def audio_loop(self) -> None:
|
||
"""Wake-Word erkennen, aufnehmen, transkribieren, an aria-core senden."""
|
||
logger.info("Audio-Schleife gestartet — warte auf Wake-Word '%s'...", self.wake_word.wake_word_key)
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
while self.running:
|
||
try:
|
||
audio_chunk = sd.rec(
|
||
BLOCK_SIZE,
|
||
samplerate=SAMPLE_RATE,
|
||
channels=CHANNELS,
|
||
dtype="int16",
|
||
)
|
||
sd.wait()
|
||
|
||
detected = await loop.run_in_executor(
|
||
None, self.wake_word.detect, audio_chunk.flatten()
|
||
)
|
||
|
||
if detected:
|
||
logger.info("Wake-Word erkannt — starte Aufnahme")
|
||
await self.send_event_to_app(
|
||
"Wake-Word erkannt",
|
||
"ARIA hoert zu...",
|
||
)
|
||
|
||
audio_data = await loop.run_in_executor(None, record_audio)
|
||
|
||
text = await loop.run_in_executor(
|
||
None, self.stt_engine.transcribe, audio_data
|
||
)
|
||
|
||
if text.strip():
|
||
new_mode = detect_mode_switch(text)
|
||
if new_mode is not None:
|
||
self.current_mode = new_mode
|
||
|
||
await self.send_to_core(text, source="bridge")
|
||
else:
|
||
logger.info("Keine Sprache erkannt — ignoriert")
|
||
|
||
except sd.PortAudioError:
|
||
if not hasattr(self, '_audio_warned'):
|
||
logger.warning("Audio-Geraet nicht verfuegbar — lokales Mikrofon deaktiviert (kein Spam mehr)")
|
||
self._audio_warned = True
|
||
await asyncio.sleep(60) # 60s statt 5s — spart Log-Spam
|
||
except Exception:
|
||
logger.exception("Fehler in der Audio-Schleife")
|
||
await asyncio.sleep(1)
|
||
|
||
# ── Run & Shutdown ───────────────────────────────────────
|
||
|
||
async def run(self) -> None:
|
||
"""Startet die Bridge mit allen Verbindungen parallel.
|
||
|
||
Ohne Audio-Geraet laeuft nur core + rvs (reiner Relay-Modus).
|
||
"""
|
||
self.running = True
|
||
|
||
tasks = [
|
||
asyncio.create_task(self.connect_to_core()),
|
||
asyncio.create_task(self.connect_to_rvs()),
|
||
]
|
||
|
||
if self.audio_available:
|
||
tasks.append(asyncio.create_task(self.audio_loop()))
|
||
else:
|
||
logger.info("Audio-Loop deaktiviert — kein Audio-Geraet")
|
||
|
||
try:
|
||
await asyncio.gather(*tasks)
|
||
except asyncio.CancelledError:
|
||
logger.info("Bridge-Tasks abgebrochen")
|
||
|
||
def shutdown(self) -> None:
|
||
"""Faehrt die Bridge sauber herunter."""
|
||
logger.info("Bridge wird heruntergefahren...")
|
||
self.running = False
|
||
|
||
|
||
# ── Hauptprogramm ────────────────────────────────────────────
|
||
|
||
|
||
def main() -> None:
|
||
"""Startet die ARIA Voice Bridge."""
|
||
bridge = ARIABridge()
|
||
|
||
# Signal-Handler fuer sauberes Herunterfahren
|
||
def handle_signal(signum: int, _frame: object) -> None:
|
||
sig_name = signal.Signals(signum).name
|
||
logger.info("Signal %s empfangen — fahre herunter", sig_name)
|
||
bridge.shutdown()
|
||
|
||
signal.signal(signal.SIGTERM, handle_signal)
|
||
signal.signal(signal.SIGINT, handle_signal)
|
||
|
||
# Initialisierung (synchron — bevor die Event-Loop startet)
|
||
try:
|
||
bridge.initialize()
|
||
except Exception:
|
||
logger.exception("Initialisierung fehlgeschlagen")
|
||
sys.exit(1)
|
||
|
||
# Event-Loop starten
|
||
try:
|
||
asyncio.run(bridge.run())
|
||
except KeyboardInterrupt:
|
||
logger.info("Keyboard Interrupt — Bridge beendet")
|
||
finally:
|
||
logger.info("ARIA Voice Bridge beendet")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|