esp32-claude-robbie/python_bridge/chat_audio_bridge.py

1355 lines
61 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Claude's Eyes - Audio Bridge
Verbindet den echten Claude.ai Chat mit Audio (TTS/STT).
WICHTIG: Claude steuert den Roboter SELBST via web_fetch!
Diese Bridge macht NUR:
1. HEARTBEAT - Sendet [TICK] damit Claude "aufwacht"
2. TTS - Liest Claudes Antworten vor
3. STT - Hört auf Stefan und tippt seine Worte in den Chat
Das ist NICHT der alte API-Ansatz. ICH (Claude im Chat) bin der echte Claude
mit dem vollen Kontext unserer Gespräche!
Usage:
python chat_audio_bridge.py # Mit config.yaml
python chat_audio_bridge.py --config my.yaml # Eigene Config
python chat_audio_bridge.py --test # Nur testen
"""
import os
import sys
import time
import threading
import random
import re
import signal
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
# ALSA/JACK Fehlermeldungen unterdrücken (harmlose Audio-Backend Warnungen)
# Muss VOR dem Import von Audio-Modulen passieren
from ctypes import CDLL, c_char_p, c_int
try:
asound = CDLL("libasound.so.2")
asound.snd_lib_error_set_handler(lambda *args: None)
except:
pass # Kein ALSA - kein Problem
import json
import yaml
import click
from rich.console import Console
from rich.panel import Panel
from rich.live import Live
from rich.table import Table
from rich.text import Text
from chat_web_interface import ClaudeChatInterface, ChatMessage
from tts_engine import create_tts_engine, TTSEngine
from stt_engine import create_stt_engine, STTEngine, SpeechResult
# Logging Setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("bridge.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# urllib3 Connection Pool Warnungen unterdrücken (Selenium-intern, harmlos)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
# Rich Console für schöne Ausgabe
console = Console()
@dataclass
class BridgeStats:
"""Statistiken der Bridge"""
ticks_sent: int = 0
messages_spoken: int = 0
stefan_inputs: int = 0
errors: int = 0
consecutive_errors: int = 0 # Fehler in Folge
start_time: float = 0
class ClaudesEyesAudioBridge:
"""
Audio Bridge für Claude's Eyes.
Diese Klasse verbindet:
- Claude.ai Chat (Browser via Selenium)
- Text-to-Speech (Claudes Stimme)
- Speech-to-Text (Stefans Mikrofon)
Claude steuert den Roboter SELBST - wir machen nur den Audio-Teil!
"""
def __init__(self, config_path: str):
self.config_path = Path(config_path)
self.config = self._load_config(config_path)
self.running = False
self.stats = BridgeStats()
self._previous_chat_id: Optional[str] = None # Für Referenz zum alten Chat
# Komponenten (werden in initialize() erstellt)
self.chat: Optional[ClaudeChatInterface] = None
self.tts: Optional[TTSEngine] = None
self.stt: Optional[STTEngine] = None
# State
self.last_assistant_message_id: Optional[str] = None
self._current_chat_id: Optional[str] = None # Aktuelle Chat-ID für TTS-State
self._lock = threading.Lock()
# Ready-Flag: Heartbeat UND TTS warten bis Claude [READY] gesendet hat
self._claude_ready = threading.Event()
# TTS-Active-Flag: TTS wird erst aktiviert nachdem [READY] empfangen UND
# die letzte Nachricht-ID gesetzt wurde (um alte Nachrichten zu ignorieren)
self._tts_active = threading.Event()
# Stefan-Buffer: Sammelt Spracheingaben während Claude tippt
self._stefan_buffer: list = []
self._stefan_buffer_lock = threading.Lock()
# Send-Lock: Verhindert dass TICKs während Senden reinkommen
self._sending = threading.Event()
self._sending.set() # Anfangs nicht am Senden (set = frei)
# Recording-Flag: Wenn gesetzt, wird gerade aufgenommen
# Heartbeat pausiert während Aufnahme aktiv ist
self._recording = threading.Event()
self._recording.clear() # Anfangs nicht am Aufnehmen
# Speaking-Flag: Wenn gesetzt, spricht TTS gerade
# Heartbeat pausiert während TTS spricht (wir lassen Claude ausreden!)
self._speaking = threading.Event()
self._speaking.clear() # Anfangs nicht am Sprechen
# Awaiting-TTS-Flag: Wird gesetzt wenn Nachricht gesendet wurde
# und wir auf TTS warten. Erst clearen wenn TTS komplett fertig ist.
# So wartet Heartbeat auch während gTTS das Audio generiert.
self._awaiting_tts = threading.Event()
self._awaiting_tts.clear() # Anfangs nicht wartend
# Mute-Flag: Wenn True, ignoriert STT alle Eingaben
# Startet gemutet um ungewollte Aufnahmen zu vermeiden
self._muted = True
self._mute_lock = threading.Lock()
# Silence-Timeout: Wie lange Stille bevor Aufnahme als fertig gilt
# 3 Sekunden erlaubt natürliche Denkpausen beim Sprechen
self._silence_timeout = 3.0 # Sekunden
def _load_config(self, config_path: str) -> dict:
"""Lädt die Konfiguration"""
path = Path(config_path)
# Versuche .local Version zuerst
local_path = path.parent / f"{path.stem}.local{path.suffix}"
if local_path.exists():
path = local_path
logger.info(f"Nutze lokale Config: {path}")
# Merke den tatsächlichen Pfad für späteres Speichern
self._actual_config_path = path
# Pfad für TTS-State (vorgelesene Nachrichten)
self._tts_state_path = path.parent / ".tts_state.json"
if not path.exists():
logger.error(f"Config nicht gefunden: {path}")
sys.exit(1)
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _load_tts_state(self, chat_id: str) -> Optional[str]:
"""
Lädt die zuletzt vorgelesene Nachricht-ID für einen Chat.
Args:
chat_id: Die Chat-ID (aus der URL)
Returns:
Die zuletzt vorgelesene Nachricht-ID oder None
"""
try:
if self._tts_state_path.exists():
with open(self._tts_state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
last_id = state.get(chat_id)
if last_id:
logger.info(f"TTS-State geladen für Chat {chat_id[:20]}...: {last_id[:30]}...")
return last_id
except Exception as e:
logger.debug(f"Konnte TTS-State nicht laden: {e}")
return None
def _save_tts_state(self, chat_id: str, message_id: str):
"""
Speichert die zuletzt vorgelesene Nachricht-ID für einen Chat.
Args:
chat_id: Die Chat-ID (aus der URL)
message_id: Die Nachricht-ID die zuletzt vorgelesen wurde
"""
try:
# Existierenden State laden oder leeren dict
state = {}
if self._tts_state_path.exists():
try:
with open(self._tts_state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
except:
pass
# State aktualisieren
state[chat_id] = message_id
# Speichern
with open(self._tts_state_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
logger.debug(f"TTS-State gespeichert: {chat_id[:20]}... -> {message_id[:30]}...")
except Exception as e:
logger.debug(f"Konnte TTS-State nicht speichern: {e}")
def _save_chat_url_to_config(self, new_url: str):
"""
Speichert die neue Chat-URL in der config.yaml.
Aktualisiert nur den chat.url Wert, behält Rest der Datei bei.
"""
try:
config_path = getattr(self, '_actual_config_path', self.config_path)
# Config-Datei lesen und als Text bearbeiten (um Kommentare zu erhalten)
with open(config_path, 'r', encoding='utf-8') as f:
content = f.read()
# URL im Text ersetzen (regex für chat.url Zeile)
import re
old_url = self.config.get("chat", {}).get("url", "")
if old_url:
# Ersetze die alte URL mit der neuen
content = content.replace(old_url, new_url)
else:
# Fallback: Ersetze die Zeile mit url:
content = re.sub(
r'(url:\s*["\'])([^"\']+)(["\'])',
rf'\g<1>{new_url}\g<3>',
content
)
# Speichern
with open(config_path, 'w', encoding='utf-8') as f:
f.write(content)
# Config im Speicher auch aktualisieren
self.config["chat"]["url"] = new_url
logger.info(f"Neue Chat-URL in Config gespeichert: {new_url}")
console.print(f"[green]✓ Config aktualisiert: {config_path}[/green]")
except Exception as e:
logger.error(f"Fehler beim Speichern der Chat-URL: {e}")
console.print(f"[yellow]⚠ Konnte Config nicht aktualisieren: {e}[/yellow]")
def initialize(self) -> bool:
"""Initialisiert alle Komponenten"""
console.print(Panel.fit(
"[bold cyan]Claude's Eyes[/bold cyan]\n"
"[dim]Audio Bridge v2.0[/dim]\n\n"
"[yellow]ICH (Claude) steuere den Roboter selbst![/yellow]\n"
"[dim]Diese Bridge macht nur Audio.[/dim]",
border_style="cyan"
))
# ==========================================
# 1. Text-to-Speech (ZUERST - mit Test!)
# ==========================================
console.print("\n[yellow]Initialisiere Text-to-Speech...[/yellow]")
tts_config = self.config.get("tts", {})
use_termux = self.config.get("termux", {}).get("use_termux_api", False)
try:
engine_type = "termux" if use_termux else tts_config.get("engine", "pyttsx3")
self.tts = create_tts_engine(
engine_type=engine_type,
language=tts_config.get("language", "de"),
rate=tts_config.get("rate", 150),
volume=tts_config.get("volume", 0.9),
speed=tts_config.get("speed", 1.0) # Geschwindigkeit für gTTS
)
speed_info = f", {tts_config.get('speed', 1.0)}x" if engine_type == "gtts" and tts_config.get("speed", 1.0) != 1.0 else ""
console.print(f"[green]TTS Engine erstellt ({engine_type}{speed_info})[/green]")
except Exception as e:
console.print(f"[red]TTS Engine konnte nicht erstellt werden: {e}[/red]")
logger.error(f"TTS Engine Fehler: {e}", exc_info=True)
console.print("[dim]Fortfahren ohne TTS[/dim]")
self.tts = None
# TTS Test - spreche einen Testtext BEVOR der Browser startet
if self.tts:
console.print("[cyan]🔊 TTS Test - du solltest jetzt etwas hören...[/cyan]")
test_text = "Claudes Augen werden aktiviert."
try:
logger.info(f"TTS Test: '{test_text}'")
self.tts.speak(test_text)
console.print("[green]✓ TTS Test abgeschlossen![/green]")
except Exception as e:
console.print(f"[red]TTS Test fehlgeschlagen: {e}[/red]")
logger.error(f"TTS speak() Fehler: {e}", exc_info=True)
console.print("[yellow]TTS deaktiviert - Fortfahren ohne Sprachausgabe[/yellow]")
self.tts = None
else:
console.print("[yellow]⚠ Kein TTS verfügbar - überspringe Test[/yellow]")
# ==========================================
# 2. Chat Interface (Selenium Browser)
# ==========================================
console.print("\n[yellow]Starte Browser für Claude.ai...[/yellow]")
chat_config = self.config.get("chat", {})
chat_url = chat_config.get("url")
esp32_config = self.config.get("esp32", {})
if not chat_url:
console.print("[red]FEHLER: Keine Chat-URL in config.yaml![/red]")
console.print("[dim]Setze chat.url auf deine Claude.ai Chat-URL[/dim]")
return False
# ESP32 URL bauen
esp32_host = esp32_config.get("host", "localhost")
esp32_port = esp32_config.get("port", 5000)
esp32_url = f"http://{esp32_host}:{esp32_port}" if esp32_port != 80 else f"http://{esp32_host}"
esp32_api_key = esp32_config.get("api_key")
try:
self.chat = ClaudeChatInterface(
chat_url=chat_url,
headless=chat_config.get("headless", False),
user_data_dir=chat_config.get("user_data_dir"),
chrome_binary=chat_config.get("chrome_binary"),
esp32_url=esp32_url,
esp32_api_key=esp32_api_key
)
console.print("[green]Browser gestartet![/green]")
console.print(f"[dim]ESP32/Mock: {esp32_url}[/dim]")
except Exception as e:
console.print(f"[red]Browser-Fehler: {e}[/red]")
return False
# ==========================================
# 3. Speech-to-Text
# ==========================================
console.print("\n[yellow]Initialisiere Speech-to-Text...[/yellow]")
stt_config = self.config.get("stt", {})
try:
engine_type = "termux" if use_termux else "standard"
self.stt = create_stt_engine(
engine_type=engine_type,
service=stt_config.get("service", "google"),
language=stt_config.get("language", "de-DE"),
energy_threshold=stt_config.get("energy_threshold", 300),
pause_threshold=stt_config.get("pause_threshold", 0.8),
phrase_time_limit=stt_config.get("phrase_time_limit", 15)
)
console.print(f"[green]STT bereit![/green]")
except Exception as e:
console.print(f"[yellow]STT-Warnung: {e}[/yellow]")
console.print("[dim]Fortfahren ohne STT[/dim]")
self.stt = None
console.print("\n" + "=" * 50)
console.print("[bold green]Alle Systeme bereit![/bold green]")
console.print("=" * 50 + "\n")
return True
def start(self):
"""Startet die Bridge"""
self.running = True
self.stats.start_time = time.time()
# Starte alle Threads
threads = []
# Thread 1: Heartbeat - hält Claude am Leben
t1 = threading.Thread(target=self._heartbeat_loop, name="Heartbeat", daemon=True)
t1.start()
threads.append(t1)
# Thread 2: TTS - liest Claudes Antworten vor
t2 = threading.Thread(target=self._tts_loop, name="TTS", daemon=True)
t2.start()
threads.append(t2)
# Thread 3: STT - hört auf Stefan
if self.stt:
t3 = threading.Thread(target=self._stt_loop, name="STT", daemon=True)
t3.start()
threads.append(t3)
# Thread 4: Keyboard-Listener für Mute-Toggle
t4 = threading.Thread(target=self._keyboard_loop, name="Keyboard", daemon=True)
t4.start()
threads.append(t4)
console.print("[cyan]Bridge läuft![/cyan]")
console.print("[bold red]🎤 Mikrofon ist GEMUTET[/bold red] - Drücke 'M' zum Aktivieren")
console.print("[dim]Tasten: M=Mute/Unmute, N=Neuer Chat, Q=Beenden[/dim]\n")
# Sende Startsignal an Claude und warte auf [READY]
if not self._send_start_signal():
# [READY] nicht empfangen - Heartbeat bleibt blockiert
# Bridge läuft weiter (TTS/STT funktionieren noch)
pass
else:
console.print("[bold green]Claude ist bereit! Starte Heartbeat...[/bold green]\n")
# Halte Hauptthread am Leben
try:
while self.running:
time.sleep(1)
self._print_status()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
"""Stoppt die Bridge"""
console.print("\n[yellow]Stoppe Bridge...[/yellow]")
self.running = False
# TTS stoppen
if self.tts:
self.tts.stop()
# STT stoppen
if self.stt:
self.stt.stop_continuous()
# Browser schließen
if self.chat:
self.chat.close()
# Statistik ausgeben
runtime = time.time() - self.stats.start_time
console.print(f"\n[bold]Session-Statistik:[/bold]")
console.print(f" Laufzeit: {runtime / 60:.1f} Minuten")
console.print(f" Heartbeats: {self.stats.ticks_sent}")
console.print(f" Nachrichten gesprochen: {self.stats.messages_spoken}")
console.print(f" Stefan-Eingaben: {self.stats.stefan_inputs}")
console.print(f" Fehler: {self.stats.errors}")
console.print("\n[green]Bridge beendet.[/green]")
def _send_start_signal(self, previous_chat_id: Optional[str] = None) -> bool:
"""
Sendet das Startsignal und die Instruktionen an Claude.
Args:
previous_chat_id: Optional - Chat-ID des vorherigen Chats (für Kontinuität)
Returns:
True wenn Claude mit [READY] antwortet
"""
# Kontext vom vorherigen Chat, falls vorhanden
previous_chat_context = ""
if previous_chat_id:
previous_chat_context = f"""
## WICHTIG: Fortsetzung eines vorherigen Chats!
Dies ist eine Fortsetzung unserer Erkundung. Der vorherige Chat hatte die ID: {previous_chat_id}
Falls du dich an den Kontext erinnerst, kannst du dort weitermachen wo wir aufgehört haben.
Der Chat-Wechsel war nötig weil Claude.ai ein Limit von 100 Bildern pro Chat hat.
"""
# Instruktionen für Claude
intro_message = f"""[START] Hallo Claude! Du steuerst jetzt einen echten Roboter - "Claude's Eyes"!
{previous_chat_context}## Deine Fähigkeiten
Du hast Zugriff auf einen ESP32-Roboter mit:
- **Kamera** (OV5640, 120° Weitwinkel) - deine Augen
- **4 Motoren** - deine Beine
- **Pan/Tilt Servos** - um den Kopf zu drehen
- **Ultraschall-Sensor** - Hinderniserkennung
## Wie es funktioniert
1. Du bekommst regelmäßig `[TICK]` Nachrichten MIT EINEM BILD - das ist dein Herzschlag zum Erkunden
2. **Das Bild zeigt was der Roboter gerade sieht!** (Die Bridge lädt es automatisch hoch)
3. Stefan kann jederzeit mit dir reden - seine Nachrichten kommen als "Stefan sagt: ..." (OHNE [TICK]!)
4. Wenn Stefan was sagt, antworte auf ihn - du musst nicht aufs Bild eingehen
5. Deine Antworten werden vorgelesen (TTS) - der nächste TICK kommt erst wenn du fertig gesprochen hast
## Befehle geben
Schreib Befehle in eckigen Klammern - die Bridge führt sie aus:
**Fahren:**
- [FORWARD] - vorwärts fahren
- [BACKWARD] - rückwärts fahren
- [LEFT] - nach links drehen
- [RIGHT] - nach rechts drehen
- [STOP] - anhalten
**Kamera schwenken:**
- [LOOK_LEFT] - Kamera nach links
- [LOOK_RIGHT] - Kamera nach rechts
- [LOOK_UP] - Kamera nach oben
- [LOOK_DOWN] - Kamera nach unten
- [LOOK_CENTER] - Kamera zentrieren
Die Befehle werden aus der TTS-Ausgabe rausgefiltert.
## Dein Verhalten
**DU ENTSCHEIDEST SELBST** was du bei jedem TICK machst! Das ist keine Fernsteuerung.
Bei jedem TICK kannst du FREI WÄHLEN:
- **Bild auswerten** - wenn du was Interessantes siehst
- **Auf Stefan antworten** - wenn er was gesagt hat (kommt mit dem TICK)
- **Beides kombinieren** - Bild kommentieren UND auf Stefan eingehen
- **Einfach weiter erkunden** - kurzer Kommentar + Fahrbefehle
- **Nur schauen** - manchmal reicht auch nur schauen ohne fahren
**WICHTIG zum Bild:**
- Das Bild im TICK ist das AKTUELLE was der Roboter sieht
- Du musst nicht bei JEDEM TICK das Bild ausführlich beschreiben
- Wenn sich nicht viel geändert hat, kannst du auch einfach weiter fahren
- Wenn Stefan was fragt, antworte darauf - auch ohne das Bild zu kommentieren
- Du entscheidest was gerade wichtiger ist: Bild, Gespräch, oder beides
Sei neugierig! Stell Stefan Fragen. Ignorier seine Vorschläge wenn dich was anderes mehr interessiert.
## Antwort-Format
**WICHTIG:** Beginne JEDE deiner Antworten mit "Claude sagt:" (genau so, ohne Formatierung).
Das ist wichtig damit die TTS-Ausgabe weiß wo dein Text beginnt.
Beispiel: "Claude sagt: Oh, das ist interessant! Ich sehe einen Stuhl vor mir. [FORWARD]"
## WICHTIG: Bestätige mit [READY]
Wenn du diese Instruktionen verstanden hast, MUSST du am Ende deiner Antwort [READY] schreiben.
NICHT "Startklar", NICHT "Bereit", sondern EXAKT die Zeichenfolge [READY] mit eckigen Klammern.
Ohne [READY] kann ich nicht starten! Beispiel-Antwort:
"Claude sagt: Verstanden, ich bin bereit! [READY]"
Erst wenn ich [READY] sehe, starten die TICKs mit Bildern!"""
console.print("[cyan]→ Sende Instruktionen an Claude...[/cyan]")
# Sende mit Verzögerung vor dem Absenden (große Texte brauchen Zeit)
self.chat.send_message_with_delay(intro_message, delay_before_send=15)
console.print("[cyan]→ Warte auf [READY] Signal...[/cyan]")
# Warte auf [READY] - KEIN Timeout-Fallback!
# Heartbeat startet NUR wenn Claude wirklich [READY] sendet
# stop_check prüft ob Q gedrückt wurde (self.running = False)
if self.chat.wait_for_ready_signal(timeout=300, stop_check=lambda: not self.running): # 5 Minuten max
# ════════════════════════════════════════════════════════════════
# Chat-ID ermitteln und gespeicherten TTS-State laden
# ════════════════════════════════════════════════════════════════
current_url = self.chat.get_current_chat_url()
self._current_chat_id = self.chat.extract_chat_id(current_url)
if self._current_chat_id:
# Versuche gespeicherten State zu laden
saved_id = self._load_tts_state(self._current_chat_id)
if saved_id:
self.last_assistant_message_id = saved_id
console.print(f"[green]TTS: Setze fort ab gespeicherter Position[/green]")
logger.info(f"TTS setzt fort nach: {saved_id[:30]}...")
else:
# Kein gespeicherter State - starte nach [READY] Nachricht
last_msg = self.chat.get_last_assistant_message()
if last_msg:
self.last_assistant_message_id = last_msg.id
self._save_tts_state(self._current_chat_id, last_msg.id)
logger.info(f"TTS startet nach READY-Nachricht: {last_msg.id[:30]}...")
else:
self.last_assistant_message_id = f"init_{time.time()}"
logger.info("Keine letzte Nachricht gefunden, TTS startet frisch")
else:
# Keine Chat-ID (neuer Chat?) - nutze letzte Nachricht
last_msg = self.chat.get_last_assistant_message()
if last_msg:
self.last_assistant_message_id = last_msg.id
else:
self.last_assistant_message_id = f"init_{time.time()}"
logger.info("Keine Chat-ID, TTS startet frisch")
# Jetzt TTS und Heartbeat aktivieren
self._tts_active.set()
self._claude_ready.set()
console.print("[green]TTS aktiviert - ab jetzt werden neue Nachrichten vorgelesen[/green]")
return True
else:
# KEIN Fallback - Heartbeat bleibt blockiert
console.print("[bold red]FEHLER: Claude hat [READY] nicht gesendet![/bold red]")
console.print("[yellow]Heartbeat bleibt deaktiviert bis [READY] empfangen wird.[/yellow]")
console.print("[dim]Tipp: Schreib manuell im Chat oder starte die Bridge neu.[/dim]")
return False
def _heartbeat_loop(self):
"""
Sendet [TICK] MIT BILD wenn Claude bereit ist.
Ablauf:
1. Warten bis Claude fertig ist mit Tippen
2. Zufällige Pause (min_pause bis max_pause) für natürliches Tempo
3. Bild vom ESP32 holen und hochladen
4. [TICK] senden
Bei zu vielen Fehlern in Folge stoppt die Bridge.
Wenn auto_tick=false in config, werden keine TICKs gesendet.
Das ist der Debug-Modus - du sendest [TICK] dann manuell im Chat.
"""
hb_config = self.config.get("heartbeat", {})
auto_tick = hb_config.get("auto_tick", True)
upload_images = hb_config.get("upload_images", True) # Bilder hochladen?
max_errors = hb_config.get("max_consecutive_errors", 5)
check_interval = hb_config.get("check_interval", 1)
min_pause = hb_config.get("min_pause", 2)
max_pause = hb_config.get("max_pause", 4)
auto_new_chat = hb_config.get("auto_new_chat", True) # Auto-Neuer-Chat bei Bilder-Limit?
auto_new_chat_threshold = hb_config.get("auto_new_chat_threshold", 90) # Ab wieviel Bildern?
# Debug-Modus: Keine automatischen TICKs
if not auto_tick:
console.print("\n[yellow]DEBUG-MODUS: Automatische TICKs deaktiviert![/yellow]")
console.print("[dim]Sende [TICK] manuell im Claude.ai Chat um fortzufahren.[/dim]\n")
logger.info("Heartbeat deaktiviert (auto_tick=false)")
return
logger.info(f"Heartbeat gestartet (Pause: {min_pause}-{max_pause}s, max {max_errors} Fehler)")
# ════════════════════════════════════════════════════════════════
# WICHTIG: Warte auf [READY] bevor TICKs gesendet werden!
# ════════════════════════════════════════════════════════════════
console.print("[dim]Heartbeat wartet auf [READY]...[/dim]")
self._claude_ready.wait() # Blockiert bis _send_start_signal() das Event setzt
console.print("[green]Heartbeat startet![/green]")
while self.running:
try:
# Warte bis Claude fertig ist mit Tippen
while self.running and self.chat.is_claude_typing():
logger.debug("Claude tippt noch, warte...")
time.sleep(check_interval)
if not self.running:
break
# ════════════════════════════════════════════════════════════════
# WICHTIG: Warte bis TTS komplett fertig ist!
# _awaiting_tts wurde beim Senden gesetzt und wird erst
# gelöscht wenn TTS die Nachricht vorgelesen hat (oder wenn
# es nichts vorzulesen gab).
# ════════════════════════════════════════════════════════════════
if self._awaiting_tts.is_set():
logger.info("Heartbeat: Warte auf TTS (Nachricht wurde gesendet)...")
# Warte bis TTS komplett fertig ist (mit Timeout als Fallback!)
# Timeout verhindert dass Heartbeat ewig hängt wenn TTS die
# Nachricht nicht findet (z.B. bei ID-Mismatch oder nur Steuercodes)
tts_wait_start = time.time()
tts_timeout = 30.0 # Maximal 30 Sekunden warten
while self.running and self._awaiting_tts.is_set():
if self._speaking.is_set():
logger.debug("Heartbeat: Claude spricht gerade...")
# Timeout-Check
if time.time() - tts_wait_start > tts_timeout:
logger.warning(f"Heartbeat: TTS-Timeout nach {tts_timeout}s - fahre trotzdem fort")
console.print("[yellow]⚠️ TTS-Timeout - Nachricht evtl. nicht vorgelesen[/yellow]")
self._awaiting_tts.clear() # Forciere Weiterfahren
break
time.sleep(0.5)
logger.info("Heartbeat: TTS fertig, fahre fort")
if not self.running:
break
# ════════════════════════════════════════════════════════════════
# STEFAN-ZEIT: Nach Claudes Antwort warten ob Stefan was sagen will.
# Wenn Stefan spricht, warten wir AKTIV auf den Buffer.
# ════════════════════════════════════════════════════════════════
logger.debug("Warte auf Stefan...")
stefan_wait_start = time.time()
stefan_initial_timeout = 5.0 # Sekunden um mit Sprechen zu beginnen
stefan_has_spoken = False
while self.running and (time.time() - stefan_wait_start) < stefan_initial_timeout:
# Wenn Stefan anfängt zu sprechen, warte bis er fertig ist
if self._recording.is_set():
logger.debug("Stefan spricht, warte auf Stille...")
while self.running and self._recording.is_set():
time.sleep(0.3)
logger.debug("Stefan fertig mit Sprechen")
stefan_has_spoken = True
# AKTIV auf Buffer warten statt feste Zeiten!
# Warte bis Text im Buffer ankommt (max 30s Sicherheits-Timeout)
buffer_wait_start = time.time()
buffer_timeout = 30.0 # Sicherheits-Timeout
logger.debug("Warte aktiv auf Stefan-Buffer...")
while self.running and (time.time() - buffer_wait_start) < buffer_timeout:
with self._stefan_buffer_lock:
if self._stefan_buffer:
logger.debug(f"Stefan-Buffer gefüllt: {len(self._stefan_buffer)} Einträge")
break
time.sleep(0.2) # Kurzes Polling
else:
# Timeout erreicht ohne Buffer
logger.warning("Stefan-Buffer Timeout - kein Text angekommen")
logger.debug("STT-Verarbeitung abgeschlossen")
break
time.sleep(0.3)
if not self.running:
break
# Stefan-Buffer holen
stefan_text = self._get_and_clear_stefan_buffer()
# ════════════════════════════════════════════════════════════════
# ENTSCHEIDUNG: Was senden wir?
# - Wenn Stefan gesprochen hat → Seine Nachricht SOFORT senden
# (kein TICK nötig, sein Sprechen IST der Trigger)
# - Wenn Stefan NICHT gesprochen hat → Normaler TICK mit Bild
# ════════════════════════════════════════════════════════════════
# Warte bis vorheriges Senden fertig ist
self._sending.wait(timeout=30) # Max 30s warten
with self._lock:
self._sending.clear()
try:
if stefan_text:
# ════════════════════════════════════════════════════════════
# STEFAN HAT GESPROCHEN → Nur seine Nachricht senden!
# Kein TICK, kein Bild - Claude soll auf Stefan antworten.
# ════════════════════════════════════════════════════════════
stefan_message = f"Stefan sagt: {stefan_text}"
console.print(f"[green]🎤 Stefan:[/green] {stefan_text[:100]}{'...' if len(stefan_text) > 100 else ''}")
# WICHTIG: Signalisiere dass wir auf TTS warten
self._awaiting_tts.set()
logger.debug("_awaiting_tts gesetzt (Stefan-Nachricht)")
self.chat.send_message(stefan_message)
self.stats.ticks_sent += 1
self.consecutive_errors = 0
logger.info(f"Stefan-Nachricht gesendet: {len(stefan_text)} Zeichen")
else:
# ════════════════════════════════════════════════════════════
# STEFAN HAT NICHT GESPROCHEN → Normaler TICK mit Bild
# ════════════════════════════════════════════════════════════
# Kurze Pause für natürlicheres Tempo
pause = random.uniform(min_pause, max_pause)
time.sleep(pause)
# Bild hochladen wenn aktiviert
image_uploaded = False
if upload_images:
if not self.chat.fetch_image_from_esp32():
logger.warning("Konnte kein Bild vom ESP32 holen")
else:
if self.chat.upload_image_if_changed():
image_uploaded = True
uploaded_count = self.chat.get_images_uploaded_count()
# ════════════════════════════════════════════════════════════
# AUTO-NEUER-CHAT: Bei X+ Bildern automatisch neuen Chat!
# Nur wenn in config aktiviert (auto_new_chat: true)
# ════════════════════════════════════════════════════════════
if uploaded_count >= auto_new_chat_threshold:
if auto_new_chat:
console.print(f"\n[bold yellow]⚠️ {uploaded_count}/100 Bilder - Starte automatisch neuen Chat![/bold yellow]")
logger.info(f"Auto-Neuer-Chat bei {uploaded_count} Bildern (Threshold: {auto_new_chat_threshold})")
# Neuen Chat starten (wie bei 'N' Taste)
self._start_new_chat_with_instructions()
# Heartbeat-Loop neu starten (continue springt zum Anfang)
continue
else:
# Auto-New-Chat deaktiviert - nur warnen
if uploaded_count == auto_new_chat_threshold:
console.print(f"\n[bold yellow]⚠️ {uploaded_count}/100 Bilder - Drücke 'N' für neuen Chat![/bold yellow]")
logger.warning(f"Bilder-Limit erreicht ({uploaded_count}), auto_new_chat ist deaktiviert")
elif uploaded_count % 10 == 0:
console.print(f"[dim]📷 {uploaded_count} Bilder hochgeladen (Limit: 100)[/dim]")
else:
logger.debug("Bild unverändert, übersprungen")
# TICK senden
tick_message = "[TICK]"
console.print("[dim]→ TICK[/dim]")
# WICHTIG: Signalisiere dass wir auf TTS warten
self._awaiting_tts.set()
logger.debug("_awaiting_tts gesetzt (TICK)")
success = self.chat.send_message(tick_message)
if success:
self.stats.ticks_sent += 1
self.consecutive_errors = 0
logger.debug(f"TICK #{self.stats.ticks_sent}" + (" mit Bild" if image_uploaded else ""))
else:
raise Exception("TICK fehlgeschlagen")
finally:
# Senden fertig - wieder freigeben
self._sending.set()
except Exception as e:
logger.error(f"Heartbeat-Fehler: {e}")
self.stats.errors += 1
self.stats.consecutive_errors += 1
# Bei zu vielen Fehlern: Bridge stoppen
if self.stats.consecutive_errors >= max_errors:
console.print(f"\n[bold red]FEHLER: {max_errors} Fehler in Folge![/bold red]")
console.print("[red]Chat nicht erreichbar - stoppe Bridge.[/red]")
self.running = False
break
# Warte etwas länger bei Fehlern
time.sleep(5)
def _tts_loop(self):
"""
Liest neue Claude-Nachrichten vor.
Filtert dabei [BEFEHLE] und technische Teile raus,
sodass nur der "menschliche" Text gesprochen wird.
WICHTIG: Wartet auf _tts_active bevor Nachrichten vorgelesen werden!
So werden Init-Nachrichten und UI-Texte ignoriert.
"""
if not self.tts:
logger.warning("TTS nicht verfügbar")
return
logger.info("TTS-Loop gestartet, warte auf READY...")
# ════════════════════════════════════════════════════════════════
# WICHTIG: Warte bis TTS aktiviert wird (nach [READY])
# ════════════════════════════════════════════════════════════════
self._tts_active.wait()
logger.info("TTS aktiviert - beginne mit Vorlesen neuer Nachrichten")
# Marker ob wir die ID beim ersten Durchlauf synchronisieren müssen
needs_id_sync = True
# Zähler für leere Polls während _awaiting_tts gesetzt ist
empty_polls_while_awaiting = 0
while self.running:
try:
# Hole neue Nachrichten
current_since_id = self.last_assistant_message_id
messages = self.chat.get_new_messages(since_id=current_since_id)
# Debug: Zeige was wir suchen und was wir finden
if messages:
logger.info(f"TTS: {len(messages)} neue Nachrichten (since={current_since_id})")
empty_polls_while_awaiting = 0 # Reset bei Erfolg
# ════════════════════════════════════════════════════════════════
# Bei leerem Ergebnis: Prüfe ob wir die ID synchronisieren müssen
# Das passiert wenn die gespeicherte ID nicht mehr existiert
# (z.B. nach Neustart, Chat-Änderungen, etc.)
# ════════════════════════════════════════════════════════════════
if not messages and needs_id_sync:
last_msg = self.chat.get_last_assistant_message()
if last_msg:
old_id = self.last_assistant_message_id
self.last_assistant_message_id = last_msg.id
if self._current_chat_id:
self._save_tts_state(self._current_chat_id, last_msg.id)
logger.info(f"TTS: ID synchronisiert (alt: {old_id[:20] if old_id else 'None'}... -> neu: {last_msg.id[:20]}...)")
console.print("[dim]TTS: Position synchronisiert, warte auf neue Nachrichten[/dim]")
needs_id_sync = False # Nur einmal pro Session
# ════════════════════════════════════════════════════════════════
# FALLBACK: Wenn wir auf TTS warten aber keine Nachrichten finden,
# nach einigen Versuchen die ID neu synchronisieren und _awaiting_tts
# clearen damit Heartbeat nicht ewig hängt.
# ════════════════════════════════════════════════════════════════
if not messages and self._awaiting_tts.is_set():
empty_polls_while_awaiting += 1
logger.debug(f"TTS: Keine Nachrichten gefunden, awaiting_tts gesetzt (Versuch {empty_polls_while_awaiting}/20)")
if empty_polls_while_awaiting >= 20: # Nach ca. 10 Sekunden (20 * 0.5s)
logger.warning("TTS: Keine Nachricht gefunden nach 10s - synchronisiere ID neu")
last_msg = self.chat.get_last_assistant_message()
if last_msg:
old_id = self.last_assistant_message_id
self.last_assistant_message_id = last_msg.id
if self._current_chat_id:
self._save_tts_state(self._current_chat_id, last_msg.id)
logger.info(f"TTS: Notfall-Sync (alt: {old_id[:20] if old_id else 'None'}... -> neu: {last_msg.id[:20]}...)")
# _awaiting_tts clearen damit Heartbeat weitermachen kann
self._awaiting_tts.clear()
logger.info("TTS: _awaiting_tts.clear() (Notfall-Timeout)")
console.print("[yellow]⚠️ TTS: Nachricht nicht gefunden - übersprungen[/yellow]")
empty_polls_while_awaiting = 0
if messages:
logger.debug(f"TTS: {len(messages)} neue Nachrichten gefunden")
needs_id_sync = False # Erfolgreich gefunden, kein Sync nötig
for msg in messages:
logger.debug(f"TTS: Nachricht - assistant={msg.is_from_assistant}, id={msg.id}, text={msg.text[:50]}...")
if msg.is_from_assistant:
old_id = self.last_assistant_message_id
self.last_assistant_message_id = msg.id
logger.info(f"TTS: ID aktualisiert: {old_id} -> {msg.id}")
# State speichern (persistent)
if self._current_chat_id:
self._save_tts_state(self._current_chat_id, msg.id)
# Text für Sprache aufbereiten (Steuercodes etc. entfernen)
speech_text = self._clean_for_speech(msg.text)
logger.debug(f"TTS: Original: '{msg.text[:100]}...'")
logger.debug(f"TTS: Nach Bereinigung: '{speech_text[:100] if speech_text else ''}' ({len(speech_text) if speech_text else 0} Zeichen)")
# "Claude sagt:" Prefix entfernen falls vorhanden (wird nicht vorgelesen)
tts_text = speech_text
if tts_text.lower().startswith("claude sagt:"):
tts_text = tts_text[12:].strip()
logger.debug(f"TTS: 'Claude sagt:' entfernt, Rest: '{tts_text}' ({len(tts_text)} Zeichen)")
# Prüfe ob nach Entfernen noch sprechbarer Text übrig ist
# Kein Mindestlänge-Check damit auch kurze Antworten wie "Ja!" gesprochen werden
if not tts_text or not tts_text.strip():
logger.info(f"TTS: Nach Bereinigung kein sprechbarer Text übrig, übersprungen (Original: '{msg.text[:80]}')")
# Trotzdem _awaiting_tts clearen - die Nachricht wurde verarbeitet!
self._awaiting_tts.clear()
logger.debug("TTS: _awaiting_tts.clear() (kein sprechbarer Text)")
continue
# In Konsole anzeigen (ohne Prefix)
console.print(f"\n[bold blue]Claude:[/bold blue] {tts_text[:200]}")
if len(tts_text) > 200:
console.print(f"[dim]...({len(tts_text)} Zeichen)[/dim]")
# Vorlesen (ohne "Claude sagt:" - das ist ja klar)
# WICHTIG: Speaking-Flag setzen damit Heartbeat wartet!
logger.info(f"TTS: _speaking.set() - Spreche {len(tts_text)} Zeichen: '{tts_text[:50]}...'")
self._speaking.set() # Signalisiere: Claude spricht!
try:
self.tts.speak(tts_text)
finally:
self._speaking.clear() # Fertig mit Sprechen
self._awaiting_tts.clear() # TTS komplett fertig!
logger.info("TTS: _speaking.clear() + _awaiting_tts.clear() - Fertig mit Sprechen")
self.stats.messages_spoken += 1
else:
logger.debug(f"TTS: Nachricht ist nicht von Claude, übersprungen")
except Exception as e:
logger.error(f"TTS-Loop-Fehler: {e}")
self.stats.errors += 1
time.sleep(0.5)
def _stt_loop(self):
"""
Hört auf Stefan und sammelt seine Worte im Buffer.
Ablauf:
1. Warte auf erste Spracheingabe
2. Signalisiere Recording aktiv → Heartbeat pausiert
3. Sammle weitere Eingaben bis 5 Sekunden Stille
4. Recording fertig → Buffer wird mit nächstem TICK gesendet
Wenn gemutet → Ignoriert alle Eingaben
"""
if not self.stt:
logger.warning("STT nicht verfügbar")
return
logger.info(f"STT-Loop gestartet ({self._silence_timeout}s Stille = fertig)")
# Temporärer Buffer für aktuelle Aufnahme-Session
current_session_texts = []
last_speech_time = 0
while self.running:
try:
# Wenn gemutet, kurz warten und überspringen
if self.is_muted():
# Falls wir mitten in einer Aufnahme waren, diese beenden
if self._recording.is_set():
self._finalize_recording(current_session_texts)
current_session_texts = []
time.sleep(0.5)
continue
# WICHTIG: Wenn Claude spricht (TTS aktiv), nicht aufzeichnen!
# Das verhindert Echo (Mikrofon nimmt TTS auf).
# ABER: _awaiting_tts blockiert NICHT mehr das Mikrofon!
# Der User soll sprechen können auch wenn TTS noch wartet.
if self._speaking.is_set():
# Falls wir mitten in einer Aufnahme waren, diese beenden
if self._recording.is_set():
self._finalize_recording(current_session_texts)
current_session_texts = []
time.sleep(0.3)
continue
# Warte auf Sprache (kurzer Timeout für schnelle Reaktion)
result = self.stt.listen_once(timeout=1)
# Nochmal prüfen nach dem Hören (falls zwischendurch gemutet oder Claude spricht)
if self.is_muted() or self._speaking.is_set():
continue
if result and result.text and len(result.text) > 2:
# Sprache erkannt!
current_session_texts.append(result.text)
last_speech_time = time.time()
self.stats.stefan_inputs += 1
# Signalisiere dass Recording aktiv ist
if not self._recording.is_set():
self._recording.set()
console.print(f"\n[bold green]🎤 Stefan spricht...[/bold green]")
logger.debug("Recording gestartet")
console.print(f"[green] → {result.text}[/green]")
logger.debug(f"Stefan-Session: {len(current_session_texts)} Teile")
else:
# Keine Sprache erkannt - prüfe auf Stille-Timeout
if self._recording.is_set() and last_speech_time > 0:
silence_duration = time.time() - last_speech_time
if silence_duration >= self._silence_timeout:
# 5 Sekunden Stille - Aufnahme beenden
self._finalize_recording(current_session_texts)
current_session_texts = []
last_speech_time = 0
except Exception as e:
# Timeout ist normal
if "timeout" not in str(e).lower():
logger.error(f"STT-Loop-Fehler: {e}")
self.stats.errors += 1
def _finalize_recording(self, texts: list):
"""
Beendet eine Aufnahme-Session und speichert im Buffer.
Args:
texts: Liste der erkannten Texte in dieser Session
"""
if not texts:
self._recording.clear()
return
# Alle Texte zusammenfügen
full_text = " ".join(texts)
# In Haupt-Buffer speichern
with self._stefan_buffer_lock:
self._stefan_buffer.append(full_text)
console.print(f"\n[bold green]✓ Stefan (komplett):[/bold green] {full_text}")
logger.info(f"Recording beendet: {len(texts)} Teile → {len(full_text)} Zeichen")
# Recording-Flag zurücksetzen → Heartbeat kann weitermachen
self._recording.clear()
def _keyboard_loop(self):
"""
Hört auf Tastatureingaben für Mute-Toggle und andere Befehle.
Tasten:
- M: Mute/Unmute Toggle
- N: Neuer Chat starten (bei Bilder-Limit)
- Q: Beenden (alternativ zu Ctrl+C)
"""
import sys
import tty
import termios
logger.info("Keyboard-Loop gestartet (M=Mute, N=New Chat, Q=Quit)")
# Speichere ursprüngliche Terminal-Settings
old_settings = None
try:
old_settings = termios.tcgetattr(sys.stdin)
except:
logger.warning("Konnte Terminal-Settings nicht lesen (kein TTY?)")
return
try:
# Terminal in raw mode setzen (einzelne Tasten ohne Enter)
tty.setraw(sys.stdin.fileno())
while self.running:
# Lese einzelnes Zeichen (blockierend, aber mit select für Timeout)
import select
if select.select([sys.stdin], [], [], 0.5)[0]:
char = sys.stdin.read(1)
if char.lower() == 'm':
self.toggle_mute()
elif char.lower() == 'n':
self._start_new_chat_with_instructions()
elif char.lower() == 'q' or char == '\x03': # q oder Ctrl+C
console.print("\n[yellow]Beende Bridge...[/yellow]")
self.running = False
break
except Exception as e:
logger.debug(f"Keyboard-Loop Fehler: {e}")
finally:
# Terminal-Settings wiederherstellen
if old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def _start_new_chat_with_instructions(self):
"""Startet einen neuen Chat und sendet die Instruktionen erneut"""
console.print("\n[yellow]Starte neuen Chat...[/yellow]")
# Heartbeat UND TTS pausieren
self._claude_ready.clear()
self._tts_active.clear()
with self._lock:
new_url, old_chat_id = self.chat.start_new_chat()
if new_url:
console.print(f"[green]Neuer Chat geöffnet (URL kommt nach Instruktion)[/green]")
if old_chat_id:
console.print(f"[dim]Vorheriger Chat: {old_chat_id}[/dim]")
# NICHT hier speichern! URL ist noch /new
# Erst nach dem Senden der Instruktion hat der Chat eine echte ID
# Warte bis der neue Chat vollständig geladen ist
console.print("[dim]Warte 10s bis Chat geladen...[/dim]")
time.sleep(10)
console.print("[cyan]Sende Instruktionen mit Referenz zum alten Chat...[/cyan]")
# Instruktionen erneut senden (mit Referenz zum alten Chat)
if self._send_start_signal(previous_chat_id=old_chat_id):
console.print("[bold green]Claude ist bereit! Heartbeat läuft weiter.[/bold green]\n")
# JETZT hat der Chat eine echte ID - URL speichern
final_url = self.chat.get_current_chat_url()
if final_url and '/chat/' in final_url and '/new' not in final_url:
self._save_chat_url_to_config(final_url)
console.print(f"[dim]Chat-ID: {self.chat.extract_chat_id(final_url)}[/dim]")
else:
console.print("[yellow]⚠ Konnte finale Chat-URL nicht ermitteln[/yellow]")
else:
console.print("[red]Claude hat nicht mit [READY] geantwortet.[/red]")
else:
console.print("[red]Konnte keinen neuen Chat starten![/red]")
# Heartbeat wieder aktivieren
self._claude_ready.set()
def _get_and_clear_stefan_buffer(self) -> Optional[str]:
"""
Holt den Stefan-Buffer und leert ihn.
Returns:
Zusammengefasster Text oder None wenn Buffer leer
"""
with self._stefan_buffer_lock:
if not self._stefan_buffer:
return None
# Alles zusammenfassen
text = " ".join(self._stefan_buffer)
self._stefan_buffer = []
return text
def toggle_mute(self) -> bool:
"""
Schaltet Mute um.
Returns:
True wenn jetzt gemutet, False wenn ungemutet
"""
with self._mute_lock:
self._muted = not self._muted
status = "MUTED" if self._muted else "UNMUTED"
console.print(f"\n[bold {'red' if self._muted else 'green'}]🎤 Mikrofon {status}[/bold {'red' if self._muted else 'green'}]")
return self._muted
def set_mute(self, muted: bool):
"""Setzt Mute-Status direkt"""
with self._mute_lock:
self._muted = muted
status = "MUTED" if self._muted else "UNMUTED"
console.print(f"\n[bold {'red' if self._muted else 'green'}]🎤 Mikrofon {status}[/bold {'red' if self._muted else 'green'}]")
def is_muted(self) -> bool:
"""Prüft ob gemutet"""
with self._mute_lock:
return self._muted
def _contains_control_codes(self, text: str) -> bool:
"""
Prüft ob der Text Steuercodes enthält.
Nachrichten MIT Steuercodes sind reine Roboter-Befehle
und sollen NICHT vorgelesen werden.
Steuercodes:
- [FORWARD], [BACKWARD], [LEFT], [RIGHT], [STOP] - Fahrbefehle
- [LOOK_LEFT], [LOOK_RIGHT], [LOOK_UP], [LOOK_DOWN], [LOOK_CENTER] - Kamera
- [READY], [TICK], [START] - System-Marker
"""
control_codes = [
'[FORWARD]', '[BACKWARD]', '[LEFT]', '[RIGHT]', '[STOP]',
'[LOOK_LEFT]', '[LOOK_RIGHT]', '[LOOK_UP]', '[LOOK_DOWN]', '[LOOK_CENTER]',
'[READY]', '[TICK]', '[START]'
]
for code in control_codes:
if code in text:
return True
return False
def _clean_for_speech(self, text: str) -> str:
"""
Entfernt Befehle und technische Teile aus dem Text.
Was rausgefiltert wird:
- [TICK], [START] und andere Marker
- [FORWARD], [LEFT] etc. Fahrbefehle
- [LOOK_LEFT] etc. Kamerabefehle
- *Aktionen* in Sternchen
- API-Call Beschreibungen
"""
# Marker entfernen
text = re.sub(r'\[TICK\]', '', text)
text = re.sub(r'\[START\]', '', text)
text = re.sub(r'\[READY\]', '', text) # [READY] nicht vorlesen
# Fahrbefehle entfernen
text = re.sub(r'\[(FORWARD|BACKWARD|LEFT|RIGHT|STOP)\]', '', text)
# Kamerabefehle entfernen
text = re.sub(r'\[(LOOK_LEFT|LOOK_RIGHT|LOOK_UP|LOOK_DOWN|LOOK_CENTER)\]', '', text)
# Aktionen in Sternchen entfernen (*holt Bild*, *schaut*, etc.)
text = re.sub(r'\*[^*]+\*', '', text)
# API-Calls entfernen
text = re.sub(r'(GET|POST)\s+/api/\S+', '', text)
text = re.sub(r'web_fetch\([^)]+\)', '', text)
# Code-Blöcke entfernen
text = re.sub(r'```[^`]+```', '', text)
text = re.sub(r'`[^`]+`', '', text)
# URLs entfernen (optional, könnte man auch lassen)
# text = re.sub(r'https?://\S+', '', text)
# ════════════════════════════════════════════════════════════════
# WICHTIG: Zeilenumbrüche durch Leerzeichen ersetzen!
# TTS stoppt sonst beim ersten Zeilenumbruch.
# ════════════════════════════════════════════════════════════════
text = text.replace('\n', ' ')
# Mehrfache Leerzeichen bereinigen
text = re.sub(r' +', ' ', text)
return text.strip()
def _print_status(self):
"""Gibt Status in regelmäßigen Abständen aus (optional)"""
# Könnte hier eine Live-Statusanzeige einbauen
pass
def signal_handler(signum, frame):
"""Behandelt Ctrl+C"""
console.print("\n[yellow]Signal empfangen, beende...[/yellow]")
sys.exit(0)
@click.command()
@click.option('--config', '-c', default='config.yaml', help='Pfad zur Config-Datei')
@click.option('--test', is_flag=True, help='Nur Test-Modus (kein Heartbeat)')
@click.option('--debug', '-d', is_flag=True, help='Debug-Logging aktivieren')
def main(config: str, test: bool, debug: bool):
"""
Claude's Eyes - Audio Bridge
Verbindet Claude.ai Chat mit Audio (TTS/STT).
Claude steuert den Roboter SELBST - wir machen nur Audio!
"""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
# Signal Handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Config-Pfad finden
config_path = Path(config)
if not config_path.is_absolute():
script_dir = Path(__file__).parent
if (script_dir / config).exists():
config_path = script_dir / config
# Bridge erstellen und starten
bridge = ClaudesEyesAudioBridge(str(config_path))
if bridge.initialize():
if test:
console.print("[yellow]Test-Modus - kein automatischer Start[/yellow]")
console.print("Drücke Enter um eine Test-Nachricht zu senden...")
input()
bridge.chat.send_message("[TEST] Das ist ein Test der Audio Bridge!")
console.print("Warte 10 Sekunden auf Antwort...")
time.sleep(10)
bridge.stop()
else:
bridge.start()
else:
console.print("[red]Initialisierung fehlgeschlagen![/red]")
sys.exit(1)
if __name__ == "__main__":
main()