esp32-claude-robbie/python_bridge/chat_web_interface.py

1379 lines
54 KiB
Python

"""
Claude's Eyes - Chat Web Interface
Steuert den echten Claude.ai Chat im Browser via Selenium.
Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio!
HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden,
wenn Claude.ai sein UI ändert.
"""
import time
import logging
import tempfile
import hashlib
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from dataclasses import dataclass
from typing import List, Optional
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, NoSuchElementException
logger = logging.getLogger(__name__)
@dataclass
class ChatMessage:
"""Eine Chat-Nachricht"""
id: str
text: str
is_from_assistant: bool
timestamp: float = 0
class ClaudeChatInterface:
"""
Steuert Claude.ai Chat via Selenium Browser Automation.
Diese Klasse:
- Öffnet einen Browser mit dem Claude.ai Chat
- Kann Nachrichten senden (für Heartbeat und Stefan's Sprache)
- Kann neue Nachrichten lesen (für TTS)
WICHTIG: Du musst beim ersten Start manuell einloggen!
"""
@staticmethod
def extract_chat_id(url: str) -> Optional[str]:
"""
Extrahiert die Chat-ID aus einer Claude.ai URL.
Args:
url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d"
Returns:
Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None
"""
if not url or '/chat/' not in url:
return None
try:
# Extrahiere alles nach /chat/
chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0]
return chat_id if chat_id else None
except:
return None
# CSS Selektoren für Claude.ai (Stand: Dezember 2025)
# Diese müssen angepasst werden wenn sich das UI ändert!
SELECTORS = {
# Eingabefeld für neue Nachrichten
"input_field": "div.ProseMirror[contenteditable='true']",
# Alternativ: Textarea
"input_textarea": "textarea[placeholder*='Message']",
# Senden-Button (falls Enter nicht funktioniert)
"send_button": "button[aria-label*='Send']",
# Alle Nachrichten-Container
"messages_container": "div[class*='conversation']",
# Einzelne Nachrichten
"message_human": "div[data-is-streaming='false'][class*='human']",
"message_assistant": "div[data-is-streaming='false'][class*='assistant']",
# Generischer Nachrichten-Selektor (Fallback)
"message_any": "div[class*='message']",
# Streaming-Indikator (Claude tippt noch)
"streaming": "div[data-is-streaming='true']",
# File Upload Input (versteckt, aber funktioniert mit send_keys)
"file_input": "input[type='file']",
}
def __init__(
self,
chat_url: Optional[str] = None,
headless: bool = False,
user_data_dir: Optional[str] = None,
chrome_binary: Optional[str] = None,
esp32_url: Optional[str] = None,
esp32_api_key: Optional[str] = None
):
"""
Initialisiert das Chat-Interface.
Args:
chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123)
headless: Browser im Hintergrund? (False = sichtbar)
user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins)
chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux)
esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture)
esp32_api_key: API-Key für ESP32 Authentifizierung
"""
self.chat_url = chat_url
self.esp32_url = esp32_url
self.esp32_api_key = esp32_api_key
self._message_cache: List[ChatMessage] = []
self._last_message_id = 0
self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg"
# Für Bild-Deduplizierung (100 Bilder pro Chat Limit!)
self._last_image_hash: Optional[str] = None
self._images_uploaded = 0
# HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen)
self._http_session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
self._http_session.mount('http://', adapter)
self._http_session.mount('https://', adapter)
# Chrome Optionen
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1280,800")
# Für persistente Sessions (Login bleibt gespeichert)
if user_data_dir:
options.add_argument(f"--user-data-dir={user_data_dir}")
# Für Termux/Android
if chrome_binary:
options.binary_location = chrome_binary
# Anti-Detection (manche Seiten blocken Selenium)
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
logger.info("Starte Chrome Browser...")
try:
self.driver = webdriver.Chrome(options=options)
self.driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception as e:
logger.error(f"Chrome konnte nicht gestartet werden: {e}")
logger.info("Versuche mit webdriver-manager...")
from webdriver_manager.chrome import ChromeDriverManager
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 30)
# Navigiere zur Chat-URL
if chat_url:
self.navigate_to_chat(chat_url)
def navigate_to_chat(self, url: str):
"""Navigiert zur Chat-URL"""
logger.info(f"Navigiere zu: {url}")
self.driver.get(url)
self.chat_url = url
# Reset Bild-Counter bei neuem Chat
self._last_image_hash = None
self._images_uploaded = 0
# Warte auf Seitenladung
time.sleep(3)
# Prüfe ob Login nötig
if "login" in self.driver.current_url.lower():
logger.warning("Login erforderlich! Bitte im Browser einloggen...")
print("\n" + "=" * 50)
print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!")
print("Das Fenster bleibt offen. Nach dem Login geht's weiter.")
print("=" * 50 + "\n")
# Warte bis wieder auf der Chat-Seite
while "login" in self.driver.current_url.lower():
time.sleep(2)
logger.info("Login erfolgreich!")
time.sleep(2)
def start_new_chat(self) -> tuple[Optional[str], Optional[str]]:
"""
Startet einen neuen Chat auf Claude.ai.
WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist!
Returns:
Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler
"""
try:
# Alte Chat-ID merken bevor wir wechseln
old_chat_id = self.extract_chat_id(self.chat_url)
logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})")
# Methode 1: "New Chat" Button suchen
new_chat_selectors = [
"button[aria-label*='New']",
"button[aria-label*='new']",
"a[href='/new']",
"a[href*='/chat/new']",
"[data-testid*='new-chat']",
"button:has-text('New chat')",
]
for selector in new_chat_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
elem.click()
time.sleep(2)
new_url = self.driver.current_url
logger.info(f"Neuer Chat gestartet: {new_url}")
# Reset Counter
self._last_image_hash = None
self._images_uploaded = 0
self.chat_url = new_url
return new_url, old_chat_id
except:
continue
# Methode 2: Direkt zu /new navigieren
base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai"
new_url = f"{base_url}/new"
logger.info(f"Versuche direkte Navigation zu: {new_url}")
self.driver.get(new_url)
time.sleep(3)
# Prüfe ob es geklappt hat
current = self.driver.current_url
if current != self.chat_url:
logger.info(f"Neuer Chat: {current}")
self._last_image_hash = None
self._images_uploaded = 0
self.chat_url = current
return current, old_chat_id
logger.warning("Konnte keinen neuen Chat starten")
return None, None
except Exception as e:
logger.error(f"Fehler beim Starten eines neuen Chats: {e}")
return None, None
def get_current_chat_url(self) -> str:
"""Gibt die aktuelle Chat-URL zurück"""
return self.driver.current_url
def send_message(self, text: str, wait_for_response: bool = False) -> bool:
"""
Sendet eine Nachricht in den Chat.
Args:
text: Die zu sendende Nachricht
wait_for_response: Warten bis Claude antwortet?
Returns:
True wenn erfolgreich gesendet
"""
# Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut)
result = self.send_message_with_delay(text, delay_before_send=0)
if result and wait_for_response:
self._wait_for_response()
return result
def wait_for_input_field(self, timeout: int = 60) -> bool:
"""
Wartet bis das Chat-Eingabefeld verfügbar ist.
Nützlich nach CAPTCHA/Login wenn das UI noch lädt.
Args:
timeout: Maximale Wartezeit in Sekunden
Returns:
True wenn Eingabefeld gefunden, False bei Timeout
"""
logger.info(f"Warte auf Chat-Eingabefeld (max {timeout}s)...")
start_time = time.time()
while time.time() - start_time < timeout:
input_field = self._find_input_field()
if input_field:
try:
if input_field.is_displayed() and input_field.is_enabled():
logger.info("Chat-Eingabefeld bereit!")
return True
except:
pass
time.sleep(1)
logger.warning(f"Timeout: Eingabefeld nach {timeout}s nicht gefunden")
return False
def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool:
"""
Sendet eine Nachricht mit Verzögerung vor dem Absenden.
Nützlich für große Texte (wie Instruktionen), bei denen die
Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten.
Ablauf:
0. Warte bis Eingabefeld verfügbar (für CAPTCHA/Login-Fälle)
1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!)
2. Warte delay_before_send Sekunden
3. Send-Button klicken (mit Retry)
Args:
text: Die zu sendende Nachricht
delay_before_send: Sekunden warten nach Einfügen, vor dem Senden
Returns:
True wenn erfolgreich gesendet
"""
try:
# WICHTIG: Warte auf Eingabefeld (CAPTCHA/Login kann dauern!)
if not self.wait_for_input_field(timeout=120):
logger.error("Eingabefeld nicht verfügbar nach 120s!")
return False
# Finde Eingabefeld
input_field = self._find_input_field()
if not input_field:
logger.error("Eingabefeld nicht gefunden!")
return False
# Feld fokussieren
input_field.click()
time.sleep(0.2)
# Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!)
logger.info(f"Füge Text ein ({len(text)} Zeichen)...")
self._insert_text_via_js(input_field, text)
# WARTEN - große Texte brauchen Zeit!
if delay_before_send > 0:
logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...")
time.sleep(delay_before_send)
else:
time.sleep(1.0) # Mindestens 1s warten
# Jetzt absenden (mit Retry-Logik)
send_success = False
for attempt in range(3):
send_button = self._find_send_button()
if send_button:
try:
if send_button.is_enabled() and send_button.is_displayed():
send_button.click()
logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})")
send_success = True
break
except Exception as e:
logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}")
time.sleep(0.5)
# Fallback: Enter-Taste
if not send_success:
logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter")
input_field.send_keys(Keys.RETURN)
time.sleep(0.3)
logger.debug(f"Nachricht gesendet: {text[:50]}...")
return True
except Exception as e:
logger.error(f"Fehler beim Senden mit Verzögerung: {e}")
return False
def _find_send_button(self):
"""Findet den Send-Button"""
selectors = [
"button[aria-label*='Send']",
"button[aria-label*='send']",
"button[data-testid*='send']",
"button[type='submit']",
# Claude.ai spezifisch - Button mit Pfeil-Icon
"button svg[class*='send']",
"button[class*='send']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed() and elem.is_enabled():
return elem
except:
continue
# Fallback: JavaScript-Suche
try:
return self.driver.execute_script("""
// Suche nach Send-Button
const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]');
if (btn && !btn.disabled) return btn;
// Alternative: Letzter Button im Input-Bereich
const buttons = document.querySelectorAll('button');
for (const b of buttons) {
if (b.offsetParent && !b.disabled) {
const text = b.textContent.toLowerCase();
const label = (b.getAttribute('aria-label') || '').toLowerCase();
if (text.includes('send') || label.includes('send')) return b;
}
}
return null;
""")
except:
return None
def _find_input_field(self):
"""Findet das Eingabefeld"""
selectors = [
self.SELECTORS["input_field"],
self.SELECTORS["input_textarea"],
"div[contenteditable='true']",
"textarea",
]
for selector in selectors:
try:
element = self.driver.find_element(By.CSS_SELECTOR, selector)
if element.is_displayed() and element.is_enabled():
return element
except NoSuchElementException:
continue
return None
def _insert_text_via_js(self, element, text: str):
"""
Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme).
Bei send_keys() werden physische Tasten gedrückt, was bei
unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu
falschen Zeichen führt (z.B. y↔z vertauscht).
Diese Methode fügt den Text direkt als String ein.
"""
# Escape für JavaScript
escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')
# Für contenteditable divs (ProseMirror)
self.driver.execute_script("""
const element = arguments[0];
const text = arguments[1];
// Fokussieren
element.focus();
// Methode 1: Für contenteditable (ProseMirror)
if (element.contentEditable === 'true') {
// Text als HTML einfügen (respektiert Zeilenumbrüche)
const htmlText = text.replace(/\\n/g, '<br>');
element.innerHTML = htmlText;
// Input-Event triggern damit React/Vue es mitbekommt
element.dispatchEvent(new Event('input', { bubbles: true }));
// Cursor ans Ende setzen
const range = document.createRange();
range.selectNodeContents(element);
range.collapse(false);
const sel = window.getSelection();
sel.removeAllRanges();
sel.addRange(range);
}
// Methode 2: Für textarea/input
else {
element.value = text;
element.dispatchEvent(new Event('input', { bubbles: true }));
element.dispatchEvent(new Event('change', { bubbles: true }));
}
""", element, text)
logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)")
def _wait_for_response(self, timeout: int = 60):
"""Wartet bis Claude fertig getippt hat"""
logger.debug("Warte auf Claudes Antwort...")
# Warte kurz damit Streaming startet
time.sleep(1)
# Warte bis Streaming endet
try:
WebDriverWait(self.driver, timeout).until_not(
EC.presence_of_element_located(
(By.CSS_SELECTOR, self.SELECTORS["streaming"])
)
)
except TimeoutException:
logger.warning("Timeout beim Warten auf Antwort")
time.sleep(0.5) # Kurz warten bis DOM aktualisiert
def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]:
"""
Holt neue Nachrichten aus dem Chat.
Args:
since_id: Nur Nachrichten nach dieser ID zurückgeben
Returns:
Liste neuer ChatMessage Objekte
"""
all_messages = self._get_all_messages()
if since_id is None:
return all_messages
# ════════════════════════════════════════════════════════════════
# STRATEGIE: Nutze immer den Index aus claude_msg_X
# Die IDs werden bei jedem Aufruf neu generiert, daher können wir
# nicht auf exakte ID-Übereinstimmung vertrauen.
# Stattdessen extrahieren wir den Index und geben alle Nachrichten
# NACH diesem Index zurück.
# ════════════════════════════════════════════════════════════════
if since_id and since_id.startswith("claude_msg_"):
try:
last_index = int(since_id.split("_")[-1])
# Zähle Claude-Nachrichten und gib nur die nach dem Index zurück
claude_msgs = [m for m in all_messages if m.is_from_assistant]
total_claude = len(claude_msgs)
if last_index < total_claude - 1:
# Es gibt neue Nachrichten!
new_claude_msgs = claude_msgs[last_index + 1:]
logger.debug(f"Index-basiert: {len(new_claude_msgs)} neue Claude-Nachrichten (Index {last_index + 1} bis {total_claude - 1})")
return new_claude_msgs
else:
# Keine neuen Nachrichten (last_index ist der letzte oder darüber)
logger.debug(f"Index-basiert: Keine neuen Nachrichten (last={last_index}, total={total_claude})")
return []
except (ValueError, IndexError) as e:
logger.warning(f"Fehler beim Parsen von since_id '{since_id}': {e}")
# Alte Hash-basierte IDs oder ungültiges Format
if since_id and since_id.startswith("claude_"):
logger.debug(f"since_id '{since_id[:30]}...' hat altes Format - überspringe alle")
return []
# Fallback: Exakte ID-Suche (für Nicht-Claude-IDs)
new_messages = []
found_marker = False
for msg in all_messages:
if found_marker:
new_messages.append(msg)
elif msg.id == since_id:
found_marker = True
return new_messages
def _get_all_messages(self) -> List[ChatMessage]:
"""
Holt alle Nachrichten aus dem Chat.
Claude.ai verwendet unterschiedliche data-testid für User und Assistant:
- data-testid="user-message" für User
- Für Claude: Wir suchen nach Nachrichten die NICHT user-message sind
"""
messages = []
try:
# ═══════════════════════════════════════════════════════════════
# STRATEGIE: Hole User UND Assistant Nachrichten SEPARAT
# ═══════════════════════════════════════════════════════════════
# 1. User-Nachrichten (haben data-testid="user-message")
user_elements = []
try:
user_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"[data-testid='user-message']"
)
logger.debug(f"User-Nachrichten gefunden: {len(user_elements)}")
except Exception as e:
logger.debug(f"User-Nachrichten Suche fehlgeschlagen: {e}")
# 2. Claude-Nachrichten suchen
# Claude.ai nutzt verschiedene Klassen, aber NICHT user-message
claude_elements = []
# Methode A: font-claude-message Klasse
try:
found = self.driver.find_elements(
By.CSS_SELECTOR,
"[class*='font-claude-message']"
)
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via font-claude-message: {len(found)}")
except:
pass
# Methode B: data-testid="assistant-message" (neue Claude.ai Version)
if not claude_elements:
try:
found = self.driver.find_elements(
By.CSS_SELECTOR,
"[data-testid='assistant-message']"
)
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via assistant-message: {len(found)}")
except:
pass
# Methode C: Prose-Container die NICHT in user-message sind
if not claude_elements:
try:
# Alle prose Elemente die nicht innerhalb von user-message sind
found = self.driver.execute_script("""
const msgs = [];
document.querySelectorAll('.prose').forEach(e => {
// Prüfe ob dieses Element NICHT in einem user-message Container ist
let parent = e;
let isUserMessage = false;
while (parent && parent !== document.body) {
if (parent.getAttribute('data-testid') === 'user-message') {
isUserMessage = true;
break;
}
parent = parent.parentElement;
}
if (!isUserMessage) {
msgs.push(e);
}
});
return msgs;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via prose (nicht-user): {len(found)}")
except Exception as e:
logger.debug(f"Prose-Suche fehlgeschlagen: {e}")
# Methode D: Suche nach conversation-turn Struktur (neue Claude.ai Version)
if not claude_elements:
try:
# Claude.ai 2024/2025 nutzt oft conversation-turn Divs
# WICHTIG: Wir suchen die INNERSTEN Elemente die mit "Claude sagt:" BEGINNEN
# und NICHT in der Sidebar/Navigation sind
found = self.driver.execute_script("""
const msgs = [];
// Suche nach Elementen die mit "Claude sagt:" BEGINNEN
const allDivs = document.querySelectorAll('div, p, span');
for (const elem of allDivs) {
// Überspringe wenn es ein user-message ist
if (elem.getAttribute('data-testid') === 'user-message') continue;
if (elem.closest('[data-testid="user-message"]')) continue;
// Überspringe Sidebar/Navigation Elemente
if (elem.closest('nav')) continue;
if (elem.closest('[role="navigation"]')) continue;
if (elem.closest('[class*="sidebar"]')) continue;
if (elem.closest('[class*="Sidebar"]')) continue;
// Hole den direkten Text-Inhalt (nicht von Kindern)
const text = (elem.innerText || '').trim();
// Text muss mit "Claude sagt:" BEGINNEN (nicht irgendwo enthalten)
// Das filtert Container-Elemente raus die die ganze Seite enthalten
if (text.startsWith('Claude sagt:') && text.length > 20 && text.length < 10000) {
msgs.push(elem);
}
}
// Dedupliziere: Behalte nur die INNERSTEN Elemente
// (die den Text direkt enthalten, nicht Container)
const filtered = msgs.filter(div => {
// Behalte dieses Element nur wenn es KEIN anderes Element enthält
for (const other of msgs) {
if (other !== div && div.contains(other)) {
// Dieses Element enthält ein anderes -> verwerfen (zu groß)
return false;
}
}
return true;
});
return filtered;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via Text-Suche: {len(found)}")
except Exception as e:
logger.debug(f"Text-Suche fehlgeschlagen: {e}")
# Methode E: Letzte Fallback - Klasse enthält "assistant" (UI-Filter)
if not claude_elements:
try:
found = self.driver.execute_script("""
const msgs = [];
document.querySelectorAll('[class*="assistant"]').forEach(e => {
// Filtere Buttons, kleine Elemente und UI-Komponenten
if (e.tagName === 'BUTTON') return;
if (e.querySelector('button')) return;
const text = (e.innerText || '').trim();
// Nur Elemente mit substantiellem Text
if (text.length > 20 && !text.startsWith('[TICK]')) {
msgs.push(e);
}
});
return msgs;
""") or []
if found:
claude_elements = found
logger.debug(f"Claude-Nachrichten via class-assistant: {len(found)}")
except Exception as e:
logger.debug(f"class-assistant-Suche fehlgeschlagen: {e}")
# ═══════════════════════════════════════════════════════════════
# DEBUG: Falls keine Claude-Nachrichten gefunden, DOM analysieren
# ═══════════════════════════════════════════════════════════════
if not claude_elements and user_elements:
try:
# Hole DOM-Info für Debugging
dom_info = self.driver.execute_script("""
const info = {
'data-testids': [],
'classes_with_message': [],
'classes_with_assistant': []
};
// Sammle alle data-testid Werte
document.querySelectorAll('[data-testid]').forEach(e => {
const testid = e.getAttribute('data-testid');
if (!info['data-testids'].includes(testid)) {
info['data-testids'].push(testid);
}
});
// Sammle Klassen die 'message' enthalten
document.querySelectorAll('[class*="message"]').forEach(e => {
const cls = e.className;
if (typeof cls === 'string' && !info['classes_with_message'].includes(cls.substring(0, 100))) {
info['classes_with_message'].push(cls.substring(0, 100));
}
});
// Sammle Klassen die 'assistant' enthalten
document.querySelectorAll('[class*="assistant"]').forEach(e => {
const cls = e.className;
if (typeof cls === 'string' && !info['classes_with_assistant'].includes(cls.substring(0, 100))) {
info['classes_with_assistant'].push(cls.substring(0, 100));
}
});
return info;
""")
logger.warning(f"KEINE Claude-Nachrichten gefunden! DOM-Info: data-testids={dom_info.get('data-testids', [])[:10]}")
logger.debug(f"Klassen mit 'message': {dom_info.get('classes_with_message', [])[:5]}")
logger.debug(f"Klassen mit 'assistant': {dom_info.get('classes_with_assistant', [])[:5]}")
except Exception as e:
logger.debug(f"DOM-Analyse fehlgeschlagen: {e}")
# ═══════════════════════════════════════════════════════════════
# Nachrichten verarbeiten
# ═══════════════════════════════════════════════════════════════
# User-Nachrichten hinzufügen
for i, elem in enumerate(user_elements):
try:
text = elem.text.strip()
if not text or len(text) < 3:
continue
msg_id = elem.get_attribute("data-message-id")
if not msg_id:
msg_id = f"user_{i}_{hash(text[:100])}"
messages.append(ChatMessage(
id=msg_id,
text=text,
is_from_assistant=False,
timestamp=time.time()
))
except Exception as e:
logger.debug(f"Fehler bei User-Nachricht {i}: {e}")
# Claude-Nachrichten hinzufügen
# WICHTIG: Filtere User-Befehle die fälschlicherweise erkannt werden
user_commands = [
'[TICK]', '[START]', '[FORWARD]', '[BACKWARD]', '[LEFT]', '[RIGHT]', '[STOP]',
'[LOOK_LEFT]', '[LOOK_RIGHT]', '[LOOK_UP]', '[LOOK_DOWN]', '[LOOK_CENTER]',
'[READY]'
]
for i, elem in enumerate(claude_elements):
try:
# Versuche zuerst innerText via JavaScript zu bekommen (zuverlässiger)
try:
text = self.driver.execute_script("return arguments[0].innerText || '';", elem).strip()
except:
text = elem.text.strip()
if not text or len(text) < 5:
logger.debug(f"Claude-Element {i} übersprungen: Text zu kurz ({len(text) if text else 0} Zeichen)")
continue
# Filtere User-Befehle (diese sind KEINE Claude-Nachrichten)
if text in user_commands:
logger.debug(f"Überspringe User-Befehl: {text}")
continue
# Filtere auch wenn der Text nur aus einem Befehl besteht (evtl. mit Whitespace)
text_upper = text.upper().strip()
if any(text_upper == cmd for cmd in user_commands):
logger.debug(f"Überspringe User-Befehl (normalized): {text}")
continue
# Prüfe ob es wirklich eine Claude-Nachricht ist (muss mit "Claude sagt:" BEGINNEN)
if not text.startswith("Claude sagt:"):
# Fallback: Prüfe ob "Claude sagt:" irgendwo am Anfang einer Zeile steht
lines = text.split('\n')
found_claude_line = False
for line in lines[:5]: # Nur erste 5 Zeilen prüfen
if line.strip().startswith("Claude sagt:"):
found_claude_line = True
break
if not found_claude_line:
logger.debug(f"Claude-Element {i} übersprungen: Beginnt nicht mit 'Claude sagt:'")
continue
# Filtere Sidebar/Navigation Elemente (beginnen oft mit "Neuer Chat")
if text.startswith("Neuer Chat") or text.startswith("Chats") or text.startswith("Projekte"):
logger.debug(f"Claude-Element {i} übersprungen: Ist Sidebar/Navigation")
continue
msg_id = elem.get_attribute("data-message-id")
if not msg_id:
# WICHTIG: Nutze einen ZÄHLER statt Hash!
# Der Zähler ist die Anzahl der bisherigen Claude-Nachrichten.
# Das ist stabil weil neue Nachrichten nur HINZUGEFÜGT werden.
# Der Hash war instabil weil der Text sich leicht ändern kann.
claude_msg_count = sum(1 for m in messages if m.is_from_assistant)
msg_id = f"claude_msg_{claude_msg_count}"
messages.append(ChatMessage(
id=msg_id,
text=text,
is_from_assistant=True,
timestamp=time.time()
))
logger.debug(f"Claude-Nachricht {claude_msg_count if 'claude_msg_count' in dir() else i}: '{text[:80]}...' ({len(text)} Zeichen)")
except Exception as e:
logger.debug(f"Fehler bei Claude-Nachricht {i}: {e}")
logger.debug(f"Gesamt: {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude")
except Exception as e:
logger.error(f"Fehler beim Lesen der Nachrichten: {e}")
return messages
def get_last_assistant_message(self) -> Optional[ChatMessage]:
"""Holt die letzte Nachricht von Claude"""
messages = self._get_all_messages()
for msg in reversed(messages):
if msg.is_from_assistant:
return msg
return None
def is_claude_typing(self) -> bool:
"""
Prüft ob Claude gerade tippt (streaming).
Erkennt mehrere Indikatoren:
1. Stop-Button ist sichtbar (während Claude schreibt)
2. data-is-streaming='true' Attribut
3. Animiertes Logo / Thinking-Indikator
"""
try:
# Methode 1: Stop-Button prüfen (zuverlässigster Indikator)
# Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button
stop_indicators = [
"button[aria-label*='Stop']",
"button[aria-label*='stop']",
"button[class*='stop']",
"button[data-testid*='stop']",
# Alternativer Indikator: Button mit Stop-Icon
"button svg[class*='stop']",
]
for selector in stop_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})")
return True
except:
continue
# Methode 2: Streaming-Attribut (original)
streaming = self.driver.find_elements(
By.CSS_SELECTOR,
self.SELECTORS["streaming"]
)
if len(streaming) > 0:
logger.debug("Claude tippt (streaming=true)")
return True
# Methode 3: Animiertes/Thinking Indikator suchen
thinking_indicators = [
"[class*='thinking']",
"[class*='loading']",
"[class*='typing']",
"[class*='streaming']",
"[data-state='loading']",
# Pulsierendes Logo
"[class*='pulse']",
"[class*='animate']",
]
for selector in thinking_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed():
logger.debug(f"Claude tippt (Indikator: {selector})")
return True
except:
continue
# Methode 4: JavaScript-basierte Prüfung
# Prüft ob irgendwo noch Text gestreamt wird
try:
is_streaming = self.driver.execute_script("""
// Prüfe ob Stop-Button existiert und sichtbar ist
const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]');
if (stopBtn && stopBtn.offsetParent !== null) return true;
// Prüfe auf streaming-Attribut
const streaming = document.querySelector('[data-is-streaming="true"]');
if (streaming) return true;
// Prüfe auf disabled Send-Button (während Claude tippt)
const sendBtn = document.querySelector('button[aria-label*="Send"]');
if (sendBtn && sendBtn.disabled) return true;
return false;
""")
if is_streaming:
logger.debug("Claude tippt (JavaScript-Check)")
return True
except:
pass
return False
except Exception as e:
logger.debug(f"Fehler bei typing-check: {e}")
return False
def wait_for_ready_signal(self, timeout: int = 120) -> bool:
"""
Wartet bis Claude [READY] sendet.
WICHTIG: Prüft nur die LETZTE Claude-Nachricht!
So werden alte [READY] im Chat-Verlauf ignoriert.
Args:
timeout: Maximale Wartezeit in Sekunden
Returns:
True wenn [READY] empfangen, False bei Timeout
"""
logger.info(f"Warte auf [READY] Signal in letzter Claude-Nachricht (max {timeout}s)...")
start_time = time.time()
# Merke die Anzahl der Claude-Nachrichten VOR dem Senden
initial_count = self._count_claude_messages()
logger.debug(f"Initiale Claude-Nachrichten: {initial_count}")
while time.time() - start_time < timeout:
# Warte bis Claude fertig ist mit Tippen
typing_wait_start = time.time()
while self.is_claude_typing():
time.sleep(0.5)
# Timeout für typing-wait (max 60s)
if time.time() - typing_wait_start > 60:
logger.debug("Typing-Wait Timeout, prüfe trotzdem...")
break
# Prüfe ob es eine NEUE Claude-Nachricht gibt
current_count = self._count_claude_messages()
if current_count > initial_count:
# Hole die letzte Claude-Nachricht
last_msg = self.get_last_assistant_message()
if last_msg and '[READY]' in last_msg.text.upper():
logger.info(f"[READY] in letzter Claude-Nachricht gefunden!")
return True
else:
logger.debug(f"Neue Nachricht aber kein [READY]: {last_msg.text[:50] if last_msg else 'None'}...")
# Kurz warten bevor nächster Check
time.sleep(1)
logger.warning(f"Timeout: Kein [READY] nach {timeout}s")
return False
def _count_claude_messages(self) -> int:
"""Zählt die Anzahl der Claude-Nachrichten im Chat"""
try:
messages = self._get_all_messages()
return sum(1 for m in messages if m.is_from_assistant)
except Exception as e:
logger.debug(f"Fehler beim Zählen: {e}")
return 0
def take_screenshot(self, path: str = "screenshot.png"):
"""Macht einen Screenshot (für Debugging)"""
self.driver.save_screenshot(path)
logger.info(f"Screenshot gespeichert: {path}")
def close(self):
"""Schließt den Browser"""
logger.info("Schließe Browser...")
try:
self.driver.quit()
except:
pass
# ════════════════════════════════════════════════════════════════════════
# BILD-UPLOAD FUNKTIONEN (für Robot Vision)
# ════════════════════════════════════════════════════════════════════════
def fetch_image_from_esp32(self) -> bool:
"""
Holt ein Bild vom ESP32/Mock-Server und speichert es lokal.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self.esp32_url:
logger.warning("Keine ESP32 URL konfiguriert")
return False
try:
# Capture-Endpoint aufrufen (macht Foto und gibt es zurück)
url = f"{self.esp32_url}/api/capture"
if self.esp32_api_key:
url += f"?key={self.esp32_api_key}"
response = self._http_session.get(url, timeout=10)
response.raise_for_status()
# Prüfe ob wir ein Bild bekommen haben
content_type = response.headers.get("Content-Type", "")
if "image" in content_type:
# Direktes Bild
with open(self._temp_image_path, "wb") as f:
f.write(response.content)
logger.info(f"Bild gespeichert: {len(response.content)} bytes")
return True
else:
# JSON Response (Mock-Server neuer Stil)
# Dann müssen wir /foto.jpg separat holen
foto_url = f"{self.esp32_url}/foto.jpg"
foto_response = self._http_session.get(foto_url, timeout=10)
foto_response.raise_for_status()
with open(self._temp_image_path, "wb") as f:
f.write(foto_response.content)
logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes")
return True
except requests.exceptions.RequestException as e:
logger.error(f"ESP32 Verbindungsfehler: {e}")
return False
except Exception as e:
logger.error(f"Fehler beim Bild holen: {e}")
return False
def upload_image_to_chat(self) -> bool:
"""
Lädt das gespeicherte Bild in den Claude.ai Chat hoch.
Returns:
True wenn erfolgreich, False bei Fehler
"""
if not self._temp_image_path.exists():
logger.error("Kein Bild zum Hochladen vorhanden")
return False
try:
# Finde das versteckte file input Element
file_input = self._find_file_input()
if not file_input:
logger.error("File-Upload Input nicht gefunden!")
return False
# Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs)
file_input.send_keys(str(self._temp_image_path.absolute()))
logger.info("Bild hochgeladen!")
self._images_uploaded += 1
# Kurz warten bis Upload verarbeitet ist
time.sleep(1.5)
return True
except Exception as e:
logger.error(f"Fehler beim Bild-Upload: {e}")
return False
def _calculate_image_hash(self) -> Optional[str]:
"""
Berechnet einen Hash des aktuellen Bildes.
Returns:
MD5 Hash als Hex-String, oder None bei Fehler
"""
if not self._temp_image_path.exists():
return None
try:
with open(self._temp_image_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except Exception as e:
logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}")
return None
def has_image_changed(self) -> bool:
"""
Prüft ob sich das Bild seit dem letzten Upload geändert hat.
WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat!
Diese Funktion hilft, unnötige Uploads zu vermeiden.
Returns:
True wenn Bild neu/geändert, False wenn identisch zum letzten
"""
current_hash = self._calculate_image_hash()
if current_hash is None:
return True # Kein Bild = als "geändert" behandeln
if self._last_image_hash is None:
return True # Erster Upload
return current_hash != self._last_image_hash
def upload_image_if_changed(self) -> bool:
"""
Lädt Bild nur hoch wenn es sich geändert hat.
Returns:
True wenn hochgeladen, False wenn übersprungen oder Fehler
"""
if not self.has_image_changed():
logger.debug("Bild unverändert - überspringe Upload")
return False
# Hash vor dem Upload speichern
new_hash = self._calculate_image_hash()
if self.upload_image_to_chat():
self._last_image_hash = new_hash
return True
return False
def delete_uploaded_images(self) -> int:
"""
Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen.
HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden!
Bereits gesendete Bilder können nicht gelöscht werden.
Returns:
Anzahl gelöschter Bilder
"""
deleted = 0
try:
# Suche nach Bild-Vorschau-Elementen im Eingabebereich
# Diese haben meist einen X/Close Button
preview_selectors = [
"button[aria-label*='Remove']",
"button[aria-label*='remove']",
"button[aria-label*='Delete']",
"button[aria-label*='delete']",
"[class*='preview'] button[class*='close']",
"[class*='attachment'] button[class*='remove']",
"[class*='file'] button[class*='delete']",
]
for selector in preview_selectors:
try:
buttons = self.driver.find_elements(By.CSS_SELECTOR, selector)
for btn in buttons:
if btn.is_displayed():
btn.click()
deleted += 1
time.sleep(0.3)
except:
continue
# JavaScript-Fallback
if deleted == 0:
try:
deleted = self.driver.execute_script("""
let count = 0;
// Suche nach Remove-Buttons in Attachment-Previews
const buttons = document.querySelectorAll(
'[class*="preview"] button, [class*="attachment"] button'
);
buttons.forEach(btn => {
if (btn.offsetParent) {
btn.click();
count++;
}
});
return count;
""") or 0
except:
pass
if deleted > 0:
logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht")
except Exception as e:
logger.debug(f"Bild-Löschung fehlgeschlagen: {e}")
return deleted
def get_images_uploaded_count(self) -> int:
"""Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück"""
return self._images_uploaded
def _find_file_input(self):
"""Findet das File-Upload Input Element"""
selectors = [
self.SELECTORS["file_input"],
"input[accept*='image']",
"input[type='file'][accept]",
"input[type='file']",
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
# Auch versteckte Inputs funktionieren mit send_keys
return elem
except:
continue
# Fallback: Via JavaScript suchen
try:
return self.driver.execute_script("""
return document.querySelector('input[type="file"]') ||
document.querySelector('[accept*="image"]');
""")
except:
return None
def send_tick_with_image(self) -> bool:
"""
Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK].
Das ist der Haupt-Heartbeat mit Bild!
Returns:
True wenn alles geklappt hat
"""
# Schritt 1: Bild vom ESP32 holen
if not self.fetch_image_from_esp32():
# Kein Bild? Trotzdem TICK senden
self.send_message("[TICK - KEIN BILD]")
return False
# Schritt 2: Bild in Chat hochladen
if not self.upload_image_to_chat():
self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]")
return False
# Schritt 3: TICK senden
self.send_message("[TICK]")
return True
# Hilfsfunktion für einfaches Testing
def test_interface(chat_url: str):
"""Testet das Interface"""
import sys
logging.basicConfig(level=logging.DEBUG)
print("Starte Chat Interface Test...")
print(f"URL: {chat_url}")
interface = ClaudeChatInterface(
chat_url=chat_url,
headless=False
)
print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...")
input()
interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!")
print("Nachricht gesendet!")
print("\nWarte 5 Sekunden auf Antwort...")
time.sleep(5)
messages = interface.get_new_messages()
print(f"\nGefundene Nachrichten: {len(messages)}")
for msg in messages[-3:]:
role = "Claude" if msg.is_from_assistant else "Human"
print(f" [{role}] {msg.text[:100]}...")
print("\nDrücke Enter zum Beenden...")
input()
interface.close()
print("Fertig!")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python chat_web_interface.py <claude-chat-url>")
print("Example: python chat_web_interface.py https://claude.ai/chat/abc123")
sys.exit(1)
test_interface(sys.argv[1])