359 lines
11 KiB
Python
359 lines
11 KiB
Python
"""
|
|
Claude's Eyes - Text-to-Speech Engine
|
|
|
|
Converts Claude's text responses to spoken audio
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
import queue
|
|
from typing import Optional
|
|
from abc import ABC, abstractmethod
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TTSEngine(ABC):
|
|
"""Abstract base class for TTS engines"""
|
|
|
|
@abstractmethod
|
|
def speak(self, text: str) -> None:
|
|
"""Speak the given text (blocking)"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def speak_async(self, text: str) -> None:
|
|
"""Speak the given text (non-blocking)"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def stop(self) -> None:
|
|
"""Stop current speech"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_speaking(self) -> bool:
|
|
"""Check if currently speaking"""
|
|
pass
|
|
|
|
|
|
class Pyttsx3Engine(TTSEngine):
|
|
"""TTS using pyttsx3 (offline, system voices)"""
|
|
|
|
def __init__(self, voice: Optional[str] = None, rate: int = 150, volume: float = 0.9):
|
|
import pyttsx3
|
|
|
|
self.engine = pyttsx3.init()
|
|
self.engine.setProperty('rate', rate)
|
|
self.engine.setProperty('volume', volume)
|
|
|
|
# Set voice if specified
|
|
if voice:
|
|
voices = self.engine.getProperty('voices')
|
|
for v in voices:
|
|
if voice.lower() in v.name.lower():
|
|
self.engine.setProperty('voice', v.id)
|
|
break
|
|
|
|
self._speaking = False
|
|
self._queue = queue.Queue()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_flag = False
|
|
|
|
logger.info("Pyttsx3 TTS engine initialized")
|
|
|
|
def speak(self, text: str) -> None:
|
|
"""Speak text (blocking)"""
|
|
self._speaking = True
|
|
try:
|
|
self.engine.say(text)
|
|
self.engine.runAndWait()
|
|
finally:
|
|
self._speaking = False
|
|
|
|
def speak_async(self, text: str) -> None:
|
|
"""Speak text (non-blocking)"""
|
|
self._queue.put(text)
|
|
|
|
if self._thread is None or not self._thread.is_alive():
|
|
self._stop_flag = False
|
|
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
|
|
self._thread.start()
|
|
|
|
def _speech_worker(self):
|
|
"""Worker thread for async speech"""
|
|
while not self._stop_flag:
|
|
try:
|
|
text = self._queue.get(timeout=0.5)
|
|
self.speak(text)
|
|
self._queue.task_done()
|
|
except queue.Empty:
|
|
continue
|
|
|
|
def stop(self) -> None:
|
|
"""Stop current speech"""
|
|
self._stop_flag = True
|
|
self.engine.stop()
|
|
# Clear queue
|
|
while not self._queue.empty():
|
|
try:
|
|
self._queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
def is_speaking(self) -> bool:
|
|
return self._speaking
|
|
|
|
|
|
class GTTSEngine(TTSEngine):
|
|
"""TTS using Google Text-to-Speech (online, better quality)"""
|
|
|
|
# Maximale Chunk-Größe für gTTS (Zeichen)
|
|
# Google hat ein Limit von ~5000 Zeichen, wir nehmen weniger für Sicherheit
|
|
MAX_CHUNK_SIZE = 500
|
|
|
|
def __init__(self, language: str = "de", speed: float = 1.0):
|
|
"""
|
|
Args:
|
|
language: Sprache (z.B. "de", "en")
|
|
speed: Geschwindigkeit (1.0 = normal, 1.5 = 50% schneller, 2.0 = doppelt so schnell)
|
|
"""
|
|
from gtts import gTTS
|
|
import pygame
|
|
|
|
pygame.mixer.init()
|
|
|
|
self.language = language
|
|
self.speed = speed # Geschwindigkeitsfaktor
|
|
self._speaking = False
|
|
self._queue = queue.Queue()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_flag = False
|
|
|
|
speed_info = f", speed: {speed}x" if speed != 1.0 else ""
|
|
logger.info(f"gTTS engine initialized (language: {language}{speed_info})")
|
|
|
|
def _split_text_into_chunks(self, text: str) -> list:
|
|
"""
|
|
Teilt langen Text in Chunks auf.
|
|
|
|
Versucht an Satzenden zu splitten (. ! ?) für natürlichere Pausen.
|
|
"""
|
|
if len(text) <= self.MAX_CHUNK_SIZE:
|
|
return [text]
|
|
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
# Teile nach Sätzen (., !, ?)
|
|
import re
|
|
sentences = re.split(r'(?<=[.!?])\s+', text)
|
|
|
|
for sentence in sentences:
|
|
# Wenn Satz selbst zu lang ist, teile nach Kommas oder Wörtern
|
|
if len(sentence) > self.MAX_CHUNK_SIZE:
|
|
# Teile nach Kommas
|
|
parts = re.split(r'(?<=,)\s+', sentence)
|
|
for part in parts:
|
|
if len(part) > self.MAX_CHUNK_SIZE:
|
|
# Letzter Ausweg: Teile nach Wörtern
|
|
words = part.split()
|
|
for word in words:
|
|
if len(current_chunk) + len(word) + 1 > self.MAX_CHUNK_SIZE:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = word
|
|
else:
|
|
current_chunk += " " + word if current_chunk else word
|
|
elif len(current_chunk) + len(part) + 1 > self.MAX_CHUNK_SIZE:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = part
|
|
else:
|
|
current_chunk += " " + part if current_chunk else part
|
|
elif len(current_chunk) + len(sentence) + 1 > self.MAX_CHUNK_SIZE:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = sentence
|
|
else:
|
|
current_chunk += " " + sentence if current_chunk else sentence
|
|
|
|
# Letzten Chunk hinzufügen
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
|
|
logger.debug(f"Text in {len(chunks)} Chunks aufgeteilt ({len(text)} Zeichen)")
|
|
return chunks
|
|
|
|
def _speed_up_audio(self, input_path: str, output_path: str) -> bool:
|
|
"""
|
|
Beschleunigt eine Audio-Datei mit pydub.
|
|
|
|
Args:
|
|
input_path: Pfad zur Original-MP3
|
|
output_path: Pfad für beschleunigte MP3
|
|
|
|
Returns:
|
|
True wenn erfolgreich, False bei Fehler
|
|
"""
|
|
try:
|
|
from pydub import AudioSegment
|
|
|
|
# Audio laden
|
|
audio = AudioSegment.from_mp3(input_path)
|
|
|
|
# Geschwindigkeit ändern (ohne Tonhöhe zu verändern ist komplexer,
|
|
# aber einfaches Speedup durch frame_rate Änderung klingt akzeptabel)
|
|
# Methode: Frame-Rate erhöhen → schnellere Wiedergabe
|
|
new_frame_rate = int(audio.frame_rate * self.speed)
|
|
speedup_audio = audio._spawn(audio.raw_data, overrides={
|
|
"frame_rate": new_frame_rate
|
|
}).set_frame_rate(audio.frame_rate)
|
|
|
|
# Speichern
|
|
speedup_audio.export(output_path, format="mp3")
|
|
return True
|
|
|
|
except ImportError:
|
|
logger.warning("pydub nicht installiert - Geschwindigkeit nicht änderbar. Installiere mit: pip install pydub")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Audio-Speedup-Fehler: {e}")
|
|
return False
|
|
|
|
def speak(self, text: str) -> None:
|
|
"""Speak text (blocking) - teilt lange Texte automatisch auf"""
|
|
from gtts import gTTS
|
|
import pygame
|
|
import tempfile
|
|
import os
|
|
|
|
self._speaking = True
|
|
try:
|
|
# Teile langen Text in Chunks
|
|
chunks = self._split_text_into_chunks(text)
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
if self._stop_flag:
|
|
break
|
|
|
|
if not chunk.strip():
|
|
continue
|
|
|
|
logger.debug(f"Spreche Chunk {i+1}/{len(chunks)}: {chunk[:50]}...")
|
|
|
|
# Generate audio file
|
|
tts = gTTS(text=chunk, lang=self.language)
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
|
|
temp_path = f.name
|
|
tts.save(temp_path)
|
|
|
|
# Geschwindigkeit anpassen wenn != 1.0
|
|
play_path = temp_path
|
|
speedup_path = None
|
|
|
|
if self.speed != 1.0:
|
|
speedup_path = temp_path.replace('.mp3', '_fast.mp3')
|
|
if self._speed_up_audio(temp_path, speedup_path):
|
|
play_path = speedup_path
|
|
else:
|
|
# Fallback: Original abspielen
|
|
logger.debug("Speedup fehlgeschlagen, spiele Original")
|
|
|
|
# Play audio
|
|
pygame.mixer.music.load(play_path)
|
|
pygame.mixer.music.play()
|
|
|
|
# Wait for playback to finish
|
|
while pygame.mixer.music.get_busy() and not self._stop_flag:
|
|
pygame.time.Clock().tick(10)
|
|
|
|
# Cleanup
|
|
try:
|
|
os.unlink(temp_path)
|
|
if speedup_path and os.path.exists(speedup_path):
|
|
os.unlink(speedup_path)
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error(f"gTTS error: {e}")
|
|
finally:
|
|
self._speaking = False
|
|
|
|
def speak_async(self, text: str) -> None:
|
|
"""Speak text (non-blocking)"""
|
|
self._queue.put(text)
|
|
|
|
if self._thread is None or not self._thread.is_alive():
|
|
self._stop_flag = False
|
|
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
|
|
self._thread.start()
|
|
|
|
def _speech_worker(self):
|
|
"""Worker thread for async speech"""
|
|
while not self._stop_flag:
|
|
try:
|
|
text = self._queue.get(timeout=0.5)
|
|
self.speak(text)
|
|
self._queue.task_done()
|
|
except queue.Empty:
|
|
continue
|
|
|
|
def stop(self) -> None:
|
|
"""Stop current speech"""
|
|
import pygame
|
|
self._stop_flag = True
|
|
pygame.mixer.music.stop()
|
|
# Clear queue
|
|
while not self._queue.empty():
|
|
try:
|
|
self._queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
def is_speaking(self) -> bool:
|
|
return self._speaking
|
|
|
|
|
|
def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine:
|
|
"""
|
|
Factory function to create TTS engine
|
|
|
|
Args:
|
|
engine_type: "pyttsx3" or "gtts"
|
|
**kwargs: Engine-specific options
|
|
"""
|
|
if engine_type == "pyttsx3":
|
|
return Pyttsx3Engine(
|
|
voice=kwargs.get("voice"),
|
|
rate=kwargs.get("rate", 150),
|
|
volume=kwargs.get("volume", 0.9)
|
|
)
|
|
elif engine_type == "gtts":
|
|
return GTTSEngine(
|
|
language=kwargs.get("language", "de"),
|
|
speed=kwargs.get("speed", 1.0)
|
|
)
|
|
else:
|
|
raise ValueError(f"Unknown TTS engine: {engine_type}")
|
|
|
|
|
|
# Test when run directly
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
print("Testing pyttsx3...")
|
|
engine = create_tts_engine("pyttsx3", rate=150)
|
|
engine.speak("Hallo! Ich bin Claude und erkunde gerade deine Wohnung.")
|
|
|
|
print("\nTesting gTTS...")
|
|
try:
|
|
engine2 = create_tts_engine("gtts", language="de")
|
|
engine2.speak("Das hier klingt noch besser!")
|
|
except Exception as e:
|
|
print(f"gTTS not available: {e}")
|
|
|
|
print("\nDone!")
|