esp32-claude-robbie/python_bridge/tts_engine.py

359 lines
11 KiB
Python

"""
Claude's Eyes - Text-to-Speech Engine
Converts Claude's text responses to spoken audio
"""
import logging
import threading
import queue
from typing import Optional
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class TTSEngine(ABC):
"""Abstract base class for TTS engines"""
@abstractmethod
def speak(self, text: str) -> None:
"""Speak the given text (blocking)"""
pass
@abstractmethod
def speak_async(self, text: str) -> None:
"""Speak the given text (non-blocking)"""
pass
@abstractmethod
def stop(self) -> None:
"""Stop current speech"""
pass
@abstractmethod
def is_speaking(self) -> bool:
"""Check if currently speaking"""
pass
class Pyttsx3Engine(TTSEngine):
"""TTS using pyttsx3 (offline, system voices)"""
def __init__(self, voice: Optional[str] = None, rate: int = 150, volume: float = 0.9):
import pyttsx3
self.engine = pyttsx3.init()
self.engine.setProperty('rate', rate)
self.engine.setProperty('volume', volume)
# Set voice if specified
if voice:
voices = self.engine.getProperty('voices')
for v in voices:
if voice.lower() in v.name.lower():
self.engine.setProperty('voice', v.id)
break
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
logger.info("Pyttsx3 TTS engine initialized")
def speak(self, text: str) -> None:
"""Speak text (blocking)"""
self._speaking = True
try:
self.engine.say(text)
self.engine.runAndWait()
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
self._stop_flag = True
self.engine.stop()
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
class GTTSEngine(TTSEngine):
"""TTS using Google Text-to-Speech (online, better quality)"""
# Maximale Chunk-Größe für gTTS (Zeichen)
# Google hat ein Limit von ~5000 Zeichen, wir nehmen weniger für Sicherheit
MAX_CHUNK_SIZE = 500
def __init__(self, language: str = "de", speed: float = 1.0):
"""
Args:
language: Sprache (z.B. "de", "en")
speed: Geschwindigkeit (1.0 = normal, 1.5 = 50% schneller, 2.0 = doppelt so schnell)
"""
from gtts import gTTS
import pygame
pygame.mixer.init()
self.language = language
self.speed = speed # Geschwindigkeitsfaktor
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
speed_info = f", speed: {speed}x" if speed != 1.0 else ""
logger.info(f"gTTS engine initialized (language: {language}{speed_info})")
def _split_text_into_chunks(self, text: str) -> list:
"""
Teilt langen Text in Chunks auf.
Versucht an Satzenden zu splitten (. ! ?) für natürlichere Pausen.
"""
if len(text) <= self.MAX_CHUNK_SIZE:
return [text]
chunks = []
current_chunk = ""
# Teile nach Sätzen (., !, ?)
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
for sentence in sentences:
# Wenn Satz selbst zu lang ist, teile nach Kommas oder Wörtern
if len(sentence) > self.MAX_CHUNK_SIZE:
# Teile nach Kommas
parts = re.split(r'(?<=,)\s+', sentence)
for part in parts:
if len(part) > self.MAX_CHUNK_SIZE:
# Letzter Ausweg: Teile nach Wörtern
words = part.split()
for word in words:
if len(current_chunk) + len(word) + 1 > self.MAX_CHUNK_SIZE:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = word
else:
current_chunk += " " + word if current_chunk else word
elif len(current_chunk) + len(part) + 1 > self.MAX_CHUNK_SIZE:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = part
else:
current_chunk += " " + part if current_chunk else part
elif len(current_chunk) + len(sentence) + 1 > self.MAX_CHUNK_SIZE:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
current_chunk += " " + sentence if current_chunk else sentence
# Letzten Chunk hinzufügen
if current_chunk:
chunks.append(current_chunk.strip())
logger.debug(f"Text in {len(chunks)} Chunks aufgeteilt ({len(text)} Zeichen)")
return chunks
def _speed_up_audio(self, input_path: str, output_path: str) -> bool:
"""
Beschleunigt eine Audio-Datei mit pydub.
Args:
input_path: Pfad zur Original-MP3
output_path: Pfad für beschleunigte MP3
Returns:
True wenn erfolgreich, False bei Fehler
"""
try:
from pydub import AudioSegment
# Audio laden
audio = AudioSegment.from_mp3(input_path)
# Geschwindigkeit ändern (ohne Tonhöhe zu verändern ist komplexer,
# aber einfaches Speedup durch frame_rate Änderung klingt akzeptabel)
# Methode: Frame-Rate erhöhen → schnellere Wiedergabe
new_frame_rate = int(audio.frame_rate * self.speed)
speedup_audio = audio._spawn(audio.raw_data, overrides={
"frame_rate": new_frame_rate
}).set_frame_rate(audio.frame_rate)
# Speichern
speedup_audio.export(output_path, format="mp3")
return True
except ImportError:
logger.warning("pydub nicht installiert - Geschwindigkeit nicht änderbar. Installiere mit: pip install pydub")
return False
except Exception as e:
logger.error(f"Audio-Speedup-Fehler: {e}")
return False
def speak(self, text: str) -> None:
"""Speak text (blocking) - teilt lange Texte automatisch auf"""
from gtts import gTTS
import pygame
import tempfile
import os
self._speaking = True
try:
# Teile langen Text in Chunks
chunks = self._split_text_into_chunks(text)
for i, chunk in enumerate(chunks):
if self._stop_flag:
break
if not chunk.strip():
continue
logger.debug(f"Spreche Chunk {i+1}/{len(chunks)}: {chunk[:50]}...")
# Generate audio file
tts = gTTS(text=chunk, lang=self.language)
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
temp_path = f.name
tts.save(temp_path)
# Geschwindigkeit anpassen wenn != 1.0
play_path = temp_path
speedup_path = None
if self.speed != 1.0:
speedup_path = temp_path.replace('.mp3', '_fast.mp3')
if self._speed_up_audio(temp_path, speedup_path):
play_path = speedup_path
else:
# Fallback: Original abspielen
logger.debug("Speedup fehlgeschlagen, spiele Original")
# Play audio
pygame.mixer.music.load(play_path)
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy() and not self._stop_flag:
pygame.time.Clock().tick(10)
# Cleanup
try:
os.unlink(temp_path)
if speedup_path and os.path.exists(speedup_path):
os.unlink(speedup_path)
except:
pass
except Exception as e:
logger.error(f"gTTS error: {e}")
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
import pygame
self._stop_flag = True
pygame.mixer.music.stop()
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine:
"""
Factory function to create TTS engine
Args:
engine_type: "pyttsx3" or "gtts"
**kwargs: Engine-specific options
"""
if engine_type == "pyttsx3":
return Pyttsx3Engine(
voice=kwargs.get("voice"),
rate=kwargs.get("rate", 150),
volume=kwargs.get("volume", 0.9)
)
elif engine_type == "gtts":
return GTTSEngine(
language=kwargs.get("language", "de"),
speed=kwargs.get("speed", 1.0)
)
else:
raise ValueError(f"Unknown TTS engine: {engine_type}")
# Test when run directly
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
print("Testing pyttsx3...")
engine = create_tts_engine("pyttsx3", rate=150)
engine.speak("Hallo! Ich bin Claude und erkunde gerade deine Wohnung.")
print("\nTesting gTTS...")
try:
engine2 = create_tts_engine("gtts", language="de")
engine2.speak("Das hier klingt noch besser!")
except Exception as e:
print(f"gTTS not available: {e}")
print("\nDone!")