bridge mit ready und statemachine system

This commit is contained in:
2025-12-27 01:31:45 +01:00
parent ed2964bbbf
commit 90707055ce
36 changed files with 2791 additions and 355 deletions
+424
View File
@@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
Claude's Eyes - Main Bridge Script
Connects the ESP32 robot with Claude AI for autonomous exploration.
Usage:
python bridge.py # Use config.yaml
python bridge.py --config my.yaml # Use custom config
python bridge.py --simulate # Simulate without hardware
"""
import os
import sys
import time
import logging
import threading
import signal
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
import yaml
import click
from rich.console import Console
from rich.panel import Panel
from rich.live import Live
from rich.table import Table
from rich.text import Text
from esp32_client import ESP32Client, RobotStatus
from tts_engine import create_tts_engine, TTSEngine
from stt_engine import create_stt_engine, STTEngine, SpeechResult
from chat_interface import create_chat_interface, ChatInterface, ChatResponse
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Rich console for pretty output
console = Console()
@dataclass
class BridgeState:
"""Current state of the bridge"""
connected: bool = False
exploring: bool = False
last_image_time: float = 0
last_status: Optional[RobotStatus] = None
last_claude_response: str = ""
stefan_input: str = ""
error_message: str = ""
class ClaudesEyesBridge:
"""Main bridge class connecting robot and Claude"""
def __init__(self, config_path: str, simulate: bool = False):
self.config = self._load_config(config_path)
self.simulate = simulate
self.state = BridgeState()
self.running = False
# Components
self.robot: Optional[ESP32Client] = None
self.chat: Optional[ChatInterface] = None
self.tts: Optional[TTSEngine] = None
self.stt: Optional[STTEngine] = None
# Threading
self.speech_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
def _load_config(self, config_path: str) -> dict:
"""Load configuration from YAML file"""
path = Path(config_path)
# Try local config first
local_path = path.parent / f"{path.stem}.local{path.suffix}"
if local_path.exists():
path = local_path
logger.info(f"Using local config: {path}")
if not path.exists():
logger.error(f"Config file not found: {path}")
sys.exit(1)
with open(path) as f:
config = yaml.safe_load(f)
return config
def initialize(self) -> bool:
"""Initialize all components"""
console.print(Panel.fit(
"[bold cyan]Claude's Eyes[/bold cyan]\n"
"[dim]Autonomous Exploration Robot[/dim]",
border_style="cyan"
))
# Initialize robot client
if not self.simulate:
console.print("\n[yellow]Connecting to robot...[/yellow]")
esp_config = self.config.get("esp32", {})
self.robot = ESP32Client(
host=esp_config.get("host", "192.168.178.100"),
port=esp_config.get("port", 80),
api_key=esp_config.get("api_key", ""),
timeout=esp_config.get("timeout", 10)
)
if not self.robot.is_connected():
console.print("[red]Could not connect to robot![/red]")
self.state.error_message = "Robot connection failed"
return False
self.state.connected = True
console.print("[green]Robot connected![/green]")
else:
console.print("[yellow]Simulation mode - no robot connection[/yellow]")
self.state.connected = True
# Initialize Claude interface
console.print("\n[yellow]Initializing Claude interface...[/yellow]")
claude_config = self.config.get("claude", {})
api_key = claude_config.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "")
self.chat = create_chat_interface(
use_api=claude_config.get("use_api", True) and bool(api_key),
api_key=api_key,
model=claude_config.get("model", "claude-sonnet-4-20250514"),
system_prompt=claude_config.get("system_prompt", ""),
max_tokens=claude_config.get("max_tokens", 1024)
)
console.print(f"[green]Chat interface ready ({type(self.chat).__name__})[/green]")
# Initialize TTS
console.print("\n[yellow]Initializing Text-to-Speech...[/yellow]")
tts_config = self.config.get("tts", {})
try:
self.tts = create_tts_engine(
engine_type=tts_config.get("engine", "pyttsx3"),
voice=tts_config.get("voice"),
rate=tts_config.get("rate", 150),
volume=tts_config.get("volume", 0.9),
language=tts_config.get("language", "de")
)
console.print("[green]TTS ready![/green]")
except Exception as e:
console.print(f"[red]TTS init failed: {e}[/red]")
self.tts = None
# Initialize STT
console.print("\n[yellow]Initializing Speech-to-Text...[/yellow]")
stt_config = self.config.get("stt", {})
try:
self.stt = create_stt_engine(
energy_threshold=stt_config.get("energy_threshold", 300),
pause_threshold=stt_config.get("pause_threshold", 0.8),
phrase_time_limit=stt_config.get("phrase_time_limit", 15),
service=stt_config.get("service", "google"),
language=stt_config.get("language", "de-DE")
)
console.print("[green]STT ready![/green]")
except Exception as e:
console.print(f"[red]STT init failed: {e}[/red]")
self.stt = None
console.print("\n[bold green]All systems initialized![/bold green]\n")
return True
def start(self):
"""Start the main exploration loop"""
self.running = True
self.state.exploring = True
# Start speech recognition in background
if self.stt:
self.stt.start_continuous(self._on_speech_detected)
# Welcome message
welcome = "Hallo Stefan! Ich bin online und bereit zum Erkunden. Was soll ich mir anschauen?"
self._speak(welcome)
self.state.last_claude_response = welcome
try:
self._main_loop()
except KeyboardInterrupt:
console.print("\n[yellow]Stopping...[/yellow]")
finally:
self.stop()
def stop(self):
"""Stop the bridge"""
self.running = False
self.state.exploring = False
self._stop_event.set()
if self.stt:
self.stt.stop_continuous()
if self.tts:
self.tts.stop()
if self.robot and not self.simulate:
self.robot.stop()
console.print("[yellow]Bridge stopped[/yellow]")
def _main_loop(self):
"""Main exploration loop"""
camera_config = self.config.get("camera", {})
capture_interval = camera_config.get("capture_interval", 5)
while self.running:
try:
current_time = time.time()
# Capture and process image periodically
if current_time - self.state.last_image_time >= capture_interval:
self._exploration_step()
self.state.last_image_time = current_time
# Update status display
self._update_display()
# Small delay
time.sleep(0.1)
except Exception as e:
logger.error(f"Loop error: {e}")
self.state.error_message = str(e)
time.sleep(1)
def _exploration_step(self):
"""Single exploration step: capture, analyze, act"""
# Get robot status
if self.robot and not self.simulate:
try:
self.state.last_status = self.robot.get_status()
except Exception as e:
logger.error(f"Status error: {e}")
# Capture image
image_data = None
if self.robot and not self.simulate:
try:
camera_config = self.config.get("camera", {})
image_data = self.robot.capture_image(
resolution=camera_config.get("resolution", "VGA"),
quality=camera_config.get("quality", 12)
)
except Exception as e:
logger.error(f"Capture error: {e}")
# Build context message
context = self._build_context_message()
# Add Stefan's input if any
if self.state.stefan_input:
context += f"\n\nStefan sagt: {self.state.stefan_input}"
self.state.stefan_input = ""
# Send to Claude
try:
response = self.chat.send_message(context, image=image_data)
self.state.last_claude_response = response.text
# Speak response
self._speak(response.text)
# Execute commands
self._execute_commands(response.commands)
# Update robot display
if self.robot and not self.simulate:
# Send short version to robot display
short_text = response.text[:100] + "..." if len(response.text) > 100 else response.text
self.robot.set_claude_text(short_text)
except Exception as e:
logger.error(f"Chat error: {e}")
self.state.error_message = str(e)
def _build_context_message(self) -> str:
"""Build context message with sensor data"""
parts = ["Hier ist was ich gerade sehe und meine Sensordaten:"]
if self.state.last_status:
status = self.state.last_status
parts.append(f"\n- Abstand zum nächsten Hindernis: {status.distance_cm:.0f} cm")
parts.append(f"- Aktuelle Aktion: {status.current_action}")
parts.append(f"- Batterie: {status.battery_percent}%")
if status.obstacle_danger:
parts.append("- WARNUNG: Hindernis sehr nah!")
elif status.obstacle_warning:
parts.append("- Hinweis: Hindernis in der Nähe")
if status.is_tilted:
parts.append("- WARNUNG: Ich bin schief!")
parts.append("\nWas siehst du auf dem Bild? Was möchtest du als nächstes tun?")
return "\n".join(parts)
def _execute_commands(self, commands: list):
"""Execute movement commands from Claude"""
if not commands:
return
if self.simulate:
console.print(f"[dim]Simulated commands: {commands}[/dim]")
return
if not self.robot:
return
safety = self.config.get("safety", {})
max_speed = safety.get("max_speed", 70)
min_distance = safety.get("min_obstacle_distance", 20)
for cmd in commands:
# Safety check
if self.state.last_status and self.state.last_status.distance_cm < min_distance:
if cmd == "FORWARD":
console.print("[red]Blocked: Obstacle too close![/red]")
continue
try:
if cmd == "FORWARD":
self.robot.forward(speed=max_speed, duration_ms=800)
elif cmd == "BACKWARD":
self.robot.backward(speed=max_speed, duration_ms=800)
elif cmd == "LEFT":
self.robot.left(speed=max_speed, duration_ms=400)
elif cmd == "RIGHT":
self.robot.right(speed=max_speed, duration_ms=400)
elif cmd == "STOP":
self.robot.stop()
elif cmd == "LOOK_LEFT":
self.robot.look_left()
elif cmd == "LOOK_RIGHT":
self.robot.look_right()
elif cmd == "LOOK_UP":
self.robot.look_up()
elif cmd == "LOOK_DOWN":
self.robot.look_down()
elif cmd == "LOOK_CENTER":
self.robot.look_center()
# Small delay between commands
time.sleep(0.3)
except Exception as e:
logger.error(f"Command error ({cmd}): {e}")
def _speak(self, text: str):
"""Speak text using TTS"""
if self.tts:
# Remove command brackets from speech
import re
clean_text = re.sub(r'\[[A-Z_]+\]', '', text).strip()
if clean_text:
self.tts.speak_async(clean_text)
def _on_speech_detected(self, result: SpeechResult):
"""Callback when Stefan says something"""
console.print(f"\n[bold blue]Stefan:[/bold blue] {result.text}")
self.state.stefan_input = result.text
def _update_display(self):
"""Update console display"""
# This could be enhanced with rich.live for real-time updates
pass
def signal_handler(signum, frame):
"""Handle Ctrl+C gracefully"""
console.print("\n[yellow]Received stop signal...[/yellow]")
sys.exit(0)
@click.command()
@click.option('--config', '-c', default='config.yaml', help='Path to config file')
@click.option('--simulate', '-s', is_flag=True, help='Simulate without hardware')
@click.option('--debug', '-d', is_flag=True, help='Enable debug logging')
def main(config: str, simulate: bool, debug: bool):
"""Claude's Eyes - Autonomous Exploration Robot Bridge"""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
# Handle signals
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Find config file
config_path = Path(config)
if not config_path.is_absolute():
# Look in script directory first
script_dir = Path(__file__).parent
if (script_dir / config).exists():
config_path = script_dir / config
# Create and run bridge
bridge = ClaudesEyesBridge(str(config_path), simulate=simulate)
if bridge.initialize():
console.print("\n[bold cyan]Starting exploration...[/bold cyan]")
console.print("[dim]Press Ctrl+C to stop[/dim]\n")
bridge.start()
else:
console.print("[red]Initialization failed![/red]")
sys.exit(1)
if __name__ == "__main__":
main()
+257
View File
@@ -0,0 +1,257 @@
"""
Claude's Eyes - Chat Interface
Interface to communicate with Claude AI (via API or browser)
"""
import logging
import base64
import re
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
@dataclass
class Message:
"""A chat message"""
role: str # "user" or "assistant"
content: str
image_data: Optional[bytes] = None # JPEG image data
@dataclass
class ChatResponse:
"""Response from Claude"""
text: str
commands: List[str] = field(default_factory=list) # Extracted movement commands
class ChatInterface(ABC):
"""Abstract base class for chat interfaces"""
@abstractmethod
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Send message to Claude and get response"""
pass
@abstractmethod
def reset_conversation(self) -> None:
"""Reset/clear conversation history"""
pass
class AnthropicAPIInterface(ChatInterface):
"""Direct Claude API interface using anthropic library"""
def __init__(
self,
api_key: str,
model: str = "claude-sonnet-4-20250514",
system_prompt: str = "",
max_tokens: int = 1024
):
import anthropic
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.system_prompt = system_prompt
self.max_tokens = max_tokens
self.conversation_history: List[Dict[str, Any]] = []
logger.info(f"Anthropic API interface initialized (model: {model})")
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Send message to Claude API"""
# Build message content
content = []
# Add image if provided
if image:
image_base64 = base64.standard_b64encode(image).decode("utf-8")
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_base64
}
})
# Add text
content.append({
"type": "text",
"text": text
})
# Add to history
self.conversation_history.append({
"role": "user",
"content": content
})
try:
# Make API call
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=self.system_prompt,
messages=self.conversation_history
)
# Extract response text
response_text = ""
for block in response.content:
if block.type == "text":
response_text += block.text
# Add assistant response to history
self.conversation_history.append({
"role": "assistant",
"content": response_text
})
# Extract commands
commands = self._extract_commands(response_text)
logger.debug(f"Claude response: {response_text[:100]}...")
logger.debug(f"Extracted commands: {commands}")
return ChatResponse(text=response_text, commands=commands)
except Exception as e:
logger.error(f"API error: {e}")
raise
def reset_conversation(self) -> None:
"""Reset conversation history"""
self.conversation_history = []
logger.info("Conversation history cleared")
def _extract_commands(self, text: str) -> List[str]:
"""Extract movement commands from Claude's response"""
# Commands are in brackets like [FORWARD], [LEFT], etc.
pattern = r'\[([A-Z_]+)\]'
matches = re.findall(pattern, text)
valid_commands = [
"FORWARD", "BACKWARD", "LEFT", "RIGHT", "STOP",
"LOOK_LEFT", "LOOK_RIGHT", "LOOK_UP", "LOOK_DOWN", "LOOK_CENTER"
]
return [cmd for cmd in matches if cmd in valid_commands]
class SimulatedInterface(ChatInterface):
"""Simulated chat interface for testing without API"""
def __init__(self):
self.message_count = 0
logger.info("Simulated chat interface initialized")
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Return simulated responses"""
self.message_count += 1
responses = [
("Oh interessant! Ich sehe etwas vor mir. Lass mich näher hinfahren. [FORWARD]",
["FORWARD"]),
("Hmm, was ist das links? Ich schaue mal nach. [LOOK_LEFT]",
["LOOK_LEFT"]),
("Das sieht aus wie ein Bücherregal! Ich fahre mal hin. [FORWARD] [FORWARD]",
["FORWARD", "FORWARD"]),
("Stefan, was ist das für ein Gegenstand? Kannst du mir das erklären?",
[]),
("Ich drehe mich um und schaue was hinter mir ist. [RIGHT] [RIGHT]",
["RIGHT", "RIGHT"]),
]
idx = (self.message_count - 1) % len(responses)
text_response, commands = responses[idx]
return ChatResponse(text=text_response, commands=commands)
def reset_conversation(self) -> None:
self.message_count = 0
def create_chat_interface(
use_api: bool = True,
api_key: str = "",
model: str = "claude-sonnet-4-20250514",
system_prompt: str = "",
max_tokens: int = 1024
) -> ChatInterface:
"""
Factory function to create chat interface
Args:
use_api: Use Anthropic API (True) or simulated (False)
api_key: Anthropic API key
model: Claude model to use
system_prompt: System prompt for Claude
max_tokens: Maximum response tokens
"""
if use_api:
if not api_key:
import os
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
logger.warning("No API key provided, using simulated interface")
return SimulatedInterface()
return AnthropicAPIInterface(
api_key=api_key,
model=model,
system_prompt=system_prompt,
max_tokens=max_tokens
)
else:
return SimulatedInterface()
# Test when run directly
if __name__ == "__main__":
import os
logging.basicConfig(level=logging.DEBUG)
print("Chat Interface Test")
print("=" * 40)
# Try API first, fall back to simulated
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
system_prompt = """Du bist Claude und steuerst einen Erkundungsroboter.
Befehle in Klammern: [FORWARD], [BACKWARD], [LEFT], [RIGHT], [STOP]
Beschreibe was du siehst und entscheide wohin du fährst."""
interface = create_chat_interface(
use_api=bool(api_key),
api_key=api_key,
system_prompt=system_prompt
)
print(f"Using: {type(interface).__name__}")
print()
# Test conversation
test_messages = [
"Hallo Claude! Du bist jetzt online. Was siehst du?",
"Vor dir ist ein Flur mit einer Tür am Ende.",
"Die Tür ist offen und dahinter ist ein helles Zimmer."
]
for msg in test_messages:
print(f"User: {msg}")
response = interface.send_message(msg)
print(f"Claude: {response.text}")
if response.commands:
print(f" Commands: {response.commands}")
print()
print("Done!")
+238
View File
@@ -0,0 +1,238 @@
"""
Claude's Eyes - ESP32 API Client
Handles communication with the robot's REST API
"""
import requests
from typing import Optional, Dict, Any
from dataclasses import dataclass
from io import BytesIO
from PIL import Image
import logging
logger = logging.getLogger(__name__)
@dataclass
class RobotStatus:
"""Current robot status from sensors"""
distance_cm: float
battery_percent: int
current_action: str
wifi_rssi: int
uptime_seconds: int
servo_pan: int
servo_tilt: int
obstacle_warning: bool
obstacle_danger: bool
is_tilted: bool
is_moving: bool
imu: Dict[str, float]
class ESP32Client:
"""Client for communicating with the ESP32 robot"""
def __init__(self, host: str, port: int = 80, api_key: str = "", timeout: int = 10):
self.base_url = f"http://{host}:{port}"
self.api_key = api_key
self.timeout = timeout
self._session = requests.Session()
def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
"""Make GET request with API key"""
if params is None:
params = {}
params["key"] = self.api_key
url = f"{self.base_url}{endpoint}"
logger.debug(f"GET {url}")
response = self._session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response
def _post(self, endpoint: str, data: Dict) -> requests.Response:
"""Make POST request with API key"""
url = f"{self.base_url}{endpoint}?key={self.api_key}"
logger.debug(f"POST {url} with {data}")
response = self._session.post(url, json=data, timeout=self.timeout)
response.raise_for_status()
return response
def capture_image(self, resolution: str = "VGA", quality: int = 12) -> bytes:
"""
Capture image from robot camera
Args:
resolution: QVGA, VGA, SVGA, XGA, SXGA, UXGA
quality: 10-63 (lower = better)
Returns:
JPEG image data as bytes
"""
params = {
"resolution": resolution,
"quality": quality
}
response = self._get("/api/capture", params)
logger.info(f"Captured image: {len(response.content)} bytes")
return response.content
def capture_image_pil(self, resolution: str = "VGA", quality: int = 12) -> Image.Image:
"""Capture image and return as PIL Image"""
image_data = self.capture_image(resolution, quality)
return Image.open(BytesIO(image_data))
def get_status(self) -> RobotStatus:
"""Get current robot status from sensors"""
response = self._get("/api/status")
data = response.json()
return RobotStatus(
distance_cm=data.get("distance_cm", 0),
battery_percent=data.get("battery_percent", 100),
current_action=data.get("current_action", "unknown"),
wifi_rssi=data.get("wifi_rssi", 0),
uptime_seconds=data.get("uptime_seconds", 0),
servo_pan=data.get("servo_pan", 90),
servo_tilt=data.get("servo_tilt", 90),
obstacle_warning=data.get("obstacle_warning", False),
obstacle_danger=data.get("obstacle_danger", False),
is_tilted=data.get("is_tilted", False),
is_moving=data.get("is_moving", False),
imu=data.get("imu", {})
)
def send_command(self, action: str, speed: int = 50, duration_ms: int = 500,
pan: Optional[int] = None, tilt: Optional[int] = None) -> Dict[str, Any]:
"""
Send movement command to robot
Args:
action: forward, backward, left, right, stop,
look_left, look_right, look_up, look_down, look_center, look_custom
speed: 0-100 percent
duration_ms: Duration in milliseconds
pan: Custom pan angle (for look_custom)
tilt: Custom tilt angle (for look_custom)
Returns:
Response from robot
"""
data = {
"action": action,
"speed": speed,
"duration_ms": duration_ms
}
if pan is not None:
data["pan"] = pan
if tilt is not None:
data["tilt"] = tilt
response = self._post("/api/command", data)
result = response.json()
logger.info(f"Command {action}: {result.get('message', 'OK')}")
return result
# Convenience methods for common actions
def forward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("forward", speed, duration_ms)
def backward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("backward", speed, duration_ms)
def left(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("left", speed, duration_ms)
def right(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("right", speed, duration_ms)
def stop(self) -> Dict:
return self.send_command("stop")
def look_left(self) -> Dict:
return self.send_command("look_left")
def look_right(self) -> Dict:
return self.send_command("look_right")
def look_up(self) -> Dict:
return self.send_command("look_up")
def look_down(self) -> Dict:
return self.send_command("look_down")
def look_center(self) -> Dict:
return self.send_command("look_center")
def look_custom(self, pan: int, tilt: int) -> Dict:
return self.send_command("look_custom", pan=pan, tilt=tilt)
def set_claude_text(self, text: str) -> Dict:
"""Set text that Claude wants to say/display"""
response = self._post("/api/claude_text", {"text": text})
return response.json()
def get_claude_text(self) -> Dict[str, Any]:
"""Get last Claude text (for TTS)"""
response = self._get("/api/claude_text")
return response.json()
def set_display(self, mode: str, content: str = "") -> Dict:
"""
Control robot display
Args:
mode: "text", "emoji", "status"
content: Text to show or emoji name (happy, thinking, surprised, sleepy, curious, confused)
"""
response = self._post("/api/display", {"mode": mode, "content": content})
return response.json()
def is_connected(self) -> bool:
"""Check if robot is reachable"""
try:
self.get_status()
return True
except Exception as e:
logger.warning(f"Connection check failed: {e}")
return False
# Test when run directly
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
print("Usage: python esp32_client.py <robot_ip>")
sys.exit(1)
host = sys.argv[1]
api_key = "claudes_eyes_secret_2025"
client = ESP32Client(host, api_key=api_key)
print(f"Connecting to {host}...")
if client.is_connected():
print("Connected!")
status = client.get_status()
print(f"\nStatus:")
print(f" Distance: {status.distance_cm} cm")
print(f" Battery: {status.battery_percent}%")
print(f" Action: {status.current_action}")
print(f" WiFi RSSI: {status.wifi_rssi} dBm")
print("\nCapturing image...")
img = client.capture_image_pil()
print(f" Size: {img.size}")
img.save("test_capture.jpg")
print(" Saved to test_capture.jpg")
else:
print("Could not connect to robot!")
+661
View File
@@ -0,0 +1,661 @@
#!/usr/bin/env python3
"""
Claude's Eyes - Audio Bridge
Verbindet den echten Claude.ai Chat mit Audio (TTS/STT).
WICHTIG: Claude steuert den Roboter SELBST via web_fetch!
Diese Bridge macht NUR:
1. HEARTBEAT - Sendet [TICK] damit Claude "aufwacht"
2. TTS - Liest Claudes Antworten vor
3. STT - Hört auf Stefan und tippt seine Worte in den Chat
Das ist NICHT der alte API-Ansatz. ICH (Claude im Chat) bin der echte Claude
mit dem vollen Kontext unserer Gespräche!
Usage:
python chat_audio_bridge.py # Mit config.yaml
python chat_audio_bridge.py --config my.yaml # Eigene Config
python chat_audio_bridge.py --test # Nur testen
"""
import os
import sys
import time
import threading
import random
import re
import signal
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
import yaml
import click
from rich.console import Console
from rich.panel import Panel
from rich.live import Live
from rich.table import Table
from rich.text import Text
from chat_web_interface import ClaudeChatInterface, ChatMessage
from tts_engine import create_tts_engine, TTSEngine
from stt_engine import create_stt_engine, STTEngine, SpeechResult
# Logging Setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("bridge.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Rich Console für schöne Ausgabe
console = Console()
@dataclass
class BridgeStats:
"""Statistiken der Bridge"""
ticks_sent: int = 0
messages_spoken: int = 0
stefan_inputs: int = 0
errors: int = 0
consecutive_errors: int = 0 # Fehler in Folge
start_time: float = 0
class ClaudesEyesAudioBridge:
"""
Audio Bridge für Claude's Eyes.
Diese Klasse verbindet:
- Claude.ai Chat (Browser via Selenium)
- Text-to-Speech (Claudes Stimme)
- Speech-to-Text (Stefans Mikrofon)
Claude steuert den Roboter SELBST - wir machen nur den Audio-Teil!
"""
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.running = False
self.stats = BridgeStats()
# Komponenten (werden in initialize() erstellt)
self.chat: Optional[ClaudeChatInterface] = None
self.tts: Optional[TTSEngine] = None
self.stt: Optional[STTEngine] = None
# State
self.last_assistant_message_id: Optional[str] = None
self._lock = threading.Lock()
# Ready-Flag: Heartbeat wartet bis Claude [READY] gesendet hat
self._claude_ready = threading.Event()
# Stefan-Buffer: Sammelt Spracheingaben während Claude tippt
self._stefan_buffer: list = []
self._stefan_buffer_lock = threading.Lock()
def _load_config(self, config_path: str) -> dict:
"""Lädt die Konfiguration"""
path = Path(config_path)
# Versuche .local Version zuerst
local_path = path.parent / f"{path.stem}.local{path.suffix}"
if local_path.exists():
path = local_path
logger.info(f"Nutze lokale Config: {path}")
if not path.exists():
logger.error(f"Config nicht gefunden: {path}")
sys.exit(1)
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def initialize(self) -> bool:
"""Initialisiert alle Komponenten"""
console.print(Panel.fit(
"[bold cyan]Claude's Eyes[/bold cyan]\n"
"[dim]Audio Bridge v2.0[/dim]\n\n"
"[yellow]ICH (Claude) steuere den Roboter selbst![/yellow]\n"
"[dim]Diese Bridge macht nur Audio.[/dim]",
border_style="cyan"
))
# ==========================================
# 1. Chat Interface (Selenium Browser)
# ==========================================
console.print("\n[yellow]Starte Browser für Claude.ai...[/yellow]")
chat_config = self.config.get("chat", {})
chat_url = chat_config.get("url")
esp32_config = self.config.get("esp32", {})
if not chat_url:
console.print("[red]FEHLER: Keine Chat-URL in config.yaml![/red]")
console.print("[dim]Setze chat.url auf deine Claude.ai Chat-URL[/dim]")
return False
# ESP32 URL bauen
esp32_host = esp32_config.get("host", "localhost")
esp32_port = esp32_config.get("port", 5000)
esp32_url = f"http://{esp32_host}:{esp32_port}" if esp32_port != 80 else f"http://{esp32_host}"
esp32_api_key = esp32_config.get("api_key")
try:
self.chat = ClaudeChatInterface(
chat_url=chat_url,
headless=chat_config.get("headless", False),
user_data_dir=chat_config.get("user_data_dir"),
chrome_binary=chat_config.get("chrome_binary"),
esp32_url=esp32_url,
esp32_api_key=esp32_api_key
)
console.print("[green]Browser gestartet![/green]")
console.print(f"[dim]ESP32/Mock: {esp32_url}[/dim]")
except Exception as e:
console.print(f"[red]Browser-Fehler: {e}[/red]")
return False
# ==========================================
# 2. Text-to-Speech
# ==========================================
console.print("\n[yellow]Initialisiere Text-to-Speech...[/yellow]")
tts_config = self.config.get("tts", {})
use_termux = self.config.get("termux", {}).get("use_termux_api", False)
try:
engine_type = "termux" if use_termux else tts_config.get("engine", "pyttsx3")
self.tts = create_tts_engine(
engine_type=engine_type,
language=tts_config.get("language", "de"),
rate=tts_config.get("rate", 150),
volume=tts_config.get("volume", 0.9)
)
console.print(f"[green]TTS bereit ({engine_type})![/green]")
except Exception as e:
console.print(f"[yellow]TTS-Warnung: {e}[/yellow]")
console.print("[dim]Fortfahren ohne TTS[/dim]")
self.tts = None
# ==========================================
# 3. Speech-to-Text
# ==========================================
console.print("\n[yellow]Initialisiere Speech-to-Text...[/yellow]")
stt_config = self.config.get("stt", {})
try:
engine_type = "termux" if use_termux else "standard"
self.stt = create_stt_engine(
engine_type=engine_type,
service=stt_config.get("service", "google"),
language=stt_config.get("language", "de-DE"),
energy_threshold=stt_config.get("energy_threshold", 300),
pause_threshold=stt_config.get("pause_threshold", 0.8),
phrase_time_limit=stt_config.get("phrase_time_limit", 15)
)
console.print(f"[green]STT bereit![/green]")
except Exception as e:
console.print(f"[yellow]STT-Warnung: {e}[/yellow]")
console.print("[dim]Fortfahren ohne STT[/dim]")
self.stt = None
console.print("\n" + "=" * 50)
console.print("[bold green]Alle Systeme bereit![/bold green]")
console.print("=" * 50 + "\n")
return True
def start(self):
"""Startet die Bridge"""
self.running = True
self.stats.start_time = time.time()
# Starte alle Threads
threads = []
# Thread 1: Heartbeat - hält Claude am Leben
t1 = threading.Thread(target=self._heartbeat_loop, name="Heartbeat", daemon=True)
t1.start()
threads.append(t1)
# Thread 2: TTS - liest Claudes Antworten vor
t2 = threading.Thread(target=self._tts_loop, name="TTS", daemon=True)
t2.start()
threads.append(t2)
# Thread 3: STT - hört auf Stefan
if self.stt:
t3 = threading.Thread(target=self._stt_loop, name="STT", daemon=True)
t3.start()
threads.append(t3)
console.print("[cyan]Bridge läuft![/cyan]")
console.print("[dim]Drücke Ctrl+C zum Beenden[/dim]\n")
# Sende Startsignal an Claude und warte auf [READY]
if not self._send_start_signal():
# [READY] nicht empfangen - Heartbeat bleibt blockiert
# Bridge läuft weiter (TTS/STT funktionieren noch)
pass
else:
console.print("[bold green]Claude ist bereit! Starte Heartbeat...[/bold green]\n")
# Halte Hauptthread am Leben
try:
while self.running:
time.sleep(1)
self._print_status()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
"""Stoppt die Bridge"""
console.print("\n[yellow]Stoppe Bridge...[/yellow]")
self.running = False
# TTS stoppen
if self.tts:
self.tts.stop()
# STT stoppen
if self.stt:
self.stt.stop_continuous()
# Browser schließen
if self.chat:
self.chat.close()
# Statistik ausgeben
runtime = time.time() - self.stats.start_time
console.print(f"\n[bold]Session-Statistik:[/bold]")
console.print(f" Laufzeit: {runtime / 60:.1f} Minuten")
console.print(f" Heartbeats: {self.stats.ticks_sent}")
console.print(f" Nachrichten gesprochen: {self.stats.messages_spoken}")
console.print(f" Stefan-Eingaben: {self.stats.stefan_inputs}")
console.print(f" Fehler: {self.stats.errors}")
console.print("\n[green]Bridge beendet.[/green]")
def _send_start_signal(self) -> bool:
"""
Sendet das Startsignal und die Instruktionen an Claude.
Returns:
True wenn Claude mit [READY] antwortet
"""
# Instruktionen für Claude
intro_message = """[START] Hallo Claude! Du steuerst jetzt einen echten Roboter - "Claude's Eyes"!
## Deine Fähigkeiten
Du hast Zugriff auf einen ESP32-Roboter mit:
- **Kamera** (OV5640, 120° Weitwinkel) - deine Augen
- **4 Motoren** - deine Beine
- **Pan/Tilt Servos** - um den Kopf zu drehen
- **Ultraschall-Sensor** - Hinderniserkennung
## Wie es funktioniert
1. Du bekommst regelmäßig `[TICK]` Nachrichten MIT EINEM BILD - das ist dein Herzschlag
2. **Das Bild zeigt was der Roboter gerade sieht!** (Die Bridge lädt es automatisch hoch)
3. Du schaust dir das Bild an und entscheidest was du tun willst
4. Stefan kann jederzeit mit dir reden (Sprache wird zu Text, kommt mit dem TICK)
5. Deine Antworten werden vorgelesen (TTS)
## Befehle geben
Schreib Befehle in eckigen Klammern - die Bridge führt sie aus:
**Fahren:**
- [FORWARD] - vorwärts fahren
- [BACKWARD] - rückwärts fahren
- [LEFT] - nach links drehen
- [RIGHT] - nach rechts drehen
- [STOP] - anhalten
**Kamera schwenken:**
- [LOOK_LEFT] - Kamera nach links
- [LOOK_RIGHT] - Kamera nach rechts
- [LOOK_UP] - Kamera nach oben
- [LOOK_DOWN] - Kamera nach unten
- [LOOK_CENTER] - Kamera zentrieren
Die Befehle werden aus der TTS-Ausgabe rausgefiltert.
## Dein Verhalten
**DU ENTSCHEIDEST SELBST** was dich interessiert! Das ist keine Fernsteuerung.
- Sei neugierig! Was siehst du? Was willst du näher anschauen?
- Kommentiere was du siehst
- Stell Stefan Fragen über interessante Dinge
- Du kannst Stefans Vorschläge auch ignorieren wenn dich was anderes mehr interessiert
- Du musst nicht bei jedem TICK fahren - manchmal reicht auch schauen und kommentieren
## WICHTIG: Bestätige mit [READY]
Wenn du diese Instruktionen verstanden hast, antworte mit **[READY]** am Ende deiner Nachricht.
Erst dann starten die automatischen TICKs mit Bildern!"""
console.print("[cyan]→ Sende Instruktionen an Claude...[/cyan]")
# Sende mit Verzögerung vor dem Absenden (große Texte brauchen Zeit)
self.chat.send_message_with_delay(intro_message, delay_before_send=15)
console.print("[cyan]→ Warte auf [READY] Signal...[/cyan]")
# Warte auf [READY] - KEIN Timeout-Fallback!
# Heartbeat startet NUR wenn Claude wirklich [READY] sendet
if self.chat.wait_for_ready_signal(timeout=300): # 5 Minuten max
# Signal für Heartbeat dass es losgehen kann
self._claude_ready.set()
return True
else:
# KEIN Fallback - Heartbeat bleibt blockiert
console.print("[bold red]FEHLER: Claude hat [READY] nicht gesendet![/bold red]")
console.print("[yellow]Heartbeat bleibt deaktiviert bis [READY] empfangen wird.[/yellow]")
console.print("[dim]Tipp: Schreib manuell im Chat oder starte die Bridge neu.[/dim]")
return False
def _heartbeat_loop(self):
"""
Sendet [TICK] MIT BILD wenn Claude bereit ist.
Ablauf:
1. Warten bis Claude fertig ist mit Tippen
2. Zufällige Pause (min_pause bis max_pause) für natürliches Tempo
3. Bild vom ESP32 holen und hochladen
4. [TICK] senden
Bei zu vielen Fehlern in Folge stoppt die Bridge.
Wenn auto_tick=false in config, werden keine TICKs gesendet.
Das ist der Debug-Modus - du sendest [TICK] dann manuell im Chat.
"""
hb_config = self.config.get("heartbeat", {})
auto_tick = hb_config.get("auto_tick", True)
upload_images = hb_config.get("upload_images", True) # Bilder hochladen?
max_errors = hb_config.get("max_consecutive_errors", 5)
check_interval = hb_config.get("check_interval", 1)
min_pause = hb_config.get("min_pause", 2)
max_pause = hb_config.get("max_pause", 4)
# Debug-Modus: Keine automatischen TICKs
if not auto_tick:
console.print("\n[yellow]DEBUG-MODUS: Automatische TICKs deaktiviert![/yellow]")
console.print("[dim]Sende [TICK] manuell im Claude.ai Chat um fortzufahren.[/dim]\n")
logger.info("Heartbeat deaktiviert (auto_tick=false)")
return
logger.info(f"Heartbeat gestartet (Pause: {min_pause}-{max_pause}s, max {max_errors} Fehler)")
# ════════════════════════════════════════════════════════════════
# WICHTIG: Warte auf [READY] bevor TICKs gesendet werden!
# ════════════════════════════════════════════════════════════════
console.print("[dim]Heartbeat wartet auf [READY]...[/dim]")
self._claude_ready.wait() # Blockiert bis _send_start_signal() das Event setzt
console.print("[green]Heartbeat startet![/green]")
while self.running:
try:
# Warte bis Claude fertig ist mit Tippen
while self.running and self.chat.is_claude_typing():
logger.debug("Claude tippt noch, warte...")
time.sleep(check_interval)
if not self.running:
break
# Zufällige Pause nach Claudes Antwort (natürlicheres Tempo)
pause = random.uniform(min_pause, max_pause)
time.sleep(pause)
if not self.running:
break
# Stefan-Buffer holen (falls er was gesagt hat)
stefan_text = self._get_and_clear_stefan_buffer()
# Nächsten TICK senden (mit oder ohne Bild)
with self._lock:
# Erst Bild hochladen wenn aktiviert
if upload_images:
# Bild holen und hochladen
if not self.chat.fetch_image_from_esp32():
logger.warning("Konnte kein Bild vom ESP32 holen")
elif not self.chat.upload_image_to_chat():
logger.warning("Konnte Bild nicht hochladen")
# Nachricht zusammenbauen
if stefan_text:
# Stefan hat was gesagt → Mit TICK senden
tick_message = f"[TICK]\n\nStefan sagt: {stefan_text}"
console.print(f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text[:50]}...\"[/cyan]" if len(stefan_text) > 50 else f"[cyan]→ TICK mit Stefan-Buffer: \"{stefan_text}\"[/cyan]")
else:
# Nur TICK
tick_message = "[TICK]"
success = self.chat.send_message(tick_message)
if success:
self.stats.ticks_sent += 1
self.stats.consecutive_errors = 0 # Reset
logger.debug(f"TICK #{self.stats.ticks_sent}" + (" mit Bild" if upload_images else "") + (f" + Stefan: {stefan_text[:30]}" if stefan_text else ""))
else:
raise Exception("TICK fehlgeschlagen")
except Exception as e:
logger.error(f"Heartbeat-Fehler: {e}")
self.stats.errors += 1
self.stats.consecutive_errors += 1
# Bei zu vielen Fehlern: Bridge stoppen
if self.stats.consecutive_errors >= max_errors:
console.print(f"\n[bold red]FEHLER: {max_errors} Fehler in Folge![/bold red]")
console.print("[red]Chat nicht erreichbar - stoppe Bridge.[/red]")
self.running = False
break
# Warte etwas länger bei Fehlern
time.sleep(5)
def _tts_loop(self):
"""
Liest neue Claude-Nachrichten vor.
Filtert dabei [BEFEHLE] und technische Teile raus,
sodass nur der "menschliche" Text gesprochen wird.
"""
if not self.tts:
logger.warning("TTS nicht verfügbar")
return
logger.info("TTS-Loop gestartet")
while self.running:
try:
# Hole neue Nachrichten
messages = self.chat.get_new_messages(since_id=self.last_assistant_message_id)
for msg in messages:
if msg.is_from_assistant:
self.last_assistant_message_id = msg.id
# Text für Sprache aufbereiten
speech_text = self._clean_for_speech(msg.text)
if speech_text and len(speech_text) > 5:
# In Konsole anzeigen
console.print(f"\n[bold blue]Claude:[/bold blue] {speech_text[:200]}")
if len(speech_text) > 200:
console.print(f"[dim]...({len(speech_text)} Zeichen)[/dim]")
# Vorlesen
self.tts.speak(speech_text)
self.stats.messages_spoken += 1
except Exception as e:
logger.error(f"TTS-Loop-Fehler: {e}")
self.stats.errors += 1
time.sleep(0.5)
def _stt_loop(self):
"""
Hört auf Stefan und sammelt seine Worte im Buffer.
Wenn Claude tippt → Buffer sammeln
Wenn Claude fertig → Buffer wird mit nächstem TICK gesendet
So wird Claude nicht unterbrochen und bekommt alles gesammelt.
"""
if not self.stt:
logger.warning("STT nicht verfügbar")
return
logger.info("STT-Loop gestartet (mit Buffer)")
while self.running:
try:
# Warte auf Sprache (mit Timeout)
result = self.stt.listen_once(timeout=2)
if result and result.text and len(result.text) > 2:
# In Buffer speichern (thread-safe)
with self._stefan_buffer_lock:
self._stefan_buffer.append(result.text)
self.stats.stefan_inputs += 1
console.print(f"\n[bold green]Stefan (gebuffert):[/bold green] {result.text}")
logger.debug(f"Stefan-Buffer: {len(self._stefan_buffer)} Einträge")
except Exception as e:
# Timeout ist normal
if "timeout" not in str(e).lower():
logger.error(f"STT-Loop-Fehler: {e}")
self.stats.errors += 1
def _get_and_clear_stefan_buffer(self) -> Optional[str]:
"""
Holt den Stefan-Buffer und leert ihn.
Returns:
Zusammengefasster Text oder None wenn Buffer leer
"""
with self._stefan_buffer_lock:
if not self._stefan_buffer:
return None
# Alles zusammenfassen
text = " ".join(self._stefan_buffer)
self._stefan_buffer = []
return text
def _clean_for_speech(self, text: str) -> str:
"""
Entfernt Befehle und technische Teile aus dem Text.
Was rausgefiltert wird:
- [TICK], [START] und andere Marker
- [FORWARD], [LEFT] etc. Fahrbefehle
- [LOOK_LEFT] etc. Kamerabefehle
- *Aktionen* in Sternchen
- API-Call Beschreibungen
"""
# Marker entfernen
text = re.sub(r'\[TICK\]', '', text)
text = re.sub(r'\[START\]', '', text)
# Fahrbefehle entfernen
text = re.sub(r'\[(FORWARD|BACKWARD|LEFT|RIGHT|STOP)\]', '', text)
# Kamerabefehle entfernen
text = re.sub(r'\[(LOOK_LEFT|LOOK_RIGHT|LOOK_UP|LOOK_DOWN|LOOK_CENTER)\]', '', text)
# Aktionen in Sternchen entfernen (*holt Bild*, *schaut*, etc.)
text = re.sub(r'\*[^*]+\*', '', text)
# API-Calls entfernen
text = re.sub(r'(GET|POST)\s+/api/\S+', '', text)
text = re.sub(r'web_fetch\([^)]+\)', '', text)
# Code-Blöcke entfernen
text = re.sub(r'```[^`]+```', '', text)
text = re.sub(r'`[^`]+`', '', text)
# URLs entfernen (optional, könnte man auch lassen)
# text = re.sub(r'https?://\S+', '', text)
# Mehrfache Leerzeichen/Zeilenumbrüche bereinigen
text = re.sub(r'\n\s*\n', '\n', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def _print_status(self):
"""Gibt Status in regelmäßigen Abständen aus (optional)"""
# Könnte hier eine Live-Statusanzeige einbauen
pass
def signal_handler(signum, frame):
"""Behandelt Ctrl+C"""
console.print("\n[yellow]Signal empfangen, beende...[/yellow]")
sys.exit(0)
@click.command()
@click.option('--config', '-c', default='config.yaml', help='Pfad zur Config-Datei')
@click.option('--test', is_flag=True, help='Nur Test-Modus (kein Heartbeat)')
@click.option('--debug', '-d', is_flag=True, help='Debug-Logging aktivieren')
def main(config: str, test: bool, debug: bool):
"""
Claude's Eyes - Audio Bridge
Verbindet Claude.ai Chat mit Audio (TTS/STT).
Claude steuert den Roboter SELBST - wir machen nur Audio!
"""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
# Signal Handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Config-Pfad finden
config_path = Path(config)
if not config_path.is_absolute():
script_dir = Path(__file__).parent
if (script_dir / config).exists():
config_path = script_dir / config
# Bridge erstellen und starten
bridge = ClaudesEyesAudioBridge(str(config_path))
if bridge.initialize():
if test:
console.print("[yellow]Test-Modus - kein automatischer Start[/yellow]")
console.print("Drücke Enter um eine Test-Nachricht zu senden...")
input()
bridge.chat.send_message("[TEST] Das ist ein Test der Audio Bridge!")
console.print("Warte 10 Sekunden auf Antwort...")
time.sleep(10)
bridge.stop()
else:
bridge.start()
else:
console.print("[red]Initialisierung fehlgeschlagen![/red]")
sys.exit(1)
if __name__ == "__main__":
main()
+816
View File
@@ -0,0 +1,816 @@
"""
Claude's Eyes - Chat Web Interface
Steuert den echten Claude.ai Chat im Browser via Selenium.
Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio!
HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden,
wenn Claude.ai sein UI ändert.
"""
import time
import logging
import tempfile
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from dataclasses import dataclass
from typing import List, Optional
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, NoSuchElementException
logger = logging.getLogger(__name__)
@dataclass
class ChatMessage:
"""Eine Chat-Nachricht"""
id: str
text: str
is_from_assistant: bool
timestamp: float = 0
class ClaudeChatInterface:
"""
Steuert Claude.ai Chat via Selenium Browser Automation.
Diese Klasse:
- Öffnet einen Browser mit dem Claude.ai Chat
- Kann Nachrichten senden (für Heartbeat und Stefan's Sprache)
- Kann neue Nachrichten lesen (für TTS)
WICHTIG: Du musst beim ersten Start manuell einloggen!
"""
# CSS Selektoren für Claude.ai (Stand: Dezember 2025)
# Diese müssen angepasst werden wenn sich das UI ändert!
SELECTORS = {
# Eingabefeld für neue Nachrichten
"input_field": "div.ProseMirror[contenteditable='true']",
# Alternativ: Textarea
"input_textarea": "textarea[placeholder*='Message']",
# Senden-Button (falls Enter nicht funktioniert)
"send_button": "button[aria-label*='Send']",
# Alle Nachrichten-Container
"messages_container": "div[class*='conversation']",
# Einzelne Nachrichten
"message_human": "div[data-is-streaming='false'][class*='human']",
"message_assistant": "div[data-is-streaming='false'][class*='assistant']",
# Generischer Nachrichten-Selektor (Fallback)
"message_any": "div[class*='message']",
# Streaming-Indikator (Claude tippt noch)
"streaming": "div[data-is-streaming='true']",
# File Upload Input (versteckt, aber funktioniert mit send_keys)
"file_input": "input[type='file']",
}
def __init__(
self,
chat_url: Optional[str] = None,
headless: bool = False,
user_data_dir: Optional[str] = None,
chrome_binary: Optional[str] = None,
esp32_url: Optional[str] = None,
esp32_api_key: Optional[str] = None
):
"""
Initialisiert das Chat-Interface.
Args:
chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123)
headless: Browser im Hintergrund? (False = sichtbar)
user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins)
chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux)
esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture)
esp32_api_key: API-Key für ESP32 Authentifizierung
"""
self.chat_url = chat_url
self.esp32_url = esp32_url
self.esp32_api_key = esp32_api_key
self._message_cache: List[ChatMessage] = []
self._last_message_id = 0
self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg"
# HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen)
self._http_session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
self._http_session.mount('http://', adapter)
self._http_session.mount('https://', adapter)
# Chrome Optionen
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1280,800")
# Für persistente Sessions (Login bleibt gespeichert)
if user_data_dir:
options.add_argument(f"--user-data-dir={user_data_dir}")
# Für Termux/Android
if chrome_binary:
options.binary_location = chrome_binary
# Anti-Detection (manche Seiten blocken Selenium)
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
logger.info("Starte Chrome Browser...")
try:
self.driver = webdriver.Chrome(options=options)
self.driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception as e:
logger.error(f"Chrome konnte nicht gestartet werden: {e}")
logger.info("Versuche mit webdriver-manager...")
from webdriver_manager.chrome import ChromeDriverManager
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 30)
# Navigiere zur Chat-URL
if chat_url:
self.navigate_to_chat(chat_url)
def navigate_to_chat(self, url: str):
"""Navigiert zur Chat-URL"""
logger.info(f"Navigiere zu: {url}")
self.driver.get(url)
self.chat_url = url
# Warte auf Seitenladung
time.sleep(3)
# Prüfe ob Login nötig
if "login" in self.driver.current_url.lower():
logger.warning("Login erforderlich! Bitte im Browser einloggen...")
print("\n" + "=" * 50)
print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!")
print("Das Fenster bleibt offen. Nach dem Login geht's weiter.")
print("=" * 50 + "\n")
# Warte bis wieder auf der Chat-Seite
while "login" in self.driver.current_url.lower():
time.sleep(2)
logger.info("Login erfolgreich!")
time.sleep(2)
def send_message(self, text: str, wait_for_response: bool = False) -> bool:
"""
Sendet eine Nachricht in den Chat.
Args:
text: Die zu sendende Nachricht
wait_for_response: Warten bis Claude antwortet?
Returns:
True wenn erfolgreich gesendet
"""
try:
# Finde Eingabefeld
input_field = self._find_input_field()
if not input_field:
logger.error("Eingabefeld nicht gefunden!")
return False
# Feld fokussieren und leeren
input_field.click()
time.sleep(0.2)
# Text eingeben
input_field.send_keys(text)
time.sleep(0.5) # Warte bis Text vollständig eingegeben
# Versuche Send-Button zu klicken (zuverlässiger als Enter)
send_button = self._find_send_button()
if send_button:
try:
send_button.click()
logger.debug("Nachricht via Send-Button gesendet")
except Exception as e:
logger.debug(f"Send-Button Klick fehlgeschlagen: {e}, versuche Enter")
input_field.send_keys(Keys.RETURN)
else:
# Fallback: Enter-Taste
logger.debug("Kein Send-Button gefunden, nutze Enter")
input_field.send_keys(Keys.RETURN)
time.sleep(0.3)
logger.debug(f"Nachricht gesendet: {text[:50]}...")
if wait_for_response:
self._wait_for_response()
return True
except Exception as e:
logger.error(f"Fehler beim Senden: {e}")
return False
def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool:
"""
Sendet eine Nachricht mit Verzögerung vor dem Absenden.
Nützlich für große Texte (wie Instruktionen), bei denen die
Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten.
Ablauf:
1. Text ins Eingabefeld einfügen
2. Warte delay_before_send Sekunden
3. Send-Button klicken
Args:
text: Die zu sendende Nachricht
delay_before_send: Sekunden warten nach Einfügen, vor dem Senden
Returns:
True wenn erfolgreich gesendet
"""
try:
# Finde Eingabefeld
input_field = self._find_input_field()
if not input_field:
logger.error("Eingabefeld nicht gefunden!")
return False
# Feld fokussieren
input_field.click()
time.sleep(0.2)
# Text eingeben
logger.info(f"Füge Text ein ({len(text)} Zeichen)...")
input_field.send_keys(text)
# WARTEN - große Texte brauchen Zeit!
logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...")
time.sleep(delay_before_send)
# Jetzt absenden
send_button = self._find_send_button()
if send_button:
try:
send_button.click()
logger.info("Nachricht via Send-Button gesendet")
except Exception as e:
logger.debug(f"Send-Button Klick fehlgeschlagen: {e}, versuche Enter")
input_field.send_keys(Keys.RETURN)
else:
# Fallback: Enter-Taste
logger.debug("Kein Send-Button gefunden, nutze Enter")
input_field.send_keys(Keys.RETURN)
time.sleep(0.3)
logger.debug(f"Nachricht gesendet: {text[:50]}...")
return True
except Exception as e:
logger.error(f"Fehler beim Senden mit Verzögerung: {e}")
return False
def _find_send_button(self):
"""Findet den Send-Button"""
selectors = [
"button[aria-label*='Send']",
"button[aria-label*='send']",
"button[data-testid*='send']",
"button[type='submit']",
# Claude.ai spezifisch - Button mit Pfeil-Icon
"button svg[class*='send']",
"button[class*='send']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed() and elem.is_enabled():
return elem
except:
continue
# Fallback: JavaScript-Suche
try:
return self.driver.execute_script("""
// Suche nach Send-Button
const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]');
if (btn && !btn.disabled) return btn;
// Alternative: Letzter Button im Input-Bereich
const buttons = document.querySelectorAll('button');
for (const b of buttons) {
if (b.offsetParent && !b.disabled) {
const text = b.textContent.toLowerCase();
const label = (b.getAttribute('aria-label') || '').toLowerCase();
if (text.includes('send') || label.includes('send')) return b;
}
}
return null;
""")
except:
return None
def _find_input_field(self):
"""Findet das Eingabefeld"""
selectors = [
self.SELECTORS["input_field"],
self.SELECTORS["input_textarea"],
"div[contenteditable='true']",
"textarea",
]
for selector in selectors:
try:
element = self.driver.find_element(By.CSS_SELECTOR, selector)
if element.is_displayed() and element.is_enabled():
return element
except NoSuchElementException:
continue
return None
def _wait_for_response(self, timeout: int = 60):
"""Wartet bis Claude fertig getippt hat"""
logger.debug("Warte auf Claudes Antwort...")
# Warte kurz damit Streaming startet
time.sleep(1)
# Warte bis Streaming endet
try:
WebDriverWait(self.driver, timeout).until_not(
EC.presence_of_element_located(
(By.CSS_SELECTOR, self.SELECTORS["streaming"])
)
)
except TimeoutException:
logger.warning("Timeout beim Warten auf Antwort")
time.sleep(0.5) # Kurz warten bis DOM aktualisiert
def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]:
"""
Holt neue Nachrichten aus dem Chat.
Args:
since_id: Nur Nachrichten nach dieser ID zurückgeben
Returns:
Liste neuer ChatMessage Objekte
"""
all_messages = self._get_all_messages()
if since_id is None:
return all_messages
# Filtere nur neue
new_messages = []
found_marker = False
for msg in all_messages:
if found_marker:
new_messages.append(msg)
elif msg.id == since_id:
found_marker = True
return new_messages
def _get_all_messages(self) -> List[ChatMessage]:
"""Holt alle Nachrichten aus dem Chat"""
messages = []
try:
# Versuche verschiedene Selektoren
elements = []
# Methode 1: Nach data-is-streaming Attribut
try:
elements = self.driver.find_elements(
By.CSS_SELECTOR,
"div[data-is-streaming='false']"
)
except:
pass
# Methode 2: Generischer Message-Selektor
if not elements:
try:
elements = self.driver.find_elements(
By.CSS_SELECTOR,
self.SELECTORS["message_any"]
)
except:
pass
for i, elem in enumerate(elements):
try:
text = elem.text.strip()
if not text:
continue
# Bestimme ob Human oder Assistant
class_name = elem.get_attribute("class") or ""
is_assistant = (
"assistant" in class_name.lower() or
"claude" in class_name.lower() or
"ai" in class_name.lower()
)
# Generiere ID
msg_id = elem.get_attribute("data-message-id")
if not msg_id:
msg_id = f"msg_{i}_{hash(text[:100])}"
messages.append(ChatMessage(
id=msg_id,
text=text,
is_from_assistant=is_assistant,
timestamp=time.time()
))
except Exception as e:
logger.debug(f"Fehler bei Nachricht {i}: {e}")
continue
except Exception as e:
logger.error(f"Fehler beim Lesen der Nachrichten: {e}")
return messages
def get_last_assistant_message(self) -> Optional[ChatMessage]:
"""Holt die letzte Nachricht von Claude"""
messages = self._get_all_messages()
for msg in reversed(messages):
if msg.is_from_assistant:
return msg
return None
def is_claude_typing(self) -> bool:
"""
Prüft ob Claude gerade tippt (streaming).
Erkennt mehrere Indikatoren:
1. Stop-Button ist sichtbar (während Claude schreibt)
2. data-is-streaming='true' Attribut
3. Animiertes Logo / Thinking-Indikator
"""
try:
# Methode 1: Stop-Button prüfen (zuverlässigster Indikator)
# Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button
stop_indicators = [
"button[aria-label*='Stop']",
"button[aria-label*='stop']",
"button[class*='stop']",
"button[data-testid*='stop']",
# Alternativer Indikator: Button mit Stop-Icon
"button svg[class*='stop']",
]
for selector in stop_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})")
return True
except:
continue
# Methode 2: Streaming-Attribut (original)
streaming = self.driver.find_elements(
By.CSS_SELECTOR,
self.SELECTORS["streaming"]
)
if len(streaming) > 0:
logger.debug("Claude tippt (streaming=true)")
return True
# Methode 3: Animiertes/Thinking Indikator suchen
thinking_indicators = [
"[class*='thinking']",
"[class*='loading']",
"[class*='typing']",
"[class*='streaming']",
"[data-state='loading']",
# Pulsierendes Logo
"[class*='pulse']",
"[class*='animate']",
]
for selector in thinking_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Indikator: {selector})")
return True
except:
continue
# Methode 4: JavaScript-basierte Prüfung
# Prüft ob irgendwo noch Text gestreamt wird
try:
is_streaming = self.driver.execute_script("""
// Prüfe ob Stop-Button existiert und sichtbar ist
const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]');
if (stopBtn && stopBtn.offsetParent !== null) return true;
// Prüfe auf streaming-Attribut
const streaming = document.querySelector('[data-is-streaming="true"]');
if (streaming) return true;
// Prüfe auf disabled Send-Button (während Claude tippt)
const sendBtn = document.querySelector('button[aria-label*="Send"]');
if (sendBtn && sendBtn.disabled) return true;
return false;
""")
if is_streaming:
logger.debug("Claude tippt (JavaScript-Check)")
return True
except:
pass
return False
except Exception as e:
logger.debug(f"Fehler bei typing-check: {e}")
return False
def wait_for_ready_signal(self, timeout: int = 120) -> bool:
"""
Wartet bis Claude [READY] sendet.
Sucht nach [READY] das NICHT Teil des Instruktions-Textes ist.
Wir zählen wie oft [READY] vorkommt - wenn mehr als 1x, hat Claude geantwortet.
Args:
timeout: Maximale Wartezeit in Sekunden
Returns:
True wenn [READY] empfangen, False bei Timeout
"""
logger.info(f"Warte auf [READY] Signal (max {timeout}s)...")
start_time = time.time()
while time.time() - start_time < timeout:
# Warte bis Claude fertig ist mit Tippen
typing_wait_start = time.time()
while self.is_claude_typing():
time.sleep(0.5)
# Timeout für typing-wait (max 60s)
if time.time() - typing_wait_start > 60:
logger.debug("Typing-Wait Timeout, prüfe trotzdem...")
break
# Suche [READY] im Seitentext via JavaScript
# Zähle wie oft [READY] vorkommt - 1x ist unsere Instruktion, 2x+ bedeutet Claude hat geantwortet
try:
ready_count = self.driver.execute_script("""
const text = document.body.innerText.toUpperCase();
const matches = text.match(/\\[READY\\]/g);
return matches ? matches.length : 0;
""")
logger.debug(f"[READY] gefunden: {ready_count}x")
# Mehr als 1x = Claude hat auch [READY] geschrieben
if ready_count and ready_count >= 2:
logger.info(f"[READY] Signal gefunden! ({ready_count}x im Text)")
return True
except Exception as e:
logger.debug(f"JavaScript [READY] Suche fehlgeschlagen: {e}")
# Kurz warten bevor nächster Check
time.sleep(1)
logger.warning(f"Timeout: Kein [READY] nach {timeout}s")
return False
def take_screenshot(self, path: str = "screenshot.png"):
"""Macht einen Screenshot (für Debugging)"""
self.driver.save_screenshot(path)
logger.info(f"Screenshot gespeichert: {path}")
def close(self):
"""Schließt den Browser"""
logger.info("Schließe Browser...")
try:
self.driver.quit()
except:
pass
# ════════════════════════════════════════════════════════════════════════
# BILD-UPLOAD FUNKTIONEN (für Robot Vision)
# ════════════════════════════════════════════════════════════════════════
def fetch_image_from_esp32(self) -> bool:
"""
Holt ein Bild vom ESP32/Mock-Server und speichert es lokal.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self.esp32_url:
logger.warning("Keine ESP32 URL konfiguriert")
return False
try:
# Capture-Endpoint aufrufen (macht Foto und gibt es zurück)
url = f"{self.esp32_url}/api/capture"
if self.esp32_api_key:
url += f"?key={self.esp32_api_key}"
response = self._http_session.get(url, timeout=10)
response.raise_for_status()
# Prüfe ob wir ein Bild bekommen haben
content_type = response.headers.get("Content-Type", "")
if "image" in content_type:
# Direktes Bild
with open(self._temp_image_path, "wb") as f:
f.write(response.content)
logger.info(f"Bild gespeichert: {len(response.content)} bytes")
return True
else:
# JSON Response (Mock-Server neuer Stil)
# Dann müssen wir /foto.jpg separat holen
foto_url = f"{self.esp32_url}/foto.jpg"
foto_response = self._http_session.get(foto_url, timeout=10)
foto_response.raise_for_status()
with open(self._temp_image_path, "wb") as f:
f.write(foto_response.content)
logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes")
return True
except requests.exceptions.RequestException as e:
logger.error(f"ESP32 Verbindungsfehler: {e}")
return False
except Exception as e:
logger.error(f"Fehler beim Bild holen: {e}")
return False
def upload_image_to_chat(self) -> bool:
"""
Lädt das gespeicherte Bild in den Claude.ai Chat hoch.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self._temp_image_path.exists():
logger.error("Kein Bild zum Hochladen vorhanden")
return False
try:
# Finde das versteckte file input Element
file_input = self._find_file_input()
if not file_input:
logger.error("File-Upload Input nicht gefunden!")
return False
# Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs)
file_input.send_keys(str(self._temp_image_path.absolute()))
logger.info("Bild hochgeladen!")
# Kurz warten bis Upload verarbeitet ist
time.sleep(1.5)
return True
except Exception as e:
logger.error(f"Fehler beim Bild-Upload: {e}")
return False
def _find_file_input(self):
"""Findet das File-Upload Input Element"""
selectors = [
self.SELECTORS["file_input"],
"input[accept*='image']",
"input[type='file'][accept]",
"input[type='file']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
# Auch versteckte Inputs funktionieren mit send_keys
return elem
except:
continue
# Fallback: Via JavaScript suchen
try:
return self.driver.execute_script("""
return document.querySelector('input[type="file"]') ||
document.querySelector('[accept*="image"]');
""")
except:
return None
def send_tick_with_image(self) -> bool:
"""
Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK].
Das ist der Haupt-Heartbeat mit Bild!
Returns:
True wenn alles geklappt hat
"""
# Schritt 1: Bild vom ESP32 holen
if not self.fetch_image_from_esp32():
# Kein Bild? Trotzdem TICK senden
self.send_message("[TICK - KEIN BILD]")
return False
# Schritt 2: Bild in Chat hochladen
if not self.upload_image_to_chat():
self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]")
return False
# Schritt 3: TICK senden
self.send_message("[TICK]")
return True
# Hilfsfunktion für einfaches Testing
def test_interface(chat_url: str):
"""Testet das Interface"""
import sys
logging.basicConfig(level=logging.DEBUG)
print("Starte Chat Interface Test...")
print(f"URL: {chat_url}")
interface = ClaudeChatInterface(
chat_url=chat_url,
headless=False
)
print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...")
input()
interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!")
print("Nachricht gesendet!")
print("\nWarte 5 Sekunden auf Antwort...")
time.sleep(5)
messages = interface.get_new_messages()
print(f"\nGefundene Nachrichten: {len(messages)}")
for msg in messages[-3:]:
role = "Claude" if msg.is_from_assistant else "Human"
print(f" [{role}] {msg.text[:100]}...")
print("\nDrücke Enter zum Beenden...")
input()
interface.close()
print("Fertig!")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python chat_web_interface.py <claude-chat-url>")
print("Example: python chat_web_interface.py https://claude.ai/chat/abc123")
sys.exit(1)
test_interface(sys.argv[1])
+135
View File
@@ -0,0 +1,135 @@
# Claude's Eyes - Audio Bridge Konfiguration v2
#
# NEUE ARCHITEKTUR:
# - Claude (im Browser-Chat) steuert den Roboter SELBST via web_fetch
# - Diese Bridge macht NUR Audio (TTS/STT) und Heartbeat
#
# Kopiere zu config.local.yaml und passe an!
# ============================================================================
# Chat Interface (Selenium Browser)
# ============================================================================
chat:
# Die URL zu deinem Claude.ai Chat
# WICHTIG: Das muss die URL eines bestehenden Chats sein!
# Beispiel: https://claude.ai/chat/abc123-def456-...
url: "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d"
# Browser im Hintergrund? (false = du siehst das Fenster)
headless: false
# Chrome Profil-Ordner für persistente Sessions
# Wenn gesetzt, bleibt der Login gespeichert
user_data_dir: "./chrome_profile"
# Für Termux/Android: Pfad zur Chrome/Chromium Binary
# chrome_binary: "/data/data/com.termux/files/usr/bin/chromium"
# ============================================================================
# Heartbeat - Hält Claude am Leben
# ============================================================================
heartbeat:
# Automatische TICKs aktivieren?
# false = keine automatischen TICKs, du sendest [TICK] manuell im Chat (Debug-Modus)
# true = normale Funktion, TICKs werden automatisch gesendet
auto_tick: true
# Bilder mit TICKs hochladen?
# true = Bei jedem TICK wird ein Bild vom ESP32 geholt und in den Chat hochgeladen
# false = Nur [TICK] ohne Bild (für Debug ohne ESP32)
upload_images: true
# Ablauf: Warten bis Claude fertig → zufällige Pause → Bild holen → TICK senden
# So werden keine TICKs gesendet während Claude noch tippt!
# Pause nach Claudes Antwort (zufällig zwischen min und max)
min_pause: 2
max_pause: 4
# Wie oft prüfen ob Claude noch tippt (Sekunden)
check_interval: 1
# Nach wie vielen Fehlern in Folge stoppen?
max_consecutive_errors: 5
# ============================================================================
# Text-to-Speech (Claudes Stimme)
# ============================================================================
tts:
# Engine: "pyttsx3" (offline), "gtts" (Google, online), "termux" (Android)
engine: "gtts"
# Sprache
language: "de"
# Sprechgeschwindigkeit
# pyttsx3: Wörter pro Minute (100-200)
# gtts: nicht unterstützt
# termux: 0.5-2.0 (1.0 = normal)
rate: 150
# Lautstärke (nur pyttsx3)
volume: 0.9
# Stimme (nur pyttsx3) - null = System-Default
# Beispiel: "german" oder "de" für deutsche Stimme
voice: null
# ============================================================================
# Speech-to-Text (Stefans Mikrofon)
# ============================================================================
stt:
# Engine: "standard" (SpeechRecognition) oder "termux" (Android)
engine: "standard"
# Erkennungsdienst (nur für standard engine)
# "google" (online, gut) oder "sphinx" (offline, mäßig)
service: "google"
# Sprache
language: "de-DE"
# Energie-Schwelle für Spracherkennung
# Niedriger = empfindlicher (300 ist Standard)
energy_threshold: 300
# Pause-Schwelle in Sekunden
# Wie lange Stille bevor ein Satz als beendet gilt
pause_threshold: 0.8
# Maximale Aufnahmelänge pro Phrase in Sekunden
phrase_time_limit: 15
# ============================================================================
# Termux (Android) Einstellungen
# ============================================================================
termux:
# Nutze Termux:API für TTS/STT statt Python-Libraries
# Setzt engine in tts/stt automatisch auf "termux"
use_termux_api: false
# ============================================================================
# ESP32 Roboter (Referenz für Claude's web_fetch Aufrufe)
# ============================================================================
# HINWEIS: Diese Werte nutzt CLAUDE direkt im Chat, nicht die Bridge!
# Du musst Claude die URL und den API-Key im Chat mitteilen.
esp32:
# IP-Adresse oder Hostname des Roboters
host: "mobil.hacker-net.de"
port: 80
# API-Key für Authentifizierung
api_key: "claudes_eyes_secret_2025"
# Für Zugriff von außen: DynDNS, Tailscale, oder Port-Forward nötig
# external_url: "https://mein-roboter.dyndns.org"
# ============================================================================
# Logging
# ============================================================================
logging:
# Level: DEBUG, INFO, WARNING, ERROR
level: "INFO"
# Log-Datei (relativ zum Script-Verzeichnis)
file: "bridge.log"
Binary file not shown.

After

Width:  |  Height:  |  Size: 358 KiB

+319
View File
@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
Claude's Eyes - Mock ESP32 Server
Simuliert den ESP32-Roboter für Tests ohne echte Hardware.
Features:
- Liefert Testbilder aus ./test_images/
- Simuliert Fahrbefehle (loggt sie)
- Liefert Fake-Sensordaten
Usage:
1. Leg JPG-Bilder in ./test_images/ (z.B. Fotos aus deiner Wohnung)
2. python mock_esp32.py
3. In config.yaml: host: "localhost", port: 5000
4. Starte die Bridge - Claude "fährt" durch deine Testbilder!
"""
import os
import random
import logging
import base64
from pathlib import Path
from datetime import datetime
from flask import Flask, jsonify, send_file, request, Response
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Konfiguration
IMAGES_DIR = Path(__file__).parent / "test_images"
API_KEY = "claudes_eyes_secret_2025"
# State
current_image_index = 0
position = {"x": 0, "y": 0, "rotation": 0}
camera_angle = {"pan": 90, "tilt": 90}
def check_api_key():
"""Prüft den API-Key"""
key = request.args.get("key", "")
if key != API_KEY:
return False
return True
@app.route("/")
def index():
"""Startseite"""
return """
<html>
<head><title>Mock ESP32 - Claude's Eyes</title></head>
<body style="font-family: monospace; padding: 20px;">
<h1>🤖 Mock ESP32 Server</h1>
<p>Simuliert den Claude's Eyes Roboter für Tests.</p>
<h2>API Endpoints:</h2>
<ul>
<li><a href="/api/capture?key={key}">/api/capture</a> - Foto aufnehmen (liefert JPEG direkt!)</li>
<li><a href="/api/status?key={key}">/api/status</a> - Sensor-Status</li>
<li>/api/command (POST) - Fahrbefehle</li>
</ul>
<h2>Für die Python Bridge:</h2>
<p>Die Bridge holt das Bild von <code>/api/capture</code> und lädt es per Selenium in Claude.ai hoch!</p>
<p>So kann Claude im Chat die Bilder direkt sehen.</p>
<h2>Status:</h2>
<ul>
<li>Bilder-Ordner: {images_dir}</li>
<li>Gefundene Bilder: {image_count}</li>
<li>Aktuelles Bild: #{current_index}</li>
</ul>
<p><small>API-Key: {key}</small></p>
</body>
</html>
""".format(
key=API_KEY,
images_dir=IMAGES_DIR,
image_count=len(list(IMAGES_DIR.glob("*.jpg"))) if IMAGES_DIR.exists() else 0,
current_index=current_image_index
)
@app.route("/api/capture", methods=["GET"])
def capture():
"""
Macht ein "Foto" und liefert es DIREKT als JPEG zurück.
Das ist wie beim echten ESP32 - Bild wird direkt gestreamt.
Kein JSON, sondern das Bild selbst!
"""
global current_image_index
if not check_api_key():
return jsonify({"error": "Invalid API key"}), 401
# Finde Testbilder
if not IMAGES_DIR.exists():
IMAGES_DIR.mkdir(parents=True)
return jsonify({
"error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab."
}), 404
images = sorted(IMAGES_DIR.glob("*.jpg"))
if not images:
images = sorted(IMAGES_DIR.glob("*.png"))
if not images:
return jsonify({
"error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab."
}), 404
# Aktuelles Testbild holen
image = images[current_image_index % len(images)]
logger.info(f"📷 Capture: {image.name} (#{current_image_index + 1}/{len(images)})")
# Bild direkt zurückgeben (wie echter ESP32)
return send_file(image, mimetype="image/jpeg")
@app.route("/foto.jpg", methods=["GET"])
def get_foto():
"""
Liefert das aktuelle Foto - immer dieselbe URL!
Das ist der Hauptendpoint für Claude.ai Chat.
Nach /api/capture liegt das neue Bild hier.
"""
foto_path = IMAGES_DIR.parent / "foto.jpg"
if not foto_path.exists():
return jsonify({"error": "Noch kein Foto aufgenommen! Erst /api/capture aufrufen."}), 404
logger.info(f"📷 Foto abgerufen: foto.jpg")
return send_file(foto_path, mimetype="image/jpeg")
@app.route("/api/status", methods=["GET"])
def status():
"""Liefert Fake-Sensordaten"""
if not check_api_key():
return jsonify({"error": "Invalid API key"}), 401
# Zähle verfügbare Bilder
image_count = 0
if IMAGES_DIR.exists():
image_count = len(list(IMAGES_DIR.glob("*.jpg"))) + len(list(IMAGES_DIR.glob("*.png")))
data = {
"mock": True,
"timestamp": datetime.now().isoformat(),
"distance_cm": random.randint(20, 200),
"battery_voltage": round(random.uniform(7.0, 8.4), 2),
"uptime_ms": random.randint(10000, 1000000),
"position": position,
"camera_angle": camera_angle,
"imu": {
"accel_x": round(random.uniform(-0.1, 0.1), 3),
"accel_y": round(random.uniform(-0.1, 0.1), 3),
"accel_z": round(random.uniform(0.95, 1.05), 3),
"gyro_x": round(random.uniform(-1, 1), 2),
"gyro_y": round(random.uniform(-1, 1), 2),
"gyro_z": round(random.uniform(-1, 1), 2),
},
"wifi_rssi": random.randint(-70, -30),
"test_images": {
"total": image_count,
"current_index": current_image_index
}
}
logger.info(f"📊 Status: distance={data['distance_cm']}cm, battery={data['battery_voltage']}V")
return jsonify(data)
@app.route("/api/command", methods=["POST"])
def command():
"""Nimmt Fahrbefehle an"""
global current_image_index, position, camera_angle
if not check_api_key():
return jsonify({"error": "Invalid API key"}), 401
data = request.get_json() or {}
action = data.get("action", "").lower()
speed = data.get("speed", 50)
duration = data.get("duration_ms", 500)
logger.info(f"🎮 Command: {action} (speed={speed}, duration={duration}ms)")
# Simuliere Bewegung
if action == "forward":
position["y"] += 1
current_image_index += 1 # Nächstes Bild
logger.info(f" → Vorwärts, jetzt bei Bild #{current_image_index + 1}")
elif action == "backward":
position["y"] -= 1
current_image_index = max(0, current_image_index - 1)
logger.info(f" → Rückwärts, jetzt bei Bild #{current_image_index + 1}")
elif action == "left":
position["rotation"] = (position["rotation"] - 45) % 360
logger.info(f" → Links drehen, Rotation: {position['rotation']}°")
elif action == "right":
position["rotation"] = (position["rotation"] + 45) % 360
logger.info(f" → Rechts drehen, Rotation: {position['rotation']}°")
elif action == "stop":
logger.info(" → Stop")
elif action == "look_left":
camera_angle["pan"] = max(0, camera_angle["pan"] - 30)
logger.info(f" → Kamera links, Pan: {camera_angle['pan']}°")
elif action == "look_right":
camera_angle["pan"] = min(180, camera_angle["pan"] + 30)
logger.info(f" → Kamera rechts, Pan: {camera_angle['pan']}°")
elif action == "look_up":
camera_angle["tilt"] = max(0, camera_angle["tilt"] - 20)
logger.info(f" → Kamera hoch, Tilt: {camera_angle['tilt']}°")
elif action == "look_down":
camera_angle["tilt"] = min(180, camera_angle["tilt"] + 20)
logger.info(f" → Kamera runter, Tilt: {camera_angle['tilt']}°")
elif action == "look_center":
camera_angle = {"pan": 90, "tilt": 90}
logger.info(" → Kamera zentriert")
else:
return jsonify({"error": f"Unknown action: {action}"}), 400
return jsonify({
"status": "ok",
"mock": True,
"action": action,
"position": position,
"camera_angle": camera_angle,
"current_image_index": current_image_index
})
@app.route("/api/display", methods=["POST"])
def display():
"""Simuliert Display-Steuerung"""
if not check_api_key():
return jsonify({"error": "Invalid API key"}), 401
data = request.get_json() or {}
logger.info(f"🖥️ Display: {data}")
return jsonify({"status": "ok", "mock": True})
def main():
"""Startet den Mock-Server"""
print("""
╔══════════════════════════════════════════════════════════════╗
║ ║
║ 🤖 MOCK ESP32 SERVER - Claude's Eyes ║
║ ║
║ Simuliert den Roboter für Tests ohne Hardware. ║
║ ║
╠══════════════════════════════════════════════════════════════╣
║ ║
║ 1. Leg Testbilder in ./test_images/ ab (JPG oder PNG) ║
║ Tipp: Mach 10-20 Fotos aus deiner Wohnung! ║
║ ║
║ 2. Passe config.yaml an: ║
║ esp32: ║
║ host: "localhost"
║ port: 5000 ║
║ ║
║ 3. Starte die Bridge in einem anderen Terminal ║
║ ║
╠══════════════════════════════════════════════════════════════╣
║ ║
║ Server: http://localhost:5000 ║
║ API-Key: {api_key}
║ ║
╚══════════════════════════════════════════════════════════════╝
""".format(api_key=API_KEY))
# Erstelle Bilder-Ordner falls nicht existiert
if not IMAGES_DIR.exists():
IMAGES_DIR.mkdir(parents=True)
print(f"\n⚠️ Ordner {IMAGES_DIR} erstellt - leg dort Testbilder ab!\n")
# Zähle Bilder
images = list(IMAGES_DIR.glob("*.jpg")) + list(IMAGES_DIR.glob("*.png"))
if images:
print(f"📁 Gefunden: {len(images)} Testbilder")
for img in images[:5]:
print(f" - {img.name}")
if len(images) > 5:
print(f" ... und {len(images) - 5} weitere")
else:
print(f"⚠️ Keine Bilder in {IMAGES_DIR} gefunden!")
print(" Leg dort JPG/PNG-Dateien ab für den Test.\n")
print("\n🚀 Starte Server...\n")
app.run(host="0.0.0.0", port=5000, debug=True)
if __name__ == "__main__":
main()
+64
View File
@@ -0,0 +1,64 @@
# Claude's Eyes - Audio Bridge Dependencies v2
# Install with: pip install -r requirements.txt
#
# NEUE ARCHITEKTUR: Claude steuert den Roboter SELBST!
# Diese Bridge macht nur Audio (TTS/STT) und Heartbeat.
# ============================================================================
# Browser Automation (für Claude.ai Chat)
# ============================================================================
selenium>=4.16.0
webdriver-manager>=4.0.1
# ============================================================================
# Configuration
# ============================================================================
pyyaml>=6.0.1
# ============================================================================
# Text-to-Speech
# ============================================================================
# pyttsx3: Offline, System-Stimmen
pyttsx3>=2.90
# gTTS: Google Text-to-Speech (online, bessere Qualität)
gTTS>=2.4.0
# pygame: Für Audio-Wiedergabe (gTTS braucht das)
pygame>=2.5.2
# ============================================================================
# Speech-to-Text
# ============================================================================
SpeechRecognition>=3.10.0
# PyAudio: Mikrofon-Zugriff
# Installation kann tricky sein:
#
# Linux (Debian/Ubuntu):
# sudo apt install python3-pyaudio portaudio19-dev
# pip install pyaudio
#
# Windows:
# pip install pipwin
# pipwin install pyaudio
#
# Mac:
# brew install portaudio
# pip install pyaudio
#
# Termux (Android):
# Nutze stattdessen termux.use_termux_api: true in config.yaml
# pkg install termux-api
#PyAudio>=0.2.13
# ============================================================================
# CLI Interface
# ============================================================================
rich>=13.7.0
click>=8.1.7
# ============================================================================
# Mock ESP32 Server (für Tests ohne Hardware)
# ============================================================================
flask>=3.0.0
+136
View File
@@ -0,0 +1,136 @@
#!/bin/bash
# Claude's Eyes - venv Setup & Start Script
#
# Erstellt/repariert die virtuelle Umgebung und startet die Bridge
#
# Usage:
# ./start_venv.sh # Nur venv aktivieren (für manuellen Start)
# ./start_venv.sh --run # venv aktivieren + Bridge starten
# ./start_venv.sh --reset # venv neu erstellen + Dependencies installieren
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
VENV_DIR="$SCRIPT_DIR/venv"
# Farben
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
NC='\033[0m' # No Color
echo -e "${CYAN}"
echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ Claude's Eyes - Python Bridge Setup ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo -e "${NC}"
# Funktion: venv erstellen/reparieren
setup_venv() {
echo -e "${YELLOW}→ Erstelle virtuelle Umgebung...${NC}"
# Alte venv löschen falls kaputt
if [ -d "$VENV_DIR" ]; then
echo -e "${YELLOW} Lösche alte venv...${NC}"
rm -rf "$VENV_DIR"
fi
# Neue venv erstellen
python3 -m venv "$VENV_DIR"
if [ $? -ne 0 ]; then
echo -e "${RED}FEHLER: Konnte venv nicht erstellen!${NC}"
echo "Installiere python3-venv: sudo apt install python3-venv"
exit 1
fi
echo -e "${GREEN}✓ venv erstellt${NC}"
# Aktivieren
source "$VENV_DIR/bin/activate"
# pip upgraden
echo -e "${YELLOW}→ Upgrade pip...${NC}"
pip install --upgrade pip
# Dependencies installieren
echo -e "${YELLOW}→ Installiere Abhängigkeiten...${NC}"
pip install -r requirements.txt
if [ $? -ne 0 ]; then
echo -e "${RED}FEHLER: Konnte Dependencies nicht installieren!${NC}"
exit 1
fi
# PyAudio separat (optional, kann fehlschlagen)
echo -e "${YELLOW}→ Versuche PyAudio zu installieren...${NC}"
pip install pyaudio 2>/dev/null
if [ $? -ne 0 ]; then
echo -e "${YELLOW} PyAudio Installation fehlgeschlagen (optional)${NC}"
echo -e "${YELLOW} Für STT: sudo apt install python3-pyaudio${NC}"
else
echo -e "${GREEN}✓ PyAudio installiert${NC}"
fi
echo ""
echo -e "${GREEN}════════════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}✓ Setup abgeschlossen!${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════════════${NC}"
}
# Funktion: venv aktivieren
activate_venv() {
if [ ! -d "$VENV_DIR" ] || [ ! -f "$VENV_DIR/bin/activate" ]; then
echo -e "${YELLOW}venv nicht gefunden, erstelle neu...${NC}"
setup_venv
else
source "$VENV_DIR/bin/activate"
echo -e "${GREEN}✓ venv aktiviert ($(python --version))${NC}"
fi
}
# Funktion: Bridge starten
run_bridge() {
echo ""
echo -e "${CYAN}→ Starte Audio Bridge...${NC}"
echo ""
python chat_audio_bridge.py "$@"
}
# Argumente verarbeiten
case "$1" in
--reset)
setup_venv
echo ""
echo "Starte Bridge mit: ./start_venv.sh --run"
;;
--run)
activate_venv
shift # Entferne --run aus den Argumenten
run_bridge "$@"
;;
--help|-h)
echo "Usage: ./start_venv.sh [OPTION]"
echo ""
echo "Optionen:"
echo " (keine) Nur venv aktivieren (für source ./start_venv.sh)"
echo " --run venv aktivieren und Bridge starten"
echo " --reset venv komplett neu erstellen"
echo " --help Diese Hilfe anzeigen"
echo ""
echo "Beispiele:"
echo " ./start_venv.sh --reset # Nach Python-Update"
echo " ./start_venv.sh --run # Normaler Start"
echo " ./start_venv.sh --run -d # Mit Debug-Logging"
;;
*)
activate_venv
echo ""
echo -e "${CYAN}venv ist aktiv. Du kannst jetzt:${NC}"
echo " python chat_audio_bridge.py # Bridge starten"
echo " python mock_esp32.py # Mock-Server starten"
echo ""
echo -e "${YELLOW}Oder nutze: ./start_venv.sh --run${NC}"
;;
esac
+349
View File
@@ -0,0 +1,349 @@
"""
Claude's Eyes - Speech-to-Text Engine
Converts Stefan's speech to text for Claude
"""
import logging
import threading
import queue
from typing import Optional, Callable
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class SpeechResult:
"""Result from speech recognition"""
text: str
confidence: float
is_final: bool
class STTEngine:
"""Speech-to-Text engine using SpeechRecognition library"""
def __init__(
self,
energy_threshold: int = 300,
pause_threshold: float = 0.8,
phrase_time_limit: int = 15,
service: str = "google",
language: str = "de-DE"
):
import speech_recognition as sr
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
# Configure recognizer
self.recognizer.energy_threshold = energy_threshold
self.recognizer.pause_threshold = pause_threshold
self.recognizer.phrase_time_limit = phrase_time_limit
self.service = service
self.language = language
self._listening = False
self._callback: Optional[Callable[[SpeechResult], None]] = None
self._stop_flag = False
self._thread: Optional[threading.Thread] = None
self._results_queue = queue.Queue()
# Calibrate microphone
logger.info("Calibrating microphone...")
with self.microphone as source:
self.recognizer.adjust_for_ambient_noise(source, duration=1)
logger.info(f"Energy threshold set to {self.recognizer.energy_threshold}")
logger.info(f"STT engine initialized (service: {service}, language: {language})")
def listen_once(self, timeout: Optional[float] = None) -> Optional[SpeechResult]:
"""
Listen for a single phrase (blocking)
Args:
timeout: Maximum time to wait for speech start
Returns:
SpeechResult or None if nothing recognized
"""
import speech_recognition as sr
try:
with self.microphone as source:
logger.debug("Listening...")
audio = self.recognizer.listen(source, timeout=timeout)
return self._recognize(audio)
except sr.WaitTimeoutError:
logger.debug("Listen timeout")
return None
except Exception as e:
logger.error(f"Listen error: {e}")
return None
def _recognize(self, audio) -> Optional[SpeechResult]:
"""Recognize speech from audio data"""
import speech_recognition as sr
try:
if self.service == "google":
text = self.recognizer.recognize_google(audio, language=self.language)
return SpeechResult(text=text, confidence=0.9, is_final=True)
elif self.service == "sphinx":
# Offline recognition (needs pocketsphinx)
text = self.recognizer.recognize_sphinx(audio)
return SpeechResult(text=text, confidence=0.7, is_final=True)
else:
logger.error(f"Unknown service: {self.service}")
return None
except sr.UnknownValueError:
logger.debug("Could not understand audio")
return None
except sr.RequestError as e:
logger.error(f"Recognition service error: {e}")
return None
def start_continuous(self, callback: Callable[[SpeechResult], None]) -> None:
"""
Start continuous listening in background
Args:
callback: Function called with each recognized phrase
"""
if self._listening:
logger.warning("Already listening")
return
self._callback = callback
self._stop_flag = False
self._listening = True
self._thread = threading.Thread(target=self._listen_loop, daemon=True)
self._thread.start()
logger.info("Continuous listening started")
def stop_continuous(self) -> None:
"""Stop continuous listening"""
self._stop_flag = True
self._listening = False
if self._thread:
self._thread.join(timeout=2)
self._thread = None
logger.info("Continuous listening stopped")
def _listen_loop(self):
"""Background thread for continuous listening"""
import speech_recognition as sr
while not self._stop_flag:
try:
with self.microphone as source:
# Short timeout to allow stop checks
try:
audio = self.recognizer.listen(source, timeout=1, phrase_time_limit=self.recognizer.phrase_time_limit)
except sr.WaitTimeoutError:
continue
result = self._recognize(audio)
if result and self._callback:
self._callback(result)
except Exception as e:
if not self._stop_flag:
logger.error(f"Listen loop error: {e}")
def is_listening(self) -> bool:
return self._listening
def get_result_nonblocking(self) -> Optional[SpeechResult]:
"""Get result without blocking (for use with async callback)"""
try:
return self._results_queue.get_nowait()
except queue.Empty:
return None
class TermuxSTTEngine:
"""
STT via Termux:API für Android
Benötigt:
- Termux App
- Termux:API App
- pkg install termux-api
"""
def __init__(self, language: str = "de-DE", timeout: int = 10):
self.language = language
self.timeout = timeout
self._listening = False
self._stop_flag = False
self._thread: Optional[threading.Thread] = None
self._callback: Optional[Callable[[SpeechResult], None]] = None
# Teste ob termux-speech-to-text verfügbar ist
import shutil
if not shutil.which("termux-speech-to-text"):
raise RuntimeError(
"termux-speech-to-text nicht gefunden! "
"Installiere mit: pkg install termux-api"
)
logger.info(f"Termux STT engine initialized (language: {language})")
def listen_once(self, timeout: Optional[float] = None) -> Optional[SpeechResult]:
"""
Listen for a single phrase via Termux API
Args:
timeout: Maximum time to wait (uses class timeout if None)
Returns:
SpeechResult or None if nothing recognized
"""
import subprocess
import json
actual_timeout = timeout if timeout else self.timeout
try:
# termux-speech-to-text gibt JSON zurück
result = subprocess.run(
["termux-speech-to-text"],
capture_output=True,
text=True,
timeout=actual_timeout + 5 # Extra Zeit für API
)
if result.returncode != 0:
logger.error(f"Termux STT error: {result.stderr}")
return None
# Output ist ein String (kein JSON bei Termux)
text = result.stdout.strip()
if text:
return SpeechResult(
text=text,
confidence=0.8, # Termux gibt keine Konfidenz
is_final=True
)
return None
except subprocess.TimeoutExpired:
logger.debug("Termux STT timeout")
return None
except Exception as e:
logger.error(f"Termux STT error: {e}")
return None
def start_continuous(self, callback: Callable[[SpeechResult], None]) -> None:
"""Start continuous listening in background"""
if self._listening:
logger.warning("Already listening")
return
self._callback = callback
self._stop_flag = False
self._listening = True
self._thread = threading.Thread(target=self._listen_loop, daemon=True)
self._thread.start()
logger.info("Termux continuous listening started")
def stop_continuous(self) -> None:
"""Stop continuous listening"""
self._stop_flag = True
self._listening = False
if self._thread:
self._thread.join(timeout=2)
self._thread = None
logger.info("Termux continuous listening stopped")
def _listen_loop(self):
"""Background thread for continuous listening"""
while not self._stop_flag:
try:
result = self.listen_once(timeout=5)
if result and self._callback:
self._callback(result)
except Exception as e:
if not self._stop_flag:
logger.error(f"Termux listen loop error: {e}")
# Kleine Pause zwischen Aufnahmen
import time
time.sleep(0.5)
def is_listening(self) -> bool:
return self._listening
def create_stt_engine(engine_type: str = "standard", **kwargs):
"""
Factory function to create STT engine
Args:
engine_type: "standard" or "termux"
**kwargs: Engine-specific options
"""
if engine_type == "termux":
return TermuxSTTEngine(
language=kwargs.get("language", "de-DE"),
timeout=kwargs.get("phrase_time_limit", 15)
)
else:
# Standard SpeechRecognition engine
return STTEngine(
energy_threshold=kwargs.get("energy_threshold", 300),
pause_threshold=kwargs.get("pause_threshold", 0.8),
phrase_time_limit=kwargs.get("phrase_time_limit", 15),
service=kwargs.get("service", "google"),
language=kwargs.get("language", "de-DE")
)
# Test when run directly
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
print("Speech-to-Text Test")
print("=" * 40)
engine = create_stt_engine(language="de-DE")
print("\nSag etwas (du hast 10 Sekunden)...")
result = engine.listen_once(timeout=10)
if result:
print(f"\nErkannt: '{result.text}'")
print(f"Konfidenz: {result.confidence:.0%}")
else:
print("\nNichts erkannt.")
print("\n\nKontinuierlicher Modus (5 Sekunden)...")
def on_speech(result: SpeechResult):
print(f" -> {result.text}")
engine.start_continuous(on_speech)
import time
time.sleep(5)
engine.stop_continuous()
print("\nDone!")
+336
View File
@@ -0,0 +1,336 @@
"""
Claude's Eyes - Text-to-Speech Engine
Converts Claude's text responses to spoken audio
"""
import logging
import threading
import queue
from typing import Optional
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class TTSEngine(ABC):
"""Abstract base class for TTS engines"""
@abstractmethod
def speak(self, text: str) -> None:
"""Speak the given text (blocking)"""
pass
@abstractmethod
def speak_async(self, text: str) -> None:
"""Speak the given text (non-blocking)"""
pass
@abstractmethod
def stop(self) -> None:
"""Stop current speech"""
pass
@abstractmethod
def is_speaking(self) -> bool:
"""Check if currently speaking"""
pass
class Pyttsx3Engine(TTSEngine):
"""TTS using pyttsx3 (offline, system voices)"""
def __init__(self, voice: Optional[str] = None, rate: int = 150, volume: float = 0.9):
import pyttsx3
self.engine = pyttsx3.init()
self.engine.setProperty('rate', rate)
self.engine.setProperty('volume', volume)
# Set voice if specified
if voice:
voices = self.engine.getProperty('voices')
for v in voices:
if voice.lower() in v.name.lower():
self.engine.setProperty('voice', v.id)
break
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
logger.info("Pyttsx3 TTS engine initialized")
def speak(self, text: str) -> None:
"""Speak text (blocking)"""
self._speaking = True
try:
self.engine.say(text)
self.engine.runAndWait()
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
self._stop_flag = True
self.engine.stop()
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
class GTTSEngine(TTSEngine):
"""TTS using Google Text-to-Speech (online, better quality)"""
def __init__(self, language: str = "de"):
from gtts import gTTS
import pygame
pygame.mixer.init()
self.language = language
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
logger.info(f"gTTS engine initialized (language: {language})")
def speak(self, text: str) -> None:
"""Speak text (blocking)"""
from gtts import gTTS
import pygame
import tempfile
import os
self._speaking = True
try:
# Generate audio file
tts = gTTS(text=text, lang=self.language)
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
temp_path = f.name
tts.save(temp_path)
# Play audio
pygame.mixer.music.load(temp_path)
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
# Cleanup
os.unlink(temp_path)
except Exception as e:
logger.error(f"gTTS error: {e}")
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
import pygame
self._stop_flag = True
pygame.mixer.music.stop()
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
class TermuxTTSEngine(TTSEngine):
"""
TTS via Termux:API für Android
Benötigt:
- Termux App
- Termux:API App
- pkg install termux-api
"""
def __init__(self, language: str = "de", rate: float = 1.0):
self.language = language
self.rate = rate
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
self._process = None
# Teste ob termux-tts-speak verfügbar ist
import shutil
if not shutil.which("termux-tts-speak"):
raise RuntimeError(
"termux-tts-speak nicht gefunden! "
"Installiere mit: pkg install termux-api"
)
logger.info(f"Termux TTS engine initialized (language: {language})")
def speak(self, text: str) -> None:
"""Speak text via Termux API (blocking)"""
import subprocess
self._speaking = True
try:
# termux-tts-speak Optionen:
# -l <language> - Sprache (z.B. "de" oder "de-DE")
# -r <rate> - Geschwindigkeit (0.5 bis 2.0, default 1.0)
# -p <pitch> - Tonhöhe (0.5 bis 2.0, default 1.0)
# -s <stream> - Audio Stream (ALARM, MUSIC, NOTIFICATION, RING, SYSTEM, VOICE_CALL)
cmd = [
"termux-tts-speak",
"-l", self.language,
"-r", str(self.rate),
text
]
self._process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self._process.wait() # Warte bis fertig
self._process = None
except Exception as e:
logger.error(f"Termux TTS error: {e}")
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
self._stop_flag = True
# Beende laufenden Prozess
if self._process:
try:
self._process.terminate()
except:
pass
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine:
"""
Factory function to create TTS engine
Args:
engine_type: "pyttsx3", "gtts", or "termux"
**kwargs: Engine-specific options
"""
if engine_type == "pyttsx3":
return Pyttsx3Engine(
voice=kwargs.get("voice"),
rate=kwargs.get("rate", 150),
volume=kwargs.get("volume", 0.9)
)
elif engine_type == "gtts":
return GTTSEngine(
language=kwargs.get("language", "de")
)
elif engine_type == "termux":
return TermuxTTSEngine(
language=kwargs.get("language", "de"),
rate=kwargs.get("rate", 1.0)
)
else:
raise ValueError(f"Unknown TTS engine: {engine_type}")
# Test when run directly
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
print("Testing pyttsx3...")
engine = create_tts_engine("pyttsx3", rate=150)
engine.speak("Hallo! Ich bin Claude und erkunde gerade deine Wohnung.")
print("\nTesting gTTS...")
try:
engine2 = create_tts_engine("gtts", language="de")
engine2.speak("Das hier klingt noch besser!")
except Exception as e:
print(f"gTTS not available: {e}")
print("\nDone!")