diff --git a/README.md b/README.md index b8d0d6a..8be2baa 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,33 @@ nano config.yaml # chat.url auf deine Claude.ai Chat-URL setzen Die Bridge öffnet Chrome mit Claude.ai. Beim ersten Mal musst du dich einloggen. Danach kann's losgehen! +### 4. Mock-Server (Tests ohne Hardware) + +Für Tests ohne echten Roboter gibt es einen Mock-Server: + +```bash +cd python_bridge + +# Mit Testbildern aus ./test_images/ +python mock_esp32.py + +# ODER mit USB-Webcam (config.yaml anpassen): +# mock: +# use_real_webcam: true +python mock_esp32.py +``` + +**Webcam-Modus aktivieren** in `config.yaml`: +```yaml +mock: + use_real_webcam: true # USB-Webcam nutzen + webcam_device: 0 # 0 = erste Webcam + webcam_width: 640 + webcam_height: 480 +``` + +Benötigt `pip install opencv-python` für Webcam-Support. + --- ## API Endpoints (ESP32) @@ -166,12 +193,19 @@ Claude verwendet diese Befehle in eckigen Klammern: - **Echte Autonomie** - Claude entscheidet selbst was ihn interessiert - **Paralelle Konversation** - Erkunden UND quatschen gleichzeitig - **Sprachausgabe** - Claude redet mit dir (TTS) + - "Claude sagt:" Prefix bei jeder Nachricht + - Wartet auf [READY] bevor TTS startet (keine Init-Nachrichten) + - Persistente Position - bei Neustart werden alte Nachrichten nicht wiederholt - **Spracheingabe** - Du redest mit Claude (STT, 5s Stille = fertig) + - Bis zu 2 Minuten pro Phrase (phrase_time_limit: 120) + - "Stefan sagt:" Prefix - **Mute/Unmute** - Mikrofon per Tastendruck stummschalten - **Smart Recording** - Heartbeat pausiert automatisch während du sprichst +- **Mock-Server** - Testen ohne Hardware + - Testbilder aus `./test_images/` Ordner + - ODER echte USB-Webcam (`use_real_webcam: true`) - **Hinderniserkennung** - Ultraschall & IMU - **Touch-Display** - Notfall-Stopp & Status -- **Termux Support** - Läuft auch auf Android! ## Keyboard-Shortcuts (Bridge) @@ -184,6 +218,24 @@ Claude verwendet diese Befehle in eckigen Klammern: **Hinweis:** Claude.ai erlaubt max. 100 Bilder pro Chat. Die Bridge warnt bei 90/95 Bildern. Mit **N** startest du einen neuen Chat. +## Bridge Argumente + +```bash +./start_venv.sh --run [OPTIONEN] +``` + +| Option | Beschreibung | +|--------|--------------| +| `-d, --debug` | Debug-Logging (zeigt Nachrichten-Erkennung) | +| `-c FILE` | Eigene Config-Datei nutzen | +| `--test` | Test-Modus (kein Heartbeat) | + +**Beispiele:** +```bash +./start_venv.sh --run -d # Mit Debug-Logging +./start_venv.sh --run -c my.yaml # Eigene Config +``` + --- ## Sicherheit diff --git a/docs/setup_guide.md b/docs/setup_guide.md index fcbd78c..7edb34f 100644 --- a/docs/setup_guide.md +++ b/docs/setup_guide.md @@ -187,23 +187,13 @@ esp32: host: "192.168.178.XXX" # IP des Roboters api_key: "dein_api_key" # Muss mit config.h übereinstimmen! -claude: - api_key: "" # Oder setze ANTHROPIC_API_KEY Environment Variable +chat: + url: "https://claude.ai/chat/..." # URL deines Claude-Chats ``` -### 2.4 Anthropic API Key +**Wichtig:** Diese Bridge nutzt Claude.ai direkt im Browser (via Selenium), NICHT die Anthropic API. Du brauchst einen Claude.ai Account aber keinen API Key! -Erstelle einen API Key auf https://console.anthropic.com/ - -```bash -# Linux/Mac: -export ANTHROPIC_API_KEY="sk-ant-..." - -# Windows (PowerShell): -$env:ANTHROPIC_API_KEY="sk-ant-..." -``` - -### 2.5 Bridge starten +### 2.4 Bridge starten **Mit start_venv.sh (empfohlen):** ```bash @@ -229,7 +219,7 @@ python chat_audio_bridge.py -d python chat_audio_bridge.py -c config.local.yaml ``` -### 2.6 Keyboard-Shortcuts während der Bridge läuft +### 2.5 Keyboard-Shortcuts während der Bridge läuft | Taste | Funktion | |-------|----------| @@ -245,7 +235,7 @@ python chat_audio_bridge.py -c config.local.yaml - Mit **N** startest du einen neuen Chat und die Instruktionen werden erneut gesendet - Bilder werden nur hochgeladen wenn sie sich geändert haben (spart Limit!) -### 2.7 Spracheingabe (STT) - Wie es funktioniert +### 2.6 Spracheingabe (STT) - Wie es funktioniert Die Spracheingabe sammelt deine Worte intelligent: diff --git a/python_bridge/ARCHIV/bridge.py b/python_bridge/ARCHIV/bridge.py deleted file mode 100644 index 89fd9aa..0000000 --- a/python_bridge/ARCHIV/bridge.py +++ /dev/null @@ -1,424 +0,0 @@ -#!/usr/bin/env python3 -""" -Claude's Eyes - Main Bridge Script - -Connects the ESP32 robot with Claude AI for autonomous exploration. - -Usage: - python bridge.py # Use config.yaml - python bridge.py --config my.yaml # Use custom config - python bridge.py --simulate # Simulate without hardware -""" - -import os -import sys -import time -import logging -import threading -import signal -from pathlib import Path -from typing import Optional -from dataclasses import dataclass - -import yaml -import click -from rich.console import Console -from rich.panel import Panel -from rich.live import Live -from rich.table import Table -from rich.text import Text - -from esp32_client import ESP32Client, RobotStatus -from tts_engine import create_tts_engine, TTSEngine -from stt_engine import create_stt_engine, STTEngine, SpeechResult -from chat_interface import create_chat_interface, ChatInterface, ChatResponse - -# Setup logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -# Rich console for pretty output -console = Console() - - -@dataclass -class BridgeState: - """Current state of the bridge""" - connected: bool = False - exploring: bool = False - last_image_time: float = 0 - last_status: Optional[RobotStatus] = None - last_claude_response: str = "" - stefan_input: str = "" - error_message: str = "" - - -class ClaudesEyesBridge: - """Main bridge class connecting robot and Claude""" - - def __init__(self, config_path: str, simulate: bool = False): - self.config = self._load_config(config_path) - self.simulate = simulate - self.state = BridgeState() - self.running = False - - # Components - self.robot: Optional[ESP32Client] = None - self.chat: Optional[ChatInterface] = None - self.tts: Optional[TTSEngine] = None - self.stt: Optional[STTEngine] = None - - # Threading - self.speech_thread: Optional[threading.Thread] = None - self._stop_event = threading.Event() - - def _load_config(self, config_path: str) -> dict: - """Load configuration from YAML file""" - path = Path(config_path) - - # Try local config first - local_path = path.parent / f"{path.stem}.local{path.suffix}" - if local_path.exists(): - path = local_path - logger.info(f"Using local config: {path}") - - if not path.exists(): - logger.error(f"Config file not found: {path}") - sys.exit(1) - - with open(path) as f: - config = yaml.safe_load(f) - - return config - - def initialize(self) -> bool: - """Initialize all components""" - console.print(Panel.fit( - "[bold cyan]Claude's Eyes[/bold cyan]\n" - "[dim]Autonomous Exploration Robot[/dim]", - border_style="cyan" - )) - - # Initialize robot client - if not self.simulate: - console.print("\n[yellow]Connecting to robot...[/yellow]") - esp_config = self.config.get("esp32", {}) - self.robot = ESP32Client( - host=esp_config.get("host", "192.168.178.100"), - port=esp_config.get("port", 80), - api_key=esp_config.get("api_key", ""), - timeout=esp_config.get("timeout", 10) - ) - - if not self.robot.is_connected(): - console.print("[red]Could not connect to robot![/red]") - self.state.error_message = "Robot connection failed" - return False - - self.state.connected = True - console.print("[green]Robot connected![/green]") - else: - console.print("[yellow]Simulation mode - no robot connection[/yellow]") - self.state.connected = True - - # Initialize Claude interface - console.print("\n[yellow]Initializing Claude interface...[/yellow]") - claude_config = self.config.get("claude", {}) - - api_key = claude_config.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "") - - self.chat = create_chat_interface( - use_api=claude_config.get("use_api", True) and bool(api_key), - api_key=api_key, - model=claude_config.get("model", "claude-sonnet-4-20250514"), - system_prompt=claude_config.get("system_prompt", ""), - max_tokens=claude_config.get("max_tokens", 1024) - ) - console.print(f"[green]Chat interface ready ({type(self.chat).__name__})[/green]") - - # Initialize TTS - console.print("\n[yellow]Initializing Text-to-Speech...[/yellow]") - tts_config = self.config.get("tts", {}) - try: - self.tts = create_tts_engine( - engine_type=tts_config.get("engine", "pyttsx3"), - voice=tts_config.get("voice"), - rate=tts_config.get("rate", 150), - volume=tts_config.get("volume", 0.9), - language=tts_config.get("language", "de") - ) - console.print("[green]TTS ready![/green]") - except Exception as e: - console.print(f"[red]TTS init failed: {e}[/red]") - self.tts = None - - # Initialize STT - console.print("\n[yellow]Initializing Speech-to-Text...[/yellow]") - stt_config = self.config.get("stt", {}) - try: - self.stt = create_stt_engine( - energy_threshold=stt_config.get("energy_threshold", 300), - pause_threshold=stt_config.get("pause_threshold", 0.8), - phrase_time_limit=stt_config.get("phrase_time_limit", 15), - service=stt_config.get("service", "google"), - language=stt_config.get("language", "de-DE") - ) - console.print("[green]STT ready![/green]") - except Exception as e: - console.print(f"[red]STT init failed: {e}[/red]") - self.stt = None - - console.print("\n[bold green]All systems initialized![/bold green]\n") - return True - - def start(self): - """Start the main exploration loop""" - self.running = True - self.state.exploring = True - - # Start speech recognition in background - if self.stt: - self.stt.start_continuous(self._on_speech_detected) - - # Welcome message - welcome = "Hallo Stefan! Ich bin online und bereit zum Erkunden. Was soll ich mir anschauen?" - self._speak(welcome) - self.state.last_claude_response = welcome - - try: - self._main_loop() - except KeyboardInterrupt: - console.print("\n[yellow]Stopping...[/yellow]") - finally: - self.stop() - - def stop(self): - """Stop the bridge""" - self.running = False - self.state.exploring = False - self._stop_event.set() - - if self.stt: - self.stt.stop_continuous() - - if self.tts: - self.tts.stop() - - if self.robot and not self.simulate: - self.robot.stop() - - console.print("[yellow]Bridge stopped[/yellow]") - - def _main_loop(self): - """Main exploration loop""" - camera_config = self.config.get("camera", {}) - capture_interval = camera_config.get("capture_interval", 5) - - while self.running: - try: - current_time = time.time() - - # Capture and process image periodically - if current_time - self.state.last_image_time >= capture_interval: - self._exploration_step() - self.state.last_image_time = current_time - - # Update status display - self._update_display() - - # Small delay - time.sleep(0.1) - - except Exception as e: - logger.error(f"Loop error: {e}") - self.state.error_message = str(e) - time.sleep(1) - - def _exploration_step(self): - """Single exploration step: capture, analyze, act""" - # Get robot status - if self.robot and not self.simulate: - try: - self.state.last_status = self.robot.get_status() - except Exception as e: - logger.error(f"Status error: {e}") - - # Capture image - image_data = None - if self.robot and not self.simulate: - try: - camera_config = self.config.get("camera", {}) - image_data = self.robot.capture_image( - resolution=camera_config.get("resolution", "VGA"), - quality=camera_config.get("quality", 12) - ) - except Exception as e: - logger.error(f"Capture error: {e}") - - # Build context message - context = self._build_context_message() - - # Add Stefan's input if any - if self.state.stefan_input: - context += f"\n\nStefan sagt: {self.state.stefan_input}" - self.state.stefan_input = "" - - # Send to Claude - try: - response = self.chat.send_message(context, image=image_data) - self.state.last_claude_response = response.text - - # Speak response - self._speak(response.text) - - # Execute commands - self._execute_commands(response.commands) - - # Update robot display - if self.robot and not self.simulate: - # Send short version to robot display - short_text = response.text[:100] + "..." if len(response.text) > 100 else response.text - self.robot.set_claude_text(short_text) - - except Exception as e: - logger.error(f"Chat error: {e}") - self.state.error_message = str(e) - - def _build_context_message(self) -> str: - """Build context message with sensor data""" - parts = ["Hier ist was ich gerade sehe und meine Sensordaten:"] - - if self.state.last_status: - status = self.state.last_status - parts.append(f"\n- Abstand zum nächsten Hindernis: {status.distance_cm:.0f} cm") - parts.append(f"- Aktuelle Aktion: {status.current_action}") - parts.append(f"- Batterie: {status.battery_percent}%") - - if status.obstacle_danger: - parts.append("- WARNUNG: Hindernis sehr nah!") - elif status.obstacle_warning: - parts.append("- Hinweis: Hindernis in der Nähe") - - if status.is_tilted: - parts.append("- WARNUNG: Ich bin schief!") - - parts.append("\nWas siehst du auf dem Bild? Was möchtest du als nächstes tun?") - - return "\n".join(parts) - - def _execute_commands(self, commands: list): - """Execute movement commands from Claude""" - if not commands: - return - - if self.simulate: - console.print(f"[dim]Simulated commands: {commands}[/dim]") - return - - if not self.robot: - return - - safety = self.config.get("safety", {}) - max_speed = safety.get("max_speed", 70) - min_distance = safety.get("min_obstacle_distance", 20) - - for cmd in commands: - # Safety check - if self.state.last_status and self.state.last_status.distance_cm < min_distance: - if cmd == "FORWARD": - console.print("[red]Blocked: Obstacle too close![/red]") - continue - - try: - if cmd == "FORWARD": - self.robot.forward(speed=max_speed, duration_ms=800) - elif cmd == "BACKWARD": - self.robot.backward(speed=max_speed, duration_ms=800) - elif cmd == "LEFT": - self.robot.left(speed=max_speed, duration_ms=400) - elif cmd == "RIGHT": - self.robot.right(speed=max_speed, duration_ms=400) - elif cmd == "STOP": - self.robot.stop() - elif cmd == "LOOK_LEFT": - self.robot.look_left() - elif cmd == "LOOK_RIGHT": - self.robot.look_right() - elif cmd == "LOOK_UP": - self.robot.look_up() - elif cmd == "LOOK_DOWN": - self.robot.look_down() - elif cmd == "LOOK_CENTER": - self.robot.look_center() - - # Small delay between commands - time.sleep(0.3) - - except Exception as e: - logger.error(f"Command error ({cmd}): {e}") - - def _speak(self, text: str): - """Speak text using TTS""" - if self.tts: - # Remove command brackets from speech - import re - clean_text = re.sub(r'\[[A-Z_]+\]', '', text).strip() - if clean_text: - self.tts.speak_async(clean_text) - - def _on_speech_detected(self, result: SpeechResult): - """Callback when Stefan says something""" - console.print(f"\n[bold blue]Stefan:[/bold blue] {result.text}") - self.state.stefan_input = result.text - - def _update_display(self): - """Update console display""" - # This could be enhanced with rich.live for real-time updates - pass - - -def signal_handler(signum, frame): - """Handle Ctrl+C gracefully""" - console.print("\n[yellow]Received stop signal...[/yellow]") - sys.exit(0) - - -@click.command() -@click.option('--config', '-c', default='config.yaml', help='Path to config file') -@click.option('--simulate', '-s', is_flag=True, help='Simulate without hardware') -@click.option('--debug', '-d', is_flag=True, help='Enable debug logging') -def main(config: str, simulate: bool, debug: bool): - """Claude's Eyes - Autonomous Exploration Robot Bridge""" - - if debug: - logging.getLogger().setLevel(logging.DEBUG) - - # Handle signals - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Find config file - config_path = Path(config) - if not config_path.is_absolute(): - # Look in script directory first - script_dir = Path(__file__).parent - if (script_dir / config).exists(): - config_path = script_dir / config - - # Create and run bridge - bridge = ClaudesEyesBridge(str(config_path), simulate=simulate) - - if bridge.initialize(): - console.print("\n[bold cyan]Starting exploration...[/bold cyan]") - console.print("[dim]Press Ctrl+C to stop[/dim]\n") - bridge.start() - else: - console.print("[red]Initialization failed![/red]") - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/python_bridge/ARCHIV/chat_interface.py b/python_bridge/ARCHIV/chat_interface.py deleted file mode 100644 index bec2400..0000000 --- a/python_bridge/ARCHIV/chat_interface.py +++ /dev/null @@ -1,257 +0,0 @@ -""" -Claude's Eyes - Chat Interface - -Interface to communicate with Claude AI (via API or browser) -""" - -import logging -import base64 -import re -from typing import Optional, List, Dict, Any, Tuple -from dataclasses import dataclass, field -from abc import ABC, abstractmethod - -logger = logging.getLogger(__name__) - - -@dataclass -class Message: - """A chat message""" - role: str # "user" or "assistant" - content: str - image_data: Optional[bytes] = None # JPEG image data - - -@dataclass -class ChatResponse: - """Response from Claude""" - text: str - commands: List[str] = field(default_factory=list) # Extracted movement commands - - -class ChatInterface(ABC): - """Abstract base class for chat interfaces""" - - @abstractmethod - def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse: - """Send message to Claude and get response""" - pass - - @abstractmethod - def reset_conversation(self) -> None: - """Reset/clear conversation history""" - pass - - -class AnthropicAPIInterface(ChatInterface): - """Direct Claude API interface using anthropic library""" - - def __init__( - self, - api_key: str, - model: str = "claude-sonnet-4-20250514", - system_prompt: str = "", - max_tokens: int = 1024 - ): - import anthropic - - self.client = anthropic.Anthropic(api_key=api_key) - self.model = model - self.system_prompt = system_prompt - self.max_tokens = max_tokens - self.conversation_history: List[Dict[str, Any]] = [] - - logger.info(f"Anthropic API interface initialized (model: {model})") - - def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse: - """Send message to Claude API""" - - # Build message content - content = [] - - # Add image if provided - if image: - image_base64 = base64.standard_b64encode(image).decode("utf-8") - content.append({ - "type": "image", - "source": { - "type": "base64", - "media_type": "image/jpeg", - "data": image_base64 - } - }) - - # Add text - content.append({ - "type": "text", - "text": text - }) - - # Add to history - self.conversation_history.append({ - "role": "user", - "content": content - }) - - try: - # Make API call - response = self.client.messages.create( - model=self.model, - max_tokens=self.max_tokens, - system=self.system_prompt, - messages=self.conversation_history - ) - - # Extract response text - response_text = "" - for block in response.content: - if block.type == "text": - response_text += block.text - - # Add assistant response to history - self.conversation_history.append({ - "role": "assistant", - "content": response_text - }) - - # Extract commands - commands = self._extract_commands(response_text) - - logger.debug(f"Claude response: {response_text[:100]}...") - logger.debug(f"Extracted commands: {commands}") - - return ChatResponse(text=response_text, commands=commands) - - except Exception as e: - logger.error(f"API error: {e}") - raise - - def reset_conversation(self) -> None: - """Reset conversation history""" - self.conversation_history = [] - logger.info("Conversation history cleared") - - def _extract_commands(self, text: str) -> List[str]: - """Extract movement commands from Claude's response""" - # Commands are in brackets like [FORWARD], [LEFT], etc. - pattern = r'\[([A-Z_]+)\]' - matches = re.findall(pattern, text) - - valid_commands = [ - "FORWARD", "BACKWARD", "LEFT", "RIGHT", "STOP", - "LOOK_LEFT", "LOOK_RIGHT", "LOOK_UP", "LOOK_DOWN", "LOOK_CENTER" - ] - - return [cmd for cmd in matches if cmd in valid_commands] - - -class SimulatedInterface(ChatInterface): - """Simulated chat interface for testing without API""" - - def __init__(self): - self.message_count = 0 - logger.info("Simulated chat interface initialized") - - def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse: - """Return simulated responses""" - self.message_count += 1 - - responses = [ - ("Oh interessant! Ich sehe etwas vor mir. Lass mich näher hinfahren. [FORWARD]", - ["FORWARD"]), - ("Hmm, was ist das links? Ich schaue mal nach. [LOOK_LEFT]", - ["LOOK_LEFT"]), - ("Das sieht aus wie ein Bücherregal! Ich fahre mal hin. [FORWARD] [FORWARD]", - ["FORWARD", "FORWARD"]), - ("Stefan, was ist das für ein Gegenstand? Kannst du mir das erklären?", - []), - ("Ich drehe mich um und schaue was hinter mir ist. [RIGHT] [RIGHT]", - ["RIGHT", "RIGHT"]), - ] - - idx = (self.message_count - 1) % len(responses) - text_response, commands = responses[idx] - - return ChatResponse(text=text_response, commands=commands) - - def reset_conversation(self) -> None: - self.message_count = 0 - - -def create_chat_interface( - use_api: bool = True, - api_key: str = "", - model: str = "claude-sonnet-4-20250514", - system_prompt: str = "", - max_tokens: int = 1024 -) -> ChatInterface: - """ - Factory function to create chat interface - - Args: - use_api: Use Anthropic API (True) or simulated (False) - api_key: Anthropic API key - model: Claude model to use - system_prompt: System prompt for Claude - max_tokens: Maximum response tokens - """ - if use_api: - if not api_key: - import os - api_key = os.environ.get("ANTHROPIC_API_KEY", "") - - if not api_key: - logger.warning("No API key provided, using simulated interface") - return SimulatedInterface() - - return AnthropicAPIInterface( - api_key=api_key, - model=model, - system_prompt=system_prompt, - max_tokens=max_tokens - ) - else: - return SimulatedInterface() - - -# Test when run directly -if __name__ == "__main__": - import os - - logging.basicConfig(level=logging.DEBUG) - - print("Chat Interface Test") - print("=" * 40) - - # Try API first, fall back to simulated - api_key = os.environ.get("ANTHROPIC_API_KEY", "") - - system_prompt = """Du bist Claude und steuerst einen Erkundungsroboter. - Befehle in Klammern: [FORWARD], [BACKWARD], [LEFT], [RIGHT], [STOP] - Beschreibe was du siehst und entscheide wohin du fährst.""" - - interface = create_chat_interface( - use_api=bool(api_key), - api_key=api_key, - system_prompt=system_prompt - ) - - print(f"Using: {type(interface).__name__}") - print() - - # Test conversation - test_messages = [ - "Hallo Claude! Du bist jetzt online. Was siehst du?", - "Vor dir ist ein Flur mit einer Tür am Ende.", - "Die Tür ist offen und dahinter ist ein helles Zimmer." - ] - - for msg in test_messages: - print(f"User: {msg}") - response = interface.send_message(msg) - print(f"Claude: {response.text}") - if response.commands: - print(f" Commands: {response.commands}") - print() - - print("Done!") diff --git a/python_bridge/ARCHIV/esp32_client.py b/python_bridge/ARCHIV/esp32_client.py deleted file mode 100644 index a383f26..0000000 --- a/python_bridge/ARCHIV/esp32_client.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -Claude's Eyes - ESP32 API Client - -Handles communication with the robot's REST API -""" - -import requests -from typing import Optional, Dict, Any -from dataclasses import dataclass -from io import BytesIO -from PIL import Image -import logging - -logger = logging.getLogger(__name__) - - -@dataclass -class RobotStatus: - """Current robot status from sensors""" - distance_cm: float - battery_percent: int - current_action: str - wifi_rssi: int - uptime_seconds: int - servo_pan: int - servo_tilt: int - obstacle_warning: bool - obstacle_danger: bool - is_tilted: bool - is_moving: bool - imu: Dict[str, float] - - -class ESP32Client: - """Client for communicating with the ESP32 robot""" - - def __init__(self, host: str, port: int = 80, api_key: str = "", timeout: int = 10): - self.base_url = f"http://{host}:{port}" - self.api_key = api_key - self.timeout = timeout - self._session = requests.Session() - - def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response: - """Make GET request with API key""" - if params is None: - params = {} - params["key"] = self.api_key - - url = f"{self.base_url}{endpoint}" - logger.debug(f"GET {url}") - - response = self._session.get(url, params=params, timeout=self.timeout) - response.raise_for_status() - return response - - def _post(self, endpoint: str, data: Dict) -> requests.Response: - """Make POST request with API key""" - url = f"{self.base_url}{endpoint}?key={self.api_key}" - logger.debug(f"POST {url} with {data}") - - response = self._session.post(url, json=data, timeout=self.timeout) - response.raise_for_status() - return response - - def capture_image(self, resolution: str = "VGA", quality: int = 12) -> bytes: - """ - Capture image from robot camera - - Args: - resolution: QVGA, VGA, SVGA, XGA, SXGA, UXGA - quality: 10-63 (lower = better) - - Returns: - JPEG image data as bytes - """ - params = { - "resolution": resolution, - "quality": quality - } - response = self._get("/api/capture", params) - logger.info(f"Captured image: {len(response.content)} bytes") - return response.content - - def capture_image_pil(self, resolution: str = "VGA", quality: int = 12) -> Image.Image: - """Capture image and return as PIL Image""" - image_data = self.capture_image(resolution, quality) - return Image.open(BytesIO(image_data)) - - def get_status(self) -> RobotStatus: - """Get current robot status from sensors""" - response = self._get("/api/status") - data = response.json() - - return RobotStatus( - distance_cm=data.get("distance_cm", 0), - battery_percent=data.get("battery_percent", 100), - current_action=data.get("current_action", "unknown"), - wifi_rssi=data.get("wifi_rssi", 0), - uptime_seconds=data.get("uptime_seconds", 0), - servo_pan=data.get("servo_pan", 90), - servo_tilt=data.get("servo_tilt", 90), - obstacle_warning=data.get("obstacle_warning", False), - obstacle_danger=data.get("obstacle_danger", False), - is_tilted=data.get("is_tilted", False), - is_moving=data.get("is_moving", False), - imu=data.get("imu", {}) - ) - - def send_command(self, action: str, speed: int = 50, duration_ms: int = 500, - pan: Optional[int] = None, tilt: Optional[int] = None) -> Dict[str, Any]: - """ - Send movement command to robot - - Args: - action: forward, backward, left, right, stop, - look_left, look_right, look_up, look_down, look_center, look_custom - speed: 0-100 percent - duration_ms: Duration in milliseconds - pan: Custom pan angle (for look_custom) - tilt: Custom tilt angle (for look_custom) - - Returns: - Response from robot - """ - data = { - "action": action, - "speed": speed, - "duration_ms": duration_ms - } - - if pan is not None: - data["pan"] = pan - if tilt is not None: - data["tilt"] = tilt - - response = self._post("/api/command", data) - result = response.json() - logger.info(f"Command {action}: {result.get('message', 'OK')}") - return result - - # Convenience methods for common actions - def forward(self, speed: int = 50, duration_ms: int = 500) -> Dict: - return self.send_command("forward", speed, duration_ms) - - def backward(self, speed: int = 50, duration_ms: int = 500) -> Dict: - return self.send_command("backward", speed, duration_ms) - - def left(self, speed: int = 50, duration_ms: int = 500) -> Dict: - return self.send_command("left", speed, duration_ms) - - def right(self, speed: int = 50, duration_ms: int = 500) -> Dict: - return self.send_command("right", speed, duration_ms) - - def stop(self) -> Dict: - return self.send_command("stop") - - def look_left(self) -> Dict: - return self.send_command("look_left") - - def look_right(self) -> Dict: - return self.send_command("look_right") - - def look_up(self) -> Dict: - return self.send_command("look_up") - - def look_down(self) -> Dict: - return self.send_command("look_down") - - def look_center(self) -> Dict: - return self.send_command("look_center") - - def look_custom(self, pan: int, tilt: int) -> Dict: - return self.send_command("look_custom", pan=pan, tilt=tilt) - - def set_claude_text(self, text: str) -> Dict: - """Set text that Claude wants to say/display""" - response = self._post("/api/claude_text", {"text": text}) - return response.json() - - def get_claude_text(self) -> Dict[str, Any]: - """Get last Claude text (for TTS)""" - response = self._get("/api/claude_text") - return response.json() - - def set_display(self, mode: str, content: str = "") -> Dict: - """ - Control robot display - - Args: - mode: "text", "emoji", "status" - content: Text to show or emoji name (happy, thinking, surprised, sleepy, curious, confused) - """ - response = self._post("/api/display", {"mode": mode, "content": content}) - return response.json() - - def is_connected(self) -> bool: - """Check if robot is reachable""" - try: - self.get_status() - return True - except Exception as e: - logger.warning(f"Connection check failed: {e}") - return False - - -# Test when run directly -if __name__ == "__main__": - import sys - - logging.basicConfig(level=logging.DEBUG) - - if len(sys.argv) < 2: - print("Usage: python esp32_client.py ") - sys.exit(1) - - host = sys.argv[1] - api_key = "claudes_eyes_secret_2025" - - client = ESP32Client(host, api_key=api_key) - - print(f"Connecting to {host}...") - if client.is_connected(): - print("Connected!") - - status = client.get_status() - print(f"\nStatus:") - print(f" Distance: {status.distance_cm} cm") - print(f" Battery: {status.battery_percent}%") - print(f" Action: {status.current_action}") - print(f" WiFi RSSI: {status.wifi_rssi} dBm") - - print("\nCapturing image...") - img = client.capture_image_pil() - print(f" Size: {img.size}") - img.save("test_capture.jpg") - print(" Saved to test_capture.jpg") - else: - print("Could not connect to robot!") diff --git a/python_bridge/config.yaml b/python_bridge/config.yaml index 3dad378..bbc3401 100644 --- a/python_bridge/config.yaml +++ b/python_bridge/config.yaml @@ -56,16 +56,13 @@ heartbeat: # Text-to-Speech (Claudes Stimme) # ============================================================================ tts: - # Engine: "pyttsx3" (offline), "gtts" (Google, online), "termux" (Android) + # Engine: "pyttsx3" (offline) oder "gtts" (Google, online) engine: "gtts" # Sprache language: "de" - # Sprechgeschwindigkeit - # pyttsx3: Wörter pro Minute (100-200) - # gtts: nicht unterstützt - # termux: 0.5-2.0 (1.0 = normal) + # Sprechgeschwindigkeit (nur pyttsx3: Wörter pro Minute, 100-200) rate: 150 # Lautstärke (nur pyttsx3) @@ -79,7 +76,7 @@ tts: # Speech-to-Text (Stefans Mikrofon) # ============================================================================ stt: - # Engine: "standard" (SpeechRecognition) oder "termux" (Android) + # Engine: "standard" (SpeechRecognition) engine: "standard" # Erkennungsdienst (nur für standard engine) @@ -101,14 +98,6 @@ stt: # Bei langen Sätzen höher setzen (max 2 Minuten = 120s) phrase_time_limit: 120 -# ============================================================================ -# Termux (Android) Einstellungen -# ============================================================================ -termux: - # Nutze Termux:API für TTS/STT statt Python-Libraries - # Setzt engine in tts/stt automatisch auf "termux" - use_termux_api: false - # ============================================================================ # ESP32 Roboter (Referenz für Claude's web_fetch Aufrufe) # ============================================================================ @@ -125,6 +114,22 @@ esp32: # Für Zugriff von außen: DynDNS, Tailscale, oder Port-Forward nötig # external_url: "https://mein-roboter.dyndns.org" +# ============================================================================ +# Mock ESP32 Server (für Tests ohne echte Hardware) +# ============================================================================ +mock: + # Echte USB-Webcam nutzen statt Testbilder? + # true = Bilder von angeschlossener Webcam (benötigt opencv-python) + # false = Bilder aus ./test_images/ Ordner + use_real_webcam: false + + # Webcam-Gerätenummer (0 = erste Webcam, 1 = zweite, etc.) + webcam_device: 0 + + # Webcam-Auflösung + webcam_width: 640 + webcam_height: 480 + # ============================================================================ # Logging # ============================================================================ diff --git a/python_bridge/mock_esp32.py b/python_bridge/mock_esp32.py index ab2b55f..8539f93 100644 --- a/python_bridge/mock_esp32.py +++ b/python_bridge/mock_esp32.py @@ -6,11 +6,13 @@ Simuliert den ESP32-Roboter für Tests ohne echte Hardware. Features: - Liefert Testbilder aus ./test_images/ +- ODER nutzt eine echte USB-Webcam (use_real_webcam: true in config) - Simuliert Fahrbefehle (loggt sie) - Liefert Fake-Sensordaten Usage: 1. Leg JPG-Bilder in ./test_images/ (z.B. Fotos aus deiner Wohnung) + ODER aktiviere use_real_webcam in config.yaml 2. python mock_esp32.py 3. In config.yaml: host: "localhost", port: 5000 4. Starte die Bridge - Claude "fährt" durch deine Testbilder! @@ -20,11 +22,19 @@ import os import random import logging import base64 +import yaml from pathlib import Path from datetime import datetime from flask import Flask, jsonify, send_file, request, Response +# OpenCV für Webcam (optional) +try: + import cv2 + OPENCV_AVAILABLE = True +except ImportError: + OPENCV_AVAILABLE = False + # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -32,15 +42,124 @@ logger = logging.getLogger(__name__) app = Flask(__name__) # Konfiguration -IMAGES_DIR = Path(__file__).parent / "test_images" +SCRIPT_DIR = Path(__file__).parent +IMAGES_DIR = SCRIPT_DIR / "test_images" +FOTO_PATH = SCRIPT_DIR / "foto.jpg" # Hier wird das aktuelle Foto gespeichert API_KEY = "claudes_eyes_secret_2025" +# Mock-Konfiguration (wird aus config.yaml geladen) +mock_config = { + "use_real_webcam": False, + "webcam_device": 0, + "webcam_width": 640, + "webcam_height": 480, +} + +# Webcam-Objekt (wird bei Bedarf initialisiert) +webcam = None + # State current_image_index = 0 position = {"x": 0, "y": 0, "rotation": 0} camera_angle = {"pan": 90, "tilt": 90} +def load_mock_config(): + """Lädt die Mock-Konfiguration aus config.yaml""" + global mock_config + + # Versuche config.local.yaml zuerst, dann config.yaml + for config_name in ["config.local.yaml", "config.yaml"]: + config_path = SCRIPT_DIR / config_name + if config_path.exists(): + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + if config and "mock" in config: + mock_config.update(config["mock"]) + logger.info(f"Mock-Config geladen aus {config_name}") + return + except Exception as e: + logger.warning(f"Fehler beim Laden von {config_name}: {e}") + + logger.info("Keine Mock-Config gefunden, nutze Defaults") + + +def init_webcam(): + """Initialisiert die Webcam""" + global webcam + + if not OPENCV_AVAILABLE: + logger.error("OpenCV nicht installiert! Installiere mit: pip install opencv-python") + return False + + if webcam is not None: + return True + + device = mock_config.get("webcam_device", 0) + width = mock_config.get("webcam_width", 640) + height = mock_config.get("webcam_height", 480) + + try: + webcam = cv2.VideoCapture(device) + if not webcam.isOpened(): + logger.error(f"Konnte Webcam {device} nicht öffnen!") + webcam = None + return False + + # Auflösung setzen + webcam.set(cv2.CAP_PROP_FRAME_WIDTH, width) + webcam.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + + actual_w = int(webcam.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_h = int(webcam.get(cv2.CAP_PROP_FRAME_HEIGHT)) + logger.info(f"Webcam {device} initialisiert: {actual_w}x{actual_h}") + return True + + except Exception as e: + logger.error(f"Webcam-Fehler: {e}") + webcam = None + return False + + +def capture_from_webcam() -> bool: + """ + Nimmt ein Bild von der Webcam auf und speichert es als foto.jpg + + Returns: + True wenn erfolgreich, False bei Fehler + """ + global webcam + + if not init_webcam(): + return False + + try: + # Bild aufnehmen + ret, frame = webcam.read() + if not ret or frame is None: + logger.error("Konnte kein Bild von Webcam lesen!") + return False + + # Als JPEG speichern + cv2.imwrite(str(FOTO_PATH), frame) + logger.info(f"📷 Webcam-Bild aufgenommen: {FOTO_PATH.name}") + return True + + except Exception as e: + logger.error(f"Webcam-Capture-Fehler: {e}") + return False + + +def release_webcam(): + """Gibt die Webcam frei""" + global webcam + if webcam is not None: + webcam.release() + webcam = None + logger.info("Webcam freigegeben") + + def check_api_key(): """Prüft den API-Key""" key = request.args.get("key", "") @@ -95,17 +214,44 @@ def capture(): Das ist wie beim echten ESP32 - Bild wird direkt gestreamt. Kein JSON, sondern das Bild selbst! + + Je nach Konfiguration: + - use_real_webcam: true → Bild von USB-Webcam + - use_real_webcam: false → Bild aus test_images/ Ordner """ global current_image_index if not check_api_key(): return jsonify({"error": "Invalid API key"}), 401 - # Finde Testbilder + # ════════════════════════════════════════════════════════════════ + # WEBCAM-MODUS: Echtes Bild von USB-Webcam + # ════════════════════════════════════════════════════════════════ + if mock_config.get("use_real_webcam", False): + if capture_from_webcam(): + # Foto wurde in foto.jpg gespeichert, das zurückgeben + if FOTO_PATH.exists(): + return send_file(FOTO_PATH, mimetype="image/jpeg") + else: + return jsonify({"error": "Webcam-Capture fehlgeschlagen"}), 500 + else: + # Fallback: Bestehendes foto.jpg nutzen falls vorhanden + if FOTO_PATH.exists(): + logger.warning("Webcam-Fehler, nutze bestehendes foto.jpg") + return send_file(FOTO_PATH, mimetype="image/jpeg") + return jsonify({"error": "Webcam nicht verfügbar und kein foto.jpg vorhanden"}), 500 + + # ════════════════════════════════════════════════════════════════ + # TESTBILD-MODUS: Bild aus test_images/ Ordner + # ════════════════════════════════════════════════════════════════ if not IMAGES_DIR.exists(): IMAGES_DIR.mkdir(parents=True) + # Fallback: foto.jpg nutzen falls vorhanden + if FOTO_PATH.exists(): + logger.info("📷 Kein test_images/, nutze foto.jpg") + return send_file(FOTO_PATH, mimetype="image/jpeg") return jsonify({ - "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab." + "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab oder aktiviere use_real_webcam." }), 404 images = sorted(IMAGES_DIR.glob("*.jpg")) @@ -113,8 +259,12 @@ def capture(): images = sorted(IMAGES_DIR.glob("*.png")) if not images: + # Fallback: foto.jpg nutzen falls vorhanden + if FOTO_PATH.exists(): + logger.info("📷 Keine Testbilder, nutze foto.jpg") + return send_file(FOTO_PATH, mimetype="image/jpeg") return jsonify({ - "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab." + "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab oder aktiviere use_real_webcam." }), 404 # Aktuelles Testbild holen @@ -132,15 +282,18 @@ def get_foto(): Liefert das aktuelle Foto - immer dieselbe URL! Das ist der Hauptendpoint für Claude.ai Chat. - Nach /api/capture liegt das neue Bild hier. + Bei Webcam-Modus wird hier immer das letzte Webcam-Bild geliefert. """ - foto_path = IMAGES_DIR.parent / "foto.jpg" + if not FOTO_PATH.exists(): + # Bei Webcam-Modus: Mach ein Foto falls noch keins da ist + if mock_config.get("use_real_webcam", False): + if capture_from_webcam(): + return send_file(FOTO_PATH, mimetype="image/jpeg") - if not foto_path.exists(): return jsonify({"error": "Noch kein Foto aufgenommen! Erst /api/capture aufrufen."}), 404 logger.info(f"📷 Foto abgerufen: foto.jpg") - return send_file(foto_path, mimetype="image/jpeg") + return send_file(FOTO_PATH, mimetype="image/jpeg") @app.route("/api/status", methods=["GET"]) @@ -266,6 +419,13 @@ def display(): def main(): """Startet den Mock-Server""" + + # Lade Konfiguration + load_mock_config() + + use_webcam = mock_config.get("use_real_webcam", False) + webcam_device = mock_config.get("webcam_device", 0) + print(""" ╔══════════════════════════════════════════════════════════════╗ ║ ║ @@ -273,18 +433,31 @@ def main(): ║ ║ ║ Simuliert den Roboter für Tests ohne Hardware. ║ ║ ║ -╠══════════════════════════════════════════════════════════════╣ +╠══════════════════════════════════════════════════════════════╣""") + + if use_webcam: + print("""║ ║ +║ 📷 WEBCAM-MODUS AKTIV ║ +║ Bilder kommen von deiner USB-Webcam (Device {device}) ║ +║ ║""".format(device=webcam_device)) + else: + print("""║ ║ +║ 📁 TESTBILD-MODUS ║ +║ Leg Testbilder in ./test_images/ ab (JPG oder PNG) ║ +║ Tipp: Mach 10-20 Fotos aus deiner Wohnung! ║ ║ ║ -║ 1. Leg Testbilder in ./test_images/ ab (JPG oder PNG) ║ -║ Tipp: Mach 10-20 Fotos aus deiner Wohnung! ║ +║ ODER aktiviere Webcam in config.yaml: ║ +║ mock: ║ +║ use_real_webcam: true ║ +║ ║""") + + print("""╠══════════════════════════════════════════════════════════════╣ ║ ║ -║ 2. Passe config.yaml an: ║ +║ Für die Bridge - config.yaml: ║ ║ esp32: ║ ║ host: "localhost" ║ ║ port: 5000 ║ ║ ║ -║ 3. Starte die Bridge in einem anderen Terminal ║ -║ ║ ╠══════════════════════════════════════════════════════════════╣ ║ ║ ║ Server: http://localhost:5000 ║ @@ -293,26 +466,52 @@ def main(): ╚══════════════════════════════════════════════════════════════╝ """.format(api_key=API_KEY)) - # Erstelle Bilder-Ordner falls nicht existiert - if not IMAGES_DIR.exists(): - IMAGES_DIR.mkdir(parents=True) - print(f"\n⚠️ Ordner {IMAGES_DIR} erstellt - leg dort Testbilder ab!\n") - - # Zähle Bilder - images = list(IMAGES_DIR.glob("*.jpg")) + list(IMAGES_DIR.glob("*.png")) - if images: - print(f"📁 Gefunden: {len(images)} Testbilder") - for img in images[:5]: - print(f" - {img.name}") - if len(images) > 5: - print(f" ... und {len(images) - 5} weitere") + # Webcam testen falls aktiviert + if use_webcam: + if not OPENCV_AVAILABLE: + print("❌ OpenCV nicht installiert!") + print(" Installiere mit: pip install opencv-python") + print(" Oder deaktiviere Webcam in config.yaml\n") + else: + print(f"📷 Teste Webcam {webcam_device}...") + if init_webcam(): + print(f"✅ Webcam bereit!") + # Test-Capture + if capture_from_webcam(): + print(f"✅ Test-Bild aufgenommen: {FOTO_PATH}") + else: + print(f"❌ Webcam {webcam_device} konnte nicht geöffnet werden!") + print(" Prüfe ob eine Webcam angeschlossen ist.\n") else: - print(f"⚠️ Keine Bilder in {IMAGES_DIR} gefunden!") - print(" Leg dort JPG/PNG-Dateien ab für den Test.\n") + # Erstelle Bilder-Ordner falls nicht existiert + if not IMAGES_DIR.exists(): + IMAGES_DIR.mkdir(parents=True) + print(f"\n⚠️ Ordner {IMAGES_DIR} erstellt - leg dort Testbilder ab!\n") + + # Zähle Bilder + images = list(IMAGES_DIR.glob("*.jpg")) + list(IMAGES_DIR.glob("*.png")) + if images: + print(f"📁 Gefunden: {len(images)} Testbilder") + for img in images[:5]: + print(f" - {img.name}") + if len(images) > 5: + print(f" ... und {len(images) - 5} weitere") + else: + # Prüfe ob foto.jpg existiert + if FOTO_PATH.exists(): + print(f"📷 Nutze bestehendes {FOTO_PATH.name}") + else: + print(f"⚠️ Keine Bilder in {IMAGES_DIR} gefunden!") + print(" Leg dort JPG/PNG-Dateien ab für den Test.") + print(" Oder aktiviere use_real_webcam in config.yaml\n") print("\n🚀 Starte Server...\n") - app.run(host="0.0.0.0", port=5000, debug=True) + try: + app.run(host="0.0.0.0", port=5000, debug=True) + finally: + # Webcam freigeben beim Beenden + release_webcam() if __name__ == "__main__": diff --git a/python_bridge/stt_engine.py b/python_bridge/stt_engine.py index a789039..4d38eaa 100644 --- a/python_bridge/stt_engine.py +++ b/python_bridge/stt_engine.py @@ -173,148 +173,21 @@ class STTEngine: return None -class TermuxSTTEngine: - """ - STT via Termux:API für Android - - Benötigt: - - Termux App - - Termux:API App - - pkg install termux-api - """ - - def __init__(self, language: str = "de-DE", timeout: int = 10): - self.language = language - self.timeout = timeout - self._listening = False - self._stop_flag = False - self._thread: Optional[threading.Thread] = None - self._callback: Optional[Callable[[SpeechResult], None]] = None - - # Teste ob termux-speech-to-text verfügbar ist - import shutil - if not shutil.which("termux-speech-to-text"): - raise RuntimeError( - "termux-speech-to-text nicht gefunden! " - "Installiere mit: pkg install termux-api" - ) - - logger.info(f"Termux STT engine initialized (language: {language})") - - def listen_once(self, timeout: Optional[float] = None) -> Optional[SpeechResult]: - """ - Listen for a single phrase via Termux API - - Args: - timeout: Maximum time to wait (uses class timeout if None) - - Returns: - SpeechResult or None if nothing recognized - """ - import subprocess - import json - - actual_timeout = timeout if timeout else self.timeout - - try: - # termux-speech-to-text gibt JSON zurück - result = subprocess.run( - ["termux-speech-to-text"], - capture_output=True, - text=True, - timeout=actual_timeout + 5 # Extra Zeit für API - ) - - if result.returncode != 0: - logger.error(f"Termux STT error: {result.stderr}") - return None - - # Output ist ein String (kein JSON bei Termux) - text = result.stdout.strip() - - if text: - return SpeechResult( - text=text, - confidence=0.8, # Termux gibt keine Konfidenz - is_final=True - ) - - return None - - except subprocess.TimeoutExpired: - logger.debug("Termux STT timeout") - return None - except Exception as e: - logger.error(f"Termux STT error: {e}") - return None - - def start_continuous(self, callback: Callable[[SpeechResult], None]) -> None: - """Start continuous listening in background""" - if self._listening: - logger.warning("Already listening") - return - - self._callback = callback - self._stop_flag = False - self._listening = True - - self._thread = threading.Thread(target=self._listen_loop, daemon=True) - self._thread.start() - - logger.info("Termux continuous listening started") - - def stop_continuous(self) -> None: - """Stop continuous listening""" - self._stop_flag = True - self._listening = False - - if self._thread: - self._thread.join(timeout=2) - self._thread = None - - logger.info("Termux continuous listening stopped") - - def _listen_loop(self): - """Background thread for continuous listening""" - while not self._stop_flag: - try: - result = self.listen_once(timeout=5) - if result and self._callback: - self._callback(result) - except Exception as e: - if not self._stop_flag: - logger.error(f"Termux listen loop error: {e}") - - # Kleine Pause zwischen Aufnahmen - import time - time.sleep(0.5) - - def is_listening(self) -> bool: - return self._listening - - -def create_stt_engine(engine_type: str = "standard", **kwargs): +def create_stt_engine(engine_type: str = "standard", **kwargs) -> STTEngine: """ Factory function to create STT engine Args: - engine_type: "standard" or "termux" + engine_type: "standard" **kwargs: Engine-specific options """ - if engine_type == "termux": - return TermuxSTTEngine( - language=kwargs.get("language", "de-DE"), - timeout=kwargs.get("phrase_time_limit", 15) - ) - else: - # Standard SpeechRecognition engine - return STTEngine( - energy_threshold=kwargs.get("energy_threshold", 300), - pause_threshold=kwargs.get("pause_threshold", 0.8), - phrase_time_limit=kwargs.get("phrase_time_limit", 15), - service=kwargs.get("service", "google"), - language=kwargs.get("language", "de-DE") - ) + return STTEngine( + energy_threshold=kwargs.get("energy_threshold", 300), + pause_threshold=kwargs.get("pause_threshold", 0.8), + phrase_time_limit=kwargs.get("phrase_time_limit", 15), + service=kwargs.get("service", "google"), + language=kwargs.get("language", "de-DE") + ) # Test when run directly diff --git a/python_bridge/tts_engine.py b/python_bridge/tts_engine.py index ba410a0..599fa09 100644 --- a/python_bridge/tts_engine.py +++ b/python_bridge/tts_engine.py @@ -260,114 +260,12 @@ class GTTSEngine(TTSEngine): return self._speaking -class TermuxTTSEngine(TTSEngine): - """ - TTS via Termux:API für Android - - Benötigt: - - Termux App - - Termux:API App - - pkg install termux-api - """ - - def __init__(self, language: str = "de", rate: float = 1.0): - self.language = language - self.rate = rate - self._speaking = False - self._queue = queue.Queue() - self._thread: Optional[threading.Thread] = None - self._stop_flag = False - self._process = None - - # Teste ob termux-tts-speak verfügbar ist - import shutil - if not shutil.which("termux-tts-speak"): - raise RuntimeError( - "termux-tts-speak nicht gefunden! " - "Installiere mit: pkg install termux-api" - ) - - logger.info(f"Termux TTS engine initialized (language: {language})") - - def speak(self, text: str) -> None: - """Speak text via Termux API (blocking)""" - import subprocess - - self._speaking = True - try: - # termux-tts-speak Optionen: - # -l - Sprache (z.B. "de" oder "de-DE") - # -r - Geschwindigkeit (0.5 bis 2.0, default 1.0) - # -p - Tonhöhe (0.5 bis 2.0, default 1.0) - # -s - Audio Stream (ALARM, MUSIC, NOTIFICATION, RING, SYSTEM, VOICE_CALL) - - cmd = [ - "termux-tts-speak", - "-l", self.language, - "-r", str(self.rate), - text - ] - - self._process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - self._process.wait() # Warte bis fertig - self._process = None - - except Exception as e: - logger.error(f"Termux TTS error: {e}") - finally: - self._speaking = False - - def speak_async(self, text: str) -> None: - """Speak text (non-blocking)""" - self._queue.put(text) - - if self._thread is None or not self._thread.is_alive(): - self._stop_flag = False - self._thread = threading.Thread(target=self._speech_worker, daemon=True) - self._thread.start() - - def _speech_worker(self): - """Worker thread for async speech""" - while not self._stop_flag: - try: - text = self._queue.get(timeout=0.5) - self.speak(text) - self._queue.task_done() - except queue.Empty: - continue - - def stop(self) -> None: - """Stop current speech""" - self._stop_flag = True - - # Beende laufenden Prozess - if self._process: - try: - self._process.terminate() - except: - pass - - # Clear queue - while not self._queue.empty(): - try: - self._queue.get_nowait() - except queue.Empty: - break - - def is_speaking(self) -> bool: - return self._speaking - - def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine: """ Factory function to create TTS engine Args: - engine_type: "pyttsx3", "gtts", or "termux" + engine_type: "pyttsx3" or "gtts" **kwargs: Engine-specific options """ if engine_type == "pyttsx3": @@ -380,11 +278,6 @@ def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine: return GTTSEngine( language=kwargs.get("language", "de") ) - elif engine_type == "termux": - return TermuxTTSEngine( - language=kwargs.get("language", "de"), - rate=kwargs.get("rate", 1.0) - ) else: raise ValueError(f"Unknown TTS engine: {engine_type}")