esp32-claude-robbie/python_bridge/ARCHIV/esp32_client.py

239 lines
7.5 KiB
Python

"""
Claude's Eyes - ESP32 API Client
Handles communication with the robot's REST API
"""
import requests
from typing import Optional, Dict, Any
from dataclasses import dataclass
from io import BytesIO
from PIL import Image
import logging
logger = logging.getLogger(__name__)
@dataclass
class RobotStatus:
"""Current robot status from sensors"""
distance_cm: float
battery_percent: int
current_action: str
wifi_rssi: int
uptime_seconds: int
servo_pan: int
servo_tilt: int
obstacle_warning: bool
obstacle_danger: bool
is_tilted: bool
is_moving: bool
imu: Dict[str, float]
class ESP32Client:
"""Client for communicating with the ESP32 robot"""
def __init__(self, host: str, port: int = 80, api_key: str = "", timeout: int = 10):
self.base_url = f"http://{host}:{port}"
self.api_key = api_key
self.timeout = timeout
self._session = requests.Session()
def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
"""Make GET request with API key"""
if params is None:
params = {}
params["key"] = self.api_key
url = f"{self.base_url}{endpoint}"
logger.debug(f"GET {url}")
response = self._session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response
def _post(self, endpoint: str, data: Dict) -> requests.Response:
"""Make POST request with API key"""
url = f"{self.base_url}{endpoint}?key={self.api_key}"
logger.debug(f"POST {url} with {data}")
response = self._session.post(url, json=data, timeout=self.timeout)
response.raise_for_status()
return response
def capture_image(self, resolution: str = "VGA", quality: int = 12) -> bytes:
"""
Capture image from robot camera
Args:
resolution: QVGA, VGA, SVGA, XGA, SXGA, UXGA
quality: 10-63 (lower = better)
Returns:
JPEG image data as bytes
"""
params = {
"resolution": resolution,
"quality": quality
}
response = self._get("/api/capture", params)
logger.info(f"Captured image: {len(response.content)} bytes")
return response.content
def capture_image_pil(self, resolution: str = "VGA", quality: int = 12) -> Image.Image:
"""Capture image and return as PIL Image"""
image_data = self.capture_image(resolution, quality)
return Image.open(BytesIO(image_data))
def get_status(self) -> RobotStatus:
"""Get current robot status from sensors"""
response = self._get("/api/status")
data = response.json()
return RobotStatus(
distance_cm=data.get("distance_cm", 0),
battery_percent=data.get("battery_percent", 100),
current_action=data.get("current_action", "unknown"),
wifi_rssi=data.get("wifi_rssi", 0),
uptime_seconds=data.get("uptime_seconds", 0),
servo_pan=data.get("servo_pan", 90),
servo_tilt=data.get("servo_tilt", 90),
obstacle_warning=data.get("obstacle_warning", False),
obstacle_danger=data.get("obstacle_danger", False),
is_tilted=data.get("is_tilted", False),
is_moving=data.get("is_moving", False),
imu=data.get("imu", {})
)
def send_command(self, action: str, speed: int = 50, duration_ms: int = 500,
pan: Optional[int] = None, tilt: Optional[int] = None) -> Dict[str, Any]:
"""
Send movement command to robot
Args:
action: forward, backward, left, right, stop,
look_left, look_right, look_up, look_down, look_center, look_custom
speed: 0-100 percent
duration_ms: Duration in milliseconds
pan: Custom pan angle (for look_custom)
tilt: Custom tilt angle (for look_custom)
Returns:
Response from robot
"""
data = {
"action": action,
"speed": speed,
"duration_ms": duration_ms
}
if pan is not None:
data["pan"] = pan
if tilt is not None:
data["tilt"] = tilt
response = self._post("/api/command", data)
result = response.json()
logger.info(f"Command {action}: {result.get('message', 'OK')}")
return result
# Convenience methods for common actions
def forward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("forward", speed, duration_ms)
def backward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("backward", speed, duration_ms)
def left(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("left", speed, duration_ms)
def right(self, speed: int = 50, duration_ms: int = 500) -> Dict:
return self.send_command("right", speed, duration_ms)
def stop(self) -> Dict:
return self.send_command("stop")
def look_left(self) -> Dict:
return self.send_command("look_left")
def look_right(self) -> Dict:
return self.send_command("look_right")
def look_up(self) -> Dict:
return self.send_command("look_up")
def look_down(self) -> Dict:
return self.send_command("look_down")
def look_center(self) -> Dict:
return self.send_command("look_center")
def look_custom(self, pan: int, tilt: int) -> Dict:
return self.send_command("look_custom", pan=pan, tilt=tilt)
def set_claude_text(self, text: str) -> Dict:
"""Set text that Claude wants to say/display"""
response = self._post("/api/claude_text", {"text": text})
return response.json()
def get_claude_text(self) -> Dict[str, Any]:
"""Get last Claude text (for TTS)"""
response = self._get("/api/claude_text")
return response.json()
def set_display(self, mode: str, content: str = "") -> Dict:
"""
Control robot display
Args:
mode: "text", "emoji", "status"
content: Text to show or emoji name (happy, thinking, surprised, sleepy, curious, confused)
"""
response = self._post("/api/display", {"mode": mode, "content": content})
return response.json()
def is_connected(self) -> bool:
"""Check if robot is reachable"""
try:
self.get_status()
return True
except Exception as e:
logger.warning(f"Connection check failed: {e}")
return False
# Test when run directly
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
print("Usage: python esp32_client.py <robot_ip>")
sys.exit(1)
host = sys.argv[1]
api_key = "claudes_eyes_secret_2025"
client = ESP32Client(host, api_key=api_key)
print(f"Connecting to {host}...")
if client.is_connected():
print("Connected!")
status = client.get_status()
print(f"\nStatus:")
print(f" Distance: {status.distance_cm} cm")
print(f" Battery: {status.battery_percent}%")
print(f" Action: {status.current_action}")
print(f" WiFi RSSI: {status.wifi_rssi} dBm")
print("\nCapturing image...")
img = client.capture_image_pil()
print(f" Size: {img.size}")
img.save("test_capture.jpg")
print(" Saved to test_capture.jpg")
else:
print("Could not connect to robot!")