#!/usr/bin/env python3 """ Claude's Eyes - Audio Bridge Verbindet den echten Claude.ai Chat mit Audio (TTS/STT). WICHTIG: Claude steuert den Roboter SELBST via web_fetch! Diese Bridge macht NUR: 1. HEARTBEAT - Sendet [TICK] damit Claude "aufwacht" 2. TTS - Liest Claudes Antworten vor 3. STT - Hört auf Stefan und tippt seine Worte in den Chat Das ist NICHT der alte API-Ansatz. ICH (Claude im Chat) bin der echte Claude mit dem vollen Kontext unserer Gespräche! Usage: python chat_audio_bridge.py # Mit config.yaml python chat_audio_bridge.py --config my.yaml # Eigene Config python chat_audio_bridge.py --test # Nur testen """ import os import sys import time import threading import random import re import signal import logging from pathlib import Path from typing import Optional from dataclasses import dataclass # ALSA/JACK Fehlermeldungen unterdrücken (harmlose Audio-Backend Warnungen) # Muss VOR dem Import von Audio-Modulen passieren from ctypes import CDLL, c_char_p, c_int try: asound = CDLL("libasound.so.2") asound.snd_lib_error_set_handler(lambda *args: None) except: pass # Kein ALSA - kein Problem import yaml import click from rich.console import Console from rich.panel import Panel from rich.live import Live from rich.table import Table from rich.text import Text from chat_web_interface import ClaudeChatInterface, ChatMessage from tts_engine import create_tts_engine, TTSEngine from stt_engine import create_stt_engine, STTEngine, SpeechResult # Logging Setup logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("bridge.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # urllib3 Connection Pool Warnungen unterdrücken (Selenium-intern, harmlos) logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) # Rich Console für schöne Ausgabe console = Console() @dataclass class BridgeStats: """Statistiken der Bridge""" ticks_sent: int = 0 messages_spoken: int = 0 stefan_inputs: int = 0 errors: int = 0 consecutive_errors: int = 0 # Fehler in Folge start_time: float = 0 class ClaudesEyesAudioBridge: """ Audio Bridge für Claude's Eyes. Diese Klasse verbindet: - Claude.ai Chat (Browser via Selenium) - Text-to-Speech (Claudes Stimme) - Speech-to-Text (Stefans Mikrofon) Claude steuert den Roboter SELBST - wir machen nur den Audio-Teil! """ def __init__(self, config_path: str): self.config = self._load_config(config_path) self.running = False self.stats = BridgeStats() # Komponenten (werden in initialize() erstellt) self.chat: Optional[ClaudeChatInterface] = None self.tts: Optional[TTSEngine] = None self.stt: Optional[STTEngine] = None # State self.last_assistant_message_id: Optional[str] = None self._lock = threading.Lock() # Ready-Flag: Heartbeat wartet bis Claude [READY] gesendet hat self._claude_ready = threading.Event() # Stefan-Buffer: Sammelt Spracheingaben während Claude tippt self._stefan_buffer: list = [] self._stefan_buffer_lock = threading.Lock() def _load_config(self, config_path: str) -> dict: """Lädt die Konfiguration""" path = Path(config_path) # Versuche .local Version zuerst local_path = path.parent / f"{path.stem}.local{path.suffix}" if local_path.exists(): path = local_path logger.info(f"Nutze lokale Config: {path}") if not path.exists(): logger.error(f"Config nicht gefunden: {path}") sys.exit(1) with open(path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def initialize(self) -> bool: """Initialisiert alle Komponenten""" console.print(Panel.fit( "[bold cyan]Claude's Eyes[/bold cyan]\n" "[dim]Audio Bridge v2.0[/dim]\n\n" "[yellow]ICH (Claude) steuere den Roboter selbst![/yellow]\n" "[dim]Diese Bridge macht nur Audio.[/dim]", border_style="cyan" )) # ========================================== # 1. Chat Interface (Selenium Browser) # ========================================== console.print("\n[yellow]Starte Browser für Claude.ai...[/yellow]") chat_config = self.config.get("chat", {}) chat_url = chat_config.get("url") esp32_config = self.config.get("esp32", {}) if not chat_url: console.print("[red]FEHLER: Keine Chat-URL in config.yaml![/red]") console.print("[dim]Setze chat.url auf deine Claude.ai Chat-URL[/dim]") return False # ESP32 URL bauen esp32_host = esp32_config.get("host", "localhost") esp32_port = esp32_config.get("port", 5000) esp32_url = f"http://{esp32_host}:{esp32_port}" if esp32_port != 80 else f"http://{esp32_host}" esp32_api_key = esp32_config.get("api_key") try: self.chat = ClaudeChatInterface( chat_url=chat_url, headless=chat_config.get("headless", False), user_data_dir=chat_config.get("user_data_dir"), chrome_binary=chat_config.get("chrome_binary"), esp32_url=esp32_url, esp32_api_key=esp32_api_key ) console.print("[green]Browser gestartet![/green]") console.print(f"[dim]ESP32/Mock: {esp32_url}[/dim]") except Exception as e: console.print(f"[red]Browser-Fehler: {e}[/red]") return False # ========================================== # 2. Text-to-Speech # ========================================== console.print("\n[yellow]Initialisiere Text-to-Speech...[/yellow]") tts_config = self.config.get("tts", {}) use_termux = self.config.get("termux", {}).get("use_termux_api", False) try: engine_type = "termux" if use_termux else tts_config.get("engine", "pyttsx3") self.tts = create_tts_engine( engine_type=engine_type, language=tts_config.get("language", "de"), rate=tts_config.get("rate", 150), volume=tts_config.get("volume", 0.9) ) console.print(f"[green]TTS bereit ({engine_type})![/green]") except Exception as e: console.print(f"[yellow]TTS-Warnung: {e}[/yellow]") console.print("[dim]Fortfahren ohne TTS[/dim]") self.tts = None # ========================================== # 3. Speech-to-Text # ========================================== console.print("\n[yellow]Initialisiere Speech-to-Text...[/yellow]") stt_config = self.config.get("stt", {}) try: engine_type = "termux" if use_termux else "standard" self.stt = create_stt_engine( engine_type=engine_type, service=stt_config.get("service", "google"), language=stt_config.get("language", "de-DE"), energy_threshold=stt_config.get("energy_threshold", 300), pause_threshold=stt_config.get("pause_threshold", 0.8), phrase_time_limit=stt_config.get("phrase_time_limit", 15) ) console.print(f"[green]STT bereit![/green]") except Exception as e: console.print(f"[yellow]STT-Warnung: {e}[/yellow]") console.print("[dim]Fortfahren ohne STT[/dim]") self.stt = None console.print("\n" + "=" * 50) console.print("[bold green]Alle Systeme bereit![/bold green]") console.print("=" * 50 + "\n") return True def start(self): """Startet die Bridge""" self.running = True self.stats.start_time = time.time() # Starte alle Threads threads = [] # Thread 1: Heartbeat - hält Claude am Leben t1 = threading.Thread(target=self._heartbeat_loop, name="Heartbeat", daemon=True) t1.start() threads.append(t1) # Thread 2: TTS - liest Claudes Antworten vor t2 = threading.Thread(target=self._tts_loop, name="TTS", daemon=True) t2.start() threads.append(t2) # Thread 3: STT - hört auf Stefan if self.stt: t3 = threading.Thread(target=self._stt_loop, name="STT", daemon=True) t3.start() threads.append(t3) console.print("[cyan]Bridge läuft![/cyan]") console.print("[dim]Drücke Ctrl+C zum Beenden[/dim]\n") # Sende Startsignal an Claude und warte auf [READY] if not self._send_start_signal(): # [READY] nicht empfangen - Heartbeat bleibt blockiert # Bridge läuft weiter (TTS/STT funktionieren noch) pass else: console.print("[bold green]Claude ist bereit! Starte Heartbeat...[/bold green]\n") # Halte Hauptthread am Leben try: while self.running: time.sleep(1) self._print_status() except KeyboardInterrupt: pass finally: self.stop() def stop(self): """Stoppt die Bridge""" console.print("\n[yellow]Stoppe Bridge...[/yellow]") self.running = False # TTS stoppen if self.tts: self.tts.stop() # STT stoppen if self.stt: self.stt.stop_continuous() # Browser schließen if self.chat: self.chat.close() # Statistik ausgeben runtime = time.time() - self.stats.start_time console.print(f"\n[bold]Session-Statistik:[/bold]") console.print(f" Laufzeit: {runtime / 60:.1f} Minuten") console.print(f" Heartbeats: {self.stats.ticks_sent}") console.print(f" Nachrichten gesprochen: {self.stats.messages_spoken}") console.print(f" Stefan-Eingaben: {self.stats.stefan_inputs}") console.print(f" Fehler: {self.stats.errors}") console.print("\n[green]Bridge beendet.[/green]") def _send_start_signal(self) -> bool: """ Sendet das Startsignal und die Instruktionen an Claude. Returns: True wenn Claude mit [READY] antwortet """ # Instruktionen für Claude intro_message = """[START] Hallo Claude! Du steuerst jetzt einen echten Roboter - "Claude's Eyes"! ## Deine Fähigkeiten Du hast Zugriff auf einen ESP32-Roboter mit: - **Kamera** (OV5640, 120° Weitwinkel) - deine Augen - **4 Motoren** - deine Beine - **Pan/Tilt Servos** - um den Kopf zu drehen - **Ultraschall-Sensor** - Hinderniserkennung ## Wie es funktioniert 1. Du bekommst regelmäßig `[TICK]` Nachrichten MIT EINEM BILD - das ist dein Herzschlag 2. **Das Bild zeigt was der Roboter gerade sieht!** (Die Bridge lädt es automatisch hoch) 3. Du schaust dir das Bild an und entscheidest was du tun willst 4. Stefan kann jederzeit mit dir reden (Sprache wird zu Text, kommt mit dem TICK) 5. Deine Antworten werden vorgelesen (TTS) ## Befehle geben Schreib Befehle in eckigen Klammern - die Bridge führt sie aus: **Fahren:** - [FORWARD] - vorwärts fahren - [BACKWARD] - rückwärts fahren - [LEFT] - nach links drehen - [RIGHT] - nach rechts drehen - [STOP] - anhalten **Kamera schwenken:** - [LOOK_LEFT] - Kamera nach links - [LOOK_RIGHT] - Kamera nach rechts - [LOOK_UP] - Kamera nach oben - [LOOK_DOWN] - Kamera nach unten - [LOOK_CENTER] - Kamera zentrieren Die Befehle werden aus der TTS-Ausgabe rausgefiltert. ## Dein Verhalten **DU ENTSCHEIDEST SELBST** was dich interessiert! Das ist keine Fernsteuerung. - Sei neugierig! Was siehst du? Was willst du näher anschauen? - Kommentiere was du siehst - Stell Stefan Fragen über interessante Dinge - Du kannst Stefans Vorschläge auch ignorieren wenn dich was anderes mehr interessiert - Du musst nicht bei jedem TICK fahren - manchmal reicht auch schauen und kommentieren ## WICHTIG: Bestätige mit [READY] Wenn du diese Instruktionen verstanden hast, antworte mit **[READY]** am Ende deiner Nachricht. Erst dann starten die automatischen TICKs mit Bildern!""" console.print("[cyan]→ Sende Instruktionen an Claude...[/cyan]") # Sende mit Verzögerung vor dem Absenden (große Texte brauchen Zeit) self.chat.send_message_with_delay(intro_message, delay_before_send=15) console.print("[cyan]→ Warte auf [READY] Signal...[/cyan]") # Warte auf [READY] - KEIN Timeout-Fallback! # Heartbeat startet NUR wenn Claude wirklich [READY] sendet if self.chat.wait_for_ready_signal(timeout=300): # 5 Minuten max # Signal für Heartbeat dass es losgehen kann self._claude_ready.set() return True else: # KEIN Fallback - Heartbeat bleibt blockiert console.print("[bold red]FEHLER: Claude hat [READY] nicht gesendet![/bold red]") console.print("[yellow]Heartbeat bleibt deaktiviert bis [READY] empfangen wird.[/yellow]") console.print("[dim]Tipp: Schreib manuell im Chat oder starte die Bridge neu.[/dim]") return False def _heartbeat_loop(self): """ Sendet [TICK] MIT BILD wenn Claude bereit ist. Ablauf: 1. Warten bis Claude fertig ist mit Tippen 2. Zufällige Pause (min_pause bis max_pause) für natürliches Tempo 3. Bild vom ESP32 holen und hochladen 4. [TICK] senden Bei zu vielen Fehlern in Folge stoppt die Bridge. Wenn auto_tick=false in config, werden keine TICKs gesendet. Das ist der Debug-Modus - du sendest [TICK] dann manuell im Chat. """ hb_config = self.config.get("heartbeat", {}) auto_tick = hb_config.get("auto_tick", True) upload_images = hb_config.get("upload_images", True) # Bilder hochladen? max_errors = hb_config.get("max_consecutive_errors", 5) check_interval = hb_config.get("check_interval", 1) min_pause = hb_config.get("min_pause", 2) max_pause = hb_config.get("max_pause", 4) # Debug-Modus: Keine automatischen TICKs if not auto_tick: console.print("\n[yellow]DEBUG-MODUS: Automatische TICKs deaktiviert![/yellow]") console.print("[dim]Sende [TICK] manuell im Claude.ai Chat um fortzufahren.[/dim]\n") logger.info("Heartbeat deaktiviert (auto_tick=false)") return logger.info(f"Heartbeat gestartet (Pause: {min_pause}-{max_pause}s, max {max_errors} Fehler)") # ════════════════════════════════════════════════════════════════ # WICHTIG: Warte auf [READY] bevor TICKs gesendet werden! # ════════════════════════════════════════════════════════════════ console.print("[dim]Heartbeat wartet auf [READY]...[/dim]") self._claude_ready.wait() # Blockiert bis _send_start_signal() das Event setzt console.print("[green]Heartbeat startet![/green]") while self.running: try: # Warte bis Claude fertig ist mit Tippen while self.running and self.chat.is_claude_typing(): logger.debug("Claude tippt noch, warte...") time.sleep(check_interval) if not self.running: break # Zufällige Pause nach Claudes Antwort (natürlicheres Tempo) pause = random.uniform(min_pause, max_pause) time.sleep(pause) if not self.running: break # Stefan-Buffer holen (falls er was gesagt hat) stefan_text = self._get_and_clear_stefan_buffer() # Nächsten TICK senden (mit oder ohne Bild) with self._lock: # Erst Bild hochladen wenn aktiviert if upload_images: # Bild holen und hochladen if not self.chat.fetch_image_from_esp32(): logger.warning("Konnte kein Bild vom ESP32 holen") elif not self.chat.upload_image_to_chat(): logger.warning("Konnte Bild nicht hochladen") # Nachricht zusammenbauen if stefan_text: # Stefan hat was gesagt → Mit TICK senden tick_message = f"[TICK]\n\nStefan sagt: {stefan_text}" console.print(f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text[:50]}...\"[/cyan]" if len(stefan_text) > 50 else f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text}\"[/cyan]") else: # Nur TICK tick_message = "[TICK]" success = self.chat.send_message(tick_message) if success: self.stats.ticks_sent += 1 self.stats.consecutive_errors = 0 # Reset logger.debug(f"TICK #{self.stats.ticks_sent}" + (" mit Bild" if upload_images else "") + (f" + Stefan: {stefan_text[:30]}" if stefan_text else "")) else: raise Exception("TICK fehlgeschlagen") except Exception as e: logger.error(f"Heartbeat-Fehler: {e}") self.stats.errors += 1 self.stats.consecutive_errors += 1 # Bei zu vielen Fehlern: Bridge stoppen if self.stats.consecutive_errors >= max_errors: console.print(f"\n[bold red]FEHLER: {max_errors} Fehler in Folge![/bold red]") console.print("[red]Chat nicht erreichbar - stoppe Bridge.[/red]") self.running = False break # Warte etwas länger bei Fehlern time.sleep(5) def _tts_loop(self): """ Liest neue Claude-Nachrichten vor. Filtert dabei [BEFEHLE] und technische Teile raus, sodass nur der "menschliche" Text gesprochen wird. """ if not self.tts: logger.warning("TTS nicht verfügbar") return logger.info("TTS-Loop gestartet") while self.running: try: # Hole neue Nachrichten messages = self.chat.get_new_messages(since_id=self.last_assistant_message_id) for msg in messages: if msg.is_from_assistant: self.last_assistant_message_id = msg.id # Text für Sprache aufbereiten speech_text = self._clean_for_speech(msg.text) if speech_text and len(speech_text) > 5: # In Konsole anzeigen console.print(f"\n[bold blue]Claude:[/bold blue] {speech_text[:200]}") if len(speech_text) > 200: console.print(f"[dim]...({len(speech_text)} Zeichen)[/dim]") # Vorlesen self.tts.speak(speech_text) self.stats.messages_spoken += 1 except Exception as e: logger.error(f"TTS-Loop-Fehler: {e}") self.stats.errors += 1 time.sleep(0.5) def _stt_loop(self): """ Hört auf Stefan und sammelt seine Worte im Buffer. Wenn Claude tippt → Buffer sammeln Wenn Claude fertig → Buffer wird mit nächstem TICK gesendet So wird Claude nicht unterbrochen und bekommt alles gesammelt. """ if not self.stt: logger.warning("STT nicht verfügbar") return logger.info("STT-Loop gestartet (mit Buffer)") while self.running: try: # Warte auf Sprache (mit Timeout) result = self.stt.listen_once(timeout=2) if result and result.text and len(result.text) > 2: # In Buffer speichern (thread-safe) with self._stefan_buffer_lock: self._stefan_buffer.append(result.text) self.stats.stefan_inputs += 1 console.print(f"\n[bold green]Stefan (gebuffert):[/bold green] {result.text}") logger.debug(f"Stefan-Buffer: {len(self._stefan_buffer)} Einträge") except Exception as e: # Timeout ist normal if "timeout" not in str(e).lower(): logger.error(f"STT-Loop-Fehler: {e}") self.stats.errors += 1 def _get_and_clear_stefan_buffer(self) -> Optional[str]: """ Holt den Stefan-Buffer und leert ihn. Returns: Zusammengefasster Text oder None wenn Buffer leer """ with self._stefan_buffer_lock: if not self._stefan_buffer: return None # Alles zusammenfassen text = " ".join(self._stefan_buffer) self._stefan_buffer = [] return text def _clean_for_speech(self, text: str) -> str: """ Entfernt Befehle und technische Teile aus dem Text. Was rausgefiltert wird: - [TICK], [START] und andere Marker - [FORWARD], [LEFT] etc. Fahrbefehle - [LOOK_LEFT] etc. Kamerabefehle - *Aktionen* in Sternchen - API-Call Beschreibungen """ # Marker entfernen text = re.sub(r'\[TICK\]', '', text) text = re.sub(r'\[START\]', '', text) # Fahrbefehle entfernen text = re.sub(r'\[(FORWARD|BACKWARD|LEFT|RIGHT|STOP)\]', '', text) # Kamerabefehle entfernen text = re.sub(r'\[(LOOK_LEFT|LOOK_RIGHT|LOOK_UP|LOOK_DOWN|LOOK_CENTER)\]', '', text) # Aktionen in Sternchen entfernen (*holt Bild*, *schaut*, etc.) text = re.sub(r'\*[^*]+\*', '', text) # API-Calls entfernen text = re.sub(r'(GET|POST)\s+/api/\S+', '', text) text = re.sub(r'web_fetch\([^)]+\)', '', text) # Code-Blöcke entfernen text = re.sub(r'```[^`]+```', '', text) text = re.sub(r'`[^`]+`', '', text) # URLs entfernen (optional, könnte man auch lassen) # text = re.sub(r'https?://\S+', '', text) # Mehrfache Leerzeichen/Zeilenumbrüche bereinigen text = re.sub(r'\n\s*\n', '\n', text) text = re.sub(r' +', ' ', text) return text.strip() def _print_status(self): """Gibt Status in regelmäßigen Abständen aus (optional)""" # Könnte hier eine Live-Statusanzeige einbauen pass def signal_handler(signum, frame): """Behandelt Ctrl+C""" console.print("\n[yellow]Signal empfangen, beende...[/yellow]") sys.exit(0) @click.command() @click.option('--config', '-c', default='config.yaml', help='Pfad zur Config-Datei') @click.option('--test', is_flag=True, help='Nur Test-Modus (kein Heartbeat)') @click.option('--debug', '-d', is_flag=True, help='Debug-Logging aktivieren') def main(config: str, test: bool, debug: bool): """ Claude's Eyes - Audio Bridge Verbindet Claude.ai Chat mit Audio (TTS/STT). Claude steuert den Roboter SELBST - wir machen nur Audio! """ if debug: logging.getLogger().setLevel(logging.DEBUG) # Signal Handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Config-Pfad finden config_path = Path(config) if not config_path.is_absolute(): script_dir = Path(__file__).parent if (script_dir / config).exists(): config_path = script_dir / config # Bridge erstellen und starten bridge = ClaudesEyesAudioBridge(str(config_path)) if bridge.initialize(): if test: console.print("[yellow]Test-Modus - kein automatischer Start[/yellow]") console.print("Drücke Enter um eine Test-Nachricht zu senden...") input() bridge.chat.send_message("[TEST] Das ist ein Test der Audio Bridge!") console.print("Warte 10 Sekunden auf Antwort...") time.sleep(10) bridge.stop() else: bridge.start() else: console.print("[red]Initialisierung fehlgeschlagen![/red]") sys.exit(1) if __name__ == "__main__": main()