esp32-claude-robbie/python_bridge/chat_web_interface.py

1284 lines
49 KiB
Python

"""
Claude's Eyes - Chat Web Interface
Steuert den echten Claude.ai Chat im Browser via Selenium.
Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio!
HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden,
wenn Claude.ai sein UI ändert.
"""
import time
import logging
import tempfile
import hashlib
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from dataclasses import dataclass
from typing import List, Optional
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, NoSuchElementException
logger = logging.getLogger(__name__)
@dataclass
class ChatMessage:
"""Eine Chat-Nachricht"""
id: str
text: str
is_from_assistant: bool
timestamp: float = 0
class ClaudeChatInterface:
"""
Steuert Claude.ai Chat via Selenium Browser Automation.
Diese Klasse:
- Öffnet einen Browser mit dem Claude.ai Chat
- Kann Nachrichten senden (für Heartbeat und Stefan's Sprache)
- Kann neue Nachrichten lesen (für TTS)
WICHTIG: Du musst beim ersten Start manuell einloggen!
"""
@staticmethod
def extract_chat_id(url: str) -> Optional[str]:
"""
Extrahiert die Chat-ID aus einer Claude.ai URL.
Args:
url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d"
Returns:
Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None
"""
if not url or '/chat/' not in url:
return None
try:
# Extrahiere alles nach /chat/
chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0]
return chat_id if chat_id else None
except:
return None
# CSS Selektoren für Claude.ai (Stand: Dezember 2025)
# Diese müssen angepasst werden wenn sich das UI ändert!
SELECTORS = {
# Eingabefeld für neue Nachrichten
"input_field": "div.ProseMirror[contenteditable='true']",
# Alternativ: Textarea
"input_textarea": "textarea[placeholder*='Message']",
# Senden-Button (falls Enter nicht funktioniert)
"send_button": "button[aria-label*='Send']",
# Alle Nachrichten-Container
"messages_container": "div[class*='conversation']",
# Einzelne Nachrichten
"message_human": "div[data-is-streaming='false'][class*='human']",
"message_assistant": "div[data-is-streaming='false'][class*='assistant']",
# Generischer Nachrichten-Selektor (Fallback)
"message_any": "div[class*='message']",
# Streaming-Indikator (Claude tippt noch)
"streaming": "div[data-is-streaming='true']",
# File Upload Input (versteckt, aber funktioniert mit send_keys)
"file_input": "input[type='file']",
}
def __init__(
self,
chat_url: Optional[str] = None,
headless: bool = False,
user_data_dir: Optional[str] = None,
chrome_binary: Optional[str] = None,
esp32_url: Optional[str] = None,
esp32_api_key: Optional[str] = None
):
"""
Initialisiert das Chat-Interface.
Args:
chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123)
headless: Browser im Hintergrund? (False = sichtbar)
user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins)
chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux)
esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture)
esp32_api_key: API-Key für ESP32 Authentifizierung
"""
self.chat_url = chat_url
self.esp32_url = esp32_url
self.esp32_api_key = esp32_api_key
self._message_cache: List[ChatMessage] = []
self._last_message_id = 0
self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg"
# Für Bild-Deduplizierung (100 Bilder pro Chat Limit!)
self._last_image_hash: Optional[str] = None
self._images_uploaded = 0
# HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen)
self._http_session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
self._http_session.mount('http://', adapter)
self._http_session.mount('https://', adapter)
# Chrome Optionen
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1280,800")
# Für persistente Sessions (Login bleibt gespeichert)
if user_data_dir:
options.add_argument(f"--user-data-dir={user_data_dir}")
# Für Termux/Android
if chrome_binary:
options.binary_location = chrome_binary
# Anti-Detection (manche Seiten blocken Selenium)
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
logger.info("Starte Chrome Browser...")
try:
self.driver = webdriver.Chrome(options=options)
self.driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception as e:
logger.error(f"Chrome konnte nicht gestartet werden: {e}")
logger.info("Versuche mit webdriver-manager...")
from webdriver_manager.chrome import ChromeDriverManager
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 30)
# Navigiere zur Chat-URL
if chat_url:
self.navigate_to_chat(chat_url)
def navigate_to_chat(self, url: str):
"""Navigiert zur Chat-URL"""
logger.info(f"Navigiere zu: {url}")
self.driver.get(url)
self.chat_url = url
# Reset Bild-Counter bei neuem Chat
self._last_image_hash = None
self._images_uploaded = 0
# Warte auf Seitenladung
time.sleep(3)
# Prüfe ob Login nötig
if "login" in self.driver.current_url.lower():
logger.warning("Login erforderlich! Bitte im Browser einloggen...")
print("\n" + "=" * 50)
print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!")
print("Das Fenster bleibt offen. Nach dem Login geht's weiter.")
print("=" * 50 + "\n")
# Warte bis wieder auf der Chat-Seite
while "login" in self.driver.current_url.lower():
time.sleep(2)
logger.info("Login erfolgreich!")
time.sleep(2)
def start_new_chat(self) -> tuple[Optional[str], Optional[str]]:
"""
Startet einen neuen Chat auf Claude.ai.
WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist!
Returns:
Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler
"""
try:
# Alte Chat-ID merken bevor wir wechseln
old_chat_id = self.extract_chat_id(self.chat_url)
logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})")
# Methode 1: "New Chat" Button suchen
new_chat_selectors = [
"button[aria-label*='New']",
"button[aria-label*='new']",
"a[href='/new']",
"a[href*='/chat/new']",
"[data-testid*='new-chat']",
"button:has-text('New chat')",
]
for selector in new_chat_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
elem.click()
time.sleep(2)
new_url = self.driver.current_url
logger.info(f"Neuer Chat gestartet: {new_url}")
# Reset Counter
self._last_image_hash = None
self._images_uploaded = 0
self.chat_url = new_url
return new_url, old_chat_id
except:
continue
# Methode 2: Direkt zu /new navigieren
base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai"
new_url = f"{base_url}/new"
logger.info(f"Versuche direkte Navigation zu: {new_url}")
self.driver.get(new_url)
time.sleep(3)
# Prüfe ob es geklappt hat
current = self.driver.current_url
if current != self.chat_url:
logger.info(f"Neuer Chat: {current}")
self._last_image_hash = None
self._images_uploaded = 0
self.chat_url = current
return current, old_chat_id
logger.warning("Konnte keinen neuen Chat starten")
return None, None
except Exception as e:
logger.error(f"Fehler beim Starten eines neuen Chats: {e}")
return None, None
def get_current_chat_url(self) -> str:
"""Gibt die aktuelle Chat-URL zurück"""
return self.driver.current_url
def send_message(self, text: str, wait_for_response: bool = False) -> bool:
"""
Sendet eine Nachricht in den Chat.
Args:
text: Die zu sendende Nachricht
wait_for_response: Warten bis Claude antwortet?
Returns:
True wenn erfolgreich gesendet
"""
# Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut)
result = self.send_message_with_delay(text, delay_before_send=0)
if result and wait_for_response:
self._wait_for_response()
return result
def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool:
"""
Sendet eine Nachricht mit Verzögerung vor dem Absenden.
Nützlich für große Texte (wie Instruktionen), bei denen die
Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten.
Ablauf:
1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!)
2. Warte delay_before_send Sekunden
3. Send-Button klicken (mit Retry)
Args:
text: Die zu sendende Nachricht
delay_before_send: Sekunden warten nach Einfügen, vor dem Senden
Returns:
True wenn erfolgreich gesendet
"""
try:
# Finde Eingabefeld
input_field = self._find_input_field()
if not input_field:
logger.error("Eingabefeld nicht gefunden!")
return False
# Feld fokussieren
input_field.click()
time.sleep(0.2)
# Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!)
logger.info(f"Füge Text ein ({len(text)} Zeichen)...")
self._insert_text_via_js(input_field, text)
# WARTEN - große Texte brauchen Zeit!
if delay_before_send > 0:
logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...")
time.sleep(delay_before_send)
else:
time.sleep(1.0) # Mindestens 1s warten
# Jetzt absenden (mit Retry-Logik)
send_success = False
for attempt in range(3):
send_button = self._find_send_button()
if send_button:
try:
if send_button.is_enabled() and send_button.is_displayed():
send_button.click()
logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})")
send_success = True
break
except Exception as e:
logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}")
time.sleep(0.5)
# Fallback: Enter-Taste
if not send_success:
logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter")
input_field.send_keys(Keys.RETURN)
time.sleep(0.3)
logger.debug(f"Nachricht gesendet: {text[:50]}...")
return True
except Exception as e:
logger.error(f"Fehler beim Senden mit Verzögerung: {e}")
return False
def _find_send_button(self):
"""Findet den Send-Button"""
selectors = [
"button[aria-label*='Send']",
"button[aria-label*='send']",
"button[data-testid*='send']",
"button[type='submit']",
# Claude.ai spezifisch - Button mit Pfeil-Icon
"button svg[class*='send']",
"button[class*='send']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed() and elem.is_enabled():
return elem
except:
continue
# Fallback: JavaScript-Suche
try:
return self.driver.execute_script("""
// Suche nach Send-Button
const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]');
if (btn && !btn.disabled) return btn;
// Alternative: Letzter Button im Input-Bereich
const buttons = document.querySelectorAll('button');
for (const b of buttons) {
if (b.offsetParent && !b.disabled) {
const text = b.textContent.toLowerCase();
const label = (b.getAttribute('aria-label') || '').toLowerCase();
if (text.includes('send') || label.includes('send')) return b;
}
}
return null;
""")
except:
return None
def _find_input_field(self):
"""Findet das Eingabefeld"""
selectors = [
self.SELECTORS["input_field"],
self.SELECTORS["input_textarea"],
"div[contenteditable='true']",
"textarea",
]
for selector in selectors:
try:
element = self.driver.find_element(By.CSS_SELECTOR, selector)
if element.is_displayed() and element.is_enabled():
return element
except NoSuchElementException:
continue
return None
def _insert_text_via_js(self, element, text: str):
"""
Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme).
Bei send_keys() werden physische Tasten gedrückt, was bei
unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu
falschen Zeichen führt (z.B. y↔z vertauscht).
Diese Methode fügt den Text direkt als String ein.
"""
# Escape für JavaScript
escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')
# Für contenteditable divs (ProseMirror)
self.driver.execute_script("""
const element = arguments[0];
const text = arguments[1];
// Fokussieren
element.focus();
// Methode 1: Für contenteditable (ProseMirror)
if (element.contentEditable === 'true') {
// Text als HTML einfügen (respektiert Zeilenumbrüche)
const htmlText = text.replace(/\\n/g, '<br>');
element.innerHTML = htmlText;
// Input-Event triggern damit React/Vue es mitbekommt
element.dispatchEvent(new Event('input', { bubbles: true }));
// Cursor ans Ende setzen
const range = document.createRange();
range.selectNodeContents(element);
range.collapse(false);
const sel = window.getSelection();
sel.removeAllRanges();
sel.addRange(range);
}
// Methode 2: Für textarea/input
else {
element.value = text;
element.dispatchEvent(new Event('input', { bubbles: true }));
element.dispatchEvent(new Event('change', { bubbles: true }));
}
""", element, text)
logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)")
def _wait_for_response(self, timeout: int = 60):
"""Wartet bis Claude fertig getippt hat"""
logger.debug("Warte auf Claudes Antwort...")
# Warte kurz damit Streaming startet
time.sleep(1)
# Warte bis Streaming endet
try:
WebDriverWait(self.driver, timeout).until_not(
EC.presence_of_element_located(
(By.CSS_SELECTOR, self.SELECTORS["streaming"])
)
)
except TimeoutException:
logger.warning("Timeout beim Warten auf Antwort")
time.sleep(0.5) # Kurz warten bis DOM aktualisiert
def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]:
"""
Holt neue Nachrichten aus dem Chat.
Args:
since_id: Nur Nachrichten nach dieser ID zurückgeben
Returns:
Liste neuer ChatMessage Objekte
"""
all_messages = self._get_all_messages()
if since_id is None:
return all_messages
# Filtere nur neue
new_messages = []
found_marker = False
for msg in all_messages:
if found_marker:
new_messages.append(msg)
elif msg.id == since_id:
found_marker = True
return new_messages
def _get_all_messages(self) -> List[ChatMessage]:
"""
Holt alle Nachrichten aus dem Chat.
Claude.ai verwendet unterschiedliche data-testid für User und Assistant:
- data-testid="user-message" für User
- Für Claude: Wir suchen nach Nachrichten die NICHT user-message sind
"""
messages = []
try:
# ═══════════════════════════════════════════════════════════════
# STRATEGIE: Hole User UND Assistant Nachrichten SEPARAT
# ═══════════════════════════════════════════════════════════════
# 1. User-Nachrichten (haben data-testid="user-message")
user_elements = []
try:
user_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"[data-testid='user-message']"
)
logger.debug(f"User-Nachrichten gefunden: {len(user_elements)}")
except Exception as e:
logger.debug(f"User-Nachrichten Suche fehlgeschlagen: {e}")
# 2. Claude-Nachrichten suchen
# Claude.ai nutzt verschiedene Klassen, aber NICHT user-message
claude_elements = []
# Methode A: font-claude-message Klasse
try:
found = self.driver.find_elements(
By.CSS_SELECTOR,
"[class*='font-claude-message']"
)
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via font-claude-message: {len(found)}")
except:
pass
# Methode B: data-testid="assistant-message" (neue Claude.ai Version)
if not claude_elements:
try:
found = self.driver.find_elements(
By.CSS_SELECTOR,
"[data-testid='assistant-message']"
)
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via assistant-message: {len(found)}")
except:
pass
# Methode C: Prose-Container die NICHT in user-message sind
if not claude_elements:
try:
# Alle prose Elemente die nicht innerhalb von user-message sind
found = self.driver.execute_script("""
const msgs = [];
document.querySelectorAll('.prose').forEach(e => {
// Prüfe ob dieses Element NICHT in einem user-message Container ist
let parent = e;
let isUserMessage = false;
while (parent && parent !== document.body) {
if (parent.getAttribute('data-testid') === 'user-message') {
isUserMessage = true;
break;
}
parent = parent.parentElement;
}
if (!isUserMessage) {
msgs.push(e);
}
});
return msgs;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via prose (nicht-user): {len(found)}")
except Exception as e:
logger.debug(f"Prose-Suche fehlgeschlagen: {e}")
# Methode D: Suche nach conversation-turn Struktur (neue Claude.ai Version)
if not claude_elements:
try:
# Claude.ai 2024/2025 nutzt oft conversation-turn Divs
# WICHTIG: Wir suchen die ÄUSSERSTEN Elemente die "Claude sagt:" enthalten
found = self.driver.execute_script("""
const msgs = [];
// Suche nach Nachrichten die KEINE user-message sind
// und einen substantiellen Text-Inhalt haben
const allDivs = document.querySelectorAll('div');
for (const div of allDivs) {
// Überspringe wenn es ein user-message ist
if (div.getAttribute('data-testid') === 'user-message') continue;
if (div.closest('[data-testid="user-message"]')) continue;
// Suche nach Elementen die typische Claude-Antwort-Merkmale haben
const text = div.innerText || '';
// Claude-Antworten müssen "Claude sagt:" enthalten
// UND der Text muss lang genug sein (mindestens 20 Zeichen)
if (text.includes('Claude sagt:') && text.length > 20) {
msgs.push(div);
}
}
// Dedupliziere: Behalte nur die ÄUSSERSTEN Elemente (Container)
// Nicht die innersten - wir wollen den vollständigen Text!
const filtered = msgs.filter(div => {
// Behalte dieses Element nur wenn kein anderes Element es enthält
for (const other of msgs) {
if (other !== div && other.contains(div)) {
// Ein anderes Element enthält dieses -> verwerfen
return false;
}
}
return true;
});
return filtered;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via Text-Suche: {len(found)}")
except Exception as e:
logger.debug(f"Text-Suche fehlgeschlagen: {e}")
# Methode E: Letzte Fallback - Klasse enthält "assistant" (UI-Filter)
if not claude_elements:
try:
found = self.driver.execute_script("""
const msgs = [];
document.querySelectorAll('[class*="assistant"]').forEach(e => {
// Filtere Buttons, kleine Elemente und UI-Komponenten
if (e.tagName === 'BUTTON') return;
if (e.querySelector('button')) return;
const text = (e.innerText || '').trim();
// Nur Elemente mit substantiellem Text
if (text.length > 20 && !text.startsWith('[TICK]')) {
msgs.push(e);
}
});
return msgs;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via class-assistant: {len(found)}")
except Exception as e:
logger.debug(f"class-assistant-Suche fehlgeschlagen: {e}")
# ═══════════════════════════════════════════════════════════════
# DEBUG: Falls keine Claude-Nachrichten gefunden, DOM analysieren
# ═══════════════════════════════════════════════════════════════
if not claude_elements and user_elements:
try:
# Hole DOM-Info für Debugging
dom_info = self.driver.execute_script("""
const info = {
'data-testids': [],
'classes_with_message': [],
'classes_with_assistant': []
};
// Sammle alle data-testid Werte
document.querySelectorAll('[data-testid]').forEach(e => {
const testid = e.getAttribute('data-testid');
if (!info['data-testids'].includes(testid)) {
info['data-testids'].push(testid);
}
});
// Sammle Klassen die 'message' enthalten
document.querySelectorAll('[class*="message"]').forEach(e => {
const cls = e.className;
if (typeof cls === 'string' && !info['classes_with_message'].includes(cls.substring(0, 100))) {
info['classes_with_message'].push(cls.substring(0, 100));
}
});
// Sammle Klassen die 'assistant' enthalten
document.querySelectorAll('[class*="assistant"]').forEach(e => {
const cls = e.className;
if (typeof cls === 'string' && !info['classes_with_assistant'].includes(cls.substring(0, 100))) {
info['classes_with_assistant'].push(cls.substring(0, 100));
}
});
return info;
""")
logger.warning(f"KEINE Claude-Nachrichten gefunden! DOM-Info: data-testids={dom_info.get('data-testids', [])[:10]}")
logger.debug(f"Klassen mit 'message': {dom_info.get('classes_with_message', [])[:5]}")
logger.debug(f"Klassen mit 'assistant': {dom_info.get('classes_with_assistant', [])[:5]}")
except Exception as e:
logger.debug(f"DOM-Analyse fehlgeschlagen: {e}")
# ═══════════════════════════════════════════════════════════════
# Nachrichten verarbeiten
# ═══════════════════════════════════════════════════════════════
# User-Nachrichten hinzufügen
for i, elem in enumerate(user_elements):
try:
text = elem.text.strip()
if not text or len(text) < 3:
continue
msg_id = elem.get_attribute("data-message-id")
if not msg_id:
msg_id = f"user_{i}_{hash(text[:100])}"
messages.append(ChatMessage(
id=msg_id,
text=text,
is_from_assistant=False,
timestamp=time.time()
))
except Exception as e:
logger.debug(f"Fehler bei User-Nachricht {i}: {e}")
# Claude-Nachrichten hinzufügen
# WICHTIG: Filtere User-Befehle die fälschlicherweise erkannt werden
user_commands = [
'[TICK]', '[START]', '[FORWARD]', '[BACKWARD]', '[LEFT]', '[RIGHT]', '[STOP]',
'[LOOK_LEFT]', '[LOOK_RIGHT]', '[LOOK_UP]', '[LOOK_DOWN]', '[LOOK_CENTER]',
'[READY]'
]
for i, elem in enumerate(claude_elements):
try:
# Versuche zuerst innerText via JavaScript zu bekommen (zuverlässiger)
try:
text = self.driver.execute_script("return arguments[0].innerText || '';", elem).strip()
except:
text = elem.text.strip()
if not text or len(text) < 5:
logger.debug(f"Claude-Element {i} übersprungen: Text zu kurz ({len(text) if text else 0} Zeichen)")
continue
# Filtere User-Befehle (diese sind KEINE Claude-Nachrichten)
if text in user_commands:
logger.debug(f"Überspringe User-Befehl: {text}")
continue
# Filtere auch wenn der Text nur aus einem Befehl besteht (evtl. mit Whitespace)
text_upper = text.upper().strip()
if any(text_upper == cmd for cmd in user_commands):
logger.debug(f"Überspringe User-Befehl (normalized): {text}")
continue
# Prüfe ob es wirklich eine Claude-Nachricht ist (muss "Claude sagt:" enthalten)
if "Claude sagt:" not in text:
logger.debug(f"Claude-Element {i} übersprungen: Kein 'Claude sagt:' gefunden")
continue
msg_id = elem.get_attribute("data-message-id")
if not msg_id:
msg_id = f"claude_{i}_{hash(text[:100])}"
messages.append(ChatMessage(
id=msg_id,
text=text,
is_from_assistant=True,
timestamp=time.time()
))
logger.debug(f"Claude-Nachricht {i}: '{text[:80]}...' ({len(text)} Zeichen)")
except Exception as e:
logger.debug(f"Fehler bei Claude-Nachricht {i}: {e}")
logger.debug(f"Gesamt: {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude")
except Exception as e:
logger.error(f"Fehler beim Lesen der Nachrichten: {e}")
return messages
def get_last_assistant_message(self) -> Optional[ChatMessage]:
"""Holt die letzte Nachricht von Claude"""
messages = self._get_all_messages()
for msg in reversed(messages):
if msg.is_from_assistant:
return msg
return None
def is_claude_typing(self) -> bool:
"""
Prüft ob Claude gerade tippt (streaming).
Erkennt mehrere Indikatoren:
1. Stop-Button ist sichtbar (während Claude schreibt)
2. data-is-streaming='true' Attribut
3. Animiertes Logo / Thinking-Indikator
"""
try:
# Methode 1: Stop-Button prüfen (zuverlässigster Indikator)
# Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button
stop_indicators = [
"button[aria-label*='Stop']",
"button[aria-label*='stop']",
"button[class*='stop']",
"button[data-testid*='stop']",
# Alternativer Indikator: Button mit Stop-Icon
"button svg[class*='stop']",
]
for selector in stop_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})")
return True
except:
continue
# Methode 2: Streaming-Attribut (original)
streaming = self.driver.find_elements(
By.CSS_SELECTOR,
self.SELECTORS["streaming"]
)
if len(streaming) > 0:
logger.debug("Claude tippt (streaming=true)")
return True
# Methode 3: Animiertes/Thinking Indikator suchen
thinking_indicators = [
"[class*='thinking']",
"[class*='loading']",
"[class*='typing']",
"[class*='streaming']",
"[data-state='loading']",
# Pulsierendes Logo
"[class*='pulse']",
"[class*='animate']",
]
for selector in thinking_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Indikator: {selector})")
return True
except:
continue
# Methode 4: JavaScript-basierte Prüfung
# Prüft ob irgendwo noch Text gestreamt wird
try:
is_streaming = self.driver.execute_script("""
// Prüfe ob Stop-Button existiert und sichtbar ist
const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]');
if (stopBtn && stopBtn.offsetParent !== null) return true;
// Prüfe auf streaming-Attribut
const streaming = document.querySelector('[data-is-streaming="true"]');
if (streaming) return true;
// Prüfe auf disabled Send-Button (während Claude tippt)
const sendBtn = document.querySelector('button[aria-label*="Send"]');
if (sendBtn && sendBtn.disabled) return true;
return false;
""")
if is_streaming:
logger.debug("Claude tippt (JavaScript-Check)")
return True
except:
pass
return False
except Exception as e:
logger.debug(f"Fehler bei typing-check: {e}")
return False
def wait_for_ready_signal(self, timeout: int = 120) -> bool:
"""
Wartet bis Claude [READY] sendet.
Sucht nach [READY] das NICHT Teil des Instruktions-Textes ist.
Wir zählen wie oft [READY] vorkommt - wenn mehr als 1x, hat Claude geantwortet.
Args:
timeout: Maximale Wartezeit in Sekunden
Returns:
True wenn [READY] empfangen, False bei Timeout
"""
logger.info(f"Warte auf [READY] Signal (max {timeout}s)...")
start_time = time.time()
while time.time() - start_time < timeout:
# Warte bis Claude fertig ist mit Tippen
typing_wait_start = time.time()
while self.is_claude_typing():
time.sleep(0.5)
# Timeout für typing-wait (max 60s)
if time.time() - typing_wait_start > 60:
logger.debug("Typing-Wait Timeout, prüfe trotzdem...")
break
# Suche [READY] im Seitentext via JavaScript
# Zähle wie oft [READY] vorkommt - 1x ist unsere Instruktion, 2x+ bedeutet Claude hat geantwortet
try:
ready_count = self.driver.execute_script("""
const text = document.body.innerText.toUpperCase();
const matches = text.match(/\\[READY\\]/g);
return matches ? matches.length : 0;
""")
logger.debug(f"[READY] gefunden: {ready_count}x")
# Mehr als 1x = Claude hat auch [READY] geschrieben
if ready_count and ready_count >= 2:
logger.info(f"[READY] Signal gefunden! ({ready_count}x im Text)")
return True
except Exception as e:
logger.debug(f"JavaScript [READY] Suche fehlgeschlagen: {e}")
# Kurz warten bevor nächster Check
time.sleep(1)
logger.warning(f"Timeout: Kein [READY] nach {timeout}s")
return False
def take_screenshot(self, path: str = "screenshot.png"):
"""Macht einen Screenshot (für Debugging)"""
self.driver.save_screenshot(path)
logger.info(f"Screenshot gespeichert: {path}")
def close(self):
"""Schließt den Browser"""
logger.info("Schließe Browser...")
try:
self.driver.quit()
except:
pass
# ════════════════════════════════════════════════════════════════════════
# BILD-UPLOAD FUNKTIONEN (für Robot Vision)
# ════════════════════════════════════════════════════════════════════════
def fetch_image_from_esp32(self) -> bool:
"""
Holt ein Bild vom ESP32/Mock-Server und speichert es lokal.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self.esp32_url:
logger.warning("Keine ESP32 URL konfiguriert")
return False
try:
# Capture-Endpoint aufrufen (macht Foto und gibt es zurück)
url = f"{self.esp32_url}/api/capture"
if self.esp32_api_key:
url += f"?key={self.esp32_api_key}"
response = self._http_session.get(url, timeout=10)
response.raise_for_status()
# Prüfe ob wir ein Bild bekommen haben
content_type = response.headers.get("Content-Type", "")
if "image" in content_type:
# Direktes Bild
with open(self._temp_image_path, "wb") as f:
f.write(response.content)
logger.info(f"Bild gespeichert: {len(response.content)} bytes")
return True
else:
# JSON Response (Mock-Server neuer Stil)
# Dann müssen wir /foto.jpg separat holen
foto_url = f"{self.esp32_url}/foto.jpg"
foto_response = self._http_session.get(foto_url, timeout=10)
foto_response.raise_for_status()
with open(self._temp_image_path, "wb") as f:
f.write(foto_response.content)
logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes")
return True
except requests.exceptions.RequestException as e:
logger.error(f"ESP32 Verbindungsfehler: {e}")
return False
except Exception as e:
logger.error(f"Fehler beim Bild holen: {e}")
return False
def upload_image_to_chat(self) -> bool:
"""
Lädt das gespeicherte Bild in den Claude.ai Chat hoch.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self._temp_image_path.exists():
logger.error("Kein Bild zum Hochladen vorhanden")
return False
try:
# Finde das versteckte file input Element
file_input = self._find_file_input()
if not file_input:
logger.error("File-Upload Input nicht gefunden!")
return False
# Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs)
file_input.send_keys(str(self._temp_image_path.absolute()))
logger.info("Bild hochgeladen!")
self._images_uploaded += 1
# Kurz warten bis Upload verarbeitet ist
time.sleep(1.5)
return True
except Exception as e:
logger.error(f"Fehler beim Bild-Upload: {e}")
return False
def _calculate_image_hash(self) -> Optional[str]:
"""
Berechnet einen Hash des aktuellen Bildes.
Returns:
MD5 Hash als Hex-String, oder None bei Fehler
"""
if not self._temp_image_path.exists():
return None
try:
with open(self._temp_image_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except Exception as e:
logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}")
return None
def has_image_changed(self) -> bool:
"""
Prüft ob sich das Bild seit dem letzten Upload geändert hat.
WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat!
Diese Funktion hilft, unnötige Uploads zu vermeiden.
Returns:
True wenn Bild neu/geändert, False wenn identisch zum letzten
"""
current_hash = self._calculate_image_hash()
if current_hash is None:
return True # Kein Bild = als "geändert" behandeln
if self._last_image_hash is None:
return True # Erster Upload
return current_hash != self._last_image_hash
def upload_image_if_changed(self) -> bool:
"""
Lädt Bild nur hoch wenn es sich geändert hat.
Returns:
True wenn hochgeladen, False wenn übersprungen oder Fehler
"""
if not self.has_image_changed():
logger.debug("Bild unverändert - überspringe Upload")
return False
# Hash vor dem Upload speichern
new_hash = self._calculate_image_hash()
if self.upload_image_to_chat():
self._last_image_hash = new_hash
return True
return False
def delete_uploaded_images(self) -> int:
"""
Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen.
HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden!
Bereits gesendete Bilder können nicht gelöscht werden.
Returns:
Anzahl gelöschter Bilder
"""
deleted = 0
try:
# Suche nach Bild-Vorschau-Elementen im Eingabebereich
# Diese haben meist einen X/Close Button
preview_selectors = [
"button[aria-label*='Remove']",
"button[aria-label*='remove']",
"button[aria-label*='Delete']",
"button[aria-label*='delete']",
"[class*='preview'] button[class*='close']",
"[class*='attachment'] button[class*='remove']",
"[class*='file'] button[class*='delete']",
]
for selector in preview_selectors:
try:
buttons = self.driver.find_elements(By.CSS_SELECTOR, selector)
for btn in buttons:
if btn.is_displayed():
btn.click()
deleted += 1
time.sleep(0.3)
except:
continue
# JavaScript-Fallback
if deleted == 0:
try:
deleted = self.driver.execute_script("""
let count = 0;
// Suche nach Remove-Buttons in Attachment-Previews
const buttons = document.querySelectorAll(
'[class*="preview"] button, [class*="attachment"] button'
);
buttons.forEach(btn => {
if (btn.offsetParent) {
btn.click();
count++;
}
});
return count;
""") or 0
except:
pass
if deleted > 0:
logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht")
except Exception as e:
logger.debug(f"Bild-Löschung fehlgeschlagen: {e}")
return deleted
def get_images_uploaded_count(self) -> int:
"""Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück"""
return self._images_uploaded
def _find_file_input(self):
"""Findet das File-Upload Input Element"""
selectors = [
self.SELECTORS["file_input"],
"input[accept*='image']",
"input[type='file'][accept]",
"input[type='file']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
# Auch versteckte Inputs funktionieren mit send_keys
return elem
except:
continue
# Fallback: Via JavaScript suchen
try:
return self.driver.execute_script("""
return document.querySelector('input[type="file"]') ||
document.querySelector('[accept*="image"]');
""")
except:
return None
def send_tick_with_image(self) -> bool:
"""
Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK].
Das ist der Haupt-Heartbeat mit Bild!
Returns:
True wenn alles geklappt hat
"""
# Schritt 1: Bild vom ESP32 holen
if not self.fetch_image_from_esp32():
# Kein Bild? Trotzdem TICK senden
self.send_message("[TICK - KEIN BILD]")
return False
# Schritt 2: Bild in Chat hochladen
if not self.upload_image_to_chat():
self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]")
return False
# Schritt 3: TICK senden
self.send_message("[TICK]")
return True
# Hilfsfunktion für einfaches Testing
def test_interface(chat_url: str):
"""Testet das Interface"""
import sys
logging.basicConfig(level=logging.DEBUG)
print("Starte Chat Interface Test...")
print(f"URL: {chat_url}")
interface = ClaudeChatInterface(
chat_url=chat_url,
headless=False
)
print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...")
input()
interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!")
print("Nachricht gesendet!")
print("\nWarte 5 Sekunden auf Antwort...")
time.sleep(5)
messages = interface.get_new_messages()
print(f"\nGefundene Nachrichten: {len(messages)}")
for msg in messages[-3:]:
role = "Claude" if msg.is_from_assistant else "Human"
print(f" [{role}] {msg.text[:100]}...")
print("\nDrücke Enter zum Beenden...")
input()
interface.close()
print("Fertig!")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python chat_web_interface.py <claude-chat-url>")
print("Example: python chat_web_interface.py https://claude.ai/chat/abc123")
sys.exit(1)
test_interface(sys.argv[1])