esp32-claude-robbie/python_bridge/chat_audio_bridge.py

981 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Claude's Eyes - Audio Bridge
Verbindet den echten Claude.ai Chat mit Audio (TTS/STT).
WICHTIG: Claude steuert den Roboter SELBST via web_fetch!
Diese Bridge macht NUR:
1. HEARTBEAT - Sendet [TICK] damit Claude "aufwacht"
2. TTS - Liest Claudes Antworten vor
3. STT - Hört auf Stefan und tippt seine Worte in den Chat
Das ist NICHT der alte API-Ansatz. ICH (Claude im Chat) bin der echte Claude
mit dem vollen Kontext unserer Gespräche!
Usage:
python chat_audio_bridge.py # Mit config.yaml
python chat_audio_bridge.py --config my.yaml # Eigene Config
python chat_audio_bridge.py --test # Nur testen
"""
import os
import sys
import time
import threading
import random
import re
import signal
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
# ALSA/JACK Fehlermeldungen unterdrücken (harmlose Audio-Backend Warnungen)
# Muss VOR dem Import von Audio-Modulen passieren
from ctypes import CDLL, c_char_p, c_int
try:
asound = CDLL("libasound.so.2")
asound.snd_lib_error_set_handler(lambda *args: None)
except:
pass # Kein ALSA - kein Problem
import yaml
import click
from rich.console import Console
from rich.panel import Panel
from rich.live import Live
from rich.table import Table
from rich.text import Text
from chat_web_interface import ClaudeChatInterface, ChatMessage
from tts_engine import create_tts_engine, TTSEngine
from stt_engine import create_stt_engine, STTEngine, SpeechResult
# Logging Setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("bridge.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# urllib3 Connection Pool Warnungen unterdrücken (Selenium-intern, harmlos)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
# Rich Console für schöne Ausgabe
console = Console()
@dataclass
class BridgeStats:
"""Statistiken der Bridge"""
ticks_sent: int = 0
messages_spoken: int = 0
stefan_inputs: int = 0
errors: int = 0
consecutive_errors: int = 0 # Fehler in Folge
start_time: float = 0
class ClaudesEyesAudioBridge:
"""
Audio Bridge für Claude's Eyes.
Diese Klasse verbindet:
- Claude.ai Chat (Browser via Selenium)
- Text-to-Speech (Claudes Stimme)
- Speech-to-Text (Stefans Mikrofon)
Claude steuert den Roboter SELBST - wir machen nur den Audio-Teil!
"""
def __init__(self, config_path: str):
self.config_path = Path(config_path)
self.config = self._load_config(config_path)
self.running = False
self.stats = BridgeStats()
self._previous_chat_id: Optional[str] = None # Für Referenz zum alten Chat
# Komponenten (werden in initialize() erstellt)
self.chat: Optional[ClaudeChatInterface] = None
self.tts: Optional[TTSEngine] = None
self.stt: Optional[STTEngine] = None
# State
self.last_assistant_message_id: Optional[str] = None
self._lock = threading.Lock()
# Ready-Flag: Heartbeat wartet bis Claude [READY] gesendet hat
self._claude_ready = threading.Event()
# Stefan-Buffer: Sammelt Spracheingaben während Claude tippt
self._stefan_buffer: list = []
self._stefan_buffer_lock = threading.Lock()
# Send-Lock: Verhindert dass TICKs während Senden reinkommen
self._sending = threading.Event()
self._sending.set() # Anfangs nicht am Senden (set = frei)
# Recording-Flag: Wenn gesetzt, wird gerade aufgenommen
# Heartbeat pausiert während Aufnahme aktiv ist
self._recording = threading.Event()
self._recording.clear() # Anfangs nicht am Aufnehmen
# Mute-Flag: Wenn True, ignoriert STT alle Eingaben
# Startet gemutet um ungewollte Aufnahmen zu vermeiden
self._muted = True
self._mute_lock = threading.Lock()
# Silence-Timeout: Wie lange Stille bevor Aufnahme als fertig gilt
self._silence_timeout = 5.0 # Sekunden
def _load_config(self, config_path: str) -> dict:
"""Lädt die Konfiguration"""
path = Path(config_path)
# Versuche .local Version zuerst
local_path = path.parent / f"{path.stem}.local{path.suffix}"
if local_path.exists():
path = local_path
logger.info(f"Nutze lokale Config: {path}")
# Merke den tatsächlichen Pfad für späteres Speichern
self._actual_config_path = path
if not path.exists():
logger.error(f"Config nicht gefunden: {path}")
sys.exit(1)
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _save_chat_url_to_config(self, new_url: str):
"""
Speichert die neue Chat-URL in der config.yaml.
Aktualisiert nur den chat.url Wert, behält Rest der Datei bei.
"""
try:
config_path = getattr(self, '_actual_config_path', self.config_path)
# Config-Datei lesen und als Text bearbeiten (um Kommentare zu erhalten)
with open(config_path, 'r', encoding='utf-8') as f:
content = f.read()
# URL im Text ersetzen (regex für chat.url Zeile)
import re
old_url = self.config.get("chat", {}).get("url", "")
if old_url:
# Ersetze die alte URL mit der neuen
content = content.replace(old_url, new_url)
else:
# Fallback: Ersetze die Zeile mit url:
content = re.sub(
r'(url:\s*["\'])([^"\']+)(["\'])',
rf'\g<1>{new_url}\g<3>',
content
)
# Speichern
with open(config_path, 'w', encoding='utf-8') as f:
f.write(content)
# Config im Speicher auch aktualisieren
self.config["chat"]["url"] = new_url
logger.info(f"Neue Chat-URL in Config gespeichert: {new_url}")
console.print(f"[green]✓ Config aktualisiert: {config_path}[/green]")
except Exception as e:
logger.error(f"Fehler beim Speichern der Chat-URL: {e}")
console.print(f"[yellow]⚠ Konnte Config nicht aktualisieren: {e}[/yellow]")
def initialize(self) -> bool:
"""Initialisiert alle Komponenten"""
console.print(Panel.fit(
"[bold cyan]Claude's Eyes[/bold cyan]\n"
"[dim]Audio Bridge v2.0[/dim]\n\n"
"[yellow]ICH (Claude) steuere den Roboter selbst![/yellow]\n"
"[dim]Diese Bridge macht nur Audio.[/dim]",
border_style="cyan"
))
# ==========================================
# 1. Chat Interface (Selenium Browser)
# ==========================================
console.print("\n[yellow]Starte Browser für Claude.ai...[/yellow]")
chat_config = self.config.get("chat", {})
chat_url = chat_config.get("url")
esp32_config = self.config.get("esp32", {})
if not chat_url:
console.print("[red]FEHLER: Keine Chat-URL in config.yaml![/red]")
console.print("[dim]Setze chat.url auf deine Claude.ai Chat-URL[/dim]")
return False
# ESP32 URL bauen
esp32_host = esp32_config.get("host", "localhost")
esp32_port = esp32_config.get("port", 5000)
esp32_url = f"http://{esp32_host}:{esp32_port}" if esp32_port != 80 else f"http://{esp32_host}"
esp32_api_key = esp32_config.get("api_key")
try:
self.chat = ClaudeChatInterface(
chat_url=chat_url,
headless=chat_config.get("headless", False),
user_data_dir=chat_config.get("user_data_dir"),
chrome_binary=chat_config.get("chrome_binary"),
esp32_url=esp32_url,
esp32_api_key=esp32_api_key
)
console.print("[green]Browser gestartet![/green]")
console.print(f"[dim]ESP32/Mock: {esp32_url}[/dim]")
except Exception as e:
console.print(f"[red]Browser-Fehler: {e}[/red]")
return False
# ==========================================
# 2. Text-to-Speech
# ==========================================
console.print("\n[yellow]Initialisiere Text-to-Speech...[/yellow]")
tts_config = self.config.get("tts", {})
use_termux = self.config.get("termux", {}).get("use_termux_api", False)
try:
engine_type = "termux" if use_termux else tts_config.get("engine", "pyttsx3")
self.tts = create_tts_engine(
engine_type=engine_type,
language=tts_config.get("language", "de"),
rate=tts_config.get("rate", 150),
volume=tts_config.get("volume", 0.9)
)
console.print(f"[green]TTS bereit ({engine_type})![/green]")
except Exception as e:
console.print(f"[yellow]TTS-Warnung: {e}[/yellow]")
console.print("[dim]Fortfahren ohne TTS[/dim]")
self.tts = None
# ==========================================
# 3. Speech-to-Text
# ==========================================
console.print("\n[yellow]Initialisiere Speech-to-Text...[/yellow]")
stt_config = self.config.get("stt", {})
try:
engine_type = "termux" if use_termux else "standard"
self.stt = create_stt_engine(
engine_type=engine_type,
service=stt_config.get("service", "google"),
language=stt_config.get("language", "de-DE"),
energy_threshold=stt_config.get("energy_threshold", 300),
pause_threshold=stt_config.get("pause_threshold", 0.8),
phrase_time_limit=stt_config.get("phrase_time_limit", 15)
)
console.print(f"[green]STT bereit![/green]")
except Exception as e:
console.print(f"[yellow]STT-Warnung: {e}[/yellow]")
console.print("[dim]Fortfahren ohne STT[/dim]")
self.stt = None
console.print("\n" + "=" * 50)
console.print("[bold green]Alle Systeme bereit![/bold green]")
console.print("=" * 50 + "\n")
return True
def start(self):
"""Startet die Bridge"""
self.running = True
self.stats.start_time = time.time()
# Starte alle Threads
threads = []
# Thread 1: Heartbeat - hält Claude am Leben
t1 = threading.Thread(target=self._heartbeat_loop, name="Heartbeat", daemon=True)
t1.start()
threads.append(t1)
# Thread 2: TTS - liest Claudes Antworten vor
t2 = threading.Thread(target=self._tts_loop, name="TTS", daemon=True)
t2.start()
threads.append(t2)
# Thread 3: STT - hört auf Stefan
if self.stt:
t3 = threading.Thread(target=self._stt_loop, name="STT", daemon=True)
t3.start()
threads.append(t3)
# Thread 4: Keyboard-Listener für Mute-Toggle
t4 = threading.Thread(target=self._keyboard_loop, name="Keyboard", daemon=True)
t4.start()
threads.append(t4)
console.print("[cyan]Bridge läuft![/cyan]")
console.print("[bold red]🎤 Mikrofon ist GEMUTET[/bold red] - Drücke 'M' zum Aktivieren")
console.print("[dim]Tasten: M=Mute/Unmute, N=Neuer Chat, Q=Beenden[/dim]\n")
# Sende Startsignal an Claude und warte auf [READY]
if not self._send_start_signal():
# [READY] nicht empfangen - Heartbeat bleibt blockiert
# Bridge läuft weiter (TTS/STT funktionieren noch)
pass
else:
console.print("[bold green]Claude ist bereit! Starte Heartbeat...[/bold green]\n")
# Halte Hauptthread am Leben
try:
while self.running:
time.sleep(1)
self._print_status()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
"""Stoppt die Bridge"""
console.print("\n[yellow]Stoppe Bridge...[/yellow]")
self.running = False
# TTS stoppen
if self.tts:
self.tts.stop()
# STT stoppen
if self.stt:
self.stt.stop_continuous()
# Browser schließen
if self.chat:
self.chat.close()
# Statistik ausgeben
runtime = time.time() - self.stats.start_time
console.print(f"\n[bold]Session-Statistik:[/bold]")
console.print(f" Laufzeit: {runtime / 60:.1f} Minuten")
console.print(f" Heartbeats: {self.stats.ticks_sent}")
console.print(f" Nachrichten gesprochen: {self.stats.messages_spoken}")
console.print(f" Stefan-Eingaben: {self.stats.stefan_inputs}")
console.print(f" Fehler: {self.stats.errors}")
console.print("\n[green]Bridge beendet.[/green]")
def _send_start_signal(self, previous_chat_id: Optional[str] = None) -> bool:
"""
Sendet das Startsignal und die Instruktionen an Claude.
Args:
previous_chat_id: Optional - Chat-ID des vorherigen Chats (für Kontinuität)
Returns:
True wenn Claude mit [READY] antwortet
"""
# Kontext vom vorherigen Chat, falls vorhanden
previous_chat_context = ""
if previous_chat_id:
previous_chat_context = f"""
## WICHTIG: Fortsetzung eines vorherigen Chats!
Dies ist eine Fortsetzung unserer Erkundung. Der vorherige Chat hatte die ID: {previous_chat_id}
Falls du dich an den Kontext erinnerst, kannst du dort weitermachen wo wir aufgehört haben.
Der Chat-Wechsel war nötig weil Claude.ai ein Limit von 100 Bildern pro Chat hat.
"""
# Instruktionen für Claude
intro_message = f"""[START] Hallo Claude! Du steuerst jetzt einen echten Roboter - "Claude's Eyes"!
{previous_chat_context}## Deine Fähigkeiten
Du hast Zugriff auf einen ESP32-Roboter mit:
- **Kamera** (OV5640, 120° Weitwinkel) - deine Augen
- **4 Motoren** - deine Beine
- **Pan/Tilt Servos** - um den Kopf zu drehen
- **Ultraschall-Sensor** - Hinderniserkennung
## Wie es funktioniert
1. Du bekommst regelmäßig `[TICK]` Nachrichten MIT EINEM BILD - das ist dein Herzschlag
2. **Das Bild zeigt was der Roboter gerade sieht!** (Die Bridge lädt es automatisch hoch)
3. Du schaust dir das Bild an und entscheidest was du tun willst
4. Stefan kann jederzeit mit dir reden (Sprache wird zu Text, kommt mit dem TICK)
5. Deine Antworten werden vorgelesen (TTS)
## Befehle geben
Schreib Befehle in eckigen Klammern - die Bridge führt sie aus:
**Fahren:**
- [FORWARD] - vorwärts fahren
- [BACKWARD] - rückwärts fahren
- [LEFT] - nach links drehen
- [RIGHT] - nach rechts drehen
- [STOP] - anhalten
**Kamera schwenken:**
- [LOOK_LEFT] - Kamera nach links
- [LOOK_RIGHT] - Kamera nach rechts
- [LOOK_UP] - Kamera nach oben
- [LOOK_DOWN] - Kamera nach unten
- [LOOK_CENTER] - Kamera zentrieren
Die Befehle werden aus der TTS-Ausgabe rausgefiltert.
## Dein Verhalten
**DU ENTSCHEIDEST SELBST** was dich interessiert! Das ist keine Fernsteuerung.
- Sei neugierig! Was siehst du? Was willst du näher anschauen?
- Kommentiere was du siehst
- Stell Stefan Fragen über interessante Dinge
- Du kannst Stefans Vorschläge auch ignorieren wenn dich was anderes mehr interessiert
- Du musst nicht bei jedem TICK fahren - manchmal reicht auch schauen und kommentieren
## WICHTIG: Bestätige mit [READY]
Wenn du diese Instruktionen verstanden hast, antworte mit dem Tag `[READY]` (exakt so, in eckigen Klammern!) am Ende deiner Nachricht.
**Das Format muss EXAKT `[READY]` sein** - nicht fett, nicht anders formatiert, sondern genau so: [READY]
Erst dann starten die automatischen TICKs mit Bildern!"""
console.print("[cyan]→ Sende Instruktionen an Claude...[/cyan]")
# Sende mit Verzögerung vor dem Absenden (große Texte brauchen Zeit)
self.chat.send_message_with_delay(intro_message, delay_before_send=15)
console.print("[cyan]→ Warte auf [READY] Signal...[/cyan]")
# Warte auf [READY] - KEIN Timeout-Fallback!
# Heartbeat startet NUR wenn Claude wirklich [READY] sendet
if self.chat.wait_for_ready_signal(timeout=300): # 5 Minuten max
# Signal für Heartbeat dass es losgehen kann
self._claude_ready.set()
return True
else:
# KEIN Fallback - Heartbeat bleibt blockiert
console.print("[bold red]FEHLER: Claude hat [READY] nicht gesendet![/bold red]")
console.print("[yellow]Heartbeat bleibt deaktiviert bis [READY] empfangen wird.[/yellow]")
console.print("[dim]Tipp: Schreib manuell im Chat oder starte die Bridge neu.[/dim]")
return False
def _heartbeat_loop(self):
"""
Sendet [TICK] MIT BILD wenn Claude bereit ist.
Ablauf:
1. Warten bis Claude fertig ist mit Tippen
2. Zufällige Pause (min_pause bis max_pause) für natürliches Tempo
3. Bild vom ESP32 holen und hochladen
4. [TICK] senden
Bei zu vielen Fehlern in Folge stoppt die Bridge.
Wenn auto_tick=false in config, werden keine TICKs gesendet.
Das ist der Debug-Modus - du sendest [TICK] dann manuell im Chat.
"""
hb_config = self.config.get("heartbeat", {})
auto_tick = hb_config.get("auto_tick", True)
upload_images = hb_config.get("upload_images", True) # Bilder hochladen?
max_errors = hb_config.get("max_consecutive_errors", 5)
check_interval = hb_config.get("check_interval", 1)
min_pause = hb_config.get("min_pause", 2)
max_pause = hb_config.get("max_pause", 4)
# Debug-Modus: Keine automatischen TICKs
if not auto_tick:
console.print("\n[yellow]DEBUG-MODUS: Automatische TICKs deaktiviert![/yellow]")
console.print("[dim]Sende [TICK] manuell im Claude.ai Chat um fortzufahren.[/dim]\n")
logger.info("Heartbeat deaktiviert (auto_tick=false)")
return
logger.info(f"Heartbeat gestartet (Pause: {min_pause}-{max_pause}s, max {max_errors} Fehler)")
# ════════════════════════════════════════════════════════════════
# WICHTIG: Warte auf [READY] bevor TICKs gesendet werden!
# ════════════════════════════════════════════════════════════════
console.print("[dim]Heartbeat wartet auf [READY]...[/dim]")
self._claude_ready.wait() # Blockiert bis _send_start_signal() das Event setzt
console.print("[green]Heartbeat startet![/green]")
while self.running:
try:
# Warte bis Claude fertig ist mit Tippen
while self.running and self.chat.is_claude_typing():
logger.debug("Claude tippt noch, warte...")
time.sleep(check_interval)
if not self.running:
break
# Warte bis Recording fertig ist (5s Stille)
if self._recording.is_set():
logger.debug("Stefan spricht noch, warte auf Stille...")
while self.running and self._recording.is_set():
time.sleep(0.5)
logger.debug("Stefan fertig, fahre fort")
if not self.running:
break
# Zufällige Pause nach Claudes Antwort (natürlicheres Tempo)
pause = random.uniform(min_pause, max_pause)
time.sleep(pause)
if not self.running:
break
# Stefan-Buffer holen (falls er was gesagt hat)
stefan_text = self._get_and_clear_stefan_buffer()
# Warte bis vorheriges Senden fertig ist
self._sending.wait(timeout=30) # Max 30s warten
# Nächsten TICK senden (mit oder ohne Bild)
with self._lock:
# Signalisiere dass wir senden
self._sending.clear()
try:
# Erst Bild hochladen wenn aktiviert
image_uploaded = False
if upload_images:
# Bild holen
if not self.chat.fetch_image_from_esp32():
logger.warning("Konnte kein Bild vom ESP32 holen")
else:
# Nur hochladen wenn sich das Bild geändert hat (100 Bilder Limit!)
if self.chat.upload_image_if_changed():
image_uploaded = True
uploaded_count = self.chat.get_images_uploaded_count()
# Warnungen bei Annäherung ans Limit
if uploaded_count >= 95:
console.print(f"[bold red]⚠️ {uploaded_count}/100 Bilder! Drücke 'N' für neuen Chat![/bold red]")
elif uploaded_count >= 90:
console.print(f"[yellow]⚠️ {uploaded_count}/100 Bilder - Limit fast erreicht![/yellow]")
elif uploaded_count % 10 == 0:
console.print(f"[dim]📷 {uploaded_count} Bilder hochgeladen (Limit: 100)[/dim]")
else:
logger.debug("Bild unverändert, übersprungen")
# Nachricht zusammenbauen
if stefan_text:
# Stefan hat was gesagt → Mit TICK senden
tick_message = f"[TICK]\n\nStefan sagt: {stefan_text}"
console.print(f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text[:50]}...\"[/cyan]" if len(stefan_text) > 50 else f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text}\"[/cyan]")
else:
# Nur TICK
tick_message = "[TICK]"
success = self.chat.send_message(tick_message)
if success:
self.stats.ticks_sent += 1
self.stats.consecutive_errors = 0 # Reset
logger.debug(f"TICK #{self.stats.ticks_sent}" + (" mit Bild" if upload_images else "") + (f" + Stefan: {stefan_text[:30]}" if stefan_text else ""))
else:
raise Exception("TICK fehlgeschlagen")
finally:
# Senden fertig - wieder freigeben
self._sending.set()
except Exception as e:
logger.error(f"Heartbeat-Fehler: {e}")
self.stats.errors += 1
self.stats.consecutive_errors += 1
# Bei zu vielen Fehlern: Bridge stoppen
if self.stats.consecutive_errors >= max_errors:
console.print(f"\n[bold red]FEHLER: {max_errors} Fehler in Folge![/bold red]")
console.print("[red]Chat nicht erreichbar - stoppe Bridge.[/red]")
self.running = False
break
# Warte etwas länger bei Fehlern
time.sleep(5)
def _tts_loop(self):
"""
Liest neue Claude-Nachrichten vor.
Filtert dabei [BEFEHLE] und technische Teile raus,
sodass nur der "menschliche" Text gesprochen wird.
"""
if not self.tts:
logger.warning("TTS nicht verfügbar")
return
logger.info("TTS-Loop gestartet")
while self.running:
try:
# Hole neue Nachrichten
messages = self.chat.get_new_messages(since_id=self.last_assistant_message_id)
if messages:
logger.debug(f"TTS: {len(messages)} neue Nachrichten gefunden")
for msg in messages:
logger.debug(f"TTS: Nachricht - assistant={msg.is_from_assistant}, id={msg.id[:20]}..., text={msg.text[:50]}...")
if msg.is_from_assistant:
self.last_assistant_message_id = msg.id
# Text für Sprache aufbereiten
speech_text = self._clean_for_speech(msg.text)
logger.debug(f"TTS: Nach Bereinigung: {len(speech_text) if speech_text else 0} Zeichen")
if speech_text and len(speech_text) > 5:
# In Konsole anzeigen
console.print(f"\n[bold blue]Claude:[/bold blue] {speech_text[:200]}")
if len(speech_text) > 200:
console.print(f"[dim]...({len(speech_text)} Zeichen)[/dim]")
# Vorlesen
logger.info(f"TTS: Spreche {len(speech_text)} Zeichen...")
self.tts.speak(speech_text)
self.stats.messages_spoken += 1
logger.debug("TTS: Sprechen beendet")
else:
logger.debug(f"TTS: Text zu kurz oder leer, übersprungen")
else:
logger.debug(f"TTS: Nachricht ist nicht von Claude, übersprungen")
except Exception as e:
logger.error(f"TTS-Loop-Fehler: {e}")
self.stats.errors += 1
time.sleep(0.5)
def _stt_loop(self):
"""
Hört auf Stefan und sammelt seine Worte im Buffer.
Ablauf:
1. Warte auf erste Spracheingabe
2. Signalisiere Recording aktiv → Heartbeat pausiert
3. Sammle weitere Eingaben bis 5 Sekunden Stille
4. Recording fertig → Buffer wird mit nächstem TICK gesendet
Wenn gemutet → Ignoriert alle Eingaben
"""
if not self.stt:
logger.warning("STT nicht verfügbar")
return
logger.info(f"STT-Loop gestartet (5s Stille = fertig)")
# Temporärer Buffer für aktuelle Aufnahme-Session
current_session_texts = []
last_speech_time = 0
while self.running:
try:
# Wenn gemutet, kurz warten und überspringen
if self.is_muted():
# Falls wir mitten in einer Aufnahme waren, diese beenden
if self._recording.is_set():
self._finalize_recording(current_session_texts)
current_session_texts = []
time.sleep(0.5)
continue
# Warte auf Sprache (kurzer Timeout für schnelle Reaktion)
result = self.stt.listen_once(timeout=1)
# Nochmal prüfen nach dem Hören (falls zwischendurch gemutet wurde)
if self.is_muted():
continue
if result and result.text and len(result.text) > 2:
# Sprache erkannt!
current_session_texts.append(result.text)
last_speech_time = time.time()
self.stats.stefan_inputs += 1
# Signalisiere dass Recording aktiv ist
if not self._recording.is_set():
self._recording.set()
console.print(f"\n[bold green]🎤 Stefan spricht...[/bold green]")
logger.debug("Recording gestartet")
console.print(f"[green] → {result.text}[/green]")
logger.debug(f"Stefan-Session: {len(current_session_texts)} Teile")
else:
# Keine Sprache erkannt - prüfe auf Stille-Timeout
if self._recording.is_set() and last_speech_time > 0:
silence_duration = time.time() - last_speech_time
if silence_duration >= self._silence_timeout:
# 5 Sekunden Stille - Aufnahme beenden
self._finalize_recording(current_session_texts)
current_session_texts = []
last_speech_time = 0
except Exception as e:
# Timeout ist normal
if "timeout" not in str(e).lower():
logger.error(f"STT-Loop-Fehler: {e}")
self.stats.errors += 1
def _finalize_recording(self, texts: list):
"""
Beendet eine Aufnahme-Session und speichert im Buffer.
Args:
texts: Liste der erkannten Texte in dieser Session
"""
if not texts:
self._recording.clear()
return
# Alle Texte zusammenfügen
full_text = " ".join(texts)
# In Haupt-Buffer speichern
with self._stefan_buffer_lock:
self._stefan_buffer.append(full_text)
console.print(f"\n[bold green]✓ Stefan (komplett):[/bold green] {full_text}")
logger.info(f"Recording beendet: {len(texts)} Teile → {len(full_text)} Zeichen")
# Recording-Flag zurücksetzen → Heartbeat kann weitermachen
self._recording.clear()
def _keyboard_loop(self):
"""
Hört auf Tastatureingaben für Mute-Toggle und andere Befehle.
Tasten:
- M: Mute/Unmute Toggle
- N: Neuer Chat starten (bei Bilder-Limit)
- Q: Beenden (alternativ zu Ctrl+C)
"""
import sys
import tty
import termios
logger.info("Keyboard-Loop gestartet (M=Mute, N=New Chat, Q=Quit)")
# Speichere ursprüngliche Terminal-Settings
old_settings = None
try:
old_settings = termios.tcgetattr(sys.stdin)
except:
logger.warning("Konnte Terminal-Settings nicht lesen (kein TTY?)")
return
try:
# Terminal in raw mode setzen (einzelne Tasten ohne Enter)
tty.setraw(sys.stdin.fileno())
while self.running:
# Lese einzelnes Zeichen (blockierend, aber mit select für Timeout)
import select
if select.select([sys.stdin], [], [], 0.5)[0]:
char = sys.stdin.read(1)
if char.lower() == 'm':
self.toggle_mute()
elif char.lower() == 'n':
self._start_new_chat_with_instructions()
elif char.lower() == 'q' or char == '\x03': # q oder Ctrl+C
console.print("\n[yellow]Beende Bridge...[/yellow]")
self.running = False
break
except Exception as e:
logger.debug(f"Keyboard-Loop Fehler: {e}")
finally:
# Terminal-Settings wiederherstellen
if old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def _start_new_chat_with_instructions(self):
"""Startet einen neuen Chat und sendet die Instruktionen erneut"""
console.print("\n[yellow]Starte neuen Chat...[/yellow]")
# Heartbeat pausieren
self._claude_ready.clear()
with self._lock:
new_url, old_chat_id = self.chat.start_new_chat()
if new_url:
console.print(f"[green]Neuer Chat geöffnet (URL kommt nach Instruktion)[/green]")
if old_chat_id:
console.print(f"[dim]Vorheriger Chat: {old_chat_id}[/dim]")
# NICHT hier speichern! URL ist noch /new
# Erst nach dem Senden der Instruktion hat der Chat eine echte ID
# Warte bis der neue Chat vollständig geladen ist
console.print("[dim]Warte 10s bis Chat geladen...[/dim]")
time.sleep(10)
console.print("[cyan]Sende Instruktionen mit Referenz zum alten Chat...[/cyan]")
# Instruktionen erneut senden (mit Referenz zum alten Chat)
if self._send_start_signal(previous_chat_id=old_chat_id):
console.print("[bold green]Claude ist bereit! Heartbeat läuft weiter.[/bold green]\n")
# JETZT hat der Chat eine echte ID - URL speichern
final_url = self.chat.get_current_chat_url()
if final_url and '/chat/' in final_url and '/new' not in final_url:
self._save_chat_url_to_config(final_url)
console.print(f"[dim]Chat-ID: {self.chat.extract_chat_id(final_url)}[/dim]")
else:
console.print("[yellow]⚠ Konnte finale Chat-URL nicht ermitteln[/yellow]")
else:
console.print("[red]Claude hat nicht mit [READY] geantwortet.[/red]")
else:
console.print("[red]Konnte keinen neuen Chat starten![/red]")
# Heartbeat wieder aktivieren
self._claude_ready.set()
def _get_and_clear_stefan_buffer(self) -> Optional[str]:
"""
Holt den Stefan-Buffer und leert ihn.
Returns:
Zusammengefasster Text oder None wenn Buffer leer
"""
with self._stefan_buffer_lock:
if not self._stefan_buffer:
return None
# Alles zusammenfassen
text = " ".join(self._stefan_buffer)
self._stefan_buffer = []
return text
def toggle_mute(self) -> bool:
"""
Schaltet Mute um.
Returns:
True wenn jetzt gemutet, False wenn ungemutet
"""
with self._mute_lock:
self._muted = not self._muted
status = "MUTED" if self._muted else "UNMUTED"
console.print(f"\n[bold {'red' if self._muted else 'green'}]🎤 Mikrofon {status}[/bold {'red' if self._muted else 'green'}]")
return self._muted
def set_mute(self, muted: bool):
"""Setzt Mute-Status direkt"""
with self._mute_lock:
self._muted = muted
status = "MUTED" if self._muted else "UNMUTED"
console.print(f"\n[bold {'red' if self._muted else 'green'}]🎤 Mikrofon {status}[/bold {'red' if self._muted else 'green'}]")
def is_muted(self) -> bool:
"""Prüft ob gemutet"""
with self._mute_lock:
return self._muted
def _clean_for_speech(self, text: str) -> str:
"""
Entfernt Befehle und technische Teile aus dem Text.
Was rausgefiltert wird:
- [TICK], [START] und andere Marker
- [FORWARD], [LEFT] etc. Fahrbefehle
- [LOOK_LEFT] etc. Kamerabefehle
- *Aktionen* in Sternchen
- API-Call Beschreibungen
"""
# Marker entfernen
text = re.sub(r'\[TICK\]', '', text)
text = re.sub(r'\[START\]', '', text)
# Fahrbefehle entfernen
text = re.sub(r'\[(FORWARD|BACKWARD|LEFT|RIGHT|STOP)\]', '', text)
# Kamerabefehle entfernen
text = re.sub(r'\[(LOOK_LEFT|LOOK_RIGHT|LOOK_UP|LOOK_DOWN|LOOK_CENTER)\]', '', text)
# Aktionen in Sternchen entfernen (*holt Bild*, *schaut*, etc.)
text = re.sub(r'\*[^*]+\*', '', text)
# API-Calls entfernen
text = re.sub(r'(GET|POST)\s+/api/\S+', '', text)
text = re.sub(r'web_fetch\([^)]+\)', '', text)
# Code-Blöcke entfernen
text = re.sub(r'```[^`]+```', '', text)
text = re.sub(r'`[^`]+`', '', text)
# URLs entfernen (optional, könnte man auch lassen)
# text = re.sub(r'https?://\S+', '', text)
# Mehrfache Leerzeichen/Zeilenumbrüche bereinigen
text = re.sub(r'\n\s*\n', '\n', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def _print_status(self):
"""Gibt Status in regelmäßigen Abständen aus (optional)"""
# Könnte hier eine Live-Statusanzeige einbauen
pass
def signal_handler(signum, frame):
"""Behandelt Ctrl+C"""
console.print("\n[yellow]Signal empfangen, beende...[/yellow]")
sys.exit(0)
@click.command()
@click.option('--config', '-c', default='config.yaml', help='Pfad zur Config-Datei')
@click.option('--test', is_flag=True, help='Nur Test-Modus (kein Heartbeat)')
@click.option('--debug', '-d', is_flag=True, help='Debug-Logging aktivieren')
def main(config: str, test: bool, debug: bool):
"""
Claude's Eyes - Audio Bridge
Verbindet Claude.ai Chat mit Audio (TTS/STT).
Claude steuert den Roboter SELBST - wir machen nur Audio!
"""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
# Signal Handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Config-Pfad finden
config_path = Path(config)
if not config_path.is_absolute():
script_dir = Path(__file__).parent
if (script_dir / config).exists():
config_path = script_dir / config
# Bridge erstellen und starten
bridge = ClaudesEyesAudioBridge(str(config_path))
if bridge.initialize():
if test:
console.print("[yellow]Test-Modus - kein automatischer Start[/yellow]")
console.print("Drücke Enter um eine Test-Nachricht zu senden...")
input()
bridge.chat.send_message("[TEST] Das ist ein Test der Audio Bridge!")
console.print("Warte 10 Sekunden auf Antwort...")
time.sleep(10)
bridge.stop()
else:
bridge.start()
else:
console.print("[red]Initialisierung fehlgeschlagen![/red]")
sys.exit(1)
if __name__ == "__main__":
main()