239 lines
7.5 KiB
Python
239 lines
7.5 KiB
Python
"""
|
|
Claude's Eyes - ESP32 API Client
|
|
|
|
Handles communication with the robot's REST API
|
|
"""
|
|
|
|
import requests
|
|
from typing import Optional, Dict, Any
|
|
from dataclasses import dataclass
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class RobotStatus:
|
|
"""Current robot status from sensors"""
|
|
distance_cm: float
|
|
battery_percent: int
|
|
current_action: str
|
|
wifi_rssi: int
|
|
uptime_seconds: int
|
|
servo_pan: int
|
|
servo_tilt: int
|
|
obstacle_warning: bool
|
|
obstacle_danger: bool
|
|
is_tilted: bool
|
|
is_moving: bool
|
|
imu: Dict[str, float]
|
|
|
|
|
|
class ESP32Client:
|
|
"""Client for communicating with the ESP32 robot"""
|
|
|
|
def __init__(self, host: str, port: int = 80, api_key: str = "", timeout: int = 10):
|
|
self.base_url = f"http://{host}:{port}"
|
|
self.api_key = api_key
|
|
self.timeout = timeout
|
|
self._session = requests.Session()
|
|
|
|
def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
|
|
"""Make GET request with API key"""
|
|
if params is None:
|
|
params = {}
|
|
params["key"] = self.api_key
|
|
|
|
url = f"{self.base_url}{endpoint}"
|
|
logger.debug(f"GET {url}")
|
|
|
|
response = self._session.get(url, params=params, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
def _post(self, endpoint: str, data: Dict) -> requests.Response:
|
|
"""Make POST request with API key"""
|
|
url = f"{self.base_url}{endpoint}?key={self.api_key}"
|
|
logger.debug(f"POST {url} with {data}")
|
|
|
|
response = self._session.post(url, json=data, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
def capture_image(self, resolution: str = "VGA", quality: int = 12) -> bytes:
|
|
"""
|
|
Capture image from robot camera
|
|
|
|
Args:
|
|
resolution: QVGA, VGA, SVGA, XGA, SXGA, UXGA
|
|
quality: 10-63 (lower = better)
|
|
|
|
Returns:
|
|
JPEG image data as bytes
|
|
"""
|
|
params = {
|
|
"resolution": resolution,
|
|
"quality": quality
|
|
}
|
|
response = self._get("/api/capture", params)
|
|
logger.info(f"Captured image: {len(response.content)} bytes")
|
|
return response.content
|
|
|
|
def capture_image_pil(self, resolution: str = "VGA", quality: int = 12) -> Image.Image:
|
|
"""Capture image and return as PIL Image"""
|
|
image_data = self.capture_image(resolution, quality)
|
|
return Image.open(BytesIO(image_data))
|
|
|
|
def get_status(self) -> RobotStatus:
|
|
"""Get current robot status from sensors"""
|
|
response = self._get("/api/status")
|
|
data = response.json()
|
|
|
|
return RobotStatus(
|
|
distance_cm=data.get("distance_cm", 0),
|
|
battery_percent=data.get("battery_percent", 100),
|
|
current_action=data.get("current_action", "unknown"),
|
|
wifi_rssi=data.get("wifi_rssi", 0),
|
|
uptime_seconds=data.get("uptime_seconds", 0),
|
|
servo_pan=data.get("servo_pan", 90),
|
|
servo_tilt=data.get("servo_tilt", 90),
|
|
obstacle_warning=data.get("obstacle_warning", False),
|
|
obstacle_danger=data.get("obstacle_danger", False),
|
|
is_tilted=data.get("is_tilted", False),
|
|
is_moving=data.get("is_moving", False),
|
|
imu=data.get("imu", {})
|
|
)
|
|
|
|
def send_command(self, action: str, speed: int = 50, duration_ms: int = 500,
|
|
pan: Optional[int] = None, tilt: Optional[int] = None) -> Dict[str, Any]:
|
|
"""
|
|
Send movement command to robot
|
|
|
|
Args:
|
|
action: forward, backward, left, right, stop,
|
|
look_left, look_right, look_up, look_down, look_center, look_custom
|
|
speed: 0-100 percent
|
|
duration_ms: Duration in milliseconds
|
|
pan: Custom pan angle (for look_custom)
|
|
tilt: Custom tilt angle (for look_custom)
|
|
|
|
Returns:
|
|
Response from robot
|
|
"""
|
|
data = {
|
|
"action": action,
|
|
"speed": speed,
|
|
"duration_ms": duration_ms
|
|
}
|
|
|
|
if pan is not None:
|
|
data["pan"] = pan
|
|
if tilt is not None:
|
|
data["tilt"] = tilt
|
|
|
|
response = self._post("/api/command", data)
|
|
result = response.json()
|
|
logger.info(f"Command {action}: {result.get('message', 'OK')}")
|
|
return result
|
|
|
|
# Convenience methods for common actions
|
|
def forward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
|
|
return self.send_command("forward", speed, duration_ms)
|
|
|
|
def backward(self, speed: int = 50, duration_ms: int = 500) -> Dict:
|
|
return self.send_command("backward", speed, duration_ms)
|
|
|
|
def left(self, speed: int = 50, duration_ms: int = 500) -> Dict:
|
|
return self.send_command("left", speed, duration_ms)
|
|
|
|
def right(self, speed: int = 50, duration_ms: int = 500) -> Dict:
|
|
return self.send_command("right", speed, duration_ms)
|
|
|
|
def stop(self) -> Dict:
|
|
return self.send_command("stop")
|
|
|
|
def look_left(self) -> Dict:
|
|
return self.send_command("look_left")
|
|
|
|
def look_right(self) -> Dict:
|
|
return self.send_command("look_right")
|
|
|
|
def look_up(self) -> Dict:
|
|
return self.send_command("look_up")
|
|
|
|
def look_down(self) -> Dict:
|
|
return self.send_command("look_down")
|
|
|
|
def look_center(self) -> Dict:
|
|
return self.send_command("look_center")
|
|
|
|
def look_custom(self, pan: int, tilt: int) -> Dict:
|
|
return self.send_command("look_custom", pan=pan, tilt=tilt)
|
|
|
|
def set_claude_text(self, text: str) -> Dict:
|
|
"""Set text that Claude wants to say/display"""
|
|
response = self._post("/api/claude_text", {"text": text})
|
|
return response.json()
|
|
|
|
def get_claude_text(self) -> Dict[str, Any]:
|
|
"""Get last Claude text (for TTS)"""
|
|
response = self._get("/api/claude_text")
|
|
return response.json()
|
|
|
|
def set_display(self, mode: str, content: str = "") -> Dict:
|
|
"""
|
|
Control robot display
|
|
|
|
Args:
|
|
mode: "text", "emoji", "status"
|
|
content: Text to show or emoji name (happy, thinking, surprised, sleepy, curious, confused)
|
|
"""
|
|
response = self._post("/api/display", {"mode": mode, "content": content})
|
|
return response.json()
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Check if robot is reachable"""
|
|
try:
|
|
self.get_status()
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Connection check failed: {e}")
|
|
return False
|
|
|
|
|
|
# Test when run directly
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python esp32_client.py <robot_ip>")
|
|
sys.exit(1)
|
|
|
|
host = sys.argv[1]
|
|
api_key = "claudes_eyes_secret_2025"
|
|
|
|
client = ESP32Client(host, api_key=api_key)
|
|
|
|
print(f"Connecting to {host}...")
|
|
if client.is_connected():
|
|
print("Connected!")
|
|
|
|
status = client.get_status()
|
|
print(f"\nStatus:")
|
|
print(f" Distance: {status.distance_cm} cm")
|
|
print(f" Battery: {status.battery_percent}%")
|
|
print(f" Action: {status.current_action}")
|
|
print(f" WiFi RSSI: {status.wifi_rssi} dBm")
|
|
|
|
print("\nCapturing image...")
|
|
img = client.capture_image_pil()
|
|
print(f" Size: {img.size}")
|
|
img.save("test_capture.jpg")
|
|
print(" Saved to test_capture.jpg")
|
|
else:
|
|
print("Could not connect to robot!")
|