425 lines
14 KiB
Python
425 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Claude's Eyes - Main Bridge Script
|
|
|
|
Connects the ESP32 robot with Claude AI for autonomous exploration.
|
|
|
|
Usage:
|
|
python bridge.py # Use config.yaml
|
|
python bridge.py --config my.yaml # Use custom config
|
|
python bridge.py --simulate # Simulate without hardware
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import threading
|
|
import signal
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from dataclasses import dataclass
|
|
|
|
import yaml
|
|
import click
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.live import Live
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
from esp32_client import ESP32Client, RobotStatus
|
|
from tts_engine import create_tts_engine, TTSEngine
|
|
from stt_engine import create_stt_engine, STTEngine, SpeechResult
|
|
from chat_interface import create_chat_interface, ChatInterface, ChatResponse
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Rich console for pretty output
|
|
console = Console()
|
|
|
|
|
|
@dataclass
|
|
class BridgeState:
|
|
"""Current state of the bridge"""
|
|
connected: bool = False
|
|
exploring: bool = False
|
|
last_image_time: float = 0
|
|
last_status: Optional[RobotStatus] = None
|
|
last_claude_response: str = ""
|
|
stefan_input: str = ""
|
|
error_message: str = ""
|
|
|
|
|
|
class ClaudesEyesBridge:
|
|
"""Main bridge class connecting robot and Claude"""
|
|
|
|
def __init__(self, config_path: str, simulate: bool = False):
|
|
self.config = self._load_config(config_path)
|
|
self.simulate = simulate
|
|
self.state = BridgeState()
|
|
self.running = False
|
|
|
|
# Components
|
|
self.robot: Optional[ESP32Client] = None
|
|
self.chat: Optional[ChatInterface] = None
|
|
self.tts: Optional[TTSEngine] = None
|
|
self.stt: Optional[STTEngine] = None
|
|
|
|
# Threading
|
|
self.speech_thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
|
|
def _load_config(self, config_path: str) -> dict:
|
|
"""Load configuration from YAML file"""
|
|
path = Path(config_path)
|
|
|
|
# Try local config first
|
|
local_path = path.parent / f"{path.stem}.local{path.suffix}"
|
|
if local_path.exists():
|
|
path = local_path
|
|
logger.info(f"Using local config: {path}")
|
|
|
|
if not path.exists():
|
|
logger.error(f"Config file not found: {path}")
|
|
sys.exit(1)
|
|
|
|
with open(path) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
return config
|
|
|
|
def initialize(self) -> bool:
|
|
"""Initialize all components"""
|
|
console.print(Panel.fit(
|
|
"[bold cyan]Claude's Eyes[/bold cyan]\n"
|
|
"[dim]Autonomous Exploration Robot[/dim]",
|
|
border_style="cyan"
|
|
))
|
|
|
|
# Initialize robot client
|
|
if not self.simulate:
|
|
console.print("\n[yellow]Connecting to robot...[/yellow]")
|
|
esp_config = self.config.get("esp32", {})
|
|
self.robot = ESP32Client(
|
|
host=esp_config.get("host", "192.168.178.100"),
|
|
port=esp_config.get("port", 80),
|
|
api_key=esp_config.get("api_key", ""),
|
|
timeout=esp_config.get("timeout", 10)
|
|
)
|
|
|
|
if not self.robot.is_connected():
|
|
console.print("[red]Could not connect to robot![/red]")
|
|
self.state.error_message = "Robot connection failed"
|
|
return False
|
|
|
|
self.state.connected = True
|
|
console.print("[green]Robot connected![/green]")
|
|
else:
|
|
console.print("[yellow]Simulation mode - no robot connection[/yellow]")
|
|
self.state.connected = True
|
|
|
|
# Initialize Claude interface
|
|
console.print("\n[yellow]Initializing Claude interface...[/yellow]")
|
|
claude_config = self.config.get("claude", {})
|
|
|
|
api_key = claude_config.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "")
|
|
|
|
self.chat = create_chat_interface(
|
|
use_api=claude_config.get("use_api", True) and bool(api_key),
|
|
api_key=api_key,
|
|
model=claude_config.get("model", "claude-sonnet-4-20250514"),
|
|
system_prompt=claude_config.get("system_prompt", ""),
|
|
max_tokens=claude_config.get("max_tokens", 1024)
|
|
)
|
|
console.print(f"[green]Chat interface ready ({type(self.chat).__name__})[/green]")
|
|
|
|
# Initialize TTS
|
|
console.print("\n[yellow]Initializing Text-to-Speech...[/yellow]")
|
|
tts_config = self.config.get("tts", {})
|
|
try:
|
|
self.tts = create_tts_engine(
|
|
engine_type=tts_config.get("engine", "pyttsx3"),
|
|
voice=tts_config.get("voice"),
|
|
rate=tts_config.get("rate", 150),
|
|
volume=tts_config.get("volume", 0.9),
|
|
language=tts_config.get("language", "de")
|
|
)
|
|
console.print("[green]TTS ready![/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]TTS init failed: {e}[/red]")
|
|
self.tts = None
|
|
|
|
# Initialize STT
|
|
console.print("\n[yellow]Initializing Speech-to-Text...[/yellow]")
|
|
stt_config = self.config.get("stt", {})
|
|
try:
|
|
self.stt = create_stt_engine(
|
|
energy_threshold=stt_config.get("energy_threshold", 300),
|
|
pause_threshold=stt_config.get("pause_threshold", 0.8),
|
|
phrase_time_limit=stt_config.get("phrase_time_limit", 15),
|
|
service=stt_config.get("service", "google"),
|
|
language=stt_config.get("language", "de-DE")
|
|
)
|
|
console.print("[green]STT ready![/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]STT init failed: {e}[/red]")
|
|
self.stt = None
|
|
|
|
console.print("\n[bold green]All systems initialized![/bold green]\n")
|
|
return True
|
|
|
|
def start(self):
|
|
"""Start the main exploration loop"""
|
|
self.running = True
|
|
self.state.exploring = True
|
|
|
|
# Start speech recognition in background
|
|
if self.stt:
|
|
self.stt.start_continuous(self._on_speech_detected)
|
|
|
|
# Welcome message
|
|
welcome = "Hallo Stefan! Ich bin online und bereit zum Erkunden. Was soll ich mir anschauen?"
|
|
self._speak(welcome)
|
|
self.state.last_claude_response = welcome
|
|
|
|
try:
|
|
self._main_loop()
|
|
except KeyboardInterrupt:
|
|
console.print("\n[yellow]Stopping...[/yellow]")
|
|
finally:
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""Stop the bridge"""
|
|
self.running = False
|
|
self.state.exploring = False
|
|
self._stop_event.set()
|
|
|
|
if self.stt:
|
|
self.stt.stop_continuous()
|
|
|
|
if self.tts:
|
|
self.tts.stop()
|
|
|
|
if self.robot and not self.simulate:
|
|
self.robot.stop()
|
|
|
|
console.print("[yellow]Bridge stopped[/yellow]")
|
|
|
|
def _main_loop(self):
|
|
"""Main exploration loop"""
|
|
camera_config = self.config.get("camera", {})
|
|
capture_interval = camera_config.get("capture_interval", 5)
|
|
|
|
while self.running:
|
|
try:
|
|
current_time = time.time()
|
|
|
|
# Capture and process image periodically
|
|
if current_time - self.state.last_image_time >= capture_interval:
|
|
self._exploration_step()
|
|
self.state.last_image_time = current_time
|
|
|
|
# Update status display
|
|
self._update_display()
|
|
|
|
# Small delay
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Loop error: {e}")
|
|
self.state.error_message = str(e)
|
|
time.sleep(1)
|
|
|
|
def _exploration_step(self):
|
|
"""Single exploration step: capture, analyze, act"""
|
|
# Get robot status
|
|
if self.robot and not self.simulate:
|
|
try:
|
|
self.state.last_status = self.robot.get_status()
|
|
except Exception as e:
|
|
logger.error(f"Status error: {e}")
|
|
|
|
# Capture image
|
|
image_data = None
|
|
if self.robot and not self.simulate:
|
|
try:
|
|
camera_config = self.config.get("camera", {})
|
|
image_data = self.robot.capture_image(
|
|
resolution=camera_config.get("resolution", "VGA"),
|
|
quality=camera_config.get("quality", 12)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Capture error: {e}")
|
|
|
|
# Build context message
|
|
context = self._build_context_message()
|
|
|
|
# Add Stefan's input if any
|
|
if self.state.stefan_input:
|
|
context += f"\n\nStefan sagt: {self.state.stefan_input}"
|
|
self.state.stefan_input = ""
|
|
|
|
# Send to Claude
|
|
try:
|
|
response = self.chat.send_message(context, image=image_data)
|
|
self.state.last_claude_response = response.text
|
|
|
|
# Speak response
|
|
self._speak(response.text)
|
|
|
|
# Execute commands
|
|
self._execute_commands(response.commands)
|
|
|
|
# Update robot display
|
|
if self.robot and not self.simulate:
|
|
# Send short version to robot display
|
|
short_text = response.text[:100] + "..." if len(response.text) > 100 else response.text
|
|
self.robot.set_claude_text(short_text)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Chat error: {e}")
|
|
self.state.error_message = str(e)
|
|
|
|
def _build_context_message(self) -> str:
|
|
"""Build context message with sensor data"""
|
|
parts = ["Hier ist was ich gerade sehe und meine Sensordaten:"]
|
|
|
|
if self.state.last_status:
|
|
status = self.state.last_status
|
|
parts.append(f"\n- Abstand zum nächsten Hindernis: {status.distance_cm:.0f} cm")
|
|
parts.append(f"- Aktuelle Aktion: {status.current_action}")
|
|
parts.append(f"- Batterie: {status.battery_percent}%")
|
|
|
|
if status.obstacle_danger:
|
|
parts.append("- WARNUNG: Hindernis sehr nah!")
|
|
elif status.obstacle_warning:
|
|
parts.append("- Hinweis: Hindernis in der Nähe")
|
|
|
|
if status.is_tilted:
|
|
parts.append("- WARNUNG: Ich bin schief!")
|
|
|
|
parts.append("\nWas siehst du auf dem Bild? Was möchtest du als nächstes tun?")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _execute_commands(self, commands: list):
|
|
"""Execute movement commands from Claude"""
|
|
if not commands:
|
|
return
|
|
|
|
if self.simulate:
|
|
console.print(f"[dim]Simulated commands: {commands}[/dim]")
|
|
return
|
|
|
|
if not self.robot:
|
|
return
|
|
|
|
safety = self.config.get("safety", {})
|
|
max_speed = safety.get("max_speed", 70)
|
|
min_distance = safety.get("min_obstacle_distance", 20)
|
|
|
|
for cmd in commands:
|
|
# Safety check
|
|
if self.state.last_status and self.state.last_status.distance_cm < min_distance:
|
|
if cmd == "FORWARD":
|
|
console.print("[red]Blocked: Obstacle too close![/red]")
|
|
continue
|
|
|
|
try:
|
|
if cmd == "FORWARD":
|
|
self.robot.forward(speed=max_speed, duration_ms=800)
|
|
elif cmd == "BACKWARD":
|
|
self.robot.backward(speed=max_speed, duration_ms=800)
|
|
elif cmd == "LEFT":
|
|
self.robot.left(speed=max_speed, duration_ms=400)
|
|
elif cmd == "RIGHT":
|
|
self.robot.right(speed=max_speed, duration_ms=400)
|
|
elif cmd == "STOP":
|
|
self.robot.stop()
|
|
elif cmd == "LOOK_LEFT":
|
|
self.robot.look_left()
|
|
elif cmd == "LOOK_RIGHT":
|
|
self.robot.look_right()
|
|
elif cmd == "LOOK_UP":
|
|
self.robot.look_up()
|
|
elif cmd == "LOOK_DOWN":
|
|
self.robot.look_down()
|
|
elif cmd == "LOOK_CENTER":
|
|
self.robot.look_center()
|
|
|
|
# Small delay between commands
|
|
time.sleep(0.3)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Command error ({cmd}): {e}")
|
|
|
|
def _speak(self, text: str):
|
|
"""Speak text using TTS"""
|
|
if self.tts:
|
|
# Remove command brackets from speech
|
|
import re
|
|
clean_text = re.sub(r'\[[A-Z_]+\]', '', text).strip()
|
|
if clean_text:
|
|
self.tts.speak_async(clean_text)
|
|
|
|
def _on_speech_detected(self, result: SpeechResult):
|
|
"""Callback when Stefan says something"""
|
|
console.print(f"\n[bold blue]Stefan:[/bold blue] {result.text}")
|
|
self.state.stefan_input = result.text
|
|
|
|
def _update_display(self):
|
|
"""Update console display"""
|
|
# This could be enhanced with rich.live for real-time updates
|
|
pass
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle Ctrl+C gracefully"""
|
|
console.print("\n[yellow]Received stop signal...[/yellow]")
|
|
sys.exit(0)
|
|
|
|
|
|
@click.command()
|
|
@click.option('--config', '-c', default='config.yaml', help='Path to config file')
|
|
@click.option('--simulate', '-s', is_flag=True, help='Simulate without hardware')
|
|
@click.option('--debug', '-d', is_flag=True, help='Enable debug logging')
|
|
def main(config: str, simulate: bool, debug: bool):
|
|
"""Claude's Eyes - Autonomous Exploration Robot Bridge"""
|
|
|
|
if debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Handle signals
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Find config file
|
|
config_path = Path(config)
|
|
if not config_path.is_absolute():
|
|
# Look in script directory first
|
|
script_dir = Path(__file__).parent
|
|
if (script_dir / config).exists():
|
|
config_path = script_dir / config
|
|
|
|
# Create and run bridge
|
|
bridge = ClaudesEyesBridge(str(config_path), simulate=simulate)
|
|
|
|
if bridge.initialize():
|
|
console.print("\n[bold cyan]Starting exploration...[/bold cyan]")
|
|
console.print("[dim]Press Ctrl+C to stop[/dim]\n")
|
|
bridge.start()
|
|
else:
|
|
console.print("[red]Initialization failed![/red]")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|