""" Claude's Eyes - Chat Web Interface Steuert den echten Claude.ai Chat im Browser via Selenium. Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio! HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden, wenn Claude.ai sein UI ändert. """ import time import logging import tempfile import hashlib import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from dataclasses import dataclass from typing import List, Optional from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import TimeoutException, NoSuchElementException logger = logging.getLogger(__name__) @dataclass class ChatMessage: """Eine Chat-Nachricht""" id: str text: str is_from_assistant: bool timestamp: float = 0 class ClaudeChatInterface: """ Steuert Claude.ai Chat via Selenium Browser Automation. Diese Klasse: - Öffnet einen Browser mit dem Claude.ai Chat - Kann Nachrichten senden (für Heartbeat und Stefan's Sprache) - Kann neue Nachrichten lesen (für TTS) WICHTIG: Du musst beim ersten Start manuell einloggen! """ @staticmethod def extract_chat_id(url: str) -> Optional[str]: """ Extrahiert die Chat-ID aus einer Claude.ai URL. Args: url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d" Returns: Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None """ if not url or '/chat/' not in url: return None try: # Extrahiere alles nach /chat/ chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0] return chat_id if chat_id else None except: return None # CSS Selektoren für Claude.ai (Stand: Dezember 2025) # Diese müssen angepasst werden wenn sich das UI ändert! SELECTORS = { # Eingabefeld für neue Nachrichten "input_field": "div.ProseMirror[contenteditable='true']", # Alternativ: Textarea "input_textarea": "textarea[placeholder*='Message']", # Senden-Button (falls Enter nicht funktioniert) "send_button": "button[aria-label*='Send']", # Alle Nachrichten-Container "messages_container": "div[class*='conversation']", # Einzelne Nachrichten "message_human": "div[data-is-streaming='false'][class*='human']", "message_assistant": "div[data-is-streaming='false'][class*='assistant']", # Generischer Nachrichten-Selektor (Fallback) "message_any": "div[class*='message']", # Streaming-Indikator (Claude tippt noch) "streaming": "div[data-is-streaming='true']", # File Upload Input (versteckt, aber funktioniert mit send_keys) "file_input": "input[type='file']", } def __init__( self, chat_url: Optional[str] = None, headless: bool = False, user_data_dir: Optional[str] = None, chrome_binary: Optional[str] = None, esp32_url: Optional[str] = None, esp32_api_key: Optional[str] = None ): """ Initialisiert das Chat-Interface. Args: chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123) headless: Browser im Hintergrund? (False = sichtbar) user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins) chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux) esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture) esp32_api_key: API-Key für ESP32 Authentifizierung """ self.chat_url = chat_url self.esp32_url = esp32_url self.esp32_api_key = esp32_api_key self._message_cache: List[ChatMessage] = [] self._last_message_id = 0 self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg" # Für Bild-Deduplizierung (100 Bilder pro Chat Limit!) self._last_image_hash: Optional[str] = None self._images_uploaded = 0 # HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen) self._http_session = requests.Session() adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10) self._http_session.mount('http://', adapter) self._http_session.mount('https://', adapter) # Chrome Optionen options = webdriver.ChromeOptions() if headless: options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--window-size=1280,800") # Für persistente Sessions (Login bleibt gespeichert) if user_data_dir: options.add_argument(f"--user-data-dir={user_data_dir}") # Für Termux/Android if chrome_binary: options.binary_location = chrome_binary # Anti-Detection (manche Seiten blocken Selenium) options.add_argument("--disable-blink-features=AutomationControlled") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) logger.info("Starte Chrome Browser...") try: self.driver = webdriver.Chrome(options=options) self.driver.execute_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) except Exception as e: logger.error(f"Chrome konnte nicht gestartet werden: {e}") logger.info("Versuche mit webdriver-manager...") from webdriver_manager.chrome import ChromeDriverManager service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=options) self.wait = WebDriverWait(self.driver, 30) # Navigiere zur Chat-URL if chat_url: self.navigate_to_chat(chat_url) def navigate_to_chat(self, url: str): """Navigiert zur Chat-URL""" logger.info(f"Navigiere zu: {url}") self.driver.get(url) self.chat_url = url # Reset Bild-Counter bei neuem Chat self._last_image_hash = None self._images_uploaded = 0 # Warte auf Seitenladung time.sleep(3) # Prüfe ob Login nötig if "login" in self.driver.current_url.lower(): logger.warning("Login erforderlich! Bitte im Browser einloggen...") print("\n" + "=" * 50) print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!") print("Das Fenster bleibt offen. Nach dem Login geht's weiter.") print("=" * 50 + "\n") # Warte bis wieder auf der Chat-Seite while "login" in self.driver.current_url.lower(): time.sleep(2) logger.info("Login erfolgreich!") time.sleep(2) def start_new_chat(self) -> tuple[Optional[str], Optional[str]]: """ Startet einen neuen Chat auf Claude.ai. WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist! Returns: Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler """ try: # Alte Chat-ID merken bevor wir wechseln old_chat_id = self.extract_chat_id(self.chat_url) logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})") # Methode 1: "New Chat" Button suchen new_chat_selectors = [ "button[aria-label*='New']", "button[aria-label*='new']", "a[href='/new']", "a[href*='/chat/new']", "[data-testid*='new-chat']", "button:has-text('New chat')", ] for selector in new_chat_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): elem.click() time.sleep(2) new_url = self.driver.current_url logger.info(f"Neuer Chat gestartet: {new_url}") # Reset Counter self._last_image_hash = None self._images_uploaded = 0 self.chat_url = new_url return new_url, old_chat_id except: continue # Methode 2: Direkt zu /new navigieren base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai" new_url = f"{base_url}/new" logger.info(f"Versuche direkte Navigation zu: {new_url}") self.driver.get(new_url) time.sleep(3) # Prüfe ob es geklappt hat current = self.driver.current_url if current != self.chat_url: logger.info(f"Neuer Chat: {current}") self._last_image_hash = None self._images_uploaded = 0 self.chat_url = current return current, old_chat_id logger.warning("Konnte keinen neuen Chat starten") return None, None except Exception as e: logger.error(f"Fehler beim Starten eines neuen Chats: {e}") return None, None def get_current_chat_url(self) -> str: """Gibt die aktuelle Chat-URL zurück""" return self.driver.current_url def send_message(self, text: str, wait_for_response: bool = False) -> bool: """ Sendet eine Nachricht in den Chat. Args: text: Die zu sendende Nachricht wait_for_response: Warten bis Claude antwortet? Returns: True wenn erfolgreich gesendet """ # Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut) result = self.send_message_with_delay(text, delay_before_send=0) if result and wait_for_response: self._wait_for_response() return result def wait_for_input_field(self, timeout: int = 60) -> bool: """ Wartet bis das Chat-Eingabefeld verfügbar ist. Nützlich nach CAPTCHA/Login wenn das UI noch lädt. Args: timeout: Maximale Wartezeit in Sekunden Returns: True wenn Eingabefeld gefunden, False bei Timeout """ logger.info(f"Warte auf Chat-Eingabefeld (max {timeout}s)...") start_time = time.time() while time.time() - start_time < timeout: input_field = self._find_input_field() if input_field: try: if input_field.is_displayed() and input_field.is_enabled(): logger.info("Chat-Eingabefeld bereit!") return True except: pass time.sleep(1) logger.warning(f"Timeout: Eingabefeld nach {timeout}s nicht gefunden") return False def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool: """ Sendet eine Nachricht mit Verzögerung vor dem Absenden. Nützlich für große Texte (wie Instruktionen), bei denen die Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten. Ablauf: 0. Warte bis Eingabefeld verfügbar (für CAPTCHA/Login-Fälle) 1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!) 2. Warte delay_before_send Sekunden 3. Send-Button klicken (mit Retry) Args: text: Die zu sendende Nachricht delay_before_send: Sekunden warten nach Einfügen, vor dem Senden Returns: True wenn erfolgreich gesendet """ try: # WICHTIG: Warte auf Eingabefeld (CAPTCHA/Login kann dauern!) if not self.wait_for_input_field(timeout=120): logger.error("Eingabefeld nicht verfügbar nach 120s!") return False # Finde Eingabefeld input_field = self._find_input_field() if not input_field: logger.error("Eingabefeld nicht gefunden!") return False # Feld fokussieren input_field.click() time.sleep(0.2) # Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!) logger.info(f"Füge Text ein ({len(text)} Zeichen)...") self._insert_text_via_js(input_field, text) # WARTEN - große Texte brauchen Zeit! if delay_before_send > 0: logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...") time.sleep(delay_before_send) else: time.sleep(1.0) # Mindestens 1s warten # Jetzt absenden (mit Retry-Logik) send_success = False for attempt in range(3): send_button = self._find_send_button() if send_button: try: if send_button.is_enabled() and send_button.is_displayed(): send_button.click() logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})") send_success = True break except Exception as e: logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}") time.sleep(0.5) # Fallback: Enter-Taste if not send_success: logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter") input_field.send_keys(Keys.RETURN) time.sleep(0.3) logger.debug(f"Nachricht gesendet: {text[:50]}...") return True except Exception as e: logger.error(f"Fehler beim Senden mit Verzögerung: {e}") return False def _find_send_button(self): """Findet den Send-Button""" selectors = [ "button[aria-label*='Send']", "button[aria-label*='send']", "button[data-testid*='send']", "button[type='submit']", # Claude.ai spezifisch - Button mit Pfeil-Icon "button svg[class*='send']", "button[class*='send']", ] for selector in selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed() and elem.is_enabled(): return elem except: continue # Fallback: JavaScript-Suche try: return self.driver.execute_script(""" // Suche nach Send-Button const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]'); if (btn && !btn.disabled) return btn; // Alternative: Letzter Button im Input-Bereich const buttons = document.querySelectorAll('button'); for (const b of buttons) { if (b.offsetParent && !b.disabled) { const text = b.textContent.toLowerCase(); const label = (b.getAttribute('aria-label') || '').toLowerCase(); if (text.includes('send') || label.includes('send')) return b; } } return null; """) except: return None def _find_input_field(self): """Findet das Eingabefeld""" selectors = [ self.SELECTORS["input_field"], self.SELECTORS["input_textarea"], "div[contenteditable='true']", "textarea", ] for selector in selectors: try: element = self.driver.find_element(By.CSS_SELECTOR, selector) if element.is_displayed() and element.is_enabled(): return element except NoSuchElementException: continue return None def _insert_text_via_js(self, element, text: str): """ Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme). Bei send_keys() werden physische Tasten gedrückt, was bei unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu falschen Zeichen führt (z.B. y↔z vertauscht). Diese Methode fügt den Text direkt als String ein. """ # Escape für JavaScript escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${') # Für contenteditable divs (ProseMirror) self.driver.execute_script(""" const element = arguments[0]; const text = arguments[1]; // Fokussieren element.focus(); // Methode 1: Für contenteditable (ProseMirror) if (element.contentEditable === 'true') { // Text als HTML einfügen (respektiert Zeilenumbrüche) const htmlText = text.replace(/\\n/g, '
'); element.innerHTML = htmlText; // Input-Event triggern damit React/Vue es mitbekommt element.dispatchEvent(new Event('input', { bubbles: true })); // Cursor ans Ende setzen const range = document.createRange(); range.selectNodeContents(element); range.collapse(false); const sel = window.getSelection(); sel.removeAllRanges(); sel.addRange(range); } // Methode 2: Für textarea/input else { element.value = text; element.dispatchEvent(new Event('input', { bubbles: true })); element.dispatchEvent(new Event('change', { bubbles: true })); } """, element, text) logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)") def _wait_for_response(self, timeout: int = 60): """Wartet bis Claude fertig getippt hat""" logger.debug("Warte auf Claudes Antwort...") # Warte kurz damit Streaming startet time.sleep(1) # Warte bis Streaming endet try: WebDriverWait(self.driver, timeout).until_not( EC.presence_of_element_located( (By.CSS_SELECTOR, self.SELECTORS["streaming"]) ) ) except TimeoutException: logger.warning("Timeout beim Warten auf Antwort") time.sleep(0.5) # Kurz warten bis DOM aktualisiert def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]: """ Holt neue Nachrichten aus dem Chat. Args: since_id: Nur Nachrichten nach dieser ID zurückgeben Returns: Liste neuer ChatMessage Objekte """ all_messages = self._get_all_messages() if since_id is None: return all_messages # ════════════════════════════════════════════════════════════════ # STRATEGIE: Nutze immer den Index aus claude_msg_X # Die IDs werden bei jedem Aufruf neu generiert, daher können wir # nicht auf exakte ID-Übereinstimmung vertrauen. # Stattdessen extrahieren wir den Index und geben alle Nachrichten # NACH diesem Index zurück. # ════════════════════════════════════════════════════════════════ if since_id and since_id.startswith("claude_msg_"): try: last_index = int(since_id.split("_")[-1]) # Zähle Claude-Nachrichten und gib nur die nach dem Index zurück claude_msgs = [m for m in all_messages if m.is_from_assistant] total_claude = len(claude_msgs) if last_index < total_claude - 1: # Es gibt neue Nachrichten! new_claude_msgs = claude_msgs[last_index + 1:] logger.debug(f"Index-basiert: {len(new_claude_msgs)} neue Claude-Nachrichten (Index {last_index + 1} bis {total_claude - 1})") return new_claude_msgs else: # Keine neuen Nachrichten (last_index ist der letzte oder darüber) logger.debug(f"Index-basiert: Keine neuen Nachrichten (last={last_index}, total={total_claude})") return [] except (ValueError, IndexError) as e: logger.warning(f"Fehler beim Parsen von since_id '{since_id}': {e}") # Alte Hash-basierte IDs oder ungültiges Format if since_id and since_id.startswith("claude_"): logger.debug(f"since_id '{since_id[:30]}...' hat altes Format - überspringe alle") return [] # Fallback: Exakte ID-Suche (für Nicht-Claude-IDs) new_messages = [] found_marker = False for msg in all_messages: if found_marker: new_messages.append(msg) elif msg.id == since_id: found_marker = True return new_messages def _get_all_messages(self) -> List[ChatMessage]: """ Holt alle Nachrichten aus dem Chat. Claude.ai verwendet unterschiedliche data-testid für User und Assistant: - data-testid="user-message" für User - Für Claude: Wir suchen nach Nachrichten die NICHT user-message sind """ messages = [] try: # ═══════════════════════════════════════════════════════════════ # STRATEGIE: Hole User UND Assistant Nachrichten SEPARAT # ═══════════════════════════════════════════════════════════════ # 1. User-Nachrichten (haben data-testid="user-message") user_elements = [] try: user_elements = self.driver.find_elements( By.CSS_SELECTOR, "[data-testid='user-message']" ) logger.debug(f"User-Nachrichten gefunden: {len(user_elements)}") except Exception as e: logger.debug(f"User-Nachrichten Suche fehlgeschlagen: {e}") # 2. Claude-Nachrichten suchen # Claude.ai nutzt verschiedene Klassen, aber NICHT user-message claude_elements = [] # Methode A: font-claude-message Klasse try: found = self.driver.find_elements( By.CSS_SELECTOR, "[class*='font-claude-message']" ) if found: claude_elements = found logger.debug(f"Claude-Nachrichten via font-claude-message: {len(found)}") except: pass # Methode B: data-testid="assistant-message" (neue Claude.ai Version) if not claude_elements: try: found = self.driver.find_elements( By.CSS_SELECTOR, "[data-testid='assistant-message']" ) if found: claude_elements = found logger.debug(f"Claude-Nachrichten via assistant-message: {len(found)}") except: pass # Methode C: Prose-Container die NICHT in user-message sind if not claude_elements: try: # Alle prose Elemente die nicht innerhalb von user-message sind found = self.driver.execute_script(""" const msgs = []; document.querySelectorAll('.prose').forEach(e => { // Prüfe ob dieses Element NICHT in einem user-message Container ist let parent = e; let isUserMessage = false; while (parent && parent !== document.body) { if (parent.getAttribute('data-testid') === 'user-message') { isUserMessage = true; break; } parent = parent.parentElement; } if (!isUserMessage) { msgs.push(e); } }); return msgs; """) or [] if found: claude_elements = found logger.debug(f"Claude-Nachrichten via prose (nicht-user): {len(found)}") except Exception as e: logger.debug(f"Prose-Suche fehlgeschlagen: {e}") # Methode D: Suche nach conversation-turn Struktur (neue Claude.ai Version) if not claude_elements: try: # Claude.ai 2024/2025 nutzt oft conversation-turn Divs # WICHTIG: Wir suchen die INNERSTEN Elemente die mit "Claude sagt:" BEGINNEN # und NICHT in der Sidebar/Navigation sind found = self.driver.execute_script(""" const msgs = []; // Suche nach Elementen die mit "Claude sagt:" BEGINNEN const allDivs = document.querySelectorAll('div, p, span'); for (const elem of allDivs) { // Überspringe wenn es ein user-message ist if (elem.getAttribute('data-testid') === 'user-message') continue; if (elem.closest('[data-testid="user-message"]')) continue; // Überspringe Sidebar/Navigation Elemente if (elem.closest('nav')) continue; if (elem.closest('[role="navigation"]')) continue; if (elem.closest('[class*="sidebar"]')) continue; if (elem.closest('[class*="Sidebar"]')) continue; // Hole den direkten Text-Inhalt (nicht von Kindern) const text = (elem.innerText || '').trim(); // Text muss mit "Claude sagt:" BEGINNEN (nicht irgendwo enthalten) // Das filtert Container-Elemente raus die die ganze Seite enthalten if (text.startsWith('Claude sagt:') && text.length > 20 && text.length < 10000) { msgs.push(elem); } } // Dedupliziere: Behalte nur die INNERSTEN Elemente // (die den Text direkt enthalten, nicht Container) const filtered = msgs.filter(div => { // Behalte dieses Element nur wenn es KEIN anderes Element enthält for (const other of msgs) { if (other !== div && div.contains(other)) { // Dieses Element enthält ein anderes -> verwerfen (zu groß) return false; } } return true; }); return filtered; """) or [] if found: claude_elements = found logger.debug(f"Claude-Nachrichten via Text-Suche: {len(found)}") except Exception as e: logger.debug(f"Text-Suche fehlgeschlagen: {e}") # Methode E: Letzte Fallback - Klasse enthält "assistant" (UI-Filter) if not claude_elements: try: found = self.driver.execute_script(""" const msgs = []; document.querySelectorAll('[class*="assistant"]').forEach(e => { // Filtere Buttons, kleine Elemente und UI-Komponenten if (e.tagName === 'BUTTON') return; if (e.querySelector('button')) return; const text = (e.innerText || '').trim(); // Nur Elemente mit substantiellem Text if (text.length > 20 && !text.startsWith('[TICK]')) { msgs.push(e); } }); return msgs; """) or [] if found: claude_elements = found logger.debug(f"Claude-Nachrichten via class-assistant: {len(found)}") except Exception as e: logger.debug(f"class-assistant-Suche fehlgeschlagen: {e}") # ═══════════════════════════════════════════════════════════════ # DEBUG: Falls keine Claude-Nachrichten gefunden, DOM analysieren # ═══════════════════════════════════════════════════════════════ if not claude_elements and user_elements: try: # Hole DOM-Info für Debugging dom_info = self.driver.execute_script(""" const info = { 'data-testids': [], 'classes_with_message': [], 'classes_with_assistant': [] }; // Sammle alle data-testid Werte document.querySelectorAll('[data-testid]').forEach(e => { const testid = e.getAttribute('data-testid'); if (!info['data-testids'].includes(testid)) { info['data-testids'].push(testid); } }); // Sammle Klassen die 'message' enthalten document.querySelectorAll('[class*="message"]').forEach(e => { const cls = e.className; if (typeof cls === 'string' && !info['classes_with_message'].includes(cls.substring(0, 100))) { info['classes_with_message'].push(cls.substring(0, 100)); } }); // Sammle Klassen die 'assistant' enthalten document.querySelectorAll('[class*="assistant"]').forEach(e => { const cls = e.className; if (typeof cls === 'string' && !info['classes_with_assistant'].includes(cls.substring(0, 100))) { info['classes_with_assistant'].push(cls.substring(0, 100)); } }); return info; """) logger.warning(f"KEINE Claude-Nachrichten gefunden! DOM-Info: data-testids={dom_info.get('data-testids', [])[:10]}") logger.debug(f"Klassen mit 'message': {dom_info.get('classes_with_message', [])[:5]}") logger.debug(f"Klassen mit 'assistant': {dom_info.get('classes_with_assistant', [])[:5]}") except Exception as e: logger.debug(f"DOM-Analyse fehlgeschlagen: {e}") # ═══════════════════════════════════════════════════════════════ # Nachrichten verarbeiten # ═══════════════════════════════════════════════════════════════ # User-Nachrichten hinzufügen for i, elem in enumerate(user_elements): try: text = elem.text.strip() if not text or len(text) < 3: continue msg_id = elem.get_attribute("data-message-id") if not msg_id: msg_id = f"user_{i}_{hash(text[:100])}" messages.append(ChatMessage( id=msg_id, text=text, is_from_assistant=False, timestamp=time.time() )) except Exception as e: logger.debug(f"Fehler bei User-Nachricht {i}: {e}") # Claude-Nachrichten hinzufügen # WICHTIG: Filtere User-Befehle die fälschlicherweise erkannt werden user_commands = [ '[TICK]', '[START]', '[FORWARD]', '[BACKWARD]', '[LEFT]', '[RIGHT]', '[STOP]', '[LOOK_LEFT]', '[LOOK_RIGHT]', '[LOOK_UP]', '[LOOK_DOWN]', '[LOOK_CENTER]', '[READY]' ] for i, elem in enumerate(claude_elements): try: # Versuche zuerst innerText via JavaScript zu bekommen (zuverlässiger) try: text = self.driver.execute_script("return arguments[0].innerText || '';", elem).strip() except: text = elem.text.strip() if not text or len(text) < 5: logger.debug(f"Claude-Element {i} übersprungen: Text zu kurz ({len(text) if text else 0} Zeichen)") continue # Filtere User-Befehle (diese sind KEINE Claude-Nachrichten) if text in user_commands: logger.debug(f"Überspringe User-Befehl: {text}") continue # Filtere auch wenn der Text nur aus einem Befehl besteht (evtl. mit Whitespace) text_upper = text.upper().strip() if any(text_upper == cmd for cmd in user_commands): logger.debug(f"Überspringe User-Befehl (normalized): {text}") continue # Prüfe ob es wirklich eine Claude-Nachricht ist (muss mit "Claude sagt:" BEGINNEN) if not text.startswith("Claude sagt:"): # Fallback: Prüfe ob "Claude sagt:" irgendwo am Anfang einer Zeile steht lines = text.split('\n') found_claude_line = False for line in lines[:5]: # Nur erste 5 Zeilen prüfen if line.strip().startswith("Claude sagt:"): found_claude_line = True break if not found_claude_line: logger.debug(f"Claude-Element {i} übersprungen: Beginnt nicht mit 'Claude sagt:'") continue # Filtere Sidebar/Navigation Elemente (beginnen oft mit "Neuer Chat") if text.startswith("Neuer Chat") or text.startswith("Chats") or text.startswith("Projekte"): logger.debug(f"Claude-Element {i} übersprungen: Ist Sidebar/Navigation") continue msg_id = elem.get_attribute("data-message-id") if not msg_id: # WICHTIG: Nutze einen ZÄHLER statt Hash! # Der Zähler ist die Anzahl der bisherigen Claude-Nachrichten. # Das ist stabil weil neue Nachrichten nur HINZUGEFÜGT werden. # Der Hash war instabil weil der Text sich leicht ändern kann. claude_msg_count = sum(1 for m in messages if m.is_from_assistant) msg_id = f"claude_msg_{claude_msg_count}" messages.append(ChatMessage( id=msg_id, text=text, is_from_assistant=True, timestamp=time.time() )) logger.debug(f"Claude-Nachricht {claude_msg_count if 'claude_msg_count' in dir() else i}: '{text[:80]}...' ({len(text)} Zeichen)") except Exception as e: logger.debug(f"Fehler bei Claude-Nachricht {i}: {e}") logger.debug(f"Gesamt: {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude") except Exception as e: logger.error(f"Fehler beim Lesen der Nachrichten: {e}") return messages def get_last_assistant_message(self) -> Optional[ChatMessage]: """Holt die letzte Nachricht von Claude""" messages = self._get_all_messages() for msg in reversed(messages): if msg.is_from_assistant: return msg return None def is_claude_typing(self) -> bool: """ Prüft ob Claude gerade tippt (streaming). Erkennt mehrere Indikatoren: 1. Stop-Button ist sichtbar (während Claude schreibt) 2. data-is-streaming='true' Attribut 3. Animiertes Logo / Thinking-Indikator """ try: # Methode 1: Stop-Button prüfen (zuverlässigster Indikator) # Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button stop_indicators = [ "button[aria-label*='Stop']", "button[aria-label*='stop']", "button[class*='stop']", "button[data-testid*='stop']", # Alternativer Indikator: Button mit Stop-Icon "button svg[class*='stop']", ] for selector in stop_indicators: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})") return True except: continue # Methode 2: Streaming-Attribut (original) streaming = self.driver.find_elements( By.CSS_SELECTOR, self.SELECTORS["streaming"] ) if len(streaming) > 0: logger.debug("Claude tippt (streaming=true)") return True # Methode 3: Animiertes/Thinking Indikator suchen thinking_indicators = [ "[class*='thinking']", "[class*='loading']", "[class*='typing']", "[class*='streaming']", "[data-state='loading']", # Pulsierendes Logo "[class*='pulse']", "[class*='animate']", ] for selector in thinking_indicators: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): logger.debug(f"Claude tippt (Indikator: {selector})") return True except: continue # Methode 4: JavaScript-basierte Prüfung # Prüft ob irgendwo noch Text gestreamt wird try: is_streaming = self.driver.execute_script(""" // Prüfe ob Stop-Button existiert und sichtbar ist const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]'); if (stopBtn && stopBtn.offsetParent !== null) return true; // Prüfe auf streaming-Attribut const streaming = document.querySelector('[data-is-streaming="true"]'); if (streaming) return true; // Prüfe auf disabled Send-Button (während Claude tippt) const sendBtn = document.querySelector('button[aria-label*="Send"]'); if (sendBtn && sendBtn.disabled) return true; return false; """) if is_streaming: logger.debug("Claude tippt (JavaScript-Check)") return True except: pass return False except Exception as e: logger.debug(f"Fehler bei typing-check: {e}") return False def wait_for_ready_signal(self, timeout: int = 120, stop_check: callable = None) -> bool: """ Wartet bis Claude [READY] sendet. WICHTIG: Prüft nur die LETZTE Claude-Nachricht! So werden alte [READY] im Chat-Verlauf ignoriert. Args: timeout: Maximale Wartezeit in Sekunden stop_check: Optionale Funktion die True zurückgibt wenn abgebrochen werden soll (z.B. lambda: not bridge.running) Returns: True wenn [READY] empfangen, False bei Timeout oder Abbruch """ logger.info(f"Warte auf [READY] Signal in letzter Claude-Nachricht (max {timeout}s)...") start_time = time.time() # Merke die Anzahl der Claude-Nachrichten VOR dem Senden initial_count = self._count_claude_messages() logger.debug(f"Initiale Claude-Nachrichten: {initial_count}") while time.time() - start_time < timeout: # Prüfe ob abgebrochen werden soll (z.B. Q gedrückt) if stop_check and stop_check(): logger.info("wait_for_ready abgebrochen (stop_check)") return False # Warte bis Claude fertig ist mit Tippen typing_wait_start = time.time() while self.is_claude_typing(): time.sleep(0.5) # Prüfe auch hier auf Abbruch if stop_check and stop_check(): logger.info("wait_for_ready abgebrochen während typing-wait") return False # Timeout für typing-wait (max 60s) if time.time() - typing_wait_start > 60: logger.debug("Typing-Wait Timeout, prüfe trotzdem...") break # Prüfe ob es eine NEUE Claude-Nachricht gibt current_count = self._count_claude_messages() if current_count > initial_count: # Hole die letzte Claude-Nachricht last_msg = self.get_last_assistant_message() if last_msg and '[READY]' in last_msg.text.upper(): logger.info(f"[READY] in letzter Claude-Nachricht gefunden!") return True else: logger.debug(f"Neue Nachricht aber kein [READY]: {last_msg.text[:50] if last_msg else 'None'}...") # Kurz warten bevor nächster Check time.sleep(1) logger.warning(f"Timeout: Kein [READY] nach {timeout}s") return False def _count_claude_messages(self) -> int: """Zählt die Anzahl der Claude-Nachrichten im Chat""" try: messages = self._get_all_messages() return sum(1 for m in messages if m.is_from_assistant) except Exception as e: logger.debug(f"Fehler beim Zählen: {e}") return 0 def take_screenshot(self, path: str = "screenshot.png"): """Macht einen Screenshot (für Debugging)""" self.driver.save_screenshot(path) logger.info(f"Screenshot gespeichert: {path}") def close(self): """Schließt den Browser""" logger.info("Schließe Browser...") try: self.driver.quit() except: pass # ════════════════════════════════════════════════════════════════════════ # BILD-UPLOAD FUNKTIONEN (für Robot Vision) # ════════════════════════════════════════════════════════════════════════ def fetch_image_from_esp32(self) -> bool: """ Holt ein Bild vom ESP32/Mock-Server und speichert es lokal. Returns: True wenn erfolgreich, False bei Fehler """ if not self.esp32_url: logger.warning("Keine ESP32 URL konfiguriert") return False try: # Capture-Endpoint aufrufen (macht Foto und gibt es zurück) url = f"{self.esp32_url}/api/capture" if self.esp32_api_key: url += f"?key={self.esp32_api_key}" response = self._http_session.get(url, timeout=10) response.raise_for_status() # Prüfe ob wir ein Bild bekommen haben content_type = response.headers.get("Content-Type", "") if "image" in content_type: # Direktes Bild with open(self._temp_image_path, "wb") as f: f.write(response.content) logger.info(f"Bild gespeichert: {len(response.content)} bytes") return True else: # JSON Response (Mock-Server neuer Stil) # Dann müssen wir /foto.jpg separat holen foto_url = f"{self.esp32_url}/foto.jpg" foto_response = self._http_session.get(foto_url, timeout=10) foto_response.raise_for_status() with open(self._temp_image_path, "wb") as f: f.write(foto_response.content) logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes") return True except requests.exceptions.RequestException as e: logger.error(f"ESP32 Verbindungsfehler: {e}") return False except Exception as e: logger.error(f"Fehler beim Bild holen: {e}") return False def upload_image_to_chat(self) -> bool: """ Lädt das gespeicherte Bild in den Claude.ai Chat hoch. Returns: True wenn erfolgreich, False bei Fehler """ if not self._temp_image_path.exists(): logger.error("Kein Bild zum Hochladen vorhanden") return False try: # Finde das versteckte file input Element file_input = self._find_file_input() if not file_input: logger.error("File-Upload Input nicht gefunden!") return False # Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs) file_input.send_keys(str(self._temp_image_path.absolute())) logger.info("Bild hochgeladen!") self._images_uploaded += 1 # Kurz warten bis Upload verarbeitet ist time.sleep(1.5) return True except Exception as e: logger.error(f"Fehler beim Bild-Upload: {e}") return False def _calculate_image_hash(self) -> Optional[str]: """ Berechnet einen Hash des aktuellen Bildes. Returns: MD5 Hash als Hex-String, oder None bei Fehler """ if not self._temp_image_path.exists(): return None try: with open(self._temp_image_path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception as e: logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}") return None def has_image_changed(self) -> bool: """ Prüft ob sich das Bild seit dem letzten Upload geändert hat. WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat! Diese Funktion hilft, unnötige Uploads zu vermeiden. Returns: True wenn Bild neu/geändert, False wenn identisch zum letzten """ current_hash = self._calculate_image_hash() if current_hash is None: return True # Kein Bild = als "geändert" behandeln if self._last_image_hash is None: return True # Erster Upload return current_hash != self._last_image_hash def upload_image_if_changed(self) -> bool: """ Lädt Bild nur hoch wenn es sich geändert hat. Returns: True wenn hochgeladen, False wenn übersprungen oder Fehler """ if not self.has_image_changed(): logger.debug("Bild unverändert - überspringe Upload") return False # Hash vor dem Upload speichern new_hash = self._calculate_image_hash() if self.upload_image_to_chat(): self._last_image_hash = new_hash return True return False def delete_uploaded_images(self) -> int: """ Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen. HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden! Bereits gesendete Bilder können nicht gelöscht werden. Returns: Anzahl gelöschter Bilder """ deleted = 0 try: # Suche nach Bild-Vorschau-Elementen im Eingabebereich # Diese haben meist einen X/Close Button preview_selectors = [ "button[aria-label*='Remove']", "button[aria-label*='remove']", "button[aria-label*='Delete']", "button[aria-label*='delete']", "[class*='preview'] button[class*='close']", "[class*='attachment'] button[class*='remove']", "[class*='file'] button[class*='delete']", ] for selector in preview_selectors: try: buttons = self.driver.find_elements(By.CSS_SELECTOR, selector) for btn in buttons: if btn.is_displayed(): btn.click() deleted += 1 time.sleep(0.3) except: continue # JavaScript-Fallback if deleted == 0: try: deleted = self.driver.execute_script(""" let count = 0; // Suche nach Remove-Buttons in Attachment-Previews const buttons = document.querySelectorAll( '[class*="preview"] button, [class*="attachment"] button' ); buttons.forEach(btn => { if (btn.offsetParent) { btn.click(); count++; } }); return count; """) or 0 except: pass if deleted > 0: logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht") except Exception as e: logger.debug(f"Bild-Löschung fehlgeschlagen: {e}") return deleted def get_images_uploaded_count(self) -> int: """Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück""" return self._images_uploaded def _find_file_input(self): """Findet das File-Upload Input Element""" selectors = [ self.SELECTORS["file_input"], "input[accept*='image']", "input[type='file'][accept]", "input[type='file']", ] for selector in selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: # Auch versteckte Inputs funktionieren mit send_keys return elem except: continue # Fallback: Via JavaScript suchen try: return self.driver.execute_script(""" return document.querySelector('input[type="file"]') || document.querySelector('[accept*="image"]'); """) except: return None def send_tick_with_image(self) -> bool: """ Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK]. Das ist der Haupt-Heartbeat mit Bild! Returns: True wenn alles geklappt hat """ # Schritt 1: Bild vom ESP32 holen if not self.fetch_image_from_esp32(): # Kein Bild? Trotzdem TICK senden self.send_message("[TICK - KEIN BILD]") return False # Schritt 2: Bild in Chat hochladen if not self.upload_image_to_chat(): self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]") return False # Schritt 3: TICK senden self.send_message("[TICK]") return True # Hilfsfunktion für einfaches Testing def test_interface(chat_url: str): """Testet das Interface""" import sys logging.basicConfig(level=logging.DEBUG) print("Starte Chat Interface Test...") print(f"URL: {chat_url}") interface = ClaudeChatInterface( chat_url=chat_url, headless=False ) print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...") input() interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!") print("Nachricht gesendet!") print("\nWarte 5 Sekunden auf Antwort...") time.sleep(5) messages = interface.get_new_messages() print(f"\nGefundene Nachrichten: {len(messages)}") for msg in messages[-3:]: role = "Claude" if msg.is_from_assistant else "Human" print(f" [{role}] {msg.text[:100]}...") print("\nDrücke Enter zum Beenden...") input() interface.close() print("Fertig!") if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python chat_web_interface.py ") print("Example: python chat_web_interface.py https://claude.ai/chat/abc123") sys.exit(1) test_interface(sys.argv[1])