1485 lines
60 KiB
Python
1485 lines
60 KiB
Python
"""
|
|
Claude's Eyes - Chat Web Interface
|
|
|
|
Steuert den echten Claude.ai Chat im Browser via Selenium.
|
|
Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio!
|
|
|
|
HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden,
|
|
wenn Claude.ai sein UI ändert.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
import tempfile
|
|
import hashlib
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
from pathlib import Path
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ChatMessage:
|
|
"""Eine Chat-Nachricht"""
|
|
id: str
|
|
text: str
|
|
is_from_assistant: bool
|
|
timestamp: float = 0
|
|
|
|
|
|
class ClaudeChatInterface:
|
|
"""
|
|
Steuert Claude.ai Chat via Selenium Browser Automation.
|
|
|
|
Diese Klasse:
|
|
- Öffnet einen Browser mit dem Claude.ai Chat
|
|
- Kann Nachrichten senden (für Heartbeat und Stefan's Sprache)
|
|
- Kann neue Nachrichten lesen (für TTS)
|
|
|
|
WICHTIG: Du musst beim ersten Start manuell einloggen!
|
|
"""
|
|
|
|
@staticmethod
|
|
def extract_chat_id(url: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert die Chat-ID aus einer Claude.ai URL.
|
|
|
|
Args:
|
|
url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d"
|
|
|
|
Returns:
|
|
Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None
|
|
"""
|
|
if not url or '/chat/' not in url:
|
|
return None
|
|
try:
|
|
# Extrahiere alles nach /chat/
|
|
chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0]
|
|
return chat_id if chat_id else None
|
|
except:
|
|
return None
|
|
|
|
# CSS Selektoren für Claude.ai (Stand: Dezember 2025)
|
|
# Diese müssen angepasst werden wenn sich das UI ändert!
|
|
SELECTORS = {
|
|
# Eingabefeld für neue Nachrichten
|
|
"input_field": "div.ProseMirror[contenteditable='true']",
|
|
|
|
# Alternativ: Textarea
|
|
"input_textarea": "textarea[placeholder*='Message']",
|
|
|
|
# Senden-Button (falls Enter nicht funktioniert)
|
|
"send_button": "button[aria-label*='Send']",
|
|
|
|
# Alle Nachrichten-Container
|
|
"messages_container": "div[class*='conversation']",
|
|
|
|
# Einzelne Nachrichten
|
|
"message_human": "div[data-is-streaming='false'][class*='human']",
|
|
"message_assistant": "div[data-is-streaming='false'][class*='assistant']",
|
|
|
|
# Generischer Nachrichten-Selektor (Fallback)
|
|
"message_any": "div[class*='message']",
|
|
|
|
# Streaming-Indikator (Claude tippt noch)
|
|
"streaming": "div[data-is-streaming='true']",
|
|
|
|
# File Upload Input (versteckt, aber funktioniert mit send_keys)
|
|
"file_input": "input[type='file']",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
chat_url: Optional[str] = None,
|
|
headless: bool = False,
|
|
user_data_dir: Optional[str] = None,
|
|
chrome_binary: Optional[str] = None,
|
|
esp32_url: Optional[str] = None,
|
|
esp32_api_key: Optional[str] = None
|
|
):
|
|
"""
|
|
Initialisiert das Chat-Interface.
|
|
|
|
Args:
|
|
chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123)
|
|
headless: Browser im Hintergrund? (False = sichtbar)
|
|
user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins)
|
|
chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux)
|
|
esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture)
|
|
esp32_api_key: API-Key für ESP32 Authentifizierung
|
|
"""
|
|
self.chat_url = chat_url
|
|
self.esp32_url = esp32_url
|
|
self.esp32_api_key = esp32_api_key
|
|
self._message_cache: List[ChatMessage] = []
|
|
self._last_message_id = 0
|
|
self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg"
|
|
|
|
# Für Bild-Deduplizierung (100 Bilder pro Chat Limit!)
|
|
self._last_image_hash: Optional[str] = None
|
|
self._images_uploaded = 0
|
|
|
|
# HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen)
|
|
self._http_session = requests.Session()
|
|
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
|
|
self._http_session.mount('http://', adapter)
|
|
self._http_session.mount('https://', adapter)
|
|
|
|
# Chrome Optionen
|
|
options = webdriver.ChromeOptions()
|
|
|
|
if headless:
|
|
options.add_argument("--headless=new")
|
|
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
options.add_argument("--disable-gpu")
|
|
options.add_argument("--window-size=1280,800")
|
|
|
|
# Für persistente Sessions (Login bleibt gespeichert)
|
|
if user_data_dir:
|
|
options.add_argument(f"--user-data-dir={user_data_dir}")
|
|
|
|
# Für Termux/Android
|
|
if chrome_binary:
|
|
options.binary_location = chrome_binary
|
|
|
|
# Anti-Detection (manche Seiten blocken Selenium)
|
|
options.add_argument("--disable-blink-features=AutomationControlled")
|
|
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
options.add_experimental_option("useAutomationExtension", False)
|
|
|
|
logger.info("Starte Chrome Browser...")
|
|
|
|
try:
|
|
self.driver = webdriver.Chrome(options=options)
|
|
self.driver.execute_script(
|
|
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Chrome konnte nicht gestartet werden: {e}")
|
|
logger.info("Versuche mit webdriver-manager...")
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
service = Service(ChromeDriverManager().install())
|
|
self.driver = webdriver.Chrome(service=service, options=options)
|
|
|
|
self.wait = WebDriverWait(self.driver, 30)
|
|
|
|
# Navigiere zur Chat-URL
|
|
if chat_url:
|
|
self.navigate_to_chat(chat_url)
|
|
|
|
def navigate_to_chat(self, url: str):
|
|
"""Navigiert zur Chat-URL"""
|
|
logger.info(f"Navigiere zu: {url}")
|
|
self.driver.get(url)
|
|
self.chat_url = url
|
|
|
|
# Reset Bild-Counter bei neuem Chat
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
|
|
# Warte auf Seitenladung
|
|
time.sleep(3)
|
|
|
|
# Prüfe ob Login nötig
|
|
if "login" in self.driver.current_url.lower():
|
|
logger.warning("Login erforderlich! Bitte im Browser einloggen...")
|
|
print("\n" + "=" * 50)
|
|
print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!")
|
|
print("Das Fenster bleibt offen. Nach dem Login geht's weiter.")
|
|
print("=" * 50 + "\n")
|
|
|
|
# Warte bis wieder auf der Chat-Seite
|
|
while "login" in self.driver.current_url.lower():
|
|
time.sleep(2)
|
|
|
|
logger.info("Login erfolgreich!")
|
|
time.sleep(2)
|
|
|
|
def start_new_chat(self) -> tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Startet einen neuen Chat auf Claude.ai.
|
|
|
|
WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist!
|
|
|
|
Returns:
|
|
Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler
|
|
"""
|
|
try:
|
|
# Alte Chat-ID merken bevor wir wechseln
|
|
old_chat_id = self.extract_chat_id(self.chat_url)
|
|
logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})")
|
|
|
|
# Methode 1: "New Chat" Button suchen
|
|
new_chat_selectors = [
|
|
"button[aria-label*='New']",
|
|
"button[aria-label*='new']",
|
|
"a[href='/new']",
|
|
"a[href*='/chat/new']",
|
|
"[data-testid*='new-chat']",
|
|
"button:has-text('New chat')",
|
|
]
|
|
|
|
for selector in new_chat_selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
elem.click()
|
|
time.sleep(2)
|
|
new_url = self.driver.current_url
|
|
logger.info(f"Neuer Chat gestartet: {new_url}")
|
|
|
|
# Reset Counter
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
self.chat_url = new_url
|
|
|
|
return new_url, old_chat_id
|
|
except:
|
|
continue
|
|
|
|
# Methode 2: Direkt zu /new navigieren
|
|
base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai"
|
|
new_url = f"{base_url}/new"
|
|
logger.info(f"Versuche direkte Navigation zu: {new_url}")
|
|
self.driver.get(new_url)
|
|
time.sleep(3)
|
|
|
|
# Prüfe ob es geklappt hat
|
|
current = self.driver.current_url
|
|
if current != self.chat_url:
|
|
logger.info(f"Neuer Chat: {current}")
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
self.chat_url = current
|
|
return current, old_chat_id
|
|
|
|
logger.warning("Konnte keinen neuen Chat starten")
|
|
return None, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Starten eines neuen Chats: {e}")
|
|
return None, None
|
|
|
|
def get_current_chat_url(self) -> str:
|
|
"""Gibt die aktuelle Chat-URL zurück"""
|
|
return self.driver.current_url
|
|
|
|
def send_message(self, text: str, wait_for_response: bool = False) -> bool:
|
|
"""
|
|
Sendet eine Nachricht in den Chat.
|
|
|
|
Args:
|
|
text: Die zu sendende Nachricht
|
|
wait_for_response: Warten bis Claude antwortet?
|
|
|
|
Returns:
|
|
True wenn erfolgreich gesendet
|
|
"""
|
|
# Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut)
|
|
result = self.send_message_with_delay(text, delay_before_send=0)
|
|
|
|
if result and wait_for_response:
|
|
self._wait_for_response()
|
|
|
|
return result
|
|
|
|
def wait_for_input_field(self, timeout: int = 60) -> bool:
|
|
"""
|
|
Wartet bis das Chat-Eingabefeld verfügbar ist.
|
|
|
|
Nützlich nach CAPTCHA/Login wenn das UI noch lädt.
|
|
|
|
Args:
|
|
timeout: Maximale Wartezeit in Sekunden
|
|
|
|
Returns:
|
|
True wenn Eingabefeld gefunden, False bei Timeout
|
|
"""
|
|
logger.info(f"Warte auf Chat-Eingabefeld (max {timeout}s)...")
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
input_field = self._find_input_field()
|
|
if input_field:
|
|
try:
|
|
if input_field.is_displayed() and input_field.is_enabled():
|
|
logger.info("Chat-Eingabefeld bereit!")
|
|
return True
|
|
except:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
logger.warning(f"Timeout: Eingabefeld nach {timeout}s nicht gefunden")
|
|
return False
|
|
|
|
def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool:
|
|
"""
|
|
Sendet eine Nachricht mit Verzögerung vor dem Absenden.
|
|
|
|
Nützlich für große Texte (wie Instruktionen), bei denen die
|
|
Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten.
|
|
|
|
Ablauf:
|
|
0. Warte bis Eingabefeld verfügbar (für CAPTCHA/Login-Fälle)
|
|
1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!)
|
|
2. Warte delay_before_send Sekunden
|
|
3. Send-Button klicken (mit Retry)
|
|
|
|
Args:
|
|
text: Die zu sendende Nachricht
|
|
delay_before_send: Sekunden warten nach Einfügen, vor dem Senden
|
|
|
|
Returns:
|
|
True wenn erfolgreich gesendet
|
|
"""
|
|
try:
|
|
# WICHTIG: Warte auf Eingabefeld (CAPTCHA/Login kann dauern!)
|
|
if not self.wait_for_input_field(timeout=120):
|
|
logger.error("Eingabefeld nicht verfügbar nach 120s!")
|
|
return False
|
|
|
|
# Finde Eingabefeld
|
|
input_field = self._find_input_field()
|
|
|
|
if not input_field:
|
|
logger.error("Eingabefeld nicht gefunden!")
|
|
return False
|
|
|
|
# Feld fokussieren
|
|
input_field.click()
|
|
time.sleep(0.2)
|
|
|
|
# Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!)
|
|
logger.info(f"Füge Text ein ({len(text)} Zeichen)...")
|
|
self._insert_text_via_js(input_field, text)
|
|
|
|
# WARTEN - große Texte brauchen Zeit!
|
|
if delay_before_send > 0:
|
|
logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...")
|
|
time.sleep(delay_before_send)
|
|
else:
|
|
time.sleep(1.0) # Mindestens 1s warten
|
|
|
|
# Jetzt absenden (mit Retry-Logik)
|
|
send_success = False
|
|
for attempt in range(3):
|
|
send_button = self._find_send_button()
|
|
if send_button:
|
|
try:
|
|
if send_button.is_enabled() and send_button.is_displayed():
|
|
send_button.click()
|
|
logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})")
|
|
send_success = True
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}")
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Fallback: Enter-Taste
|
|
if not send_success:
|
|
logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter")
|
|
input_field.send_keys(Keys.RETURN)
|
|
|
|
time.sleep(0.3)
|
|
logger.debug(f"Nachricht gesendet: {text[:50]}...")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Senden mit Verzögerung: {e}")
|
|
return False
|
|
|
|
def _find_send_button(self):
|
|
"""Findet den Send-Button"""
|
|
selectors = [
|
|
"button[aria-label*='Send']",
|
|
"button[aria-label*='send']",
|
|
"button[data-testid*='send']",
|
|
"button[type='submit']",
|
|
# Claude.ai spezifisch - Button mit Pfeil-Icon
|
|
"button svg[class*='send']",
|
|
"button[class*='send']",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed() and elem.is_enabled():
|
|
return elem
|
|
except:
|
|
continue
|
|
|
|
# Fallback: JavaScript-Suche
|
|
try:
|
|
return self.driver.execute_script("""
|
|
// Suche nach Send-Button
|
|
const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]');
|
|
if (btn && !btn.disabled) return btn;
|
|
|
|
// Alternative: Letzter Button im Input-Bereich
|
|
const buttons = document.querySelectorAll('button');
|
|
for (const b of buttons) {
|
|
if (b.offsetParent && !b.disabled) {
|
|
const text = b.textContent.toLowerCase();
|
|
const label = (b.getAttribute('aria-label') || '').toLowerCase();
|
|
if (text.includes('send') || label.includes('send')) return b;
|
|
}
|
|
}
|
|
return null;
|
|
""")
|
|
except:
|
|
return None
|
|
|
|
def _find_input_field(self):
|
|
"""Findet das Eingabefeld"""
|
|
selectors = [
|
|
self.SELECTORS["input_field"],
|
|
self.SELECTORS["input_textarea"],
|
|
"div[contenteditable='true']",
|
|
"textarea",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
element = self.driver.find_element(By.CSS_SELECTOR, selector)
|
|
if element.is_displayed() and element.is_enabled():
|
|
return element
|
|
except NoSuchElementException:
|
|
continue
|
|
|
|
return None
|
|
|
|
def _insert_text_via_js(self, element, text: str):
|
|
"""
|
|
Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme).
|
|
|
|
Bei send_keys() werden physische Tasten gedrückt, was bei
|
|
unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu
|
|
falschen Zeichen führt (z.B. y↔z vertauscht).
|
|
|
|
Diese Methode fügt den Text direkt als String ein.
|
|
"""
|
|
# Escape für JavaScript
|
|
escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')
|
|
|
|
# Für contenteditable divs (ProseMirror)
|
|
self.driver.execute_script("""
|
|
const element = arguments[0];
|
|
const text = arguments[1];
|
|
|
|
// Fokussieren
|
|
element.focus();
|
|
|
|
// Methode 1: Für contenteditable (ProseMirror)
|
|
if (element.contentEditable === 'true') {
|
|
// Text als HTML einfügen (respektiert Zeilenumbrüche)
|
|
const htmlText = text.replace(/\\n/g, '<br>');
|
|
element.innerHTML = htmlText;
|
|
|
|
// Input-Event triggern damit React/Vue es mitbekommt
|
|
element.dispatchEvent(new Event('input', { bubbles: true }));
|
|
|
|
// Cursor ans Ende setzen
|
|
const range = document.createRange();
|
|
range.selectNodeContents(element);
|
|
range.collapse(false);
|
|
const sel = window.getSelection();
|
|
sel.removeAllRanges();
|
|
sel.addRange(range);
|
|
}
|
|
// Methode 2: Für textarea/input
|
|
else {
|
|
element.value = text;
|
|
element.dispatchEvent(new Event('input', { bubbles: true }));
|
|
element.dispatchEvent(new Event('change', { bubbles: true }));
|
|
}
|
|
""", element, text)
|
|
|
|
logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)")
|
|
|
|
def _wait_for_response(self, timeout: int = 60):
|
|
"""Wartet bis Claude fertig getippt hat"""
|
|
logger.debug("Warte auf Claudes Antwort...")
|
|
|
|
# Warte kurz damit Streaming startet
|
|
time.sleep(1)
|
|
|
|
# Warte bis Streaming endet
|
|
try:
|
|
WebDriverWait(self.driver, timeout).until_not(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, self.SELECTORS["streaming"])
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
logger.warning("Timeout beim Warten auf Antwort")
|
|
|
|
time.sleep(0.5) # Kurz warten bis DOM aktualisiert
|
|
|
|
def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]:
|
|
"""
|
|
Holt neue Nachrichten aus dem Chat.
|
|
|
|
Args:
|
|
since_id: Nur Nachrichten nach dieser ID zurückgeben
|
|
|
|
Returns:
|
|
Liste neuer ChatMessage Objekte
|
|
"""
|
|
all_messages = self._get_all_messages()
|
|
|
|
if since_id is None:
|
|
return all_messages
|
|
|
|
# ════════════════════════════════════════════════════════════════
|
|
# STRATEGIE: Nutze immer den Index aus claude_msg_X
|
|
# Die IDs werden bei jedem Aufruf neu generiert, daher können wir
|
|
# nicht auf exakte ID-Übereinstimmung vertrauen.
|
|
# Stattdessen extrahieren wir den Index und geben alle Nachrichten
|
|
# NACH diesem Index zurück.
|
|
# ════════════════════════════════════════════════════════════════
|
|
if since_id and since_id.startswith("claude_msg_"):
|
|
try:
|
|
last_index = int(since_id.split("_")[-1])
|
|
# Zähle Claude-Nachrichten und gib nur die nach dem Index zurück
|
|
claude_msgs = [m for m in all_messages if m.is_from_assistant]
|
|
total_claude = len(claude_msgs)
|
|
|
|
if last_index < total_claude - 1:
|
|
# Es gibt neue Nachrichten!
|
|
new_claude_msgs = claude_msgs[last_index + 1:]
|
|
logger.debug(f"Index-basiert: {len(new_claude_msgs)} neue Claude-Nachrichten (Index {last_index + 1} bis {total_claude - 1})")
|
|
return new_claude_msgs
|
|
else:
|
|
# Keine neuen Nachrichten (last_index ist der letzte oder darüber)
|
|
logger.debug(f"Index-basiert: Keine neuen Nachrichten (last={last_index}, total={total_claude})")
|
|
return []
|
|
except (ValueError, IndexError) as e:
|
|
logger.warning(f"Fehler beim Parsen von since_id '{since_id}': {e}")
|
|
|
|
# Alte Hash-basierte IDs oder ungültiges Format
|
|
if since_id and since_id.startswith("claude_"):
|
|
logger.debug(f"since_id '{since_id[:30]}...' hat altes Format - überspringe alle")
|
|
return []
|
|
|
|
# Fallback: Exakte ID-Suche (für Nicht-Claude-IDs)
|
|
new_messages = []
|
|
found_marker = False
|
|
|
|
for msg in all_messages:
|
|
if found_marker:
|
|
new_messages.append(msg)
|
|
elif msg.id == since_id:
|
|
found_marker = True
|
|
|
|
return new_messages
|
|
|
|
def _get_all_messages(self) -> List[ChatMessage]:
|
|
"""
|
|
Holt alle Nachrichten aus dem Chat.
|
|
|
|
Claude.ai verwendet unterschiedliche data-testid für User und Assistant:
|
|
- data-testid="user-message" für User
|
|
- Für Claude: Wir suchen nach Nachrichten die NICHT user-message sind
|
|
"""
|
|
messages = []
|
|
|
|
try:
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STRATEGIE: Hole User UND Assistant Nachrichten SEPARAT
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# 1. User-Nachrichten (haben data-testid="user-message")
|
|
user_elements = []
|
|
try:
|
|
user_elements = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
"[data-testid='user-message']"
|
|
)
|
|
logger.debug(f"User-Nachrichten gefunden: {len(user_elements)}")
|
|
except Exception as e:
|
|
logger.debug(f"User-Nachrichten Suche fehlgeschlagen: {e}")
|
|
|
|
# 2. Claude-Nachrichten suchen
|
|
# Claude.ai nutzt verschiedene Klassen, aber NICHT user-message
|
|
claude_elements = []
|
|
|
|
# Methode A: font-claude-message Klasse
|
|
try:
|
|
found = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
"[class*='font-claude-message']"
|
|
)
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via font-claude-message: {len(found)}")
|
|
except:
|
|
pass
|
|
|
|
# Methode B: data-testid="assistant-message" (neue Claude.ai Version)
|
|
if not claude_elements:
|
|
try:
|
|
found = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
"[data-testid='assistant-message']"
|
|
)
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via assistant-message: {len(found)}")
|
|
except:
|
|
pass
|
|
|
|
# Methode C: Prose-Container die NICHT in user-message sind
|
|
if not claude_elements:
|
|
try:
|
|
# Alle prose Elemente die nicht innerhalb von user-message sind
|
|
found = self.driver.execute_script("""
|
|
const msgs = [];
|
|
document.querySelectorAll('.prose').forEach(e => {
|
|
// Prüfe ob dieses Element NICHT in einem user-message Container ist
|
|
let parent = e;
|
|
let isUserMessage = false;
|
|
while (parent && parent !== document.body) {
|
|
if (parent.getAttribute('data-testid') === 'user-message') {
|
|
isUserMessage = true;
|
|
break;
|
|
}
|
|
parent = parent.parentElement;
|
|
}
|
|
if (!isUserMessage) {
|
|
msgs.push(e);
|
|
}
|
|
});
|
|
return msgs;
|
|
""") or []
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via prose (nicht-user): {len(found)}")
|
|
except Exception as e:
|
|
logger.debug(f"Prose-Suche fehlgeschlagen: {e}")
|
|
|
|
# Methode D: Suche nach conversation-turn Struktur (neue Claude.ai Version)
|
|
if not claude_elements:
|
|
try:
|
|
# Claude.ai 2024/2025 nutzt oft conversation-turn Divs
|
|
# WICHTIG: Wir suchen den CONTAINER der die gesamte Claude-Nachricht enthält,
|
|
# nicht nur das Element mit "Claude sagt:" - denn [READY] kann in einem
|
|
# separaten Child-Element stehen!
|
|
found = self.driver.execute_script("""
|
|
const msgs = [];
|
|
|
|
// Strategie: Finde Elemente mit "Claude sagt:" und dann den
|
|
// ÜBERGEORDNETEN Container der die vollständige Nachricht enthält
|
|
const allDivs = document.querySelectorAll('div, p, span');
|
|
for (const elem of allDivs) {
|
|
// Überspringe wenn es ein user-message ist
|
|
if (elem.getAttribute('data-testid') === 'user-message') continue;
|
|
if (elem.closest('[data-testid="user-message"]')) continue;
|
|
|
|
// Überspringe Sidebar/Navigation Elemente
|
|
if (elem.closest('nav')) continue;
|
|
if (elem.closest('[role="navigation"]')) continue;
|
|
if (elem.closest('[class*="sidebar"]')) continue;
|
|
if (elem.closest('[class*="Sidebar"]')) continue;
|
|
|
|
// Hole den direkten Text-Inhalt
|
|
const text = (elem.innerText || '').trim();
|
|
|
|
// Text muss mit "Claude sagt:" BEGINNEN
|
|
if (text.startsWith('Claude sagt:') && text.length > 20 && text.length < 10000) {
|
|
// Suche nach dem übergeordneten Container (z.B. prose-Container)
|
|
// der auch [READY] enthalten könnte
|
|
let container = elem;
|
|
let parent = elem.parentElement;
|
|
|
|
// Gehe nach oben bis wir einen guten Container finden
|
|
// (max 5 Ebenen, um nicht die ganze Seite zu erfassen)
|
|
for (let i = 0; i < 5 && parent; i++) {
|
|
const parentText = (parent.innerText || '').trim();
|
|
|
|
// Stoppe wenn wir einen zu großen Container erreichen
|
|
if (parentText.length > 15000) break;
|
|
|
|
// Stoppe wenn wir in user-message oder Navigation sind
|
|
if (parent.getAttribute('data-testid') === 'user-message') break;
|
|
if (parent.closest('nav')) break;
|
|
|
|
// Prüfe ob der Parent-Container [READY] enthält und
|
|
// immer noch mit "Claude sagt:" beginnt
|
|
if (parentText.startsWith('Claude sagt:') ||
|
|
parentText.includes('[READY]') ||
|
|
parent.classList.contains('prose') ||
|
|
parent.classList.contains('markdown')) {
|
|
container = parent;
|
|
}
|
|
|
|
parent = parent.parentElement;
|
|
}
|
|
|
|
msgs.push(container);
|
|
}
|
|
}
|
|
|
|
// Dedupliziere: Behalte nur die ÄUSSERSTEN (größten) Container
|
|
// (aber nicht zu groß - max 10000 Zeichen)
|
|
const unique = [];
|
|
for (const div of msgs) {
|
|
// Prüfe ob dieses Element schon in einem anderen enthalten ist
|
|
let isDuplicate = false;
|
|
for (const other of msgs) {
|
|
if (other !== div && other.contains(div)) {
|
|
// Dieses Element ist in einem anderen enthalten
|
|
isDuplicate = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!isDuplicate && !unique.includes(div)) {
|
|
unique.push(div);
|
|
}
|
|
}
|
|
|
|
return unique;
|
|
""") or []
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via Text-Suche: {len(found)}")
|
|
except Exception as e:
|
|
logger.debug(f"Text-Suche fehlgeschlagen: {e}")
|
|
|
|
# Methode E: Letzte Fallback - Klasse enthält "assistant" (UI-Filter)
|
|
if not claude_elements:
|
|
try:
|
|
found = self.driver.execute_script("""
|
|
const msgs = [];
|
|
document.querySelectorAll('[class*="assistant"]').forEach(e => {
|
|
// Filtere Buttons, kleine Elemente und UI-Komponenten
|
|
if (e.tagName === 'BUTTON') return;
|
|
if (e.querySelector('button')) return;
|
|
const text = (e.innerText || '').trim();
|
|
// Nur Elemente mit substantiellem Text
|
|
if (text.length > 20 && !text.startsWith('[TICK]')) {
|
|
msgs.push(e);
|
|
}
|
|
});
|
|
return msgs;
|
|
""") or []
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via class-assistant: {len(found)}")
|
|
except Exception as e:
|
|
logger.debug(f"class-assistant-Suche fehlgeschlagen: {e}")
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# DEBUG: Falls keine Claude-Nachrichten gefunden, DOM analysieren
|
|
# ═══════════════════════════════════════════════════════════════
|
|
if not claude_elements and user_elements:
|
|
try:
|
|
# Hole DOM-Info für Debugging
|
|
dom_info = self.driver.execute_script("""
|
|
const info = {
|
|
'data-testids': [],
|
|
'classes_with_message': [],
|
|
'classes_with_assistant': []
|
|
};
|
|
|
|
// Sammle alle data-testid Werte
|
|
document.querySelectorAll('[data-testid]').forEach(e => {
|
|
const testid = e.getAttribute('data-testid');
|
|
if (!info['data-testids'].includes(testid)) {
|
|
info['data-testids'].push(testid);
|
|
}
|
|
});
|
|
|
|
// Sammle Klassen die 'message' enthalten
|
|
document.querySelectorAll('[class*="message"]').forEach(e => {
|
|
const cls = e.className;
|
|
if (typeof cls === 'string' && !info['classes_with_message'].includes(cls.substring(0, 100))) {
|
|
info['classes_with_message'].push(cls.substring(0, 100));
|
|
}
|
|
});
|
|
|
|
// Sammle Klassen die 'assistant' enthalten
|
|
document.querySelectorAll('[class*="assistant"]').forEach(e => {
|
|
const cls = e.className;
|
|
if (typeof cls === 'string' && !info['classes_with_assistant'].includes(cls.substring(0, 100))) {
|
|
info['classes_with_assistant'].push(cls.substring(0, 100));
|
|
}
|
|
});
|
|
|
|
return info;
|
|
""")
|
|
logger.warning(f"KEINE Claude-Nachrichten gefunden! DOM-Info: data-testids={dom_info.get('data-testids', [])[:10]}")
|
|
logger.debug(f"Klassen mit 'message': {dom_info.get('classes_with_message', [])[:5]}")
|
|
logger.debug(f"Klassen mit 'assistant': {dom_info.get('classes_with_assistant', [])[:5]}")
|
|
except Exception as e:
|
|
logger.debug(f"DOM-Analyse fehlgeschlagen: {e}")
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Nachrichten verarbeiten
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# User-Nachrichten hinzufügen
|
|
for i, elem in enumerate(user_elements):
|
|
try:
|
|
text = elem.text.strip()
|
|
if not text or len(text) < 3:
|
|
continue
|
|
|
|
msg_id = elem.get_attribute("data-message-id")
|
|
if not msg_id:
|
|
msg_id = f"user_{i}_{hash(text[:100])}"
|
|
|
|
messages.append(ChatMessage(
|
|
id=msg_id,
|
|
text=text,
|
|
is_from_assistant=False,
|
|
timestamp=time.time()
|
|
))
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei User-Nachricht {i}: {e}")
|
|
|
|
# Claude-Nachrichten hinzufügen
|
|
# WICHTIG: Filtere User-Befehle die fälschlicherweise erkannt werden
|
|
# HINWEIS: [READY] ist KEINE User-Nachricht - das sendet CLAUDE!
|
|
user_commands = [
|
|
'[TICK]', '[START]', '[FORWARD]', '[BACKWARD]', '[LEFT]', '[RIGHT]', '[STOP]',
|
|
'[LOOK_LEFT]', '[LOOK_RIGHT]', '[LOOK_UP]', '[LOOK_DOWN]', '[LOOK_CENTER]'
|
|
# [READY] ist NICHT in dieser Liste - Claude sendet es!
|
|
]
|
|
|
|
for i, elem in enumerate(claude_elements):
|
|
try:
|
|
# Versuche zuerst innerText via JavaScript zu bekommen (zuverlässiger)
|
|
try:
|
|
text = self.driver.execute_script("return arguments[0].innerText || '';", elem).strip()
|
|
except:
|
|
text = elem.text.strip()
|
|
|
|
if not text or len(text) < 5:
|
|
logger.debug(f"Claude-Element {i} übersprungen: Text zu kurz ({len(text) if text else 0} Zeichen)")
|
|
continue
|
|
|
|
# Filtere User-Befehle (diese sind KEINE Claude-Nachrichten)
|
|
if text in user_commands:
|
|
logger.debug(f"Überspringe User-Befehl: {text}")
|
|
continue
|
|
|
|
# Filtere auch wenn der Text nur aus einem Befehl besteht (evtl. mit Whitespace)
|
|
text_upper = text.upper().strip()
|
|
if any(text_upper == cmd for cmd in user_commands):
|
|
logger.debug(f"Überspringe User-Befehl (normalized): {text}")
|
|
continue
|
|
|
|
# Prüfe ob es wirklich eine Claude-Nachricht ist (muss mit "Claude sagt:" BEGINNEN)
|
|
if not text.startswith("Claude sagt:"):
|
|
# Fallback: Prüfe ob "Claude sagt:" irgendwo am Anfang einer Zeile steht
|
|
lines = text.split('\n')
|
|
found_claude_line = False
|
|
for line in lines[:5]: # Nur erste 5 Zeilen prüfen
|
|
if line.strip().startswith("Claude sagt:"):
|
|
found_claude_line = True
|
|
break
|
|
if not found_claude_line:
|
|
logger.debug(f"Claude-Element {i} übersprungen: Beginnt nicht mit 'Claude sagt:'")
|
|
continue
|
|
|
|
# Filtere Sidebar/Navigation Elemente (beginnen oft mit "Neuer Chat")
|
|
if text.startswith("Neuer Chat") or text.startswith("Chats") or text.startswith("Projekte"):
|
|
logger.debug(f"Claude-Element {i} übersprungen: Ist Sidebar/Navigation")
|
|
continue
|
|
|
|
msg_id = elem.get_attribute("data-message-id")
|
|
if not msg_id:
|
|
# WICHTIG: Nutze einen ZÄHLER statt Hash!
|
|
# Der Zähler ist die Anzahl der bisherigen Claude-Nachrichten.
|
|
# Das ist stabil weil neue Nachrichten nur HINZUGEFÜGT werden.
|
|
# Der Hash war instabil weil der Text sich leicht ändern kann.
|
|
claude_msg_count = sum(1 for m in messages if m.is_from_assistant)
|
|
msg_id = f"claude_msg_{claude_msg_count}"
|
|
|
|
messages.append(ChatMessage(
|
|
id=msg_id,
|
|
text=text,
|
|
is_from_assistant=True,
|
|
timestamp=time.time()
|
|
))
|
|
logger.debug(f"Claude-Nachricht {claude_msg_count if 'claude_msg_count' in dir() else i}: '{text[:80]}...' ({len(text)} Zeichen)")
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei Claude-Nachricht {i}: {e}")
|
|
|
|
logger.debug(f"Gesamt: {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen der Nachrichten: {e}")
|
|
|
|
return messages
|
|
|
|
def get_last_assistant_message(self) -> Optional[ChatMessage]:
|
|
"""Holt die letzte Nachricht von Claude"""
|
|
messages = self._get_all_messages()
|
|
|
|
for msg in reversed(messages):
|
|
if msg.is_from_assistant:
|
|
return msg
|
|
|
|
return None
|
|
|
|
def is_claude_typing(self) -> bool:
|
|
"""
|
|
Prüft ob Claude gerade tippt (streaming).
|
|
|
|
Erkennt mehrere Indikatoren:
|
|
1. Stop-Button ist sichtbar (während Claude schreibt)
|
|
2. data-is-streaming='true' Attribut
|
|
3. Animiertes Logo / Thinking-Indikator
|
|
"""
|
|
try:
|
|
# Methode 1: Stop-Button prüfen (zuverlässigster Indikator)
|
|
# Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button
|
|
stop_indicators = [
|
|
"button[aria-label*='Stop']",
|
|
"button[aria-label*='stop']",
|
|
"button[class*='stop']",
|
|
"button[data-testid*='stop']",
|
|
# Alternativer Indikator: Button mit Stop-Icon
|
|
"button svg[class*='stop']",
|
|
]
|
|
|
|
for selector in stop_indicators:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})")
|
|
return True
|
|
except:
|
|
continue
|
|
|
|
# Methode 2: Streaming-Attribut (original)
|
|
streaming = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
self.SELECTORS["streaming"]
|
|
)
|
|
if len(streaming) > 0:
|
|
logger.debug("Claude tippt (streaming=true)")
|
|
return True
|
|
|
|
# Methode 3: Animiertes/Thinking Indikator suchen
|
|
thinking_indicators = [
|
|
"[class*='thinking']",
|
|
"[class*='loading']",
|
|
"[class*='typing']",
|
|
"[class*='streaming']",
|
|
"[data-state='loading']",
|
|
# Pulsierendes Logo
|
|
"[class*='pulse']",
|
|
"[class*='animate']",
|
|
]
|
|
|
|
for selector in thinking_indicators:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
logger.debug(f"Claude tippt (Indikator: {selector})")
|
|
return True
|
|
except:
|
|
continue
|
|
|
|
# Methode 4: JavaScript-basierte Prüfung
|
|
# Prüft ob irgendwo noch Text gestreamt wird
|
|
try:
|
|
is_streaming = self.driver.execute_script("""
|
|
// Prüfe ob Stop-Button existiert und sichtbar ist
|
|
const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]');
|
|
if (stopBtn && stopBtn.offsetParent !== null) return true;
|
|
|
|
// Prüfe auf streaming-Attribut
|
|
const streaming = document.querySelector('[data-is-streaming="true"]');
|
|
if (streaming) return true;
|
|
|
|
// Prüfe auf disabled Send-Button (während Claude tippt)
|
|
const sendBtn = document.querySelector('button[aria-label*="Send"]');
|
|
if (sendBtn && sendBtn.disabled) return true;
|
|
|
|
return false;
|
|
""")
|
|
if is_streaming:
|
|
logger.debug("Claude tippt (JavaScript-Check)")
|
|
return True
|
|
except:
|
|
pass
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei typing-check: {e}")
|
|
return False
|
|
|
|
def wait_for_ready_signal(self, timeout: int = 120, stop_check: callable = None) -> bool:
|
|
"""
|
|
Wartet bis Claude [READY] sendet.
|
|
|
|
WICHTIG: Prüft nur die LETZTE Claude-Nachricht!
|
|
So werden alte [READY] im Chat-Verlauf ignoriert.
|
|
|
|
Args:
|
|
timeout: Maximale Wartezeit in Sekunden
|
|
stop_check: Optionale Funktion die True zurückgibt wenn abgebrochen werden soll
|
|
(z.B. lambda: not bridge.running)
|
|
|
|
Returns:
|
|
True wenn [READY] empfangen, False bei Timeout oder Abbruch
|
|
"""
|
|
logger.info(f"Warte auf [READY] Signal in letzter Claude-Nachricht (max {timeout}s)...")
|
|
start_time = time.time()
|
|
|
|
# Merke die Anzahl der Claude-Nachrichten VOR dem Senden
|
|
initial_count = self._count_claude_messages()
|
|
logger.debug(f"Initiale Claude-Nachrichten: {initial_count}")
|
|
|
|
while time.time() - start_time < timeout:
|
|
# Prüfe ob abgebrochen werden soll (z.B. Q gedrückt)
|
|
if stop_check and stop_check():
|
|
logger.info("wait_for_ready abgebrochen (stop_check)")
|
|
return False
|
|
|
|
# Warte bis Claude fertig ist mit Tippen
|
|
typing_wait_start = time.time()
|
|
while self.is_claude_typing():
|
|
time.sleep(0.5)
|
|
# Prüfe auch hier auf Abbruch
|
|
if stop_check and stop_check():
|
|
logger.info("wait_for_ready abgebrochen während typing-wait")
|
|
return False
|
|
# Timeout für typing-wait (max 60s)
|
|
if time.time() - typing_wait_start > 60:
|
|
logger.debug("Typing-Wait Timeout, prüfe trotzdem...")
|
|
break
|
|
|
|
# Prüfe ob es eine NEUE Claude-Nachricht gibt
|
|
current_count = self._count_claude_messages()
|
|
if current_count > initial_count:
|
|
# Hole die letzte Claude-Nachricht
|
|
last_msg = self.get_last_assistant_message()
|
|
if last_msg and '[READY]' in last_msg.text.upper():
|
|
logger.info(f"[READY] in letzter Claude-Nachricht gefunden!")
|
|
return True
|
|
else:
|
|
logger.debug(f"Neue Nachricht aber kein [READY]: {last_msg.text[:50] if last_msg else 'None'}...")
|
|
# Aktualisiere initial_count damit wir auf die NÄCHSTE warten
|
|
initial_count = current_count
|
|
else:
|
|
# FALLBACK: Auch wenn keine NEUE Nachricht, prüfe ob letzte [READY] hat
|
|
# Das hilft wenn Claude schon vor dem Senden geantwortet hat
|
|
last_msg = self.get_last_assistant_message()
|
|
if last_msg and '[READY]' in last_msg.text.upper():
|
|
logger.info(f"[READY] in bestehender Claude-Nachricht gefunden!")
|
|
return True
|
|
|
|
# FALLBACK 2: Direkte DOM-Suche nach [READY]
|
|
# Claude.ai rendert [READY] manchmal in einem separaten Element
|
|
# das nicht als Teil der "Claude sagt:" Nachricht erkannt wird
|
|
try:
|
|
ready_found = self.driver.execute_script("""
|
|
// Suche nach einem Element das EXAKT "[READY]" enthält
|
|
// WICHTIG: Muss NICHT in user-message sein und NICHT in den Instruktionen
|
|
const allElements = document.querySelectorAll('*');
|
|
for (const elem of allElements) {
|
|
// Nur Blatt-Elemente (keine Container mit Kindern)
|
|
if (elem.children.length > 0) continue;
|
|
|
|
const text = (elem.innerText || elem.textContent || '').trim();
|
|
|
|
// Muss [READY] enthalten
|
|
if (!text.includes('[READY]')) continue;
|
|
|
|
// Prüfe dass es NICHT in user-message ist
|
|
if (elem.closest('[data-testid="user-message"]')) continue;
|
|
|
|
// Prüfe dass es NICHT in der Sidebar ist
|
|
if (elem.closest('nav')) continue;
|
|
if (elem.closest('[class*="sidebar"]')) continue;
|
|
|
|
// Wichtig: Text darf NICHT mit "## WICHTIG" beginnen
|
|
// (das wäre ein Heading in den Instruktionen)
|
|
if (text.startsWith('## WICHTIG') || text.startsWith('**WICHTIG')) continue;
|
|
|
|
// Text darf nicht "Bestätige mit [READY]" enthalten
|
|
// (das wäre eine Anweisung, keine Bestätigung)
|
|
if (text.includes('Bestätige mit')) continue;
|
|
if (text.includes('antworte mit')) continue;
|
|
if (text.includes('Format muss EXAKT')) continue;
|
|
|
|
// Wenn der Text kurz ist (<100 Zeichen), ist es wahrscheinlich
|
|
// Claudes echte [READY] Bestätigung
|
|
// Wenn er lang ist, ist es wahrscheinlich ein Container der Instruktionen enthält
|
|
if (text.length > 200) continue;
|
|
|
|
console.log('Found [READY] in:', text.substring(0, 50));
|
|
return true;
|
|
}
|
|
return false;
|
|
""")
|
|
if ready_found:
|
|
logger.info(f"[READY] via direkte DOM-Suche gefunden!")
|
|
return True
|
|
except Exception as e:
|
|
logger.debug(f"DOM-Suche nach [READY] fehlgeschlagen: {e}")
|
|
|
|
# Kurz warten bevor nächster Check
|
|
time.sleep(1)
|
|
|
|
logger.warning(f"Timeout: Kein [READY] nach {timeout}s")
|
|
return False
|
|
|
|
def _count_claude_messages(self) -> int:
|
|
"""Zählt die Anzahl der Claude-Nachrichten im Chat"""
|
|
try:
|
|
messages = self._get_all_messages()
|
|
return sum(1 for m in messages if m.is_from_assistant)
|
|
except Exception as e:
|
|
logger.debug(f"Fehler beim Zählen: {e}")
|
|
return 0
|
|
|
|
def take_screenshot(self, path: str = "screenshot.png"):
|
|
"""Macht einen Screenshot (für Debugging)"""
|
|
self.driver.save_screenshot(path)
|
|
logger.info(f"Screenshot gespeichert: {path}")
|
|
|
|
def close(self):
|
|
"""Schließt den Browser"""
|
|
logger.info("Schließe Browser...")
|
|
try:
|
|
self.driver.quit()
|
|
except:
|
|
pass
|
|
|
|
# ════════════════════════════════════════════════════════════════════════
|
|
# BILD-UPLOAD FUNKTIONEN (für Robot Vision)
|
|
# ════════════════════════════════════════════════════════════════════════
|
|
|
|
def fetch_image_from_esp32(self) -> bool:
|
|
"""
|
|
Holt ein Bild vom ESP32/Mock-Server und speichert es lokal.
|
|
|
|
Returns:
|
|
True wenn erfolgreich, False bei Fehler
|
|
"""
|
|
if not self.esp32_url:
|
|
logger.warning("Keine ESP32 URL konfiguriert")
|
|
return False
|
|
|
|
try:
|
|
# Capture-Endpoint aufrufen (macht Foto und gibt es zurück)
|
|
url = f"{self.esp32_url}/api/capture"
|
|
if self.esp32_api_key:
|
|
url += f"?key={self.esp32_api_key}"
|
|
|
|
response = self._http_session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
# Prüfe ob wir ein Bild bekommen haben
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if "image" in content_type:
|
|
# Direktes Bild
|
|
with open(self._temp_image_path, "wb") as f:
|
|
f.write(response.content)
|
|
logger.info(f"Bild gespeichert: {len(response.content)} bytes")
|
|
return True
|
|
else:
|
|
# JSON Response (Mock-Server neuer Stil)
|
|
# Dann müssen wir /foto.jpg separat holen
|
|
foto_url = f"{self.esp32_url}/foto.jpg"
|
|
foto_response = self._http_session.get(foto_url, timeout=10)
|
|
foto_response.raise_for_status()
|
|
|
|
with open(self._temp_image_path, "wb") as f:
|
|
f.write(foto_response.content)
|
|
logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes")
|
|
return True
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"ESP32 Verbindungsfehler: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Bild holen: {e}")
|
|
return False
|
|
|
|
def upload_image_to_chat(self) -> bool:
|
|
"""
|
|
Lädt das gespeicherte Bild in den Claude.ai Chat hoch.
|
|
|
|
Returns:
|
|
True wenn erfolgreich, False bei Fehler
|
|
"""
|
|
if not self._temp_image_path.exists():
|
|
logger.error("Kein Bild zum Hochladen vorhanden")
|
|
return False
|
|
|
|
try:
|
|
# Finde das versteckte file input Element
|
|
file_input = self._find_file_input()
|
|
|
|
if not file_input:
|
|
logger.error("File-Upload Input nicht gefunden!")
|
|
return False
|
|
|
|
# Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs)
|
|
file_input.send_keys(str(self._temp_image_path.absolute()))
|
|
|
|
logger.info("Bild hochgeladen!")
|
|
self._images_uploaded += 1
|
|
|
|
# Kurz warten bis Upload verarbeitet ist
|
|
time.sleep(1.5)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Bild-Upload: {e}")
|
|
return False
|
|
|
|
def _calculate_image_hash(self) -> Optional[str]:
|
|
"""
|
|
Berechnet einen Hash des aktuellen Bildes.
|
|
|
|
Returns:
|
|
MD5 Hash als Hex-String, oder None bei Fehler
|
|
"""
|
|
if not self._temp_image_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(self._temp_image_path, "rb") as f:
|
|
return hashlib.md5(f.read()).hexdigest()
|
|
except Exception as e:
|
|
logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}")
|
|
return None
|
|
|
|
def has_image_changed(self) -> bool:
|
|
"""
|
|
Prüft ob sich das Bild seit dem letzten Upload geändert hat.
|
|
|
|
WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat!
|
|
Diese Funktion hilft, unnötige Uploads zu vermeiden.
|
|
|
|
Returns:
|
|
True wenn Bild neu/geändert, False wenn identisch zum letzten
|
|
"""
|
|
current_hash = self._calculate_image_hash()
|
|
|
|
if current_hash is None:
|
|
return True # Kein Bild = als "geändert" behandeln
|
|
|
|
if self._last_image_hash is None:
|
|
return True # Erster Upload
|
|
|
|
return current_hash != self._last_image_hash
|
|
|
|
def upload_image_if_changed(self) -> bool:
|
|
"""
|
|
Lädt Bild nur hoch wenn es sich geändert hat.
|
|
|
|
Returns:
|
|
True wenn hochgeladen, False wenn übersprungen oder Fehler
|
|
"""
|
|
if not self.has_image_changed():
|
|
logger.debug("Bild unverändert - überspringe Upload")
|
|
return False
|
|
|
|
# Hash vor dem Upload speichern
|
|
new_hash = self._calculate_image_hash()
|
|
|
|
if self.upload_image_to_chat():
|
|
self._last_image_hash = new_hash
|
|
return True
|
|
|
|
return False
|
|
|
|
def delete_uploaded_images(self) -> int:
|
|
"""
|
|
Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen.
|
|
|
|
HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden!
|
|
Bereits gesendete Bilder können nicht gelöscht werden.
|
|
|
|
Returns:
|
|
Anzahl gelöschter Bilder
|
|
"""
|
|
deleted = 0
|
|
|
|
try:
|
|
# Suche nach Bild-Vorschau-Elementen im Eingabebereich
|
|
# Diese haben meist einen X/Close Button
|
|
preview_selectors = [
|
|
"button[aria-label*='Remove']",
|
|
"button[aria-label*='remove']",
|
|
"button[aria-label*='Delete']",
|
|
"button[aria-label*='delete']",
|
|
"[class*='preview'] button[class*='close']",
|
|
"[class*='attachment'] button[class*='remove']",
|
|
"[class*='file'] button[class*='delete']",
|
|
]
|
|
|
|
for selector in preview_selectors:
|
|
try:
|
|
buttons = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for btn in buttons:
|
|
if btn.is_displayed():
|
|
btn.click()
|
|
deleted += 1
|
|
time.sleep(0.3)
|
|
except:
|
|
continue
|
|
|
|
# JavaScript-Fallback
|
|
if deleted == 0:
|
|
try:
|
|
deleted = self.driver.execute_script("""
|
|
let count = 0;
|
|
// Suche nach Remove-Buttons in Attachment-Previews
|
|
const buttons = document.querySelectorAll(
|
|
'[class*="preview"] button, [class*="attachment"] button'
|
|
);
|
|
buttons.forEach(btn => {
|
|
if (btn.offsetParent) {
|
|
btn.click();
|
|
count++;
|
|
}
|
|
});
|
|
return count;
|
|
""") or 0
|
|
except:
|
|
pass
|
|
|
|
if deleted > 0:
|
|
logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Bild-Löschung fehlgeschlagen: {e}")
|
|
|
|
return deleted
|
|
|
|
def get_images_uploaded_count(self) -> int:
|
|
"""Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück"""
|
|
return self._images_uploaded
|
|
|
|
def _find_file_input(self):
|
|
"""Findet das File-Upload Input Element"""
|
|
selectors = [
|
|
self.SELECTORS["file_input"],
|
|
"input[accept*='image']",
|
|
"input[type='file'][accept]",
|
|
"input[type='file']",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
# Auch versteckte Inputs funktionieren mit send_keys
|
|
return elem
|
|
except:
|
|
continue
|
|
|
|
# Fallback: Via JavaScript suchen
|
|
try:
|
|
return self.driver.execute_script("""
|
|
return document.querySelector('input[type="file"]') ||
|
|
document.querySelector('[accept*="image"]');
|
|
""")
|
|
except:
|
|
return None
|
|
|
|
def send_tick_with_image(self) -> bool:
|
|
"""
|
|
Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK].
|
|
|
|
Das ist der Haupt-Heartbeat mit Bild!
|
|
|
|
Returns:
|
|
True wenn alles geklappt hat
|
|
"""
|
|
# Schritt 1: Bild vom ESP32 holen
|
|
if not self.fetch_image_from_esp32():
|
|
# Kein Bild? Trotzdem TICK senden
|
|
self.send_message("[TICK - KEIN BILD]")
|
|
return False
|
|
|
|
# Schritt 2: Bild in Chat hochladen
|
|
if not self.upload_image_to_chat():
|
|
self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]")
|
|
return False
|
|
|
|
# Schritt 3: TICK senden
|
|
self.send_message("[TICK]")
|
|
|
|
return True
|
|
|
|
|
|
# Hilfsfunktion für einfaches Testing
|
|
def test_interface(chat_url: str):
|
|
"""Testet das Interface"""
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
print("Starte Chat Interface Test...")
|
|
print(f"URL: {chat_url}")
|
|
|
|
interface = ClaudeChatInterface(
|
|
chat_url=chat_url,
|
|
headless=False
|
|
)
|
|
|
|
print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...")
|
|
input()
|
|
|
|
interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!")
|
|
print("Nachricht gesendet!")
|
|
|
|
print("\nWarte 5 Sekunden auf Antwort...")
|
|
time.sleep(5)
|
|
|
|
messages = interface.get_new_messages()
|
|
print(f"\nGefundene Nachrichten: {len(messages)}")
|
|
|
|
for msg in messages[-3:]:
|
|
role = "Claude" if msg.is_from_assistant else "Human"
|
|
print(f" [{role}] {msg.text[:100]}...")
|
|
|
|
print("\nDrücke Enter zum Beenden...")
|
|
input()
|
|
|
|
interface.close()
|
|
print("Fertig!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python chat_web_interface.py <claude-chat-url>")
|
|
print("Example: python chat_web_interface.py https://claude.ai/chat/abc123")
|
|
sys.exit(1)
|
|
|
|
test_interface(sys.argv[1])
|