ARIA-AGENT/bridge/aria_bridge.py

601 lines
20 KiB
Python

"""
ARIA Voice Bridge — Hauptmodul.
Verbindet Wake-Word-Erkennung, Whisper STT und Piper TTS
mit dem ARIA-Core Container ueber WebSocket.
Stimmen:
- Ramona (de_DE-ramona-low) — Alltag, Gespraeche
- Thorsten (de_DE-thorsten-high) — epische Momente, Alarme
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import signal
import sys
import tempfile
import wave
from pathlib import Path
from typing import Optional
import numpy as np
import sounddevice as sd
import websockets
from faster_whisper import WhisperModel
from openwakeword.model import Model as WakeWordModel
from piper import PiperVoice
from modes import Mode, detect_mode_switch, should_speak
# ── Logging ──────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("aria-bridge")
# ── Konfiguration ───────────────────────────────────────────
CONFIG_PATH = Path("/config/aria.env")
VOICES_DIR = Path("/voices")
CORE_WS_URL = os.getenv("ARIA_CORE_WS", "ws://aria:8080")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small")
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "de")
# Audio-Parameter
SAMPLE_RATE = 16000
CHANNELS = 1
BLOCK_SIZE = 1280 # 80ms bei 16kHz — gut fuer Wake-Word-Erkennung
RECORD_SECONDS = 8 # Max. Aufnahmedauer nach Wake-Word
# Epische Trigger — bei diesen Woertern spricht Thorsten
EPIC_TRIGGERS = [
"deploy",
"erfolgreich",
"alarm",
"so soll es sein",
"kritisch",
"server down",
"sicherheitswarnung",
"ticket geloest",
"aufgabe abgeschlossen",
]
def load_config() -> dict[str, str]:
"""Laedt Konfiguration aus /config/aria.env."""
config: dict[str, str] = {}
if CONFIG_PATH.exists():
for line in CONFIG_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
config[key.strip()] = value.strip()
logger.info("Konfiguration geladen aus %s", CONFIG_PATH)
else:
logger.warning("Keine Konfiguration gefunden: %s", CONFIG_PATH)
return config
# ── Voice Engine ─────────────────────────────────────────────
class VoiceEngine:
"""Verwaltet Piper TTS mit zwei Stimmen: Ramona und Thorsten."""
def __init__(self, voices_dir: Path) -> None:
self.voices_dir = voices_dir
self.voices: dict[str, PiperVoice] = {}
def initialize(self) -> None:
"""Laedt die Piper-Stimmen aus dem Voices-Verzeichnis."""
voice_configs = {
"ramona": "de_DE-ramona-low",
"thorsten": "de_DE-thorsten-high",
}
for name, model_name in voice_configs.items():
model_path = self.voices_dir / f"{model_name}.onnx"
config_path = self.voices_dir / f"{model_name}.onnx.json"
if not model_path.exists():
logger.error("Stimme nicht gefunden: %s", model_path)
continue
self.voices[name] = PiperVoice.load(
str(model_path),
config_path=str(config_path) if config_path.exists() else None,
)
logger.info("Stimme geladen: %s (%s)", name, model_name)
if not self.voices:
logger.error("Keine Stimmen geladen — TTS deaktiviert")
def select_voice(
self, text: str, requested_voice: Optional[str] = None
) -> str:
"""Waehlt die passende Stimme basierend auf Text oder Anfrage.
Thorsten wird bei epischen Triggern verwendet,
sonst Ramona als Standardstimme.
Args:
text: Der zu sprechende Text (fuer Epic-Trigger-Erkennung).
requested_voice: Explizit angeforderte Stimme ("ramona" | "thorsten").
Returns:
Name der gewaehlten Stimme.
"""
if requested_voice and requested_voice in self.voices:
return requested_voice
# Epische Trigger pruefen
text_lower = text.lower()
for trigger in EPIC_TRIGGERS:
if trigger in text_lower:
logger.info("Epischer Trigger erkannt: '%s' — Thorsten spricht", trigger)
return "thorsten"
return "ramona"
def synthesize(self, text: str, voice_name: str = "ramona") -> Optional[bytes]:
"""Erzeugt Audio-Daten aus Text mit der gewaehlten Stimme.
Args:
text: Der zu sprechende Text.
voice_name: Name der Stimme ("ramona" oder "thorsten").
Returns:
WAV-Audiodaten als bytes oder None bei Fehler.
"""
voice = self.voices.get(voice_name)
if voice is None:
logger.error("Stimme '%s' nicht verfuegbar", voice_name)
return None
try:
# Piper gibt PCM-Samples zurueck, wir schreiben sie als WAV
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
with wave.open(tmp_path, "wb") as wav_file:
voice.synthesize(text, wav_file)
audio_data = Path(tmp_path).read_bytes()
Path(tmp_path).unlink(missing_ok=True)
logger.info(
"TTS: %d bytes erzeugt mit %s'%s'",
len(audio_data),
voice_name,
text[:60],
)
return audio_data
except Exception:
logger.exception("TTS-Fehler bei Stimme '%s'", voice_name)
return None
def speak(self, text: str, requested_voice: Optional[str] = None) -> None:
"""Spricht den Text ueber das Audio-Geraet.
Waehlt automatisch die passende Stimme und gibt das Audio aus.
Args:
text: Der zu sprechende Text.
requested_voice: Optionale explizite Stimmenwahl.
"""
voice_name = self.select_voice(text, requested_voice)
audio_data = self.synthesize(text, voice_name)
if audio_data is None:
return
try:
# WAV-Daten lesen und ueber sounddevice abspielen
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
with wave.open(tmp_path, "rb") as wf:
frames = wf.readframes(wf.getnframes())
sample_width = wf.getsampwidth()
rate = wf.getframerate()
channels = wf.getnchannels()
Path(tmp_path).unlink(missing_ok=True)
# Numpy-Array aus PCM-Daten
dtype_map = {1: np.int8, 2: np.int16, 4: np.int32}
dtype = dtype_map.get(sample_width, np.int16)
audio_array = np.frombuffer(frames, dtype=dtype)
if channels > 1:
audio_array = audio_array.reshape(-1, channels)
sd.play(audio_array, samplerate=rate)
sd.wait() # Warten bis Wiedergabe fertig
except Exception:
logger.exception("Audio-Wiedergabe fehlgeschlagen")
# ── STT Engine ───────────────────────────────────────────────
class STTEngine:
"""Whisper Speech-to-Text — laeuft komplett lokal."""
def __init__(self, model_size: str = "small", language: str = "de") -> None:
self.model_size = model_size
self.language = language
self.model: Optional[WhisperModel] = None
def initialize(self) -> None:
"""Laedt das Whisper-Modell."""
logger.info(
"Lade Whisper-Modell '%s' (Sprache: %s)...",
self.model_size,
self.language,
)
self.model = WhisperModel(self.model_size, device="cpu", compute_type="int8")
logger.info("Whisper-Modell geladen")
def transcribe(self, audio_data: np.ndarray) -> str:
"""Transkribiert Audio-Daten zu Text.
Args:
audio_data: NumPy-Array mit Audio (float32, 16kHz, mono).
Returns:
Erkannter Text oder leerer String.
"""
if self.model is None:
logger.error("Whisper-Modell nicht initialisiert")
return ""
try:
# Audio als float32 normalisieren
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32) / 32768.0
segments, info = self.model.transcribe(
audio_data,
language=self.language,
beam_size=5,
vad_filter=True,
)
text = " ".join(segment.text.strip() for segment in segments)
logger.info("STT: '%s' (Sprache: %s, Dauer: %.1fs)", text, info.language, info.duration)
return text
except Exception:
logger.exception("STT-Fehler")
return ""
# ── Wake-Word Erkennung ──────────────────────────────────────
class WakeWordDetector:
"""Erkennt das Wake-Word 'aria' im Audio-Stream."""
WAKE_WORD = "aria"
THRESHOLD = 0.5
def __init__(self) -> None:
self.model: Optional[WakeWordModel] = None
def initialize(self) -> None:
"""Laedt das Wake-Word-Modell."""
logger.info("Lade Wake-Word-Modell...")
self.model = WakeWordModel(
wakeword_models=[self.WAKE_WORD],
inference_framework="onnx",
)
logger.info("Wake-Word-Modell geladen (Trigger: '%s')", self.WAKE_WORD)
def detect(self, audio_chunk: np.ndarray) -> bool:
"""Prueft ob das Wake-Word im Audio-Chunk enthalten ist.
Args:
audio_chunk: Audio-Daten (int16, 16kHz).
Returns:
True wenn Wake-Word erkannt wurde.
"""
if self.model is None:
return False
prediction = self.model.predict(audio_chunk)
# openwakeword gibt Scores pro Modell zurueck
for key, score in prediction.items():
if score > self.THRESHOLD:
logger.info("Wake-Word erkannt! (Score: %.2f)", score)
return True
return False
# ── Audio-Aufnahme ───────────────────────────────────────────
def record_audio(duration: float = RECORD_SECONDS) -> np.ndarray:
"""Nimmt Audio vom Mikrofon auf.
Args:
duration: Aufnahmedauer in Sekunden.
Returns:
NumPy-Array mit Audio-Daten (int16, 16kHz, mono).
"""
logger.info("Aufnahme laeuft... (%d Sekunden)", duration)
audio = sd.rec(
int(duration * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
logger.info("Aufnahme beendet")
return audio.flatten()
# ── Bridge Hauptklasse ───────────────────────────────────────
class ARIABridge:
"""ARIA Voice Bridge — verbindet Sprache mit dem ARIA-Core."""
def __init__(self) -> None:
self.config = load_config()
self.ws_url = self.config.get("ARIA_CORE_WS", CORE_WS_URL)
self.current_mode = Mode.NORMAL
self.running = False
# Komponenten
self.voice_engine = VoiceEngine(VOICES_DIR)
self.stt_engine = STTEngine(
model_size=self.config.get("WHISPER_MODEL", WHISPER_MODEL),
language=self.config.get("WHISPER_LANGUAGE", WHISPER_LANGUAGE),
)
self.wake_word = WakeWordDetector()
# WebSocket-Verbindung
self.ws: Optional[websockets.WebSocketClientProtocol] = None
def initialize(self) -> None:
"""Initialisiert alle Komponenten."""
logger.info("=" * 50)
logger.info("ARIA Voice Bridge startet...")
logger.info("=" * 50)
# PulseAudio-Server pruefen
pulse_server = os.getenv("PULSE_SERVER")
if pulse_server:
logger.info("PulseAudio Server: %s", pulse_server)
else:
logger.warning("Kein PULSE_SERVER gesetzt — verwende Standard-Audio")
self.voice_engine.initialize()
self.stt_engine.initialize()
self.wake_word.initialize()
logger.info("Alle Komponenten initialisiert")
logger.info("WebSocket-Ziel: %s", self.ws_url)
logger.info("Aktueller Modus: %s %s", self.current_mode.config.emoji, self.current_mode.config.name)
async def connect_to_core(self) -> None:
"""Stellt die WebSocket-Verbindung zu aria-core her.
Versucht bei Verbindungsverlust automatisch erneut zu verbinden.
"""
retry_delay = 2
while self.running:
try:
logger.info("Verbinde mit aria-core: %s", self.ws_url)
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
retry_delay = 2 # Reset bei erfolgreicher Verbindung
logger.info("Verbunden mit aria-core")
# Nachrichten empfangen
async for message in ws:
await self._handle_core_message(message)
except websockets.ConnectionClosed:
logger.warning("Verbindung zu aria-core verloren")
except ConnectionRefusedError:
logger.warning("aria-core nicht erreichbar")
except Exception:
logger.exception("WebSocket-Fehler")
finally:
self.ws = None
if self.running:
logger.info("Neuverbindung in %d Sekunden...", retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
async def _handle_core_message(self, raw_message: str) -> None:
"""Verarbeitet eingehende Nachrichten von aria-core.
Args:
raw_message: JSON-Nachricht als String.
"""
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
logger.error("Ungueltige JSON-Nachricht: %s", raw_message[:100])
return
text = message.get("text", "")
metadata = message.get("metadata", {})
is_critical = metadata.get("critical", False)
requested_voice = metadata.get("voice")
logger.info("Nachricht von aria-core: '%s'", text[:80])
# Modus-Wechsel pruefen
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
logger.info(
"Modus gewechselt: %s %s",
self.current_mode.config.emoji,
self.current_mode.config.name,
)
# Sprachausgabe nur wenn der Modus es erlaubt
if should_speak(self.current_mode, is_critical):
self.voice_engine.speak(text, requested_voice)
else:
logger.info(
"Sprachausgabe unterdrueckt (Modus: %s)",
self.current_mode.config.name,
)
async def send_to_core(self, text: str) -> None:
"""Sendet Text an aria-core ueber WebSocket.
Args:
text: Der erkannte Text vom Benutzer.
"""
if self.ws is None:
logger.error("Keine Verbindung zu aria-core — Nachricht verworfen")
return
message = json.dumps({
"type": "voice_input",
"text": text,
"mode": self.current_mode.name,
"source": "bridge",
})
try:
await self.ws.send(message)
logger.info("An aria-core gesendet: '%s'", text[:80])
except Exception:
logger.exception("Fehler beim Senden an aria-core")
async def audio_loop(self) -> None:
"""Haupt-Audio-Schleife: Wake-Word erkennen, aufnehmen, transkribieren.
Laeuft kontinuierlich und hoert auf das Wake-Word.
Bei Erkennung wird aufgenommen, transkribiert und an aria-core gesendet.
"""
logger.info("Audio-Schleife gestartet — warte auf Wake-Word '%s'...", WakeWordDetector.WAKE_WORD)
# Audio-Stream fuer Wake-Word-Erkennung
loop = asyncio.get_event_loop()
while self.running:
try:
# Einen Block Audio aufnehmen und auf Wake-Word pruefen
audio_chunk = sd.rec(
BLOCK_SIZE,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
sd.wait()
detected = await loop.run_in_executor(
None, self.wake_word.detect, audio_chunk.flatten()
)
if detected:
logger.info("Wake-Word erkannt — starte Aufnahme")
# Aufnahme im Thread-Pool (blockiert)
audio_data = await loop.run_in_executor(None, record_audio)
# Transkription im Thread-Pool
text = await loop.run_in_executor(
None, self.stt_engine.transcribe, audio_data
)
if text.strip():
# Modus-Wechsel lokal pruefen
new_mode = detect_mode_switch(text)
if new_mode is not None:
self.current_mode = new_mode
await self.send_to_core(text)
else:
logger.info("Keine Sprache erkannt — ignoriert")
except sd.PortAudioError:
logger.error("Audio-Geraet nicht verfuegbar — warte 5 Sekunden")
await asyncio.sleep(5)
except Exception:
logger.exception("Fehler in der Audio-Schleife")
await asyncio.sleep(1)
async def run(self) -> None:
"""Startet die Bridge mit allen Komponenten."""
self.running = True
# WebSocket-Verbindung und Audio-Schleife parallel ausfuehren
tasks = [
asyncio.create_task(self.connect_to_core()),
asyncio.create_task(self.audio_loop()),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logger.info("Bridge-Tasks abgebrochen")
def shutdown(self) -> None:
"""Faehrt die Bridge sauber herunter."""
logger.info("Bridge wird heruntergefahren...")
self.running = False
# ── Hauptprogramm ────────────────────────────────────────────
def main() -> None:
"""Startet die ARIA Voice Bridge."""
bridge = ARIABridge()
# Signal-Handler fuer sauberes Herunterfahren
def handle_signal(signum: int, _frame: object) -> None:
sig_name = signal.Signals(signum).name
logger.info("Signal %s empfangen — fahre herunter", sig_name)
bridge.shutdown()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Initialisierung (synchron — bevor die Event-Loop startet)
try:
bridge.initialize()
except Exception:
logger.exception("Initialisierung fehlgeschlagen")
sys.exit(1)
# Event-Loop starten
try:
asyncio.run(bridge.run())
except KeyboardInterrupt:
logger.info("Keyboard Interrupt — Bridge beendet")
finally:
logger.info("ARIA Voice Bridge beendet")
if __name__ == "__main__":
main()