#!/usr/bin/env python3 """ Claude's Eyes - Main Bridge Script Connects the ESP32 robot with Claude AI for autonomous exploration. Usage: python bridge.py # Use config.yaml python bridge.py --config my.yaml # Use custom config python bridge.py --simulate # Simulate without hardware """ import os import sys import time import logging import threading import signal from pathlib import Path from typing import Optional from dataclasses import dataclass import yaml import click from rich.console import Console from rich.panel import Panel from rich.live import Live from rich.table import Table from rich.text import Text from esp32_client import ESP32Client, RobotStatus from tts_engine import create_tts_engine, TTSEngine from stt_engine import create_stt_engine, STTEngine, SpeechResult from chat_interface import create_chat_interface, ChatInterface, ChatResponse # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Rich console for pretty output console = Console() @dataclass class BridgeState: """Current state of the bridge""" connected: bool = False exploring: bool = False last_image_time: float = 0 last_status: Optional[RobotStatus] = None last_claude_response: str = "" stefan_input: str = "" error_message: str = "" class ClaudesEyesBridge: """Main bridge class connecting robot and Claude""" def __init__(self, config_path: str, simulate: bool = False): self.config = self._load_config(config_path) self.simulate = simulate self.state = BridgeState() self.running = False # Components self.robot: Optional[ESP32Client] = None self.chat: Optional[ChatInterface] = None self.tts: Optional[TTSEngine] = None self.stt: Optional[STTEngine] = None # Threading self.speech_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() def _load_config(self, config_path: str) -> dict: """Load configuration from YAML file""" path = Path(config_path) # Try local config first local_path = path.parent / f"{path.stem}.local{path.suffix}" if local_path.exists(): path = local_path logger.info(f"Using local config: {path}") if not path.exists(): logger.error(f"Config file not found: {path}") sys.exit(1) with open(path) as f: config = yaml.safe_load(f) return config def initialize(self) -> bool: """Initialize all components""" console.print(Panel.fit( "[bold cyan]Claude's Eyes[/bold cyan]\n" "[dim]Autonomous Exploration Robot[/dim]", border_style="cyan" )) # Initialize robot client if not self.simulate: console.print("\n[yellow]Connecting to robot...[/yellow]") esp_config = self.config.get("esp32", {}) self.robot = ESP32Client( host=esp_config.get("host", "192.168.178.100"), port=esp_config.get("port", 80), api_key=esp_config.get("api_key", ""), timeout=esp_config.get("timeout", 10) ) if not self.robot.is_connected(): console.print("[red]Could not connect to robot![/red]") self.state.error_message = "Robot connection failed" return False self.state.connected = True console.print("[green]Robot connected![/green]") else: console.print("[yellow]Simulation mode - no robot connection[/yellow]") self.state.connected = True # Initialize Claude interface console.print("\n[yellow]Initializing Claude interface...[/yellow]") claude_config = self.config.get("claude", {}) api_key = claude_config.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "") self.chat = create_chat_interface( use_api=claude_config.get("use_api", True) and bool(api_key), api_key=api_key, model=claude_config.get("model", "claude-sonnet-4-20250514"), system_prompt=claude_config.get("system_prompt", ""), max_tokens=claude_config.get("max_tokens", 1024) ) console.print(f"[green]Chat interface ready ({type(self.chat).__name__})[/green]") # Initialize TTS console.print("\n[yellow]Initializing Text-to-Speech...[/yellow]") tts_config = self.config.get("tts", {}) try: self.tts = create_tts_engine( engine_type=tts_config.get("engine", "pyttsx3"), voice=tts_config.get("voice"), rate=tts_config.get("rate", 150), volume=tts_config.get("volume", 0.9), language=tts_config.get("language", "de") ) console.print("[green]TTS ready![/green]") except Exception as e: console.print(f"[red]TTS init failed: {e}[/red]") self.tts = None # Initialize STT console.print("\n[yellow]Initializing Speech-to-Text...[/yellow]") stt_config = self.config.get("stt", {}) try: self.stt = create_stt_engine( energy_threshold=stt_config.get("energy_threshold", 300), pause_threshold=stt_config.get("pause_threshold", 0.8), phrase_time_limit=stt_config.get("phrase_time_limit", 15), service=stt_config.get("service", "google"), language=stt_config.get("language", "de-DE") ) console.print("[green]STT ready![/green]") except Exception as e: console.print(f"[red]STT init failed: {e}[/red]") self.stt = None console.print("\n[bold green]All systems initialized![/bold green]\n") return True def start(self): """Start the main exploration loop""" self.running = True self.state.exploring = True # Start speech recognition in background if self.stt: self.stt.start_continuous(self._on_speech_detected) # Welcome message welcome = "Hallo Stefan! Ich bin online und bereit zum Erkunden. Was soll ich mir anschauen?" self._speak(welcome) self.state.last_claude_response = welcome try: self._main_loop() except KeyboardInterrupt: console.print("\n[yellow]Stopping...[/yellow]") finally: self.stop() def stop(self): """Stop the bridge""" self.running = False self.state.exploring = False self._stop_event.set() if self.stt: self.stt.stop_continuous() if self.tts: self.tts.stop() if self.robot and not self.simulate: self.robot.stop() console.print("[yellow]Bridge stopped[/yellow]") def _main_loop(self): """Main exploration loop""" camera_config = self.config.get("camera", {}) capture_interval = camera_config.get("capture_interval", 5) while self.running: try: current_time = time.time() # Capture and process image periodically if current_time - self.state.last_image_time >= capture_interval: self._exploration_step() self.state.last_image_time = current_time # Update status display self._update_display() # Small delay time.sleep(0.1) except Exception as e: logger.error(f"Loop error: {e}") self.state.error_message = str(e) time.sleep(1) def _exploration_step(self): """Single exploration step: capture, analyze, act""" # Get robot status if self.robot and not self.simulate: try: self.state.last_status = self.robot.get_status() except Exception as e: logger.error(f"Status error: {e}") # Capture image image_data = None if self.robot and not self.simulate: try: camera_config = self.config.get("camera", {}) image_data = self.robot.capture_image( resolution=camera_config.get("resolution", "VGA"), quality=camera_config.get("quality", 12) ) except Exception as e: logger.error(f"Capture error: {e}") # Build context message context = self._build_context_message() # Add Stefan's input if any if self.state.stefan_input: context += f"\n\nStefan sagt: {self.state.stefan_input}" self.state.stefan_input = "" # Send to Claude try: response = self.chat.send_message(context, image=image_data) self.state.last_claude_response = response.text # Speak response self._speak(response.text) # Execute commands self._execute_commands(response.commands) # Update robot display if self.robot and not self.simulate: # Send short version to robot display short_text = response.text[:100] + "..." if len(response.text) > 100 else response.text self.robot.set_claude_text(short_text) except Exception as e: logger.error(f"Chat error: {e}") self.state.error_message = str(e) def _build_context_message(self) -> str: """Build context message with sensor data""" parts = ["Hier ist was ich gerade sehe und meine Sensordaten:"] if self.state.last_status: status = self.state.last_status parts.append(f"\n- Abstand zum nächsten Hindernis: {status.distance_cm:.0f} cm") parts.append(f"- Aktuelle Aktion: {status.current_action}") parts.append(f"- Batterie: {status.battery_percent}%") if status.obstacle_danger: parts.append("- WARNUNG: Hindernis sehr nah!") elif status.obstacle_warning: parts.append("- Hinweis: Hindernis in der Nähe") if status.is_tilted: parts.append("- WARNUNG: Ich bin schief!") parts.append("\nWas siehst du auf dem Bild? Was möchtest du als nächstes tun?") return "\n".join(parts) def _execute_commands(self, commands: list): """Execute movement commands from Claude""" if not commands: return if self.simulate: console.print(f"[dim]Simulated commands: {commands}[/dim]") return if not self.robot: return safety = self.config.get("safety", {}) max_speed = safety.get("max_speed", 70) min_distance = safety.get("min_obstacle_distance", 20) for cmd in commands: # Safety check if self.state.last_status and self.state.last_status.distance_cm < min_distance: if cmd == "FORWARD": console.print("[red]Blocked: Obstacle too close![/red]") continue try: if cmd == "FORWARD": self.robot.forward(speed=max_speed, duration_ms=800) elif cmd == "BACKWARD": self.robot.backward(speed=max_speed, duration_ms=800) elif cmd == "LEFT": self.robot.left(speed=max_speed, duration_ms=400) elif cmd == "RIGHT": self.robot.right(speed=max_speed, duration_ms=400) elif cmd == "STOP": self.robot.stop() elif cmd == "LOOK_LEFT": self.robot.look_left() elif cmd == "LOOK_RIGHT": self.robot.look_right() elif cmd == "LOOK_UP": self.robot.look_up() elif cmd == "LOOK_DOWN": self.robot.look_down() elif cmd == "LOOK_CENTER": self.robot.look_center() # Small delay between commands time.sleep(0.3) except Exception as e: logger.error(f"Command error ({cmd}): {e}") def _speak(self, text: str): """Speak text using TTS""" if self.tts: # Remove command brackets from speech import re clean_text = re.sub(r'\[[A-Z_]+\]', '', text).strip() if clean_text: self.tts.speak_async(clean_text) def _on_speech_detected(self, result: SpeechResult): """Callback when Stefan says something""" console.print(f"\n[bold blue]Stefan:[/bold blue] {result.text}") self.state.stefan_input = result.text def _update_display(self): """Update console display""" # This could be enhanced with rich.live for real-time updates pass def signal_handler(signum, frame): """Handle Ctrl+C gracefully""" console.print("\n[yellow]Received stop signal...[/yellow]") sys.exit(0) @click.command() @click.option('--config', '-c', default='config.yaml', help='Path to config file') @click.option('--simulate', '-s', is_flag=True, help='Simulate without hardware') @click.option('--debug', '-d', is_flag=True, help='Enable debug logging') def main(config: str, simulate: bool, debug: bool): """Claude's Eyes - Autonomous Exploration Robot Bridge""" if debug: logging.getLogger().setLevel(logging.DEBUG) # Handle signals signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Find config file config_path = Path(config) if not config_path.is_absolute(): # Look in script directory first script_dir = Path(__file__).parent if (script_dir / config).exists(): config_path = script_dir / config # Create and run bridge bridge = ClaudesEyesBridge(str(config_path), simulate=simulate) if bridge.initialize(): console.print("\n[bold cyan]Starting exploration...[/bold cyan]") console.print("[dim]Press Ctrl+C to stop[/dim]\n") bridge.start() else: console.print("[red]Initialization failed![/red]") sys.exit(1) if __name__ == "__main__": main()