""" Claude's Eyes - Text-to-Speech Engine Converts Claude's text responses to spoken audio """ import logging import threading import queue from typing import Optional from abc import ABC, abstractmethod logger = logging.getLogger(__name__) class TTSEngine(ABC): """Abstract base class for TTS engines""" @abstractmethod def speak(self, text: str) -> None: """Speak the given text (blocking)""" pass @abstractmethod def speak_async(self, text: str) -> None: """Speak the given text (non-blocking)""" pass @abstractmethod def stop(self) -> None: """Stop current speech""" pass @abstractmethod def is_speaking(self) -> bool: """Check if currently speaking""" pass class Pyttsx3Engine(TTSEngine): """TTS using pyttsx3 (offline, system voices)""" def __init__(self, voice: Optional[str] = None, rate: int = 150, volume: float = 0.9): import pyttsx3 self.engine = pyttsx3.init() self.engine.setProperty('rate', rate) self.engine.setProperty('volume', volume) # Set voice if specified if voice: voices = self.engine.getProperty('voices') for v in voices: if voice.lower() in v.name.lower(): self.engine.setProperty('voice', v.id) break self._speaking = False self._queue = queue.Queue() self._thread: Optional[threading.Thread] = None self._stop_flag = False logger.info("Pyttsx3 TTS engine initialized") def speak(self, text: str) -> None: """Speak text (blocking)""" self._speaking = True try: self.engine.say(text) self.engine.runAndWait() finally: self._speaking = False def speak_async(self, text: str) -> None: """Speak text (non-blocking)""" self._queue.put(text) if self._thread is None or not self._thread.is_alive(): self._stop_flag = False self._thread = threading.Thread(target=self._speech_worker, daemon=True) self._thread.start() def _speech_worker(self): """Worker thread for async speech""" while not self._stop_flag: try: text = self._queue.get(timeout=0.5) self.speak(text) self._queue.task_done() except queue.Empty: continue def stop(self) -> None: """Stop current speech""" self._stop_flag = True self.engine.stop() # Clear queue while not self._queue.empty(): try: self._queue.get_nowait() except queue.Empty: break def is_speaking(self) -> bool: return self._speaking class GTTSEngine(TTSEngine): """TTS using Google Text-to-Speech (online, better quality)""" # Maximale Chunk-Größe für gTTS (Zeichen) # Google hat ein Limit von ~5000 Zeichen, wir nehmen weniger für Sicherheit MAX_CHUNK_SIZE = 500 def __init__(self, language: str = "de", speed: float = 1.0): """ Args: language: Sprache (z.B. "de", "en") speed: Geschwindigkeit (1.0 = normal, 1.5 = 50% schneller, 2.0 = doppelt so schnell) """ from gtts import gTTS import pygame pygame.mixer.init() self.language = language self.speed = speed # Geschwindigkeitsfaktor self._speaking = False self._queue = queue.Queue() self._thread: Optional[threading.Thread] = None self._stop_flag = False speed_info = f", speed: {speed}x" if speed != 1.0 else "" logger.info(f"gTTS engine initialized (language: {language}{speed_info})") def _split_text_into_chunks(self, text: str) -> list: """ Teilt langen Text in Chunks auf. Versucht an Satzenden zu splitten (. ! ?) für natürlichere Pausen. """ if len(text) <= self.MAX_CHUNK_SIZE: return [text] chunks = [] current_chunk = "" # Teile nach Sätzen (., !, ?) import re sentences = re.split(r'(?<=[.!?])\s+', text) for sentence in sentences: # Wenn Satz selbst zu lang ist, teile nach Kommas oder Wörtern if len(sentence) > self.MAX_CHUNK_SIZE: # Teile nach Kommas parts = re.split(r'(?<=,)\s+', sentence) for part in parts: if len(part) > self.MAX_CHUNK_SIZE: # Letzter Ausweg: Teile nach Wörtern words = part.split() for word in words: if len(current_chunk) + len(word) + 1 > self.MAX_CHUNK_SIZE: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = word else: current_chunk += " " + word if current_chunk else word elif len(current_chunk) + len(part) + 1 > self.MAX_CHUNK_SIZE: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = part else: current_chunk += " " + part if current_chunk else part elif len(current_chunk) + len(sentence) + 1 > self.MAX_CHUNK_SIZE: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence else: current_chunk += " " + sentence if current_chunk else sentence # Letzten Chunk hinzufügen if current_chunk: chunks.append(current_chunk.strip()) logger.debug(f"Text in {len(chunks)} Chunks aufgeteilt ({len(text)} Zeichen)") return chunks def _speed_up_audio(self, input_path: str, output_path: str) -> bool: """ Beschleunigt eine Audio-Datei mit pydub. Args: input_path: Pfad zur Original-MP3 output_path: Pfad für beschleunigte MP3 Returns: True wenn erfolgreich, False bei Fehler """ try: from pydub import AudioSegment # Audio laden audio = AudioSegment.from_mp3(input_path) # Geschwindigkeit ändern (ohne Tonhöhe zu verändern ist komplexer, # aber einfaches Speedup durch frame_rate Änderung klingt akzeptabel) # Methode: Frame-Rate erhöhen → schnellere Wiedergabe new_frame_rate = int(audio.frame_rate * self.speed) speedup_audio = audio._spawn(audio.raw_data, overrides={ "frame_rate": new_frame_rate }).set_frame_rate(audio.frame_rate) # Speichern speedup_audio.export(output_path, format="mp3") return True except ImportError: logger.warning("pydub nicht installiert - Geschwindigkeit nicht änderbar. Installiere mit: pip install pydub") return False except Exception as e: logger.error(f"Audio-Speedup-Fehler: {e}") return False def speak(self, text: str) -> None: """Speak text (blocking) - teilt lange Texte automatisch auf""" from gtts import gTTS import pygame import tempfile import os self._speaking = True try: # Teile langen Text in Chunks chunks = self._split_text_into_chunks(text) for i, chunk in enumerate(chunks): if self._stop_flag: break if not chunk.strip(): continue logger.debug(f"Spreche Chunk {i+1}/{len(chunks)}: {chunk[:50]}...") # Generate audio file tts = gTTS(text=chunk, lang=self.language) with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: temp_path = f.name tts.save(temp_path) # Geschwindigkeit anpassen wenn != 1.0 play_path = temp_path speedup_path = None if self.speed != 1.0: speedup_path = temp_path.replace('.mp3', '_fast.mp3') if self._speed_up_audio(temp_path, speedup_path): play_path = speedup_path else: # Fallback: Original abspielen logger.debug("Speedup fehlgeschlagen, spiele Original") # Play audio pygame.mixer.music.load(play_path) pygame.mixer.music.play() # Wait for playback to finish while pygame.mixer.music.get_busy() and not self._stop_flag: pygame.time.Clock().tick(10) # Cleanup try: os.unlink(temp_path) if speedup_path and os.path.exists(speedup_path): os.unlink(speedup_path) except: pass except Exception as e: logger.error(f"gTTS error: {e}") finally: self._speaking = False def speak_async(self, text: str) -> None: """Speak text (non-blocking)""" self._queue.put(text) if self._thread is None or not self._thread.is_alive(): self._stop_flag = False self._thread = threading.Thread(target=self._speech_worker, daemon=True) self._thread.start() def _speech_worker(self): """Worker thread for async speech""" while not self._stop_flag: try: text = self._queue.get(timeout=0.5) self.speak(text) self._queue.task_done() except queue.Empty: continue def stop(self) -> None: """Stop current speech""" import pygame self._stop_flag = True pygame.mixer.music.stop() # Clear queue while not self._queue.empty(): try: self._queue.get_nowait() except queue.Empty: break def is_speaking(self) -> bool: return self._speaking def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine: """ Factory function to create TTS engine Args: engine_type: "pyttsx3" or "gtts" **kwargs: Engine-specific options """ if engine_type == "pyttsx3": return Pyttsx3Engine( voice=kwargs.get("voice"), rate=kwargs.get("rate", 150), volume=kwargs.get("volume", 0.9) ) elif engine_type == "gtts": return GTTSEngine( language=kwargs.get("language", "de"), speed=kwargs.get("speed", 1.0) ) else: raise ValueError(f"Unknown TTS engine: {engine_type}") # Test when run directly if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) print("Testing pyttsx3...") engine = create_tts_engine("pyttsx3", rate=150) engine.speak("Hallo! Ich bin Claude und erkunde gerade deine Wohnung.") print("\nTesting gTTS...") try: engine2 = create_tts_engine("gtts", language="de") engine2.speak("Das hier klingt noch besser!") except Exception as e: print(f"gTTS not available: {e}") print("\nDone!")