""" Claude's Eyes - Chat Web Interface Steuert den echten Claude.ai Chat im Browser via Selenium. Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio! HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden, wenn Claude.ai sein UI ändert. """ import time import logging import tempfile import hashlib import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from dataclasses import dataclass from typing import List, Optional from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import TimeoutException, NoSuchElementException logger = logging.getLogger(__name__) @dataclass class ChatMessage: """Eine Chat-Nachricht""" id: str text: str is_from_assistant: bool timestamp: float = 0 class ClaudeChatInterface: """ Steuert Claude.ai Chat via Selenium Browser Automation. Diese Klasse: - Öffnet einen Browser mit dem Claude.ai Chat - Kann Nachrichten senden (für Heartbeat und Stefan's Sprache) - Kann neue Nachrichten lesen (für TTS) WICHTIG: Du musst beim ersten Start manuell einloggen! """ @staticmethod def extract_chat_id(url: str) -> Optional[str]: """ Extrahiert die Chat-ID aus einer Claude.ai URL. Args: url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d" Returns: Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None """ if not url or '/chat/' not in url: return None try: # Extrahiere alles nach /chat/ chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0] return chat_id if chat_id else None except: return None # CSS Selektoren für Claude.ai (Stand: Dezember 2025) # Diese müssen angepasst werden wenn sich das UI ändert! SELECTORS = { # Eingabefeld für neue Nachrichten "input_field": "div.ProseMirror[contenteditable='true']", # Alternativ: Textarea "input_textarea": "textarea[placeholder*='Message']", # Senden-Button (falls Enter nicht funktioniert) "send_button": "button[aria-label*='Send']", # Alle Nachrichten-Container "messages_container": "div[class*='conversation']", # Einzelne Nachrichten "message_human": "div[data-is-streaming='false'][class*='human']", "message_assistant": "div[data-is-streaming='false'][class*='assistant']", # Generischer Nachrichten-Selektor (Fallback) "message_any": "div[class*='message']", # Streaming-Indikator (Claude tippt noch) "streaming": "div[data-is-streaming='true']", # File Upload Input (versteckt, aber funktioniert mit send_keys) "file_input": "input[type='file']", } def __init__( self, chat_url: Optional[str] = None, headless: bool = False, user_data_dir: Optional[str] = None, chrome_binary: Optional[str] = None, esp32_url: Optional[str] = None, esp32_api_key: Optional[str] = None ): """ Initialisiert das Chat-Interface. Args: chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123) headless: Browser im Hintergrund? (False = sichtbar) user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins) chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux) esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture) esp32_api_key: API-Key für ESP32 Authentifizierung """ self.chat_url = chat_url self.esp32_url = esp32_url self.esp32_api_key = esp32_api_key self._message_cache: List[ChatMessage] = [] self._last_message_id = 0 self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg" # Für Bild-Deduplizierung (100 Bilder pro Chat Limit!) self._last_image_hash: Optional[str] = None self._images_uploaded = 0 # HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen) self._http_session = requests.Session() adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10) self._http_session.mount('http://', adapter) self._http_session.mount('https://', adapter) # Chrome Optionen options = webdriver.ChromeOptions() if headless: options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--window-size=1280,800") # Für persistente Sessions (Login bleibt gespeichert) if user_data_dir: options.add_argument(f"--user-data-dir={user_data_dir}") # Für Termux/Android if chrome_binary: options.binary_location = chrome_binary # Anti-Detection (manche Seiten blocken Selenium) options.add_argument("--disable-blink-features=AutomationControlled") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) logger.info("Starte Chrome Browser...") try: self.driver = webdriver.Chrome(options=options) self.driver.execute_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) except Exception as e: logger.error(f"Chrome konnte nicht gestartet werden: {e}") logger.info("Versuche mit webdriver-manager...") from webdriver_manager.chrome import ChromeDriverManager service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=options) self.wait = WebDriverWait(self.driver, 30) # Navigiere zur Chat-URL if chat_url: self.navigate_to_chat(chat_url) def navigate_to_chat(self, url: str): """Navigiert zur Chat-URL""" logger.info(f"Navigiere zu: {url}") self.driver.get(url) self.chat_url = url # Reset Bild-Counter bei neuem Chat self._last_image_hash = None self._images_uploaded = 0 # Warte auf Seitenladung time.sleep(3) # Prüfe ob Login nötig if "login" in self.driver.current_url.lower(): logger.warning("Login erforderlich! Bitte im Browser einloggen...") print("\n" + "=" * 50) print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!") print("Das Fenster bleibt offen. Nach dem Login geht's weiter.") print("=" * 50 + "\n") # Warte bis wieder auf der Chat-Seite while "login" in self.driver.current_url.lower(): time.sleep(2) logger.info("Login erfolgreich!") time.sleep(2) def start_new_chat(self) -> tuple[Optional[str], Optional[str]]: """ Startet einen neuen Chat auf Claude.ai. WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist! Returns: Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler """ try: # Alte Chat-ID merken bevor wir wechseln old_chat_id = self.extract_chat_id(self.chat_url) logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})") # Methode 1: "New Chat" Button suchen new_chat_selectors = [ "button[aria-label*='New']", "button[aria-label*='new']", "a[href='/new']", "a[href*='/chat/new']", "[data-testid*='new-chat']", "button:has-text('New chat')", ] for selector in new_chat_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): elem.click() time.sleep(2) new_url = self.driver.current_url logger.info(f"Neuer Chat gestartet: {new_url}") # Reset Counter self._last_image_hash = None self._images_uploaded = 0 self.chat_url = new_url return new_url, old_chat_id except: continue # Methode 2: Direkt zu /new navigieren base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai" new_url = f"{base_url}/new" logger.info(f"Versuche direkte Navigation zu: {new_url}") self.driver.get(new_url) time.sleep(3) # Prüfe ob es geklappt hat current = self.driver.current_url if current != self.chat_url: logger.info(f"Neuer Chat: {current}") self._last_image_hash = None self._images_uploaded = 0 self.chat_url = current return current, old_chat_id logger.warning("Konnte keinen neuen Chat starten") return None, None except Exception as e: logger.error(f"Fehler beim Starten eines neuen Chats: {e}") return None, None def get_current_chat_url(self) -> str: """Gibt die aktuelle Chat-URL zurück""" return self.driver.current_url def send_message(self, text: str, wait_for_response: bool = False) -> bool: """ Sendet eine Nachricht in den Chat. Args: text: Die zu sendende Nachricht wait_for_response: Warten bis Claude antwortet? Returns: True wenn erfolgreich gesendet """ # Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut) result = self.send_message_with_delay(text, delay_before_send=0) if result and wait_for_response: self._wait_for_response() return result def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool: """ Sendet eine Nachricht mit Verzögerung vor dem Absenden. Nützlich für große Texte (wie Instruktionen), bei denen die Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten. Ablauf: 1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!) 2. Warte delay_before_send Sekunden 3. Send-Button klicken (mit Retry) Args: text: Die zu sendende Nachricht delay_before_send: Sekunden warten nach Einfügen, vor dem Senden Returns: True wenn erfolgreich gesendet """ try: # Finde Eingabefeld input_field = self._find_input_field() if not input_field: logger.error("Eingabefeld nicht gefunden!") return False # Feld fokussieren input_field.click() time.sleep(0.2) # Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!) logger.info(f"Füge Text ein ({len(text)} Zeichen)...") self._insert_text_via_js(input_field, text) # WARTEN - große Texte brauchen Zeit! if delay_before_send > 0: logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...") time.sleep(delay_before_send) else: time.sleep(1.0) # Mindestens 1s warten # Jetzt absenden (mit Retry-Logik) send_success = False for attempt in range(3): send_button = self._find_send_button() if send_button: try: if send_button.is_enabled() and send_button.is_displayed(): send_button.click() logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})") send_success = True break except Exception as e: logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}") time.sleep(0.5) # Fallback: Enter-Taste if not send_success: logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter") input_field.send_keys(Keys.RETURN) time.sleep(0.3) logger.debug(f"Nachricht gesendet: {text[:50]}...") return True except Exception as e: logger.error(f"Fehler beim Senden mit Verzögerung: {e}") return False def _find_send_button(self): """Findet den Send-Button""" selectors = [ "button[aria-label*='Send']", "button[aria-label*='send']", "button[data-testid*='send']", "button[type='submit']", # Claude.ai spezifisch - Button mit Pfeil-Icon "button svg[class*='send']", "button[class*='send']", ] for selector in selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed() and elem.is_enabled(): return elem except: continue # Fallback: JavaScript-Suche try: return self.driver.execute_script(""" // Suche nach Send-Button const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]'); if (btn && !btn.disabled) return btn; // Alternative: Letzter Button im Input-Bereich const buttons = document.querySelectorAll('button'); for (const b of buttons) { if (b.offsetParent && !b.disabled) { const text = b.textContent.toLowerCase(); const label = (b.getAttribute('aria-label') || '').toLowerCase(); if (text.includes('send') || label.includes('send')) return b; } } return null; """) except: return None def _find_input_field(self): """Findet das Eingabefeld""" selectors = [ self.SELECTORS["input_field"], self.SELECTORS["input_textarea"], "div[contenteditable='true']", "textarea", ] for selector in selectors: try: element = self.driver.find_element(By.CSS_SELECTOR, selector) if element.is_displayed() and element.is_enabled(): return element except NoSuchElementException: continue return None def _insert_text_via_js(self, element, text: str): """ Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme). Bei send_keys() werden physische Tasten gedrückt, was bei unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu falschen Zeichen führt (z.B. y↔z vertauscht). Diese Methode fügt den Text direkt als String ein. """ # Escape für JavaScript escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${') # Für contenteditable divs (ProseMirror) self.driver.execute_script(""" const element = arguments[0]; const text = arguments[1]; // Fokussieren element.focus(); // Methode 1: Für contenteditable (ProseMirror) if (element.contentEditable === 'true') { // Text als HTML einfügen (respektiert Zeilenumbrüche) const htmlText = text.replace(/\\n/g, '
'); element.innerHTML = htmlText; // Input-Event triggern damit React/Vue es mitbekommt element.dispatchEvent(new Event('input', { bubbles: true })); // Cursor ans Ende setzen const range = document.createRange(); range.selectNodeContents(element); range.collapse(false); const sel = window.getSelection(); sel.removeAllRanges(); sel.addRange(range); } // Methode 2: Für textarea/input else { element.value = text; element.dispatchEvent(new Event('input', { bubbles: true })); element.dispatchEvent(new Event('change', { bubbles: true })); } """, element, text) logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)") def _wait_for_response(self, timeout: int = 60): """Wartet bis Claude fertig getippt hat""" logger.debug("Warte auf Claudes Antwort...") # Warte kurz damit Streaming startet time.sleep(1) # Warte bis Streaming endet try: WebDriverWait(self.driver, timeout).until_not( EC.presence_of_element_located( (By.CSS_SELECTOR, self.SELECTORS["streaming"]) ) ) except TimeoutException: logger.warning("Timeout beim Warten auf Antwort") time.sleep(0.5) # Kurz warten bis DOM aktualisiert def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]: """ Holt neue Nachrichten aus dem Chat. Args: since_id: Nur Nachrichten nach dieser ID zurückgeben Returns: Liste neuer ChatMessage Objekte """ all_messages = self._get_all_messages() if since_id is None: return all_messages # Filtere nur neue new_messages = [] found_marker = False for msg in all_messages: if found_marker: new_messages.append(msg) elif msg.id == since_id: found_marker = True return new_messages def _get_all_messages(self) -> List[ChatMessage]: """Holt alle Nachrichten aus dem Chat""" messages = [] try: # Versuche verschiedene Selektoren elements = [] # Claude.ai spezifische Selektoren (2024/2025) message_selectors = [ # Neuere Claude.ai Versionen "[data-testid='user-message']", "[data-testid='assistant-message']", # Ältere Varianten "div[data-is-streaming='false']", # Container für Nachrichten-Inhalt ".font-claude-message", ".prose", # Generische Fallbacks "div[class*='message']", "div[class*='Message']", ] for selector in message_selectors: try: found = self.driver.find_elements(By.CSS_SELECTOR, selector) if found: logger.debug(f"Selektor '{selector}' fand {len(found)} Elemente") if not elements or len(found) > len(elements): elements = found except Exception as e: logger.debug(f"Selektor '{selector}' fehlgeschlagen: {e}") logger.debug(f"Gesamt gefundene Nachrichten-Elemente: {len(elements)}") # Wenn immer noch nichts gefunden, versuche via JavaScript if not elements: try: elements = self.driver.execute_script(""" // Suche nach Nachrichten-Containern const msgs = []; // Methode 1: data-testid document.querySelectorAll('[data-testid*="message"]').forEach(e => msgs.push(e)); // Methode 2: Prose-Container (Markdown-Inhalt) if (msgs.length === 0) { document.querySelectorAll('.prose').forEach(e => msgs.push(e)); } // Methode 3: Alle großen Text-Container im Hauptbereich if (msgs.length === 0) { document.querySelectorAll('main div').forEach(e => { if (e.innerText && e.innerText.length > 50 && e.children.length < 10) { msgs.push(e); } }); } return msgs; """) or [] logger.debug(f"JavaScript fand {len(elements)} Nachrichten") except Exception as e: logger.debug(f"JavaScript-Suche fehlgeschlagen: {e}") for i, elem in enumerate(elements): try: text = elem.text.strip() if not text or len(text) < 5: continue # Bestimme ob Human oder Assistant mit mehreren Methoden class_name = elem.get_attribute("class") or "" data_testid = elem.get_attribute("data-testid") or "" data_role = elem.get_attribute("data-role") or "" # Prüfe Parent-Elemente auf Hinweise parent_hints = "" try: parent = elem.find_element(By.XPATH, "..") parent_class = parent.get_attribute("class") or "" parent_testid = parent.get_attribute("data-testid") or "" grandparent = parent.find_element(By.XPATH, "..") grandparent_class = grandparent.get_attribute("class") or "" parent_hints = f"{parent_class} {parent_testid} {grandparent_class}" except: pass # Kombiniere alle Hinweise all_hints = (class_name + " " + data_testid + " " + data_role + " " + parent_hints).lower() # Prüfe explizit auf "user" oder "human" für User-Nachrichten is_human = ( "user" in all_hints or "human" in all_hints or "user-message" in data_testid ) is_assistant = ( "assistant" in all_hints or "claude" in all_hints or "ai-message" in all_hints or "assistant-message" in data_testid or "response" in all_hints ) # Wenn weder user noch assistant explizit, nutze Heuristik if not is_human and not is_assistant: # Prüfe ob Text typische User-Marker hat if text.startswith("[TICK]") or text.startswith("[START]") or "Stefan sagt:" in text: is_human = True else: # Fallback: Nachrichten ohne User-Marker sind von Claude is_assistant = True # Final: is_assistant ist True wenn nicht explizit Human final_is_assistant = is_assistant and not is_human logger.debug(f"Nachricht {i}: is_human={is_human}, is_assistant={is_assistant}, final={final_is_assistant}, hints='{all_hints[:80]}...', text='{text[:40]}...'") # Generiere ID msg_id = elem.get_attribute("data-message-id") if not msg_id: msg_id = f"msg_{i}_{hash(text[:100])}" messages.append(ChatMessage( id=msg_id, text=text, is_from_assistant=final_is_assistant, timestamp=time.time() )) except Exception as e: logger.debug(f"Fehler bei Nachricht {i}: {e}") continue logger.debug(f"Parsed {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude") except Exception as e: logger.error(f"Fehler beim Lesen der Nachrichten: {e}") return messages def get_last_assistant_message(self) -> Optional[ChatMessage]: """Holt die letzte Nachricht von Claude""" messages = self._get_all_messages() for msg in reversed(messages): if msg.is_from_assistant: return msg return None def is_claude_typing(self) -> bool: """ Prüft ob Claude gerade tippt (streaming). Erkennt mehrere Indikatoren: 1. Stop-Button ist sichtbar (während Claude schreibt) 2. data-is-streaming='true' Attribut 3. Animiertes Logo / Thinking-Indikator """ try: # Methode 1: Stop-Button prüfen (zuverlässigster Indikator) # Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button stop_indicators = [ "button[aria-label*='Stop']", "button[aria-label*='stop']", "button[class*='stop']", "button[data-testid*='stop']", # Alternativer Indikator: Button mit Stop-Icon "button svg[class*='stop']", ] for selector in stop_indicators: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})") return True except: continue # Methode 2: Streaming-Attribut (original) streaming = self.driver.find_elements( By.CSS_SELECTOR, self.SELECTORS["streaming"] ) if len(streaming) > 0: logger.debug("Claude tippt (streaming=true)") return True # Methode 3: Animiertes/Thinking Indikator suchen thinking_indicators = [ "[class*='thinking']", "[class*='loading']", "[class*='typing']", "[class*='streaming']", "[data-state='loading']", # Pulsierendes Logo "[class*='pulse']", "[class*='animate']", ] for selector in thinking_indicators: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: if elem.is_displayed(): logger.debug(f"Claude tippt (Indikator: {selector})") return True except: continue # Methode 4: JavaScript-basierte Prüfung # Prüft ob irgendwo noch Text gestreamt wird try: is_streaming = self.driver.execute_script(""" // Prüfe ob Stop-Button existiert und sichtbar ist const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]'); if (stopBtn && stopBtn.offsetParent !== null) return true; // Prüfe auf streaming-Attribut const streaming = document.querySelector('[data-is-streaming="true"]'); if (streaming) return true; // Prüfe auf disabled Send-Button (während Claude tippt) const sendBtn = document.querySelector('button[aria-label*="Send"]'); if (sendBtn && sendBtn.disabled) return true; return false; """) if is_streaming: logger.debug("Claude tippt (JavaScript-Check)") return True except: pass return False except Exception as e: logger.debug(f"Fehler bei typing-check: {e}") return False def wait_for_ready_signal(self, timeout: int = 120) -> bool: """ Wartet bis Claude [READY] sendet. Sucht nach [READY] das NICHT Teil des Instruktions-Textes ist. Wir zählen wie oft [READY] vorkommt - wenn mehr als 1x, hat Claude geantwortet. Args: timeout: Maximale Wartezeit in Sekunden Returns: True wenn [READY] empfangen, False bei Timeout """ logger.info(f"Warte auf [READY] Signal (max {timeout}s)...") start_time = time.time() while time.time() - start_time < timeout: # Warte bis Claude fertig ist mit Tippen typing_wait_start = time.time() while self.is_claude_typing(): time.sleep(0.5) # Timeout für typing-wait (max 60s) if time.time() - typing_wait_start > 60: logger.debug("Typing-Wait Timeout, prüfe trotzdem...") break # Suche [READY] im Seitentext via JavaScript # Zähle wie oft [READY] vorkommt - 1x ist unsere Instruktion, 2x+ bedeutet Claude hat geantwortet try: ready_count = self.driver.execute_script(""" const text = document.body.innerText.toUpperCase(); const matches = text.match(/\\[READY\\]/g); return matches ? matches.length : 0; """) logger.debug(f"[READY] gefunden: {ready_count}x") # Mehr als 1x = Claude hat auch [READY] geschrieben if ready_count and ready_count >= 2: logger.info(f"[READY] Signal gefunden! ({ready_count}x im Text)") return True except Exception as e: logger.debug(f"JavaScript [READY] Suche fehlgeschlagen: {e}") # Kurz warten bevor nächster Check time.sleep(1) logger.warning(f"Timeout: Kein [READY] nach {timeout}s") return False def take_screenshot(self, path: str = "screenshot.png"): """Macht einen Screenshot (für Debugging)""" self.driver.save_screenshot(path) logger.info(f"Screenshot gespeichert: {path}") def close(self): """Schließt den Browser""" logger.info("Schließe Browser...") try: self.driver.quit() except: pass # ════════════════════════════════════════════════════════════════════════ # BILD-UPLOAD FUNKTIONEN (für Robot Vision) # ════════════════════════════════════════════════════════════════════════ def fetch_image_from_esp32(self) -> bool: """ Holt ein Bild vom ESP32/Mock-Server und speichert es lokal. Returns: True wenn erfolgreich, False bei Fehler """ if not self.esp32_url: logger.warning("Keine ESP32 URL konfiguriert") return False try: # Capture-Endpoint aufrufen (macht Foto und gibt es zurück) url = f"{self.esp32_url}/api/capture" if self.esp32_api_key: url += f"?key={self.esp32_api_key}" response = self._http_session.get(url, timeout=10) response.raise_for_status() # Prüfe ob wir ein Bild bekommen haben content_type = response.headers.get("Content-Type", "") if "image" in content_type: # Direktes Bild with open(self._temp_image_path, "wb") as f: f.write(response.content) logger.info(f"Bild gespeichert: {len(response.content)} bytes") return True else: # JSON Response (Mock-Server neuer Stil) # Dann müssen wir /foto.jpg separat holen foto_url = f"{self.esp32_url}/foto.jpg" foto_response = self._http_session.get(foto_url, timeout=10) foto_response.raise_for_status() with open(self._temp_image_path, "wb") as f: f.write(foto_response.content) logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes") return True except requests.exceptions.RequestException as e: logger.error(f"ESP32 Verbindungsfehler: {e}") return False except Exception as e: logger.error(f"Fehler beim Bild holen: {e}") return False def upload_image_to_chat(self) -> bool: """ Lädt das gespeicherte Bild in den Claude.ai Chat hoch. Returns: True wenn erfolgreich, False bei Fehler """ if not self._temp_image_path.exists(): logger.error("Kein Bild zum Hochladen vorhanden") return False try: # Finde das versteckte file input Element file_input = self._find_file_input() if not file_input: logger.error("File-Upload Input nicht gefunden!") return False # Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs) file_input.send_keys(str(self._temp_image_path.absolute())) logger.info("Bild hochgeladen!") self._images_uploaded += 1 # Kurz warten bis Upload verarbeitet ist time.sleep(1.5) return True except Exception as e: logger.error(f"Fehler beim Bild-Upload: {e}") return False def _calculate_image_hash(self) -> Optional[str]: """ Berechnet einen Hash des aktuellen Bildes. Returns: MD5 Hash als Hex-String, oder None bei Fehler """ if not self._temp_image_path.exists(): return None try: with open(self._temp_image_path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception as e: logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}") return None def has_image_changed(self) -> bool: """ Prüft ob sich das Bild seit dem letzten Upload geändert hat. WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat! Diese Funktion hilft, unnötige Uploads zu vermeiden. Returns: True wenn Bild neu/geändert, False wenn identisch zum letzten """ current_hash = self._calculate_image_hash() if current_hash is None: return True # Kein Bild = als "geändert" behandeln if self._last_image_hash is None: return True # Erster Upload return current_hash != self._last_image_hash def upload_image_if_changed(self) -> bool: """ Lädt Bild nur hoch wenn es sich geändert hat. Returns: True wenn hochgeladen, False wenn übersprungen oder Fehler """ if not self.has_image_changed(): logger.debug("Bild unverändert - überspringe Upload") return False # Hash vor dem Upload speichern new_hash = self._calculate_image_hash() if self.upload_image_to_chat(): self._last_image_hash = new_hash return True return False def delete_uploaded_images(self) -> int: """ Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen. HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden! Bereits gesendete Bilder können nicht gelöscht werden. Returns: Anzahl gelöschter Bilder """ deleted = 0 try: # Suche nach Bild-Vorschau-Elementen im Eingabebereich # Diese haben meist einen X/Close Button preview_selectors = [ "button[aria-label*='Remove']", "button[aria-label*='remove']", "button[aria-label*='Delete']", "button[aria-label*='delete']", "[class*='preview'] button[class*='close']", "[class*='attachment'] button[class*='remove']", "[class*='file'] button[class*='delete']", ] for selector in preview_selectors: try: buttons = self.driver.find_elements(By.CSS_SELECTOR, selector) for btn in buttons: if btn.is_displayed(): btn.click() deleted += 1 time.sleep(0.3) except: continue # JavaScript-Fallback if deleted == 0: try: deleted = self.driver.execute_script(""" let count = 0; // Suche nach Remove-Buttons in Attachment-Previews const buttons = document.querySelectorAll( '[class*="preview"] button, [class*="attachment"] button' ); buttons.forEach(btn => { if (btn.offsetParent) { btn.click(); count++; } }); return count; """) or 0 except: pass if deleted > 0: logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht") except Exception as e: logger.debug(f"Bild-Löschung fehlgeschlagen: {e}") return deleted def get_images_uploaded_count(self) -> int: """Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück""" return self._images_uploaded def _find_file_input(self): """Findet das File-Upload Input Element""" selectors = [ self.SELECTORS["file_input"], "input[accept*='image']", "input[type='file'][accept]", "input[type='file']", ] for selector in selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: # Auch versteckte Inputs funktionieren mit send_keys return elem except: continue # Fallback: Via JavaScript suchen try: return self.driver.execute_script(""" return document.querySelector('input[type="file"]') || document.querySelector('[accept*="image"]'); """) except: return None def send_tick_with_image(self) -> bool: """ Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK]. Das ist der Haupt-Heartbeat mit Bild! Returns: True wenn alles geklappt hat """ # Schritt 1: Bild vom ESP32 holen if not self.fetch_image_from_esp32(): # Kein Bild? Trotzdem TICK senden self.send_message("[TICK - KEIN BILD]") return False # Schritt 2: Bild in Chat hochladen if not self.upload_image_to_chat(): self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]") return False # Schritt 3: TICK senden self.send_message("[TICK]") return True # Hilfsfunktion für einfaches Testing def test_interface(chat_url: str): """Testet das Interface""" import sys logging.basicConfig(level=logging.DEBUG) print("Starte Chat Interface Test...") print(f"URL: {chat_url}") interface = ClaudeChatInterface( chat_url=chat_url, headless=False ) print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...") input() interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!") print("Nachricht gesendet!") print("\nWarte 5 Sekunden auf Antwort...") time.sleep(5) messages = interface.get_new_messages() print(f"\nGefundene Nachrichten: {len(messages)}") for msg in messages[-3:]: role = "Claude" if msg.is_from_assistant else "Human" print(f" [{role}] {msg.text[:100]}...") print("\nDrücke Enter zum Beenden...") input() interface.close() print("Fertig!") if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python chat_web_interface.py ") print("Example: python chat_web_interface.py https://claude.ai/chat/abc123") sys.exit(1) test_interface(sys.argv[1])