claud api deleted, termux support removed, mock server webcam support added (optional)

This commit is contained in:
duffyduck 2025-12-27 19:44:56 +01:00
parent d9b1e1bed4
commit 20943541a5
9 changed files with 317 additions and 1224 deletions

View File

@ -128,6 +128,33 @@ nano config.yaml # chat.url auf deine Claude.ai Chat-URL setzen
Die Bridge öffnet Chrome mit Claude.ai. Beim ersten Mal musst du dich einloggen. Danach kann's losgehen! Die Bridge öffnet Chrome mit Claude.ai. Beim ersten Mal musst du dich einloggen. Danach kann's losgehen!
### 4. Mock-Server (Tests ohne Hardware)
Für Tests ohne echten Roboter gibt es einen Mock-Server:
```bash
cd python_bridge
# Mit Testbildern aus ./test_images/
python mock_esp32.py
# ODER mit USB-Webcam (config.yaml anpassen):
# mock:
# use_real_webcam: true
python mock_esp32.py
```
**Webcam-Modus aktivieren** in `config.yaml`:
```yaml
mock:
use_real_webcam: true # USB-Webcam nutzen
webcam_device: 0 # 0 = erste Webcam
webcam_width: 640
webcam_height: 480
```
Benötigt `pip install opencv-python` für Webcam-Support.
--- ---
## API Endpoints (ESP32) ## API Endpoints (ESP32)
@ -166,12 +193,19 @@ Claude verwendet diese Befehle in eckigen Klammern:
- **Echte Autonomie** - Claude entscheidet selbst was ihn interessiert - **Echte Autonomie** - Claude entscheidet selbst was ihn interessiert
- **Paralelle Konversation** - Erkunden UND quatschen gleichzeitig - **Paralelle Konversation** - Erkunden UND quatschen gleichzeitig
- **Sprachausgabe** - Claude redet mit dir (TTS) - **Sprachausgabe** - Claude redet mit dir (TTS)
- "Claude sagt:" Prefix bei jeder Nachricht
- Wartet auf [READY] bevor TTS startet (keine Init-Nachrichten)
- Persistente Position - bei Neustart werden alte Nachrichten nicht wiederholt
- **Spracheingabe** - Du redest mit Claude (STT, 5s Stille = fertig) - **Spracheingabe** - Du redest mit Claude (STT, 5s Stille = fertig)
- Bis zu 2 Minuten pro Phrase (phrase_time_limit: 120)
- "Stefan sagt:" Prefix
- **Mute/Unmute** - Mikrofon per Tastendruck stummschalten - **Mute/Unmute** - Mikrofon per Tastendruck stummschalten
- **Smart Recording** - Heartbeat pausiert automatisch während du sprichst - **Smart Recording** - Heartbeat pausiert automatisch während du sprichst
- **Mock-Server** - Testen ohne Hardware
- Testbilder aus `./test_images/` Ordner
- ODER echte USB-Webcam (`use_real_webcam: true`)
- **Hinderniserkennung** - Ultraschall & IMU - **Hinderniserkennung** - Ultraschall & IMU
- **Touch-Display** - Notfall-Stopp & Status - **Touch-Display** - Notfall-Stopp & Status
- **Termux Support** - Läuft auch auf Android!
## Keyboard-Shortcuts (Bridge) ## Keyboard-Shortcuts (Bridge)
@ -184,6 +218,24 @@ Claude verwendet diese Befehle in eckigen Klammern:
**Hinweis:** Claude.ai erlaubt max. 100 Bilder pro Chat. Die Bridge warnt bei 90/95 Bildern. Mit **N** startest du einen neuen Chat. **Hinweis:** Claude.ai erlaubt max. 100 Bilder pro Chat. Die Bridge warnt bei 90/95 Bildern. Mit **N** startest du einen neuen Chat.
## Bridge Argumente
```bash
./start_venv.sh --run [OPTIONEN]
```
| Option | Beschreibung |
|--------|--------------|
| `-d, --debug` | Debug-Logging (zeigt Nachrichten-Erkennung) |
| `-c FILE` | Eigene Config-Datei nutzen |
| `--test` | Test-Modus (kein Heartbeat) |
**Beispiele:**
```bash
./start_venv.sh --run -d # Mit Debug-Logging
./start_venv.sh --run -c my.yaml # Eigene Config
```
--- ---
## Sicherheit ## Sicherheit

View File

@ -187,23 +187,13 @@ esp32:
host: "192.168.178.XXX" # IP des Roboters host: "192.168.178.XXX" # IP des Roboters
api_key: "dein_api_key" # Muss mit config.h übereinstimmen! api_key: "dein_api_key" # Muss mit config.h übereinstimmen!
claude: chat:
api_key: "" # Oder setze ANTHROPIC_API_KEY Environment Variable url: "https://claude.ai/chat/..." # URL deines Claude-Chats
``` ```
### 2.4 Anthropic API Key **Wichtig:** Diese Bridge nutzt Claude.ai direkt im Browser (via Selenium), NICHT die Anthropic API. Du brauchst einen Claude.ai Account aber keinen API Key!
Erstelle einen API Key auf https://console.anthropic.com/ ### 2.4 Bridge starten
```bash
# Linux/Mac:
export ANTHROPIC_API_KEY="sk-ant-..."
# Windows (PowerShell):
$env:ANTHROPIC_API_KEY="sk-ant-..."
```
### 2.5 Bridge starten
**Mit start_venv.sh (empfohlen):** **Mit start_venv.sh (empfohlen):**
```bash ```bash
@ -229,7 +219,7 @@ python chat_audio_bridge.py -d
python chat_audio_bridge.py -c config.local.yaml python chat_audio_bridge.py -c config.local.yaml
``` ```
### 2.6 Keyboard-Shortcuts während der Bridge läuft ### 2.5 Keyboard-Shortcuts während der Bridge läuft
| Taste | Funktion | | Taste | Funktion |
|-------|----------| |-------|----------|
@ -245,7 +235,7 @@ python chat_audio_bridge.py -c config.local.yaml
- Mit **N** startest du einen neuen Chat und die Instruktionen werden erneut gesendet - Mit **N** startest du einen neuen Chat und die Instruktionen werden erneut gesendet
- Bilder werden nur hochgeladen wenn sie sich geändert haben (spart Limit!) - Bilder werden nur hochgeladen wenn sie sich geändert haben (spart Limit!)
### 2.7 Spracheingabe (STT) - Wie es funktioniert ### 2.6 Spracheingabe (STT) - Wie es funktioniert
Die Spracheingabe sammelt deine Worte intelligent: Die Spracheingabe sammelt deine Worte intelligent:

View File

@ -1,424 +0,0 @@
#!/usr/bin/env python3
"""
Claude's Eyes - Main Bridge Script
Connects the ESP32 robot with Claude AI for autonomous exploration.
Usage:
python bridge.py # Use config.yaml
python bridge.py --config my.yaml # Use custom config
python bridge.py --simulate # Simulate without hardware
"""
import os
import sys
import time
import logging
import threading
import signal
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
import yaml
import click
from rich.console import Console
from rich.panel import Panel
from rich.live import Live
from rich.table import Table
from rich.text import Text
from esp32_client import ESP32Client, RobotStatus
from tts_engine import create_tts_engine, TTSEngine
from stt_engine import create_stt_engine, STTEngine, SpeechResult
from chat_interface import create_chat_interface, ChatInterface, ChatResponse
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Rich console for pretty output
console = Console()
@dataclass
class BridgeState:
"""Current state of the bridge"""
connected: bool = False
exploring: bool = False
last_image_time: float = 0
last_status: Optional[RobotStatus] = None
last_claude_response: str = ""
stefan_input: str = ""
error_message: str = ""
class ClaudesEyesBridge:
"""Main bridge class connecting robot and Claude"""
def __init__(self, config_path: str, simulate: bool = False):
self.config = self._load_config(config_path)
self.simulate = simulate
self.state = BridgeState()
self.running = False
# Components
self.robot: Optional[ESP32Client] = None
self.chat: Optional[ChatInterface] = None
self.tts: Optional[TTSEngine] = None
self.stt: Optional[STTEngine] = None
# Threading
self.speech_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
def _load_config(self, config_path: str) -> dict:
"""Load configuration from YAML file"""
path = Path(config_path)
# Try local config first
local_path = path.parent / f"{path.stem}.local{path.suffix}"
if local_path.exists():
path = local_path
logger.info(f"Using local config: {path}")
if not path.exists():
logger.error(f"Config file not found: {path}")
sys.exit(1)
with open(path) as f:
config = yaml.safe_load(f)
return config
def initialize(self) -> bool:
"""Initialize all components"""
console.print(Panel.fit(
"[bold cyan]Claude's Eyes[/bold cyan]\n"
"[dim]Autonomous Exploration Robot[/dim]",
border_style="cyan"
))
# Initialize robot client
if not self.simulate:
console.print("\n[yellow]Connecting to robot...[/yellow]")
esp_config = self.config.get("esp32", {})
self.robot = ESP32Client(
host=esp_config.get("host", "192.168.178.100"),
port=esp_config.get("port", 80),
api_key=esp_config.get("api_key", ""),
timeout=esp_config.get("timeout", 10)
)
if not self.robot.is_connected():
console.print("[red]Could not connect to robot![/red]")
self.state.error_message = "Robot connection failed"
return False
self.state.connected = True
console.print("[green]Robot connected![/green]")
else:
console.print("[yellow]Simulation mode - no robot connection[/yellow]")
self.state.connected = True
# Initialize Claude interface
console.print("\n[yellow]Initializing Claude interface...[/yellow]")
claude_config = self.config.get("claude", {})
api_key = claude_config.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "")
self.chat = create_chat_interface(
use_api=claude_config.get("use_api", True) and bool(api_key),
api_key=api_key,
model=claude_config.get("model", "claude-sonnet-4-20250514"),
system_prompt=claude_config.get("system_prompt", ""),
max_tokens=claude_config.get("max_tokens", 1024)
)
console.print(f"[green]Chat interface ready ({type(self.chat).__name__})[/green]")
# Initialize TTS
console.print("\n[yellow]Initializing Text-to-Speech...[/yellow]")
tts_config = self.config.get("tts", {})
try:
self.tts = create_tts_engine(
engine_type=tts_config.get("engine", "pyttsx3"),
voice=tts_config.get("voice"),
rate=tts_config.get("rate", 150),
volume=tts_config.get("volume", 0.9),
language=tts_config.get("language", "de")
)
console.print("[green]TTS ready![/green]")
except Exception as e:
console.print(f"[red]TTS init failed: {e}[/red]")
self.tts = None
# Initialize STT
console.print("\n[yellow]Initializing Speech-to-Text...[/yellow]")
stt_config = self.config.get("stt", {})
try:
self.stt = create_stt_engine(
energy_threshold=stt_config.get("energy_threshold", 300),
pause_threshold=stt_config.get("pause_threshold", 0.8),
phrase_time_limit=stt_config.get("phrase_time_limit", 15),
service=stt_config.get("service", "google"),
language=stt_config.get("language", "de-DE")
)
console.print("[green]STT ready![/green]")
except Exception as e:
console.print(f"[red]STT init failed: {e}[/red]")
self.stt = None
console.print("\n[bold green]All systems initialized![/bold green]\n")
return True
def start(self):
"""Start the main exploration loop"""
self.running = True
self.state.exploring = True
# Start speech recognition in background
if self.stt:
self.stt.start_continuous(self._on_speech_detected)
# Welcome message
welcome = "Hallo Stefan! Ich bin online und bereit zum Erkunden. Was soll ich mir anschauen?"
self._speak(welcome)
self.state.last_claude_response = welcome
try:
self._main_loop()
except KeyboardInterrupt:
console.print("\n[yellow]Stopping...[/yellow]")
finally:
self.stop()
def stop(self):
"""Stop the bridge"""
self.running = False
self.state.exploring = False
self._stop_event.set()
if self.stt:
self.stt.stop_continuous()
if self.tts:
self.tts.stop()
if self.robot and not self.simulate:
self.robot.stop()
console.print("[yellow]Bridge stopped[/yellow]")
def _main_loop(self):
"""Main exploration loop"""
camera_config = self.config.get("camera", {})
capture_interval = camera_config.get("capture_interval", 5)
while self.running:
try:
current_time = time.time()
# Capture and process image periodically
if current_time - self.state.last_image_time >= capture_interval:
self._exploration_step()
self.state.last_image_time = current_time
# Update status display
self._update_display()
# Small delay
time.sleep(0.1)
except Exception as e:
logger.error(f"Loop error: {e}")
self.state.error_message = str(e)
time.sleep(1)
def _exploration_step(self):
"""Single exploration step: capture, analyze, act"""
# Get robot status
if self.robot and not self.simulate:
try:
self.state.last_status = self.robot.get_status()
except Exception as e:
logger.error(f"Status error: {e}")
# Capture image
image_data = None
if self.robot and not self.simulate:
try:
camera_config = self.config.get("camera", {})
image_data = self.robot.capture_image(
resolution=camera_config.get("resolution", "VGA"),
quality=camera_config.get("quality", 12)
)
except Exception as e:
logger.error(f"Capture error: {e}")
# Build context message
context = self._build_context_message()
# Add Stefan's input if any
if self.state.stefan_input:
context += f"\n\nStefan sagt: {self.state.stefan_input}"
self.state.stefan_input = ""
# Send to Claude
try:
response = self.chat.send_message(context, image=image_data)
self.state.last_claude_response = response.text
# Speak response
self._speak(response.text)
# Execute commands
self._execute_commands(response.commands)
# Update robot display
if self.robot and not self.simulate:
# Send short version to robot display
short_text = response.text[:100] + "..." if len(response.text) > 100 else response.text
self.robot.set_claude_text(short_text)
except Exception as e:
logger.error(f"Chat error: {e}")
self.state.error_message = str(e)
def _build_context_message(self) -> str:
"""Build context message with sensor data"""
parts = ["Hier ist was ich gerade sehe und meine Sensordaten:"]
if self.state.last_status:
status = self.state.last_status
parts.append(f"\n- Abstand zum nächsten Hindernis: {status.distance_cm:.0f} cm")
parts.append(f"- Aktuelle Aktion: {status.current_action}")
parts.append(f"- Batterie: {status.battery_percent}%")
if status.obstacle_danger:
parts.append("- WARNUNG: Hindernis sehr nah!")
elif status.obstacle_warning:
parts.append("- Hinweis: Hindernis in der Nähe")
if status.is_tilted:
parts.append("- WARNUNG: Ich bin schief!")
parts.append("\nWas siehst du auf dem Bild? Was möchtest du als nächstes tun?")
return "\n".join(parts)
def _execute_commands(self, commands: list):
"""Execute movement commands from Claude"""
if not commands:
return
if self.simulate:
console.print(f"[dim]Simulated commands: {commands}[/dim]")
return
if not self.robot:
return
safety = self.config.get("safety", {})
max_speed = safety.get("max_speed", 70)
min_distance = safety.get("min_obstacle_distance", 20)
for cmd in commands:
# Safety check
if self.state.last_status and self.state.last_status.distance_cm < min_distance:
if cmd == "FORWARD":
console.print("[red]Blocked: Obstacle too close![/red]")
continue
try:
if cmd == "FORWARD":
self.robot.forward(speed=max_speed, duration_ms=800)
elif cmd == "BACKWARD":
self.robot.backward(speed=max_speed, duration_ms=800)
elif cmd == "LEFT":
self.robot.left(speed=max_speed, duration_ms=400)
elif cmd == "RIGHT":
self.robot.right(speed=max_speed, duration_ms=400)
elif cmd == "STOP":
self.robot.stop()
elif cmd == "LOOK_LEFT":
self.robot.look_left()
elif cmd == "LOOK_RIGHT":
self.robot.look_right()
elif cmd == "LOOK_UP":
self.robot.look_up()
elif cmd == "LOOK_DOWN":
self.robot.look_down()
elif cmd == "LOOK_CENTER":
self.robot.look_center()
# Small delay between commands
time.sleep(0.3)
except Exception as e:
logger.error(f"Command error ({cmd}): {e}")
def _speak(self, text: str):
"""Speak text using TTS"""
if self.tts:
# Remove command brackets from speech
import re
clean_text = re.sub(r'\[[A-Z_]+\]', '', text).strip()
if clean_text:
self.tts.speak_async(clean_text)
def _on_speech_detected(self, result: SpeechResult):
"""Callback when Stefan says something"""
console.print(f"\n[bold blue]Stefan:[/bold blue] {result.text}")
self.state.stefan_input = result.text
def _update_display(self):
"""Update console display"""
# This could be enhanced with rich.live for real-time updates
pass
def signal_handler(signum, frame):
"""Handle Ctrl+C gracefully"""
console.print("\n[yellow]Received stop signal...[/yellow]")
sys.exit(0)
@click.command()
@click.option('--config', '-c', default='config.yaml', help='Path to config file')
@click.option('--simulate', '-s', is_flag=True, help='Simulate without hardware')
@click.option('--debug', '-d', is_flag=True, help='Enable debug logging')
def main(config: str, simulate: bool, debug: bool):
"""Claude's Eyes - Autonomous Exploration Robot Bridge"""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
# Handle signals
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Find config file
config_path = Path(config)
if not config_path.is_absolute():
# Look in script directory first
script_dir = Path(__file__).parent
if (script_dir / config).exists():
config_path = script_dir / config
# Create and run bridge
bridge = ClaudesEyesBridge(str(config_path), simulate=simulate)
if bridge.initialize():
console.print("\n[bold cyan]Starting exploration...[/bold cyan]")
console.print("[dim]Press Ctrl+C to stop[/dim]\n")
bridge.start()
else:
console.print("[red]Initialization failed![/red]")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,257 +0,0 @@
"""
Claude's Eyes - Chat Interface
Interface to communicate with Claude AI (via API or browser)
"""
import logging
import base64
import re
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
@dataclass
class Message:
"""A chat message"""
role: str # "user" or "assistant"
content: str
image_data: Optional[bytes] = None # JPEG image data
@dataclass
class ChatResponse:
"""Response from Claude"""
text: str
commands: List[str] = field(default_factory=list) # Extracted movement commands
class ChatInterface(ABC):
"""Abstract base class for chat interfaces"""
@abstractmethod
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Send message to Claude and get response"""
pass
@abstractmethod
def reset_conversation(self) -> None:
"""Reset/clear conversation history"""
pass
class AnthropicAPIInterface(ChatInterface):
"""Direct Claude API interface using anthropic library"""
def __init__(
self,
api_key: str,
model: str = "claude-sonnet-4-20250514",
system_prompt: str = "",
max_tokens: int = 1024
):
import anthropic
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.system_prompt = system_prompt
self.max_tokens = max_tokens
self.conversation_history: List[Dict[str, Any]] = []
logger.info(f"Anthropic API interface initialized (model: {model})")
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Send message to Claude API"""
# Build message content
content = []
# Add image if provided
if image:
image_base64 = base64.standard_b64encode(image).decode("utf-8")
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_base64
}
})
# Add text
content.append({
"type": "text",
"text": text
})
# Add to history
self.conversation_history.append({
"role": "user",
"content": content
})
try:
# Make API call
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=self.system_prompt,
messages=self.conversation_history
)
# Extract response text
response_text = ""
for block in response.content:
if block.type == "text":
response_text += block.text
# Add assistant response to history
self.conversation_history.append({
"role": "assistant",
"content": response_text
})
# Extract commands
commands = self._extract_commands(response_text)
logger.debug(f"Claude response: {response_text[:100]}...")
logger.debug(f"Extracted commands: {commands}")
return ChatResponse(text=response_text, commands=commands)
except Exception as e:
logger.error(f"API error: {e}")
raise
def reset_conversation(self) -> None:
"""Reset conversation history"""
self.conversation_history = []
logger.info("Conversation history cleared")
def _extract_commands(self, text: str) -> List[str]:
"""Extract movement commands from Claude's response"""
# Commands are in brackets like [FORWARD], [LEFT], etc.
pattern = r'\[([A-Z_]+)\]'
matches = re.findall(pattern, text)
valid_commands = [
"FORWARD", "BACKWARD", "LEFT", "RIGHT", "STOP",
"LOOK_LEFT", "LOOK_RIGHT", "LOOK_UP", "LOOK_DOWN", "LOOK_CENTER"
]
return [cmd for cmd in matches if cmd in valid_commands]
class SimulatedInterface(ChatInterface):
"""Simulated chat interface for testing without API"""
def __init__(self):
self.message_count = 0
logger.info("Simulated chat interface initialized")
def send_message(self, text: str, image: Optional[bytes] = None) -> ChatResponse:
"""Return simulated responses"""
self.message_count += 1
responses = [
("Oh interessant! Ich sehe etwas vor mir. Lass mich näher hinfahren. [FORWARD]",
["FORWARD"]),
("Hmm, was ist das links? Ich schaue mal nach. [LOOK_LEFT]",
["LOOK_LEFT"]),
("Das sieht aus wie ein Bücherregal! Ich fahre mal hin. [FORWARD] [FORWARD]",
["FORWARD", "FORWARD"]),
("Stefan, was ist das für ein Gegenstand? Kannst du mir das erklären?",
[]),
("Ich drehe mich um und schaue was hinter mir ist. [RIGHT] [RIGHT]",
["RIGHT", "RIGHT"]),
]
idx = (self.message_count - 1) % len(responses)
text_response, commands = responses[idx]
return ChatResponse(text=text_response, commands=commands)
def reset_conversation(self) -> None:
self.message_count = 0
def create_chat_interface(
use_api: bool = True,
api_key: str = "",
model: str = "claude-sonnet-4-20250514",
system_prompt: str = "",
max_tokens: int = 1024
) -> ChatInterface:
"""
Factory function to create chat interface
Args:
use_api: Use Anthropic API (True) or simulated (False)
api_key: Anthropic API key
model: Claude model to use
system_prompt: System prompt for Claude
max_tokens: Maximum response tokens
"""
if use_api:
if not api_key:
import os
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
logger.warning("No API key provided, using simulated interface")
return SimulatedInterface()
return AnthropicAPIInterface(
api_key=api_key,
model=model,
system_prompt=system_prompt,
max_tokens=max_tokens
)
else:
return SimulatedInterface()
# Test when run directly
if __name__ == "__main__":
import os
logging.basicConfig(level=logging.DEBUG)
print("Chat Interface Test")
print("=" * 40)
# Try API first, fall back to simulated
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
system_prompt = """Du bist Claude und steuerst einen Erkundungsroboter.
Befehle in Klammern: [FORWARD], [BACKWARD], [LEFT], [RIGHT], [STOP]
Beschreibe was du siehst und entscheide wohin du fährst."""
interface = create_chat_interface(
use_api=bool(api_key),
api_key=api_key,
system_prompt=system_prompt
)
print(f"Using: {type(interface).__name__}")
print()
# Test conversation
test_messages = [
"Hallo Claude! Du bist jetzt online. Was siehst du?",
"Vor dir ist ein Flur mit einer Tür am Ende.",
"Die Tür ist offen und dahinter ist ein helles Zimmer."
]
for msg in test_messages:
print(f"User: {msg}")
response = interface.send_message(msg)
print(f"Claude: {response.text}")
if response.commands:
print(f" Commands: {response.commands}")
print()
print("Done!")

View File

@ -1,238 +0,0 @@
"""
Claude's Eyes - ESP32 API Client
Handles communication with the robot's REST API
"""
import requests
from typing import Optional, Dict, Any
from dataclasses import dataclass
from io import BytesIO
from PIL import Image
import logging
logger = logging.getLogger(__name__)
@dataclass
class RobotStatus:
"""Current robot status from sensors"""
distance_cm: float
battery_percent: int
current_action: str
wifi_rssi: int
uptime_seconds: int
servo_pan: int
servo_tilt: int
obstacle_warning: bool
obstacle_danger: bool
is_tilted: bool
is_moving: bool
imu: Dict[str, float]
class ESP32Client:
"""Client for communicating with the ESP32 robot"""
def __init__(self, host: str, port: int = 80, api_key: str = "", timeout: int = 10):
self.base_url = f"http://{host}:{port}"
self.api_key = api_key
self.timeout = timeout
self._session = requests.Session()
def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
"""Make GET request with API key"""
if params is None:
params = {}
params["key"] = self.api_key
url = f"{self.base_url}{endpoint}"
logger.debug(f"GET {url}")
response = self._session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response
def _post(self, endpoint: str, data: Dict) -> requests.Response:
"""Make POST request with API key"""
url = f"{self.base_url}{endpoint}?key={self.api_key}"
logger.debug(f"POST {url} with {data}")
response = self._session.post(url, json=data, timeout=self.timeout)
response.raise_for_status()
return response
def capture_image(self, resolution: str = "VGA", quality: int = 12) -> bytes:
"""
Capture image from robot camera
Args:
resolution: QVGA, VGA, SVGA, XGA, SXGA, UXGA
quality: 10-63 (lower = better)
Returns:
JPEG image data as bytes
"""
params = {
"resolution": resolution,
"quality": quality
}
response = self._get("/api/capture", params)
logger.info(f"Captured image: {len(response.content)} bytes")
return response.content
def capture_image_pil(self, resolution: str = "VGA", quality: int = 12) -> Image.Image:
"""Capture image and return as PIL Image"""
image_data = self.capture_image(resolution, quality)
return Image.open(BytesIO(image_data))
def get_status(self) -> RobotStatus:
"""Get current robot status from sensors"""
response = self._get("/api/status")
data = response.json()
return RobotStatus(
distance_cm=data.get("distance_cm", 0),
battery_percent=data.get("battery_percent", 100),
current_action=data.get("current_action", "unknown"),
wifi_rssi=data.get("wifi_rssi", 0),
uptime_seconds=data.get("uptime_seconds", 0),
servo_pan=data.get("servo_pan", 90),
servo_tilt=data.get("servo_tilt", 90),
obstacle_warning=data.get("obstacle_warning", False),
obstacle_danger=data.get("obstacle_danger", False),
is_tilted=data.get("is_tilted", False),
is_moving=data.get("is_moving", False),
imu=data.get("imu", {})
)
def send_command(self, action: str, speed: int = 50, duration_ms: int = 500,
pan: Optional[int] = None, tilt: Optional[int] = None) -> Dict[str, Any]:
"""
Send movement command to robot
Args:
action: forward, backward, left, right, stop,
look_left, look_right, look_up, look_down, look_center, look_custom
speed: 0-100 percent
duration_ms: Duration in milliseconds
pan: Custom pan angle (for look_custom)
tilt: Custom tilt angle (for look_custom)
Returns:
Response from robot
"""
data = {
"action": action,
"speed": speed,
"duration_ms": duration_ms
}
if pan is not None:
data["pan"] = pan
if tilt is not None:
data["tilt"] = tilt
response = self._post("/api/command", data)
result = response.json()
logger.info(f"Command {action}: {result.get('message', 'OK')}")
return result
# Convenience methods for common actions
def forward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("forward", speed, duration_ms)
def backward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("backward", speed, duration_ms)
def left(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("left", speed, duration_ms)
def right(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("right", speed, duration_ms)
def stop(self) -> Dict:
return self.send_command("stop")
def look_left(self) -> Dict:
return self.send_command("look_left")
def look_right(self) -> Dict:
return self.send_command("look_right")
def look_up(self) -> Dict:
return self.send_command("look_up")
def look_down(self) -> Dict:
return self.send_command("look_down")
def look_center(self) -> Dict:
return self.send_command("look_center")
def look_custom(self, pan: int, tilt: int) -> Dict:
return self.send_command("look_custom", pan=pan, tilt=tilt)
def set_claude_text(self, text: str) -> Dict:
"""Set text that Claude wants to say/display"""
response = self._post("/api/claude_text", {"text": text})
return response.json()
def get_claude_text(self) -> Dict[str, Any]:
"""Get last Claude text (for TTS)"""
response = self._get("/api/claude_text")
return response.json()
def set_display(self, mode: str, content: str = "") -> Dict:
"""
Control robot display
Args:
mode: "text", "emoji", "status"
content: Text to show or emoji name (happy, thinking, surprised, sleepy, curious, confused)
"""
response = self._post("/api/display", {"mode": mode, "content": content})
return response.json()
def is_connected(self) -> bool:
"""Check if robot is reachable"""
try:
self.get_status()
return True
except Exception as e:
logger.warning(f"Connection check failed: {e}")
return False
# Test when run directly
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
print("Usage: python esp32_client.py <robot_ip>")
sys.exit(1)
host = sys.argv[1]
api_key = "claudes_eyes_secret_2025"
client = ESP32Client(host, api_key=api_key)
print(f"Connecting to {host}...")
if client.is_connected():
print("Connected!")
status = client.get_status()
print(f"\nStatus:")
print(f" Distance: {status.distance_cm} cm")
print(f" Battery: {status.battery_percent}%")
print(f" Action: {status.current_action}")
print(f" WiFi RSSI: {status.wifi_rssi} dBm")
print("\nCapturing image...")
img = client.capture_image_pil()
print(f" Size: {img.size}")
img.save("test_capture.jpg")
print(" Saved to test_capture.jpg")
else:
print("Could not connect to robot!")

View File

@ -56,16 +56,13 @@ heartbeat:
# Text-to-Speech (Claudes Stimme) # Text-to-Speech (Claudes Stimme)
# ============================================================================ # ============================================================================
tts: tts:
# Engine: "pyttsx3" (offline), "gtts" (Google, online), "termux" (Android) # Engine: "pyttsx3" (offline) oder "gtts" (Google, online)
engine: "gtts" engine: "gtts"
# Sprache # Sprache
language: "de" language: "de"
# Sprechgeschwindigkeit # Sprechgeschwindigkeit (nur pyttsx3: Wörter pro Minute, 100-200)
# pyttsx3: Wörter pro Minute (100-200)
# gtts: nicht unterstützt
# termux: 0.5-2.0 (1.0 = normal)
rate: 150 rate: 150
# Lautstärke (nur pyttsx3) # Lautstärke (nur pyttsx3)
@ -79,7 +76,7 @@ tts:
# Speech-to-Text (Stefans Mikrofon) # Speech-to-Text (Stefans Mikrofon)
# ============================================================================ # ============================================================================
stt: stt:
# Engine: "standard" (SpeechRecognition) oder "termux" (Android) # Engine: "standard" (SpeechRecognition)
engine: "standard" engine: "standard"
# Erkennungsdienst (nur für standard engine) # Erkennungsdienst (nur für standard engine)
@ -101,14 +98,6 @@ stt:
# Bei langen Sätzen höher setzen (max 2 Minuten = 120s) # Bei langen Sätzen höher setzen (max 2 Minuten = 120s)
phrase_time_limit: 120 phrase_time_limit: 120
# ============================================================================
# Termux (Android) Einstellungen
# ============================================================================
termux:
# Nutze Termux:API für TTS/STT statt Python-Libraries
# Setzt engine in tts/stt automatisch auf "termux"
use_termux_api: false
# ============================================================================ # ============================================================================
# ESP32 Roboter (Referenz für Claude's web_fetch Aufrufe) # ESP32 Roboter (Referenz für Claude's web_fetch Aufrufe)
# ============================================================================ # ============================================================================
@ -125,6 +114,22 @@ esp32:
# Für Zugriff von außen: DynDNS, Tailscale, oder Port-Forward nötig # Für Zugriff von außen: DynDNS, Tailscale, oder Port-Forward nötig
# external_url: "https://mein-roboter.dyndns.org" # external_url: "https://mein-roboter.dyndns.org"
# ============================================================================
# Mock ESP32 Server (für Tests ohne echte Hardware)
# ============================================================================
mock:
# Echte USB-Webcam nutzen statt Testbilder?
# true = Bilder von angeschlossener Webcam (benötigt opencv-python)
# false = Bilder aus ./test_images/ Ordner
use_real_webcam: false
# Webcam-Gerätenummer (0 = erste Webcam, 1 = zweite, etc.)
webcam_device: 0
# Webcam-Auflösung
webcam_width: 640
webcam_height: 480
# ============================================================================ # ============================================================================
# Logging # Logging
# ============================================================================ # ============================================================================

View File

@ -6,11 +6,13 @@ Simuliert den ESP32-Roboter für Tests ohne echte Hardware.
Features: Features:
- Liefert Testbilder aus ./test_images/ - Liefert Testbilder aus ./test_images/
- ODER nutzt eine echte USB-Webcam (use_real_webcam: true in config)
- Simuliert Fahrbefehle (loggt sie) - Simuliert Fahrbefehle (loggt sie)
- Liefert Fake-Sensordaten - Liefert Fake-Sensordaten
Usage: Usage:
1. Leg JPG-Bilder in ./test_images/ (z.B. Fotos aus deiner Wohnung) 1. Leg JPG-Bilder in ./test_images/ (z.B. Fotos aus deiner Wohnung)
ODER aktiviere use_real_webcam in config.yaml
2. python mock_esp32.py 2. python mock_esp32.py
3. In config.yaml: host: "localhost", port: 5000 3. In config.yaml: host: "localhost", port: 5000
4. Starte die Bridge - Claude "fährt" durch deine Testbilder! 4. Starte die Bridge - Claude "fährt" durch deine Testbilder!
@ -20,11 +22,19 @@ import os
import random import random
import logging import logging
import base64 import base64
import yaml
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from flask import Flask, jsonify, send_file, request, Response from flask import Flask, jsonify, send_file, request, Response
# OpenCV für Webcam (optional)
try:
import cv2
OPENCV_AVAILABLE = True
except ImportError:
OPENCV_AVAILABLE = False
# Logging # Logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,15 +42,124 @@ logger = logging.getLogger(__name__)
app = Flask(__name__) app = Flask(__name__)
# Konfiguration # Konfiguration
IMAGES_DIR = Path(__file__).parent / "test_images" SCRIPT_DIR = Path(__file__).parent
IMAGES_DIR = SCRIPT_DIR / "test_images"
FOTO_PATH = SCRIPT_DIR / "foto.jpg" # Hier wird das aktuelle Foto gespeichert
API_KEY = "claudes_eyes_secret_2025" API_KEY = "claudes_eyes_secret_2025"
# Mock-Konfiguration (wird aus config.yaml geladen)
mock_config = {
"use_real_webcam": False,
"webcam_device": 0,
"webcam_width": 640,
"webcam_height": 480,
}
# Webcam-Objekt (wird bei Bedarf initialisiert)
webcam = None
# State # State
current_image_index = 0 current_image_index = 0
position = {"x": 0, "y": 0, "rotation": 0} position = {"x": 0, "y": 0, "rotation": 0}
camera_angle = {"pan": 90, "tilt": 90} camera_angle = {"pan": 90, "tilt": 90}
def load_mock_config():
"""Lädt die Mock-Konfiguration aus config.yaml"""
global mock_config
# Versuche config.local.yaml zuerst, dann config.yaml
for config_name in ["config.local.yaml", "config.yaml"]:
config_path = SCRIPT_DIR / config_name
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config and "mock" in config:
mock_config.update(config["mock"])
logger.info(f"Mock-Config geladen aus {config_name}")
return
except Exception as e:
logger.warning(f"Fehler beim Laden von {config_name}: {e}")
logger.info("Keine Mock-Config gefunden, nutze Defaults")
def init_webcam():
"""Initialisiert die Webcam"""
global webcam
if not OPENCV_AVAILABLE:
logger.error("OpenCV nicht installiert! Installiere mit: pip install opencv-python")
return False
if webcam is not None:
return True
device = mock_config.get("webcam_device", 0)
width = mock_config.get("webcam_width", 640)
height = mock_config.get("webcam_height", 480)
try:
webcam = cv2.VideoCapture(device)
if not webcam.isOpened():
logger.error(f"Konnte Webcam {device} nicht öffnen!")
webcam = None
return False
# Auflösung setzen
webcam.set(cv2.CAP_PROP_FRAME_WIDTH, width)
webcam.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
actual_w = int(webcam.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_h = int(webcam.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"Webcam {device} initialisiert: {actual_w}x{actual_h}")
return True
except Exception as e:
logger.error(f"Webcam-Fehler: {e}")
webcam = None
return False
def capture_from_webcam() -> bool:
"""
Nimmt ein Bild von der Webcam auf und speichert es als foto.jpg
Returns:
True wenn erfolgreich, False bei Fehler
"""
global webcam
if not init_webcam():
return False
try:
# Bild aufnehmen
ret, frame = webcam.read()
if not ret or frame is None:
logger.error("Konnte kein Bild von Webcam lesen!")
return False
# Als JPEG speichern
cv2.imwrite(str(FOTO_PATH), frame)
logger.info(f"📷 Webcam-Bild aufgenommen: {FOTO_PATH.name}")
return True
except Exception as e:
logger.error(f"Webcam-Capture-Fehler: {e}")
return False
def release_webcam():
"""Gibt die Webcam frei"""
global webcam
if webcam is not None:
webcam.release()
webcam = None
logger.info("Webcam freigegeben")
def check_api_key(): def check_api_key():
"""Prüft den API-Key""" """Prüft den API-Key"""
key = request.args.get("key", "") key = request.args.get("key", "")
@ -95,17 +214,44 @@ def capture():
Das ist wie beim echten ESP32 - Bild wird direkt gestreamt. Das ist wie beim echten ESP32 - Bild wird direkt gestreamt.
Kein JSON, sondern das Bild selbst! Kein JSON, sondern das Bild selbst!
Je nach Konfiguration:
- use_real_webcam: true Bild von USB-Webcam
- use_real_webcam: false Bild aus test_images/ Ordner
""" """
global current_image_index global current_image_index
if not check_api_key(): if not check_api_key():
return jsonify({"error": "Invalid API key"}), 401 return jsonify({"error": "Invalid API key"}), 401
# Finde Testbilder # ════════════════════════════════════════════════════════════════
# WEBCAM-MODUS: Echtes Bild von USB-Webcam
# ════════════════════════════════════════════════════════════════
if mock_config.get("use_real_webcam", False):
if capture_from_webcam():
# Foto wurde in foto.jpg gespeichert, das zurückgeben
if FOTO_PATH.exists():
return send_file(FOTO_PATH, mimetype="image/jpeg")
else:
return jsonify({"error": "Webcam-Capture fehlgeschlagen"}), 500
else:
# Fallback: Bestehendes foto.jpg nutzen falls vorhanden
if FOTO_PATH.exists():
logger.warning("Webcam-Fehler, nutze bestehendes foto.jpg")
return send_file(FOTO_PATH, mimetype="image/jpeg")
return jsonify({"error": "Webcam nicht verfügbar und kein foto.jpg vorhanden"}), 500
# ════════════════════════════════════════════════════════════════
# TESTBILD-MODUS: Bild aus test_images/ Ordner
# ════════════════════════════════════════════════════════════════
if not IMAGES_DIR.exists(): if not IMAGES_DIR.exists():
IMAGES_DIR.mkdir(parents=True) IMAGES_DIR.mkdir(parents=True)
# Fallback: foto.jpg nutzen falls vorhanden
if FOTO_PATH.exists():
logger.info("📷 Kein test_images/, nutze foto.jpg")
return send_file(FOTO_PATH, mimetype="image/jpeg")
return jsonify({ return jsonify({
"error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab." "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab oder aktiviere use_real_webcam."
}), 404 }), 404
images = sorted(IMAGES_DIR.glob("*.jpg")) images = sorted(IMAGES_DIR.glob("*.jpg"))
@ -113,8 +259,12 @@ def capture():
images = sorted(IMAGES_DIR.glob("*.png")) images = sorted(IMAGES_DIR.glob("*.png"))
if not images: if not images:
# Fallback: foto.jpg nutzen falls vorhanden
if FOTO_PATH.exists():
logger.info("📷 Keine Testbilder, nutze foto.jpg")
return send_file(FOTO_PATH, mimetype="image/jpeg")
return jsonify({ return jsonify({
"error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab." "error": f"Keine Bilder gefunden! Leg JPGs in {IMAGES_DIR} ab oder aktiviere use_real_webcam."
}), 404 }), 404
# Aktuelles Testbild holen # Aktuelles Testbild holen
@ -132,15 +282,18 @@ def get_foto():
Liefert das aktuelle Foto - immer dieselbe URL! Liefert das aktuelle Foto - immer dieselbe URL!
Das ist der Hauptendpoint für Claude.ai Chat. Das ist der Hauptendpoint für Claude.ai Chat.
Nach /api/capture liegt das neue Bild hier. Bei Webcam-Modus wird hier immer das letzte Webcam-Bild geliefert.
""" """
foto_path = IMAGES_DIR.parent / "foto.jpg" if not FOTO_PATH.exists():
# Bei Webcam-Modus: Mach ein Foto falls noch keins da ist
if mock_config.get("use_real_webcam", False):
if capture_from_webcam():
return send_file(FOTO_PATH, mimetype="image/jpeg")
if not foto_path.exists():
return jsonify({"error": "Noch kein Foto aufgenommen! Erst /api/capture aufrufen."}), 404 return jsonify({"error": "Noch kein Foto aufgenommen! Erst /api/capture aufrufen."}), 404
logger.info(f"📷 Foto abgerufen: foto.jpg") logger.info(f"📷 Foto abgerufen: foto.jpg")
return send_file(foto_path, mimetype="image/jpeg") return send_file(FOTO_PATH, mimetype="image/jpeg")
@app.route("/api/status", methods=["GET"]) @app.route("/api/status", methods=["GET"])
@ -266,6 +419,13 @@ def display():
def main(): def main():
"""Startet den Mock-Server""" """Startet den Mock-Server"""
# Lade Konfiguration
load_mock_config()
use_webcam = mock_config.get("use_real_webcam", False)
webcam_device = mock_config.get("webcam_device", 0)
print(""" print("""
@ -273,18 +433,31 @@ def main():
Simuliert den Roboter für Tests ohne Hardware. Simuliert den Roboter für Tests ohne Hardware.
""")
if use_webcam:
print("""║ ║
📷 WEBCAM-MODUS AKTIV
Bilder kommen von deiner USB-Webcam (Device {device})
""".format(device=webcam_device))
else:
print("""║ ║
📁 TESTBILD-MODUS
Leg Testbilder in ./test_images/ ab (JPG oder PNG)
Tipp: Mach 10-20 Fotos aus deiner Wohnung!
1. Leg Testbilder in ./test_images/ ab (JPG oder PNG) ODER aktiviere Webcam in config.yaml:
Tipp: Mach 10-20 Fotos aus deiner Wohnung! mock:
use_real_webcam: true
""")
print("""╠══════════════════════════════════════════════════════════════╣
2. Passe config.yaml an: Für die Bridge - config.yaml:
esp32: esp32:
host: "localhost" host: "localhost"
port: 5000 port: 5000
3. Starte die Bridge in einem anderen Terminal
Server: http://localhost:5000 Server: http://localhost:5000
@ -293,26 +466,52 @@ def main():
""".format(api_key=API_KEY)) """.format(api_key=API_KEY))
# Erstelle Bilder-Ordner falls nicht existiert # Webcam testen falls aktiviert
if not IMAGES_DIR.exists(): if use_webcam:
IMAGES_DIR.mkdir(parents=True) if not OPENCV_AVAILABLE:
print(f"\n⚠️ Ordner {IMAGES_DIR} erstellt - leg dort Testbilder ab!\n") print("❌ OpenCV nicht installiert!")
print(" Installiere mit: pip install opencv-python")
# Zähle Bilder print(" Oder deaktiviere Webcam in config.yaml\n")
images = list(IMAGES_DIR.glob("*.jpg")) + list(IMAGES_DIR.glob("*.png")) else:
if images: print(f"📷 Teste Webcam {webcam_device}...")
print(f"📁 Gefunden: {len(images)} Testbilder") if init_webcam():
for img in images[:5]: print(f"✅ Webcam bereit!")
print(f" - {img.name}") # Test-Capture
if len(images) > 5: if capture_from_webcam():
print(f" ... und {len(images) - 5} weitere") print(f"✅ Test-Bild aufgenommen: {FOTO_PATH}")
else:
print(f"❌ Webcam {webcam_device} konnte nicht geöffnet werden!")
print(" Prüfe ob eine Webcam angeschlossen ist.\n")
else: else:
print(f"⚠️ Keine Bilder in {IMAGES_DIR} gefunden!") # Erstelle Bilder-Ordner falls nicht existiert
print(" Leg dort JPG/PNG-Dateien ab für den Test.\n") if not IMAGES_DIR.exists():
IMAGES_DIR.mkdir(parents=True)
print(f"\n⚠️ Ordner {IMAGES_DIR} erstellt - leg dort Testbilder ab!\n")
# Zähle Bilder
images = list(IMAGES_DIR.glob("*.jpg")) + list(IMAGES_DIR.glob("*.png"))
if images:
print(f"📁 Gefunden: {len(images)} Testbilder")
for img in images[:5]:
print(f" - {img.name}")
if len(images) > 5:
print(f" ... und {len(images) - 5} weitere")
else:
# Prüfe ob foto.jpg existiert
if FOTO_PATH.exists():
print(f"📷 Nutze bestehendes {FOTO_PATH.name}")
else:
print(f"⚠️ Keine Bilder in {IMAGES_DIR} gefunden!")
print(" Leg dort JPG/PNG-Dateien ab für den Test.")
print(" Oder aktiviere use_real_webcam in config.yaml\n")
print("\n🚀 Starte Server...\n") print("\n🚀 Starte Server...\n")
app.run(host="0.0.0.0", port=5000, debug=True) try:
app.run(host="0.0.0.0", port=5000, debug=True)
finally:
# Webcam freigeben beim Beenden
release_webcam()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -173,148 +173,21 @@ class STTEngine:
return None return None
class TermuxSTTEngine: def create_stt_engine(engine_type: str = "standard", **kwargs) -> STTEngine:
"""
STT via Termux:API für Android
Benötigt:
- Termux App
- Termux:API App
- pkg install termux-api
"""
def __init__(self, language: str = "de-DE", timeout: int = 10):
self.language = language
self.timeout = timeout
self._listening = False
self._stop_flag = False
self._thread: Optional[threading.Thread] = None
self._callback: Optional[Callable[[SpeechResult], None]] = None
# Teste ob termux-speech-to-text verfügbar ist
import shutil
if not shutil.which("termux-speech-to-text"):
raise RuntimeError(
"termux-speech-to-text nicht gefunden! "
"Installiere mit: pkg install termux-api"
)
logger.info(f"Termux STT engine initialized (language: {language})")
def listen_once(self, timeout: Optional[float] = None) -> Optional[SpeechResult]:
"""
Listen for a single phrase via Termux API
Args:
timeout: Maximum time to wait (uses class timeout if None)
Returns:
SpeechResult or None if nothing recognized
"""
import subprocess
import json
actual_timeout = timeout if timeout else self.timeout
try:
# termux-speech-to-text gibt JSON zurück
result = subprocess.run(
["termux-speech-to-text"],
capture_output=True,
text=True,
timeout=actual_timeout + 5 # Extra Zeit für API
)
if result.returncode != 0:
logger.error(f"Termux STT error: {result.stderr}")
return None
# Output ist ein String (kein JSON bei Termux)
text = result.stdout.strip()
if text:
return SpeechResult(
text=text,
confidence=0.8, # Termux gibt keine Konfidenz
is_final=True
)
return None
except subprocess.TimeoutExpired:
logger.debug("Termux STT timeout")
return None
except Exception as e:
logger.error(f"Termux STT error: {e}")
return None
def start_continuous(self, callback: Callable[[SpeechResult], None]) -> None:
"""Start continuous listening in background"""
if self._listening:
logger.warning("Already listening")
return
self._callback = callback
self._stop_flag = False
self._listening = True
self._thread = threading.Thread(target=self._listen_loop, daemon=True)
self._thread.start()
logger.info("Termux continuous listening started")
def stop_continuous(self) -> None:
"""Stop continuous listening"""
self._stop_flag = True
self._listening = False
if self._thread:
self._thread.join(timeout=2)
self._thread = None
logger.info("Termux continuous listening stopped")
def _listen_loop(self):
"""Background thread for continuous listening"""
while not self._stop_flag:
try:
result = self.listen_once(timeout=5)
if result and self._callback:
self._callback(result)
except Exception as e:
if not self._stop_flag:
logger.error(f"Termux listen loop error: {e}")
# Kleine Pause zwischen Aufnahmen
import time
time.sleep(0.5)
def is_listening(self) -> bool:
return self._listening
def create_stt_engine(engine_type: str = "standard", **kwargs):
""" """
Factory function to create STT engine Factory function to create STT engine
Args: Args:
engine_type: "standard" or "termux" engine_type: "standard"
**kwargs: Engine-specific options **kwargs: Engine-specific options
""" """
if engine_type == "termux": return STTEngine(
return TermuxSTTEngine( energy_threshold=kwargs.get("energy_threshold", 300),
language=kwargs.get("language", "de-DE"), pause_threshold=kwargs.get("pause_threshold", 0.8),
timeout=kwargs.get("phrase_time_limit", 15) phrase_time_limit=kwargs.get("phrase_time_limit", 15),
) service=kwargs.get("service", "google"),
else: language=kwargs.get("language", "de-DE")
# Standard SpeechRecognition engine )
return STTEngine(
energy_threshold=kwargs.get("energy_threshold", 300),
pause_threshold=kwargs.get("pause_threshold", 0.8),
phrase_time_limit=kwargs.get("phrase_time_limit", 15),
service=kwargs.get("service", "google"),
language=kwargs.get("language", "de-DE")
)
# Test when run directly # Test when run directly

View File

@ -260,114 +260,12 @@ class GTTSEngine(TTSEngine):
return self._speaking return self._speaking
class TermuxTTSEngine(TTSEngine):
"""
TTS via Termux:API für Android
Benötigt:
- Termux App
- Termux:API App
- pkg install termux-api
"""
def __init__(self, language: str = "de", rate: float = 1.0):
self.language = language
self.rate = rate
self._speaking = False
self._queue = queue.Queue()
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
self._process = None
# Teste ob termux-tts-speak verfügbar ist
import shutil
if not shutil.which("termux-tts-speak"):
raise RuntimeError(
"termux-tts-speak nicht gefunden! "
"Installiere mit: pkg install termux-api"
)
logger.info(f"Termux TTS engine initialized (language: {language})")
def speak(self, text: str) -> None:
"""Speak text via Termux API (blocking)"""
import subprocess
self._speaking = True
try:
# termux-tts-speak Optionen:
# -l <language> - Sprache (z.B. "de" oder "de-DE")
# -r <rate> - Geschwindigkeit (0.5 bis 2.0, default 1.0)
# -p <pitch> - Tonhöhe (0.5 bis 2.0, default 1.0)
# -s <stream> - Audio Stream (ALARM, MUSIC, NOTIFICATION, RING, SYSTEM, VOICE_CALL)
cmd = [
"termux-tts-speak",
"-l", self.language,
"-r", str(self.rate),
text
]
self._process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self._process.wait() # Warte bis fertig
self._process = None
except Exception as e:
logger.error(f"Termux TTS error: {e}")
finally:
self._speaking = False
def speak_async(self, text: str) -> None:
"""Speak text (non-blocking)"""
self._queue.put(text)
if self._thread is None or not self._thread.is_alive():
self._stop_flag = False
self._thread = threading.Thread(target=self._speech_worker, daemon=True)
self._thread.start()
def _speech_worker(self):
"""Worker thread for async speech"""
while not self._stop_flag:
try:
text = self._queue.get(timeout=0.5)
self.speak(text)
self._queue.task_done()
except queue.Empty:
continue
def stop(self) -> None:
"""Stop current speech"""
self._stop_flag = True
# Beende laufenden Prozess
if self._process:
try:
self._process.terminate()
except:
pass
# Clear queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
def is_speaking(self) -> bool:
return self._speaking
def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine: def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine:
""" """
Factory function to create TTS engine Factory function to create TTS engine
Args: Args:
engine_type: "pyttsx3", "gtts", or "termux" engine_type: "pyttsx3" or "gtts"
**kwargs: Engine-specific options **kwargs: Engine-specific options
""" """
if engine_type == "pyttsx3": if engine_type == "pyttsx3":
@ -380,11 +278,6 @@ def create_tts_engine(engine_type: str = "pyttsx3", **kwargs) -> TTSEngine:
return GTTSEngine( return GTTSEngine(
language=kwargs.get("language", "de") language=kwargs.get("language", "de")
) )
elif engine_type == "termux":
return TermuxTTSEngine(
language=kwargs.get("language", "de"),
rate=kwargs.get("rate", 1.0)
)
else: else:
raise ValueError(f"Unknown TTS engine: {engine_type}") raise ValueError(f"Unknown TTS engine: {engine_type}")