1127 lines
40 KiB
Python
1127 lines
40 KiB
Python
"""
|
|
Claude's Eyes - Chat Web Interface
|
|
|
|
Steuert den echten Claude.ai Chat im Browser via Selenium.
|
|
Claude (im Chat) steuert den Roboter SELBST - diese Bridge ist nur für Audio!
|
|
|
|
HINWEIS: Die CSS-Selektoren müssen möglicherweise angepasst werden,
|
|
wenn Claude.ai sein UI ändert.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
import tempfile
|
|
import hashlib
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
from pathlib import Path
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ChatMessage:
|
|
"""Eine Chat-Nachricht"""
|
|
id: str
|
|
text: str
|
|
is_from_assistant: bool
|
|
timestamp: float = 0
|
|
|
|
|
|
class ClaudeChatInterface:
|
|
"""
|
|
Steuert Claude.ai Chat via Selenium Browser Automation.
|
|
|
|
Diese Klasse:
|
|
- Öffnet einen Browser mit dem Claude.ai Chat
|
|
- Kann Nachrichten senden (für Heartbeat und Stefan's Sprache)
|
|
- Kann neue Nachrichten lesen (für TTS)
|
|
|
|
WICHTIG: Du musst beim ersten Start manuell einloggen!
|
|
"""
|
|
|
|
@staticmethod
|
|
def extract_chat_id(url: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert die Chat-ID aus einer Claude.ai URL.
|
|
|
|
Args:
|
|
url: z.B. "https://claude.ai/chat/21ac7549-1009-44cc-a143-3e4bd3c64b2d"
|
|
|
|
Returns:
|
|
Chat-ID z.B. "21ac7549-1009-44cc-a143-3e4bd3c64b2d", oder None
|
|
"""
|
|
if not url or '/chat/' not in url:
|
|
return None
|
|
try:
|
|
# Extrahiere alles nach /chat/
|
|
chat_id = url.split('/chat/')[-1].split('?')[0].split('#')[0]
|
|
return chat_id if chat_id else None
|
|
except:
|
|
return None
|
|
|
|
# CSS Selektoren für Claude.ai (Stand: Dezember 2025)
|
|
# Diese müssen angepasst werden wenn sich das UI ändert!
|
|
SELECTORS = {
|
|
# Eingabefeld für neue Nachrichten
|
|
"input_field": "div.ProseMirror[contenteditable='true']",
|
|
|
|
# Alternativ: Textarea
|
|
"input_textarea": "textarea[placeholder*='Message']",
|
|
|
|
# Senden-Button (falls Enter nicht funktioniert)
|
|
"send_button": "button[aria-label*='Send']",
|
|
|
|
# Alle Nachrichten-Container
|
|
"messages_container": "div[class*='conversation']",
|
|
|
|
# Einzelne Nachrichten
|
|
"message_human": "div[data-is-streaming='false'][class*='human']",
|
|
"message_assistant": "div[data-is-streaming='false'][class*='assistant']",
|
|
|
|
# Generischer Nachrichten-Selektor (Fallback)
|
|
"message_any": "div[class*='message']",
|
|
|
|
# Streaming-Indikator (Claude tippt noch)
|
|
"streaming": "div[data-is-streaming='true']",
|
|
|
|
# File Upload Input (versteckt, aber funktioniert mit send_keys)
|
|
"file_input": "input[type='file']",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
chat_url: Optional[str] = None,
|
|
headless: bool = False,
|
|
user_data_dir: Optional[str] = None,
|
|
chrome_binary: Optional[str] = None,
|
|
esp32_url: Optional[str] = None,
|
|
esp32_api_key: Optional[str] = None
|
|
):
|
|
"""
|
|
Initialisiert das Chat-Interface.
|
|
|
|
Args:
|
|
chat_url: URL zum Claude.ai Chat (z.B. https://claude.ai/chat/abc123)
|
|
headless: Browser im Hintergrund? (False = sichtbar)
|
|
user_data_dir: Chrome Profil-Ordner (für gespeicherte Logins)
|
|
chrome_binary: Pfad zur Chrome/Chromium Binary (für Termux)
|
|
esp32_url: URL zum ESP32/Mock-Server (für Bild-Capture)
|
|
esp32_api_key: API-Key für ESP32 Authentifizierung
|
|
"""
|
|
self.chat_url = chat_url
|
|
self.esp32_url = esp32_url
|
|
self.esp32_api_key = esp32_api_key
|
|
self._message_cache: List[ChatMessage] = []
|
|
self._last_message_id = 0
|
|
self._temp_image_path = Path(tempfile.gettempdir()) / "robot_view.jpg"
|
|
|
|
# Für Bild-Deduplizierung (100 Bilder pro Chat Limit!)
|
|
self._last_image_hash: Optional[str] = None
|
|
self._images_uploaded = 0
|
|
|
|
# HTTP Session mit größerem Connection Pool (vermeidet "pool full" Warnungen)
|
|
self._http_session = requests.Session()
|
|
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
|
|
self._http_session.mount('http://', adapter)
|
|
self._http_session.mount('https://', adapter)
|
|
|
|
# Chrome Optionen
|
|
options = webdriver.ChromeOptions()
|
|
|
|
if headless:
|
|
options.add_argument("--headless=new")
|
|
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
options.add_argument("--disable-gpu")
|
|
options.add_argument("--window-size=1280,800")
|
|
|
|
# Für persistente Sessions (Login bleibt gespeichert)
|
|
if user_data_dir:
|
|
options.add_argument(f"--user-data-dir={user_data_dir}")
|
|
|
|
# Für Termux/Android
|
|
if chrome_binary:
|
|
options.binary_location = chrome_binary
|
|
|
|
# Anti-Detection (manche Seiten blocken Selenium)
|
|
options.add_argument("--disable-blink-features=AutomationControlled")
|
|
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
options.add_experimental_option("useAutomationExtension", False)
|
|
|
|
logger.info("Starte Chrome Browser...")
|
|
|
|
try:
|
|
self.driver = webdriver.Chrome(options=options)
|
|
self.driver.execute_script(
|
|
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Chrome konnte nicht gestartet werden: {e}")
|
|
logger.info("Versuche mit webdriver-manager...")
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
service = Service(ChromeDriverManager().install())
|
|
self.driver = webdriver.Chrome(service=service, options=options)
|
|
|
|
self.wait = WebDriverWait(self.driver, 30)
|
|
|
|
# Navigiere zur Chat-URL
|
|
if chat_url:
|
|
self.navigate_to_chat(chat_url)
|
|
|
|
def navigate_to_chat(self, url: str):
|
|
"""Navigiert zur Chat-URL"""
|
|
logger.info(f"Navigiere zu: {url}")
|
|
self.driver.get(url)
|
|
self.chat_url = url
|
|
|
|
# Reset Bild-Counter bei neuem Chat
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
|
|
# Warte auf Seitenladung
|
|
time.sleep(3)
|
|
|
|
# Prüfe ob Login nötig
|
|
if "login" in self.driver.current_url.lower():
|
|
logger.warning("Login erforderlich! Bitte im Browser einloggen...")
|
|
print("\n" + "=" * 50)
|
|
print("BITTE IM BROWSER BEI CLAUDE.AI EINLOGGEN!")
|
|
print("Das Fenster bleibt offen. Nach dem Login geht's weiter.")
|
|
print("=" * 50 + "\n")
|
|
|
|
# Warte bis wieder auf der Chat-Seite
|
|
while "login" in self.driver.current_url.lower():
|
|
time.sleep(2)
|
|
|
|
logger.info("Login erfolgreich!")
|
|
time.sleep(2)
|
|
|
|
def start_new_chat(self) -> tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Startet einen neuen Chat auf Claude.ai.
|
|
|
|
WICHTIG: Nutze dies wenn das 100-Bilder-Limit erreicht ist!
|
|
|
|
Returns:
|
|
Tuple (neue_url, alte_chat_id) oder (None, None) bei Fehler
|
|
"""
|
|
try:
|
|
# Alte Chat-ID merken bevor wir wechseln
|
|
old_chat_id = self.extract_chat_id(self.chat_url)
|
|
logger.info(f"Starte neuen Chat... (alter Chat: {old_chat_id})")
|
|
|
|
# Methode 1: "New Chat" Button suchen
|
|
new_chat_selectors = [
|
|
"button[aria-label*='New']",
|
|
"button[aria-label*='new']",
|
|
"a[href='/new']",
|
|
"a[href*='/chat/new']",
|
|
"[data-testid*='new-chat']",
|
|
"button:has-text('New chat')",
|
|
]
|
|
|
|
for selector in new_chat_selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
elem.click()
|
|
time.sleep(2)
|
|
new_url = self.driver.current_url
|
|
logger.info(f"Neuer Chat gestartet: {new_url}")
|
|
|
|
# Reset Counter
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
self.chat_url = new_url
|
|
|
|
return new_url, old_chat_id
|
|
except:
|
|
continue
|
|
|
|
# Methode 2: Direkt zu /new navigieren
|
|
base_url = self.chat_url.split('/chat/')[0] if '/chat/' in self.chat_url else "https://claude.ai"
|
|
new_url = f"{base_url}/new"
|
|
logger.info(f"Versuche direkte Navigation zu: {new_url}")
|
|
self.driver.get(new_url)
|
|
time.sleep(3)
|
|
|
|
# Prüfe ob es geklappt hat
|
|
current = self.driver.current_url
|
|
if current != self.chat_url:
|
|
logger.info(f"Neuer Chat: {current}")
|
|
self._last_image_hash = None
|
|
self._images_uploaded = 0
|
|
self.chat_url = current
|
|
return current, old_chat_id
|
|
|
|
logger.warning("Konnte keinen neuen Chat starten")
|
|
return None, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Starten eines neuen Chats: {e}")
|
|
return None, None
|
|
|
|
def get_current_chat_url(self) -> str:
|
|
"""Gibt die aktuelle Chat-URL zurück"""
|
|
return self.driver.current_url
|
|
|
|
def send_message(self, text: str, wait_for_response: bool = False) -> bool:
|
|
"""
|
|
Sendet eine Nachricht in den Chat.
|
|
|
|
Args:
|
|
text: Die zu sendende Nachricht
|
|
wait_for_response: Warten bis Claude antwortet?
|
|
|
|
Returns:
|
|
True wenn erfolgreich gesendet
|
|
"""
|
|
# Nutze send_message_with_delay mit delay=0 (hat Retry-Logik eingebaut)
|
|
result = self.send_message_with_delay(text, delay_before_send=0)
|
|
|
|
if result and wait_for_response:
|
|
self._wait_for_response()
|
|
|
|
return result
|
|
|
|
def send_message_with_delay(self, text: str, delay_before_send: int = 15) -> bool:
|
|
"""
|
|
Sendet eine Nachricht mit Verzögerung vor dem Absenden.
|
|
|
|
Nützlich für große Texte (wie Instruktionen), bei denen die
|
|
Zwischenablage/das Eingabefeld Zeit braucht um den Text zu verarbeiten.
|
|
|
|
Ablauf:
|
|
1. Text via JavaScript ins Eingabefeld einfügen (vermeidet Tastaturlayout-Probleme!)
|
|
2. Warte delay_before_send Sekunden
|
|
3. Send-Button klicken (mit Retry)
|
|
|
|
Args:
|
|
text: Die zu sendende Nachricht
|
|
delay_before_send: Sekunden warten nach Einfügen, vor dem Senden
|
|
|
|
Returns:
|
|
True wenn erfolgreich gesendet
|
|
"""
|
|
try:
|
|
# Finde Eingabefeld
|
|
input_field = self._find_input_field()
|
|
|
|
if not input_field:
|
|
logger.error("Eingabefeld nicht gefunden!")
|
|
return False
|
|
|
|
# Feld fokussieren
|
|
input_field.click()
|
|
time.sleep(0.2)
|
|
|
|
# Text via JavaScript einfügen (vermeidet QWERTY/QWERTZ Probleme!)
|
|
logger.info(f"Füge Text ein ({len(text)} Zeichen)...")
|
|
self._insert_text_via_js(input_field, text)
|
|
|
|
# WARTEN - große Texte brauchen Zeit!
|
|
if delay_before_send > 0:
|
|
logger.info(f"Warte {delay_before_send}s vor dem Absenden (große Texte brauchen Zeit)...")
|
|
time.sleep(delay_before_send)
|
|
else:
|
|
time.sleep(1.0) # Mindestens 1s warten
|
|
|
|
# Jetzt absenden (mit Retry-Logik)
|
|
send_success = False
|
|
for attempt in range(3):
|
|
send_button = self._find_send_button()
|
|
if send_button:
|
|
try:
|
|
if send_button.is_enabled() and send_button.is_displayed():
|
|
send_button.click()
|
|
logger.info(f"Nachricht via Send-Button gesendet (Versuch {attempt + 1})")
|
|
send_success = True
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Send-Button Klick Versuch {attempt + 1} fehlgeschlagen: {e}")
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Fallback: Enter-Taste
|
|
if not send_success:
|
|
logger.debug("Send-Button nicht gefunden/klickbar, nutze Enter")
|
|
input_field.send_keys(Keys.RETURN)
|
|
|
|
time.sleep(0.3)
|
|
logger.debug(f"Nachricht gesendet: {text[:50]}...")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Senden mit Verzögerung: {e}")
|
|
return False
|
|
|
|
def _find_send_button(self):
|
|
"""Findet den Send-Button"""
|
|
selectors = [
|
|
"button[aria-label*='Send']",
|
|
"button[aria-label*='send']",
|
|
"button[data-testid*='send']",
|
|
"button[type='submit']",
|
|
# Claude.ai spezifisch - Button mit Pfeil-Icon
|
|
"button svg[class*='send']",
|
|
"button[class*='send']",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed() and elem.is_enabled():
|
|
return elem
|
|
except:
|
|
continue
|
|
|
|
# Fallback: JavaScript-Suche
|
|
try:
|
|
return self.driver.execute_script("""
|
|
// Suche nach Send-Button
|
|
const btn = document.querySelector('button[aria-label*="Send"], button[aria-label*="send"]');
|
|
if (btn && !btn.disabled) return btn;
|
|
|
|
// Alternative: Letzter Button im Input-Bereich
|
|
const buttons = document.querySelectorAll('button');
|
|
for (const b of buttons) {
|
|
if (b.offsetParent && !b.disabled) {
|
|
const text = b.textContent.toLowerCase();
|
|
const label = (b.getAttribute('aria-label') || '').toLowerCase();
|
|
if (text.includes('send') || label.includes('send')) return b;
|
|
}
|
|
}
|
|
return null;
|
|
""")
|
|
except:
|
|
return None
|
|
|
|
def _find_input_field(self):
|
|
"""Findet das Eingabefeld"""
|
|
selectors = [
|
|
self.SELECTORS["input_field"],
|
|
self.SELECTORS["input_textarea"],
|
|
"div[contenteditable='true']",
|
|
"textarea",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
element = self.driver.find_element(By.CSS_SELECTOR, selector)
|
|
if element.is_displayed() and element.is_enabled():
|
|
return element
|
|
except NoSuchElementException:
|
|
continue
|
|
|
|
return None
|
|
|
|
def _insert_text_via_js(self, element, text: str):
|
|
"""
|
|
Fügt Text via JavaScript ein (vermeidet Tastaturlayout-Probleme).
|
|
|
|
Bei send_keys() werden physische Tasten gedrückt, was bei
|
|
unterschiedlichen Tastaturlayouts (QWERTY vs QWERTZ) zu
|
|
falschen Zeichen führt (z.B. y↔z vertauscht).
|
|
|
|
Diese Methode fügt den Text direkt als String ein.
|
|
"""
|
|
# Escape für JavaScript
|
|
escaped_text = text.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')
|
|
|
|
# Für contenteditable divs (ProseMirror)
|
|
self.driver.execute_script("""
|
|
const element = arguments[0];
|
|
const text = arguments[1];
|
|
|
|
// Fokussieren
|
|
element.focus();
|
|
|
|
// Methode 1: Für contenteditable (ProseMirror)
|
|
if (element.contentEditable === 'true') {
|
|
// Text als HTML einfügen (respektiert Zeilenumbrüche)
|
|
const htmlText = text.replace(/\\n/g, '<br>');
|
|
element.innerHTML = htmlText;
|
|
|
|
// Input-Event triggern damit React/Vue es mitbekommt
|
|
element.dispatchEvent(new Event('input', { bubbles: true }));
|
|
|
|
// Cursor ans Ende setzen
|
|
const range = document.createRange();
|
|
range.selectNodeContents(element);
|
|
range.collapse(false);
|
|
const sel = window.getSelection();
|
|
sel.removeAllRanges();
|
|
sel.addRange(range);
|
|
}
|
|
// Methode 2: Für textarea/input
|
|
else {
|
|
element.value = text;
|
|
element.dispatchEvent(new Event('input', { bubbles: true }));
|
|
element.dispatchEvent(new Event('change', { bubbles: true }));
|
|
}
|
|
""", element, text)
|
|
|
|
logger.debug(f"Text via JavaScript eingefügt ({len(text)} Zeichen)")
|
|
|
|
def _wait_for_response(self, timeout: int = 60):
|
|
"""Wartet bis Claude fertig getippt hat"""
|
|
logger.debug("Warte auf Claudes Antwort...")
|
|
|
|
# Warte kurz damit Streaming startet
|
|
time.sleep(1)
|
|
|
|
# Warte bis Streaming endet
|
|
try:
|
|
WebDriverWait(self.driver, timeout).until_not(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, self.SELECTORS["streaming"])
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
logger.warning("Timeout beim Warten auf Antwort")
|
|
|
|
time.sleep(0.5) # Kurz warten bis DOM aktualisiert
|
|
|
|
def get_new_messages(self, since_id: Optional[str] = None) -> List[ChatMessage]:
|
|
"""
|
|
Holt neue Nachrichten aus dem Chat.
|
|
|
|
Args:
|
|
since_id: Nur Nachrichten nach dieser ID zurückgeben
|
|
|
|
Returns:
|
|
Liste neuer ChatMessage Objekte
|
|
"""
|
|
all_messages = self._get_all_messages()
|
|
|
|
if since_id is None:
|
|
return all_messages
|
|
|
|
# Filtere nur neue
|
|
new_messages = []
|
|
found_marker = False
|
|
|
|
for msg in all_messages:
|
|
if found_marker:
|
|
new_messages.append(msg)
|
|
elif msg.id == since_id:
|
|
found_marker = True
|
|
|
|
return new_messages
|
|
|
|
def _get_all_messages(self) -> List[ChatMessage]:
|
|
"""
|
|
Holt alle Nachrichten aus dem Chat.
|
|
|
|
Claude.ai verwendet unterschiedliche data-testid für User und Assistant:
|
|
- data-testid="user-message" für User
|
|
- Für Claude: Wir suchen nach Nachrichten die NICHT user-message sind
|
|
"""
|
|
messages = []
|
|
|
|
try:
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STRATEGIE: Hole User UND Assistant Nachrichten SEPARAT
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# 1. User-Nachrichten (haben data-testid="user-message")
|
|
user_elements = []
|
|
try:
|
|
user_elements = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
"[data-testid='user-message']"
|
|
)
|
|
logger.debug(f"User-Nachrichten gefunden: {len(user_elements)}")
|
|
except Exception as e:
|
|
logger.debug(f"User-Nachrichten Suche fehlgeschlagen: {e}")
|
|
|
|
# 2. Claude-Nachrichten suchen
|
|
# Claude.ai nutzt verschiedene Klassen, aber NICHT user-message
|
|
claude_elements = []
|
|
|
|
# Methode A: font-claude-message Klasse
|
|
try:
|
|
found = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
"[class*='font-claude-message']"
|
|
)
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via font-claude-message: {len(found)}")
|
|
except:
|
|
pass
|
|
|
|
# Methode B: Prose-Container die NICHT in user-message sind
|
|
if not claude_elements:
|
|
try:
|
|
# Alle prose Elemente die nicht innerhalb von user-message sind
|
|
found = self.driver.execute_script("""
|
|
const msgs = [];
|
|
document.querySelectorAll('.prose').forEach(e => {
|
|
// Prüfe ob dieses Element NICHT in einem user-message Container ist
|
|
let parent = e;
|
|
let isUserMessage = false;
|
|
while (parent && parent !== document.body) {
|
|
if (parent.getAttribute('data-testid') === 'user-message') {
|
|
isUserMessage = true;
|
|
break;
|
|
}
|
|
parent = parent.parentElement;
|
|
}
|
|
if (!isUserMessage) {
|
|
msgs.push(e);
|
|
}
|
|
});
|
|
return msgs;
|
|
""") or []
|
|
if found:
|
|
claude_elements = found
|
|
logger.debug(f"Claude-Nachrichten via prose (nicht-user): {len(found)}")
|
|
except Exception as e:
|
|
logger.debug(f"Prose-Suche fehlgeschlagen: {e}")
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Nachrichten verarbeiten
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# User-Nachrichten hinzufügen
|
|
for i, elem in enumerate(user_elements):
|
|
try:
|
|
text = elem.text.strip()
|
|
if not text or len(text) < 3:
|
|
continue
|
|
|
|
msg_id = elem.get_attribute("data-message-id")
|
|
if not msg_id:
|
|
msg_id = f"user_{i}_{hash(text[:100])}"
|
|
|
|
messages.append(ChatMessage(
|
|
id=msg_id,
|
|
text=text,
|
|
is_from_assistant=False,
|
|
timestamp=time.time()
|
|
))
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei User-Nachricht {i}: {e}")
|
|
|
|
# Claude-Nachrichten hinzufügen
|
|
for i, elem in enumerate(claude_elements):
|
|
try:
|
|
text = elem.text.strip()
|
|
if not text or len(text) < 5:
|
|
continue
|
|
|
|
msg_id = elem.get_attribute("data-message-id")
|
|
if not msg_id:
|
|
msg_id = f"claude_{i}_{hash(text[:100])}"
|
|
|
|
messages.append(ChatMessage(
|
|
id=msg_id,
|
|
text=text,
|
|
is_from_assistant=True,
|
|
timestamp=time.time()
|
|
))
|
|
logger.debug(f"Claude-Nachricht {i}: '{text[:50]}...'")
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei Claude-Nachricht {i}: {e}")
|
|
|
|
logger.debug(f"Gesamt: {len(messages)} Nachrichten, davon {sum(1 for m in messages if m.is_from_assistant)} von Claude")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen der Nachrichten: {e}")
|
|
|
|
return messages
|
|
|
|
def get_last_assistant_message(self) -> Optional[ChatMessage]:
|
|
"""Holt die letzte Nachricht von Claude"""
|
|
messages = self._get_all_messages()
|
|
|
|
for msg in reversed(messages):
|
|
if msg.is_from_assistant:
|
|
return msg
|
|
|
|
return None
|
|
|
|
def is_claude_typing(self) -> bool:
|
|
"""
|
|
Prüft ob Claude gerade tippt (streaming).
|
|
|
|
Erkennt mehrere Indikatoren:
|
|
1. Stop-Button ist sichtbar (während Claude schreibt)
|
|
2. data-is-streaming='true' Attribut
|
|
3. Animiertes Logo / Thinking-Indikator
|
|
"""
|
|
try:
|
|
# Methode 1: Stop-Button prüfen (zuverlässigster Indikator)
|
|
# Wenn Claude tippt, gibt es einen Stop-Button statt Send-Button
|
|
stop_indicators = [
|
|
"button[aria-label*='Stop']",
|
|
"button[aria-label*='stop']",
|
|
"button[class*='stop']",
|
|
"button[data-testid*='stop']",
|
|
# Alternativer Indikator: Button mit Stop-Icon
|
|
"button svg[class*='stop']",
|
|
]
|
|
|
|
for selector in stop_indicators:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
logger.debug(f"Claude tippt (Stop-Button gefunden: {selector})")
|
|
return True
|
|
except:
|
|
continue
|
|
|
|
# Methode 2: Streaming-Attribut (original)
|
|
streaming = self.driver.find_elements(
|
|
By.CSS_SELECTOR,
|
|
self.SELECTORS["streaming"]
|
|
)
|
|
if len(streaming) > 0:
|
|
logger.debug("Claude tippt (streaming=true)")
|
|
return True
|
|
|
|
# Methode 3: Animiertes/Thinking Indikator suchen
|
|
thinking_indicators = [
|
|
"[class*='thinking']",
|
|
"[class*='loading']",
|
|
"[class*='typing']",
|
|
"[class*='streaming']",
|
|
"[data-state='loading']",
|
|
# Pulsierendes Logo
|
|
"[class*='pulse']",
|
|
"[class*='animate']",
|
|
]
|
|
|
|
for selector in thinking_indicators:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
if elem.is_displayed():
|
|
logger.debug(f"Claude tippt (Indikator: {selector})")
|
|
return True
|
|
except:
|
|
continue
|
|
|
|
# Methode 4: JavaScript-basierte Prüfung
|
|
# Prüft ob irgendwo noch Text gestreamt wird
|
|
try:
|
|
is_streaming = self.driver.execute_script("""
|
|
// Prüfe ob Stop-Button existiert und sichtbar ist
|
|
const stopBtn = document.querySelector('button[aria-label*="Stop"], button[aria-label*="stop"]');
|
|
if (stopBtn && stopBtn.offsetParent !== null) return true;
|
|
|
|
// Prüfe auf streaming-Attribut
|
|
const streaming = document.querySelector('[data-is-streaming="true"]');
|
|
if (streaming) return true;
|
|
|
|
// Prüfe auf disabled Send-Button (während Claude tippt)
|
|
const sendBtn = document.querySelector('button[aria-label*="Send"]');
|
|
if (sendBtn && sendBtn.disabled) return true;
|
|
|
|
return false;
|
|
""")
|
|
if is_streaming:
|
|
logger.debug("Claude tippt (JavaScript-Check)")
|
|
return True
|
|
except:
|
|
pass
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei typing-check: {e}")
|
|
return False
|
|
|
|
def wait_for_ready_signal(self, timeout: int = 120) -> bool:
|
|
"""
|
|
Wartet bis Claude [READY] sendet.
|
|
|
|
Sucht nach [READY] das NICHT Teil des Instruktions-Textes ist.
|
|
Wir zählen wie oft [READY] vorkommt - wenn mehr als 1x, hat Claude geantwortet.
|
|
|
|
Args:
|
|
timeout: Maximale Wartezeit in Sekunden
|
|
|
|
Returns:
|
|
True wenn [READY] empfangen, False bei Timeout
|
|
"""
|
|
logger.info(f"Warte auf [READY] Signal (max {timeout}s)...")
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
# Warte bis Claude fertig ist mit Tippen
|
|
typing_wait_start = time.time()
|
|
while self.is_claude_typing():
|
|
time.sleep(0.5)
|
|
# Timeout für typing-wait (max 60s)
|
|
if time.time() - typing_wait_start > 60:
|
|
logger.debug("Typing-Wait Timeout, prüfe trotzdem...")
|
|
break
|
|
|
|
# Suche [READY] im Seitentext via JavaScript
|
|
# Zähle wie oft [READY] vorkommt - 1x ist unsere Instruktion, 2x+ bedeutet Claude hat geantwortet
|
|
try:
|
|
ready_count = self.driver.execute_script("""
|
|
const text = document.body.innerText.toUpperCase();
|
|
const matches = text.match(/\\[READY\\]/g);
|
|
return matches ? matches.length : 0;
|
|
""")
|
|
|
|
logger.debug(f"[READY] gefunden: {ready_count}x")
|
|
|
|
# Mehr als 1x = Claude hat auch [READY] geschrieben
|
|
if ready_count and ready_count >= 2:
|
|
logger.info(f"[READY] Signal gefunden! ({ready_count}x im Text)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.debug(f"JavaScript [READY] Suche fehlgeschlagen: {e}")
|
|
|
|
# Kurz warten bevor nächster Check
|
|
time.sleep(1)
|
|
|
|
logger.warning(f"Timeout: Kein [READY] nach {timeout}s")
|
|
return False
|
|
|
|
def take_screenshot(self, path: str = "screenshot.png"):
|
|
"""Macht einen Screenshot (für Debugging)"""
|
|
self.driver.save_screenshot(path)
|
|
logger.info(f"Screenshot gespeichert: {path}")
|
|
|
|
def close(self):
|
|
"""Schließt den Browser"""
|
|
logger.info("Schließe Browser...")
|
|
try:
|
|
self.driver.quit()
|
|
except:
|
|
pass
|
|
|
|
# ════════════════════════════════════════════════════════════════════════
|
|
# BILD-UPLOAD FUNKTIONEN (für Robot Vision)
|
|
# ════════════════════════════════════════════════════════════════════════
|
|
|
|
def fetch_image_from_esp32(self) -> bool:
|
|
"""
|
|
Holt ein Bild vom ESP32/Mock-Server und speichert es lokal.
|
|
|
|
Returns:
|
|
True wenn erfolgreich, False bei Fehler
|
|
"""
|
|
if not self.esp32_url:
|
|
logger.warning("Keine ESP32 URL konfiguriert")
|
|
return False
|
|
|
|
try:
|
|
# Capture-Endpoint aufrufen (macht Foto und gibt es zurück)
|
|
url = f"{self.esp32_url}/api/capture"
|
|
if self.esp32_api_key:
|
|
url += f"?key={self.esp32_api_key}"
|
|
|
|
response = self._http_session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
# Prüfe ob wir ein Bild bekommen haben
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if "image" in content_type:
|
|
# Direktes Bild
|
|
with open(self._temp_image_path, "wb") as f:
|
|
f.write(response.content)
|
|
logger.info(f"Bild gespeichert: {len(response.content)} bytes")
|
|
return True
|
|
else:
|
|
# JSON Response (Mock-Server neuer Stil)
|
|
# Dann müssen wir /foto.jpg separat holen
|
|
foto_url = f"{self.esp32_url}/foto.jpg"
|
|
foto_response = self._http_session.get(foto_url, timeout=10)
|
|
foto_response.raise_for_status()
|
|
|
|
with open(self._temp_image_path, "wb") as f:
|
|
f.write(foto_response.content)
|
|
logger.info(f"Bild von /foto.jpg: {len(foto_response.content)} bytes")
|
|
return True
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"ESP32 Verbindungsfehler: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Bild holen: {e}")
|
|
return False
|
|
|
|
def upload_image_to_chat(self) -> bool:
|
|
"""
|
|
Lädt das gespeicherte Bild in den Claude.ai Chat hoch.
|
|
|
|
Returns:
|
|
True wenn erfolgreich, False bei Fehler
|
|
"""
|
|
if not self._temp_image_path.exists():
|
|
logger.error("Kein Bild zum Hochladen vorhanden")
|
|
return False
|
|
|
|
try:
|
|
# Finde das versteckte file input Element
|
|
file_input = self._find_file_input()
|
|
|
|
if not file_input:
|
|
logger.error("File-Upload Input nicht gefunden!")
|
|
return False
|
|
|
|
# Datei hochladen via send_keys (funktioniert auch bei versteckten Inputs)
|
|
file_input.send_keys(str(self._temp_image_path.absolute()))
|
|
|
|
logger.info("Bild hochgeladen!")
|
|
self._images_uploaded += 1
|
|
|
|
# Kurz warten bis Upload verarbeitet ist
|
|
time.sleep(1.5)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Bild-Upload: {e}")
|
|
return False
|
|
|
|
def _calculate_image_hash(self) -> Optional[str]:
|
|
"""
|
|
Berechnet einen Hash des aktuellen Bildes.
|
|
|
|
Returns:
|
|
MD5 Hash als Hex-String, oder None bei Fehler
|
|
"""
|
|
if not self._temp_image_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(self._temp_image_path, "rb") as f:
|
|
return hashlib.md5(f.read()).hexdigest()
|
|
except Exception as e:
|
|
logger.debug(f"Hash-Berechnung fehlgeschlagen: {e}")
|
|
return None
|
|
|
|
def has_image_changed(self) -> bool:
|
|
"""
|
|
Prüft ob sich das Bild seit dem letzten Upload geändert hat.
|
|
|
|
WICHTIG: Claude.ai hat ein Limit von 100 Dateien pro Chat!
|
|
Diese Funktion hilft, unnötige Uploads zu vermeiden.
|
|
|
|
Returns:
|
|
True wenn Bild neu/geändert, False wenn identisch zum letzten
|
|
"""
|
|
current_hash = self._calculate_image_hash()
|
|
|
|
if current_hash is None:
|
|
return True # Kein Bild = als "geändert" behandeln
|
|
|
|
if self._last_image_hash is None:
|
|
return True # Erster Upload
|
|
|
|
return current_hash != self._last_image_hash
|
|
|
|
def upload_image_if_changed(self) -> bool:
|
|
"""
|
|
Lädt Bild nur hoch wenn es sich geändert hat.
|
|
|
|
Returns:
|
|
True wenn hochgeladen, False wenn übersprungen oder Fehler
|
|
"""
|
|
if not self.has_image_changed():
|
|
logger.debug("Bild unverändert - überspringe Upload")
|
|
return False
|
|
|
|
# Hash vor dem Upload speichern
|
|
new_hash = self._calculate_image_hash()
|
|
|
|
if self.upload_image_to_chat():
|
|
self._last_image_hash = new_hash
|
|
return True
|
|
|
|
return False
|
|
|
|
def delete_uploaded_images(self) -> int:
|
|
"""
|
|
Versucht hochgeladene Bilder aus dem Chat-Eingabefeld zu löschen.
|
|
|
|
HINWEIS: Funktioniert nur für Bilder die noch nicht gesendet wurden!
|
|
Bereits gesendete Bilder können nicht gelöscht werden.
|
|
|
|
Returns:
|
|
Anzahl gelöschter Bilder
|
|
"""
|
|
deleted = 0
|
|
|
|
try:
|
|
# Suche nach Bild-Vorschau-Elementen im Eingabebereich
|
|
# Diese haben meist einen X/Close Button
|
|
preview_selectors = [
|
|
"button[aria-label*='Remove']",
|
|
"button[aria-label*='remove']",
|
|
"button[aria-label*='Delete']",
|
|
"button[aria-label*='delete']",
|
|
"[class*='preview'] button[class*='close']",
|
|
"[class*='attachment'] button[class*='remove']",
|
|
"[class*='file'] button[class*='delete']",
|
|
]
|
|
|
|
for selector in preview_selectors:
|
|
try:
|
|
buttons = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for btn in buttons:
|
|
if btn.is_displayed():
|
|
btn.click()
|
|
deleted += 1
|
|
time.sleep(0.3)
|
|
except:
|
|
continue
|
|
|
|
# JavaScript-Fallback
|
|
if deleted == 0:
|
|
try:
|
|
deleted = self.driver.execute_script("""
|
|
let count = 0;
|
|
// Suche nach Remove-Buttons in Attachment-Previews
|
|
const buttons = document.querySelectorAll(
|
|
'[class*="preview"] button, [class*="attachment"] button'
|
|
);
|
|
buttons.forEach(btn => {
|
|
if (btn.offsetParent) {
|
|
btn.click();
|
|
count++;
|
|
}
|
|
});
|
|
return count;
|
|
""") or 0
|
|
except:
|
|
pass
|
|
|
|
if deleted > 0:
|
|
logger.info(f"{deleted} Bild(er) aus Eingabefeld gelöscht")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Bild-Löschung fehlgeschlagen: {e}")
|
|
|
|
return deleted
|
|
|
|
def get_images_uploaded_count(self) -> int:
|
|
"""Gibt Anzahl der in dieser Session hochgeladenen Bilder zurück"""
|
|
return self._images_uploaded
|
|
|
|
def _find_file_input(self):
|
|
"""Findet das File-Upload Input Element"""
|
|
selectors = [
|
|
self.SELECTORS["file_input"],
|
|
"input[accept*='image']",
|
|
"input[type='file'][accept]",
|
|
"input[type='file']",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
# Auch versteckte Inputs funktionieren mit send_keys
|
|
return elem
|
|
except:
|
|
continue
|
|
|
|
# Fallback: Via JavaScript suchen
|
|
try:
|
|
return self.driver.execute_script("""
|
|
return document.querySelector('input[type="file"]') ||
|
|
document.querySelector('[accept*="image"]');
|
|
""")
|
|
except:
|
|
return None
|
|
|
|
def send_tick_with_image(self) -> bool:
|
|
"""
|
|
Holt ein Bild vom ESP32, lädt es hoch und sendet [TICK].
|
|
|
|
Das ist der Haupt-Heartbeat mit Bild!
|
|
|
|
Returns:
|
|
True wenn alles geklappt hat
|
|
"""
|
|
# Schritt 1: Bild vom ESP32 holen
|
|
if not self.fetch_image_from_esp32():
|
|
# Kein Bild? Trotzdem TICK senden
|
|
self.send_message("[TICK - KEIN BILD]")
|
|
return False
|
|
|
|
# Schritt 2: Bild in Chat hochladen
|
|
if not self.upload_image_to_chat():
|
|
self.send_message("[TICK - UPLOAD FEHLGESCHLAGEN]")
|
|
return False
|
|
|
|
# Schritt 3: TICK senden
|
|
self.send_message("[TICK]")
|
|
|
|
return True
|
|
|
|
|
|
# Hilfsfunktion für einfaches Testing
|
|
def test_interface(chat_url: str):
|
|
"""Testet das Interface"""
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
print("Starte Chat Interface Test...")
|
|
print(f"URL: {chat_url}")
|
|
|
|
interface = ClaudeChatInterface(
|
|
chat_url=chat_url,
|
|
headless=False
|
|
)
|
|
|
|
print("\nChat geöffnet! Drücke Enter um eine Test-Nachricht zu senden...")
|
|
input()
|
|
|
|
interface.send_message("[TEST] Hallo, das ist ein Test der Audio Bridge!")
|
|
print("Nachricht gesendet!")
|
|
|
|
print("\nWarte 5 Sekunden auf Antwort...")
|
|
time.sleep(5)
|
|
|
|
messages = interface.get_new_messages()
|
|
print(f"\nGefundene Nachrichten: {len(messages)}")
|
|
|
|
for msg in messages[-3:]:
|
|
role = "Claude" if msg.is_from_assistant else "Human"
|
|
print(f" [{role}] {msg.text[:100]}...")
|
|
|
|
print("\nDrücke Enter zum Beenden...")
|
|
input()
|
|
|
|
interface.close()
|
|
print("Fertig!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python chat_web_interface.py <claude-chat-url>")
|
|
print("Example: python chat_web_interface.py https://claude.ai/chat/abc123")
|
|
sys.exit(1)
|
|
|
|
test_interface(sys.argv[1])
|