diff --git a/android/src/screens/ChatScreen.tsx b/android/src/screens/ChatScreen.tsx index f99600f..b5427a7 100644 --- a/android/src/screens/ChatScreen.tsx +++ b/android/src/screens/ChatScreen.tsx @@ -114,6 +114,16 @@ interface ChatMessage { * sind noch nicht persistiert (kurzer Race) — Muelltonne erscheint erst * wenn das chat_backup-Event vom Bridge zurueck kommt. */ backupTs?: number; + /** Client-seitige Eindeutigs-ID fuer Delivery-Tracking (offline-Queue, + * ACK von Bridge, Idempotenz bei Retry). Wird beim Senden generiert und + * durch die Bridge zurueck-gespiegelt. */ + clientMsgId?: string; + /** Delivery-Status der User-Bubble (WhatsApp-style): queued = noch nicht + * raus (offline), sending = an Bridge unterwegs, sent = Bridge hat ACK + * gesendet, delivered = Brain hat geantwortet, failed = Retry-Limit. */ + deliveryStatus?: 'queued' | 'sending' | 'sent' | 'delivered' | 'failed'; + /** Anzahl der bisherigen Sende-Versuche (fuer Retry-Limit). */ + sendAttempts?: number; } // --- Konstanten --- @@ -260,6 +270,17 @@ const ChatScreen: React.FC = () => { const flatListRef = useRef(null); const messageIdCounter = useRef(0); + // Watchdog gegen "ARIA denkt"-Hang: wird bei jedem agent_activity-Event mit + // nicht-idle Status neu armiert. Feuert er, sind 180s lang KEINE Updates + // vom Brain mehr gekommen → wir gehen davon aus dass die Verbindung + // verloren ist oder das Brain abgestuerzt — Timeout-Bubble + Reset. + const stuckWatchdog = useRef | null>(null); + const clearStuckWatchdog = () => { + if (stuckWatchdog.current) { + clearTimeout(stuckWatchdog.current); + stuckWatchdog.current = null; + } + }; // ServerPaths fuer die der User auf "oeffnen" geklickt hat — beim // file_response wird die Datei nach dem Speichern direkt mit dem System- // Intent geoeffnet (PDF-Viewer, Galerie, etc.). @@ -271,6 +292,98 @@ const ChatScreen: React.FC = () => { return `msg_${Date.now()}_${messageIdCounter.current}`; }; + // Eindeutige clientMsgId fuer Delivery-Tracking (Bridge-Echo, Retry, + // Idempotenz). Format: cmsg__ — eindeutig genug fuer eine + // 100er-Dedup-Window auf der Bridge. + const nextClientMsgId = (): string => + `cmsg_${Date.now()}_${Math.floor(Math.random() * 1_000_000)}`; + + // Wie lange wir auf das ACK warten bevor wir retryen. Bridge sollte + // unmittelbar zurueckmelden — 30s ist grosszuegig fuer schlechte Netze. + const ACK_TIMEOUT_MS = 30_000; + // Wie oft re-tryen wir bevor wir "failed" anzeigen. + const MAX_SEND_ATTEMPTS = 3; + // Pending ACK-Timer pro clientMsgId — fuer cancel beim ACK. + const ackTimers = useRef>>(new Map()); + const clearAckTimer = (cmid: string) => { + const t = ackTimers.current.get(cmid); + if (t) { + clearTimeout(t); + ackTimers.current.delete(cmid); + } + }; + + // Pending-Payloads pro clientMsgId — wir brauchen sie fuer Retry nach + // ACK-Timeout oder nach Reconnect (offline-Queue). Liegt in einer Ref + // damit der Inhalt Closures ueberlebt. + const pendingPayloads = useRef }>>(new Map()); + + // ConnectionState in Ref spiegeln — fuer Closures (onMessage, Send-Pfade) + // die sonst auf einen veralteten Wert zugreifen wuerden. + const connectionStateRef = useRef('disconnected'); + + // Status einer Bubble per clientMsgId aendern (Helper) + const updateMessageStatus = useCallback( + (cmid: string, patch: Partial>) => { + setMessages(prev => prev.map(m => (m.clientMsgId === cmid ? { ...m, ...patch } : m))); + }, + [], + ); + + // Sende eine 'chat'- oder 'audio'-Nachricht an die Bridge mit ACK-Tracking. + // - Wenn offline → status='queued', wird beim Reconnect rausgeschickt. + // - Wenn online → status='sending', Timer fuer ACK-Erwartung. + // - Bei ACK-Timeout: retry (bis MAX_SEND_ATTEMPTS) oder 'failed'. + const dispatchWithAck = useCallback( + (cmid: string, type: 'chat' | 'audio', payload: Record, attempt = 1) => { + pendingPayloads.current.set(cmid, { type, payload }); + const online = connectionStateRef.current === 'connected'; + if (!online) { + updateMessageStatus(cmid, { deliveryStatus: 'queued', sendAttempts: attempt }); + return; + } + // RVS.send mit clientMsgId — Bridge spiegelt das im chat_ack zurueck + rvs.send(type, { ...payload, clientMsgId: cmid }); + updateMessageStatus(cmid, { deliveryStatus: 'sending', sendAttempts: attempt }); + clearAckTimer(cmid); + ackTimers.current.set( + cmid, + setTimeout(() => { + ackTimers.current.delete(cmid); + if (attempt >= MAX_SEND_ATTEMPTS) { + updateMessageStatus(cmid, { deliveryStatus: 'failed', sendAttempts: attempt }); + console.warn('[Chat] Send fehlgeschlagen nach %d Versuchen: %s', attempt, cmid); + } else { + console.warn('[Chat] kein ACK fuer %s — Retry #%d', cmid, attempt + 1); + dispatchWithAck(cmid, type, payload, attempt + 1); + } + }, ACK_TIMEOUT_MS), + ); + }, + [updateMessageStatus], + ); + + // Alle 'queued'-Nachrichten beim Reconnect rausschicken + const flushQueuedMessages = useCallback(() => { + setMessages(prev => { + for (const m of prev) { + if (m.deliveryStatus !== 'queued' || !m.clientMsgId) continue; + const pending = pendingPayloads.current.get(m.clientMsgId); + if (!pending) continue; + // Versuchszaehler beibehalten (oder mit 1 starten falls leer) + dispatchWithAck(m.clientMsgId, pending.type, pending.payload, m.sendAttempts || 1); + } + return prev; + }); + }, [dispatchWithAck]); + + // Manueller Retry nach 'failed' (tap auf das ⚠️-Icon) + const retryFailedMessage = useCallback((cmid: string) => { + const pending = pendingPayloads.current.get(cmid); + if (!pending) return; + dispatchWithAck(cmid, pending.type, pending.payload, 1); + }, [dispatchWithAck]); + // TTS- + GPS-Settings beim Mount + alle 2s neu laden (damit Settings-Toggle // sofort greift, ohne Context- oder Event-System) useEffect(() => { @@ -376,12 +489,24 @@ const ChatScreen: React.FC = () => { const parsed: ChatMessage[] = JSON.parse(stored); if (Array.isArray(parsed) && parsed.length > 0) { console.log('[Chat] ${parsed.length} Nachrichten geladen'); - setMessages(parsed); + // MERGE statt Overwrite: zwischen Mount und Load-Done koennen + // bereits Nachrichten ankommen (User schreibt sofort, WS-Events + // kommen vor Load-Ende). Vorher hat setMessages(parsed) diese + // ueberschrieben → "Nachricht weg ohne Spur". Jetzt mergen wir + // per id; lokal-gerade-hinzugefuegte schlagen Gespeichertes + // (die sind frischer). + setMessages(prev => { + if (prev.length === 0) return parsed; + const byId = new Map(); + for (const m of parsed) byId.set(m.id, m); + for (const m of prev) byId.set(m.id, m); + return [...byId.values()].sort((a, b) => (a.timestamp || 0) - (b.timestamp || 0)); + }); const maxId = parsed.reduce((max, msg) => { const num = parseInt(msg.id.split('_').pop() || '0', 10); return num > max ? num : max; }, 0); - messageIdCounter.current = maxId; + messageIdCounter.current = Math.max(messageIdCounter.current, maxId); } } } catch (err) { @@ -419,6 +544,22 @@ const ChatScreen: React.FC = () => { // RVS-Nachrichten abonnieren useEffect(() => { const unsubMessage = rvs.onMessage((message: RVSMessage) => { + // chat_ack: Bridge bestaetigt Empfang einer chat/audio-Nachricht. + // Wir markieren die Bubble als 'sent' (✓) und stoppen den ACK-Timer. + if (message.type === ('chat_ack' as any)) { + const cmid = (message.payload as any).clientMsgId as string | undefined; + if (cmid) { + clearAckTimer(cmid); + pendingPayloads.current.delete(cmid); + setMessages(prev => prev.map(m => + m.clientMsgId === cmid && m.deliveryStatus !== 'delivered' + ? { ...m, deliveryStatus: 'sent' } + : m + )); + } + return; + } + // file_saved: Bridge meldet Server-Pfad — in Attachment merken fuer Re-Download if (message.type === 'file_saved') { const serverPath = (message.payload.serverPath as string) || ''; @@ -750,8 +891,17 @@ const ChatScreen: React.FC = () => { messageId: (message.payload.messageId as string) || undefined, backupTs: (message.payload.backupTs as number) || undefined, }; - return capMessages([...prev, ariaMsg]); + // ARIA hat geantwortet → alle User-Bubbles davor als 'delivered' + // markieren (WhatsApp-Doppelhaken ✓✓). Brain hat sie verarbeitet. + return capMessages([...prev, ariaMsg]).map(m => + m.sender === 'user' + && (m.deliveryStatus === 'sent' || m.deliveryStatus === 'sending') + ? { ...m, deliveryStatus: 'delivered' } + : m + ); }); + // ARIA hat geantwortet → Watchdog clearen, falls noch armiert + clearStuckWatchdog(); } // TTS-Audio abspielen wenn vorhanden — respektiert geraetelokalen Mute/Disable @@ -796,6 +946,21 @@ const ChatScreen: React.FC = () => { setAgentActivity({ activity, tool }); // Spotify darf waehrend "ARIA denkt/schreibt" weiterspielen — pausiert // nur wenn TTS startet (dann acquired _firePlaybackStarted den Focus). + // Watchdog: solange Brain noch Lebenszeichen sendet (jedes neue + // activity-Event), Timer neu starten. 180s ohne Update → Hang. + clearStuckWatchdog(); + if (activity !== 'idle') { + stuckWatchdog.current = setTimeout(() => { + stuckWatchdog.current = null; + setAgentActivity({ activity: 'idle', tool: '' }); + setMessages(prev => capMessages([...prev, { + id: nextId(), + sender: 'aria', + text: '⚠️ Habe gerade keine Verbindung zurueck bekommen (Timeout nach 3 Min). Deine letzte Nachricht ist evtl. nicht durchgekommen — schick sie nochmal.', + timestamp: Date.now(), + }])); + }, 180_000); + } } // Voice-Config aus Diagnostic — setzt die lokale App-Stimme auf den @@ -839,6 +1004,7 @@ const ChatScreen: React.FC = () => { const unsubState = rvs.onStateChange((state) => { setConnectionState(state); + connectionStateRef.current = state; // Bei (re)connect: KOMPLETTEN Server-Stand holen. Server ist die // Source-of-Truth — wenn er leer ist (z.B. nach "Konversation // zuruecksetzen"), soll die App das spiegeln, auch wenn sie offline @@ -846,11 +1012,26 @@ const ChatScreen: React.FC = () => { // Nachrichten vom Server, oder leeres Array wenn Server leer. if (state === 'connected') { rvs.send('chat_history_request' as any, { since: 0, limit: 200 }); + // Offline-Queue flushen — alle 'queued'-Bubbles raussschicken + flushQueuedMessages(); + } else if (state === 'disconnected') { + // ACK-Timer cancellen, betroffene Bubbles auf 'queued' zurueck + for (const [cmid, t] of ackTimers.current.entries()) { + clearTimeout(t); + ackTimers.current.delete(cmid); + setMessages(prev => prev.map(m => + m.clientMsgId === cmid && m.deliveryStatus === 'sending' + ? { ...m, deliveryStatus: 'queued' } + : m + )); + } } }); // Initalen Status setzen - setConnectionState(rvs.getState()); + const initialState = rvs.getState(); + setConnectionState(initialState); + connectionStateRef.current = initialState; return () => { unsubMessage(); @@ -1052,30 +1233,60 @@ const ChatScreen: React.FC = () => { setSearchIndex(0); }, [searchQuery]); - // Bei Index-Wechsel zu der entsprechenden Bubble scrollen. + // Tracking damit wir nicht zur selben Bubble mehrfach scrollen (z.B. wenn + // neue Nachrichten kommen waehrend Suche aktiv ist → invertedMessages + // aendert sich, soll aber nicht den Scroll erneut triggern). + const lastSearchScrollKey = useRef(''); + // Pending Retry-Timer fuer onScrollToIndexFailed — wird gecancelt sobald + // ein neuer Search-Hit kommt, damit alte Retries nicht den neuen + // Scroll-Versuch durcheinanderbringen ("permanent springen"-Bug). + const pendingScrollRetry = useRef | null>(null); + const clearPendingScrollRetry = () => { + if (pendingScrollRetry.current) { + clearTimeout(pendingScrollRetry.current); + pendingScrollRetry.current = null; + } + }; + + // Bei Search-Index-Wechsel zur entsprechenden Bubble scrollen. // FlatList ist `inverted`. viewPosition 0 = Item-Top oben am Viewport → - // Treffer-Bubble liegt mit dem Anfang direkt oben sichtbar, kein - // weiteres Hochscrollen noetig. Plus mehrere Retries da Layout bei - // langen Listen zeitversetzt fertig wird. + // Treffer-Bubble liegt mit dem Anfang direkt oben sichtbar. + // WICHTIG: invertedMessages bewusst NICHT in den Deps — sonst feuert das + // Effekt bei jeder neuen ARIA-Nachricht erneut und scrollt amok. + // Den aktuellen Snapshot von invertedMessages holen wir via Ref. + const invertedMessagesRef = useRef(invertedMessages); + invertedMessagesRef.current = invertedMessages; useEffect(() => { - if (!searchMatchIds.length) return; + if (!searchMatchIds.length) { + lastSearchScrollKey.current = ''; + clearPendingScrollRetry(); + return; + } const id = searchMatchIds[searchIndex]; if (!id) return; - const idx = invertedMessages.findIndex(m => m.id === id); + // Eindeutiger Schluessel pro Treffer-Stop — verhindert dass identische + // Re-Renders erneut scrollen. + const key = `${searchIndex}:${id}`; + if (lastSearchScrollKey.current === key) return; + lastSearchScrollKey.current = key; + // Neue Suche → alte Retries verwerfen + clearPendingScrollRetry(); + const idx = invertedMessagesRef.current.findIndex(m => m.id === id); if (idx < 0 || !flatListRef.current) return; - const tryScroll = () => { + requestAnimationFrame(() => { try { flatListRef.current?.scrollToIndex({ index: idx, animated: true, viewPosition: 0 }); } catch { - // wird von onScrollToIndexFailed nochmal versucht + // onScrollToIndexFailed-Handler uebernimmt den Fallback } - }; - // requestAnimationFrame fuer den ersten Versuch, dann setTimeout-Folge - // damit auch bei tiefen Indizes (viel ungelayoutete Items dazwischen) - // der Sprung am Ende sitzt. - requestAnimationFrame(tryScroll); - [180, 420, 800].forEach(d => setTimeout(tryScroll, d)); - }, [searchIndex, searchMatchIds, invertedMessages]); + }); + }, [searchIndex, searchMatchIds]); + + // Unmount → pending Timer verwerfen, sonst feuern sie nach Navigation ins Leere + useEffect(() => () => { + clearPendingScrollRetry(); + clearStuckWatchdog(); + }, []); const activeSearchId = searchMatchIds[searchIndex] || ''; const gotoSearchPrev = () => { @@ -1155,29 +1366,33 @@ const ChatScreen: React.FC = () => { const wasInterrupted = interruptAriaIfBusy(); const location = await getCurrentLocation(); + const cmid = nextClientMsgId(); const userMsg: ChatMessage = { id: nextId(), sender: 'user', text, timestamp: Date.now(), + clientMsgId: cmid, + deliveryStatus: connectionStateRef.current === 'connected' ? 'sending' : 'queued', + sendAttempts: 1, }; setMessages(prev => capMessages([...prev, userMsg])); - console.log('[Chat] sende mit voice=%s speed=%s interrupted=%s', - localXttsVoiceRef.current || '(default)', ttsSpeedRef.current, wasInterrupted); - // An RVS senden — mit geraetelokaler Voice (Bridge nutzt sie fuer die Antwort) - rvs.send('chat', { + console.log('[Chat] sende cmid=%s voice=%s speed=%s interrupted=%s', + cmid, localXttsVoiceRef.current || '(default)', ttsSpeedRef.current, wasInterrupted); + dispatchWithAck(cmid, 'chat', { text, voice: localXttsVoiceRef.current, speed: ttsSpeedRef.current, interrupted: wasInterrupted, ...(location && { location }), }); - }, [inputText, getCurrentLocation, pendingAttachments, sendPendingAttachments, interruptAriaIfBusy]); + }, [inputText, getCurrentLocation, pendingAttachments, sendPendingAttachments, interruptAriaIfBusy, dispatchWithAck]); // Anfrage abbrechen — sofort lokalen Indicator weg, Bridge triggert doctor --fix const cancelRequest = useCallback(() => { setAgentActivity({ activity: 'idle', tool: '' }); + clearStuckWatchdog(); rvs.send('cancel_request' as any, {}); }, []); @@ -1194,6 +1409,7 @@ const ChatScreen: React.FC = () => { if (speaking) audioService.haltAllPlayback('user spricht (barge-in)'); if (thinking) { setAgentActivity({ activity: 'idle', tool: '' }); + clearStuckWatchdog(); rvs.send('cancel_request' as any, {}); } return true; @@ -1206,16 +1422,20 @@ const ChatScreen: React.FC = () => { const location = await getCurrentLocation(); const audioRequestId = `audio_${Date.now()}_${Math.floor(Math.random() * 100000)}`; + const cmid = nextClientMsgId(); const userMsg: ChatMessage = { id: nextId(), sender: 'user', text: '🎙 Spracheingabe wird verarbeitet...', timestamp: Date.now(), audioRequestId, + clientMsgId: cmid, + deliveryStatus: connectionStateRef.current === 'connected' ? 'sending' : 'queued', + sendAttempts: 1, }; setMessages(prev => capMessages([...prev, userMsg])); - rvs.send('audio', { + dispatchWithAck(cmid, 'audio', { base64: result.base64, durationMs: result.durationMs, mimeType: result.mimeType, @@ -1276,13 +1496,20 @@ const ChatScreen: React.FC = () => { }); } - // Chat-Nachricht mit allen Anhaengen + // Chat-Nachricht mit allen Anhaengen. clientMsgId nur wenn Text dabei + // ist — files selber haben (noch) kein ACK-Tracking auf der Bridge. + const cmid = messageText ? nextClientMsgId() : undefined; const userMsg: ChatMessage = { id: msgId, sender: 'user', text: messageText || `${pendingAttachments.length} Anhang/Anhaenge`, timestamp: Date.now(), attachments, + ...(cmid && { + clientMsgId: cmid, + deliveryStatus: connectionStateRef.current === 'connected' ? 'sending' : 'queued', + sendAttempts: 1, + }), }; setMessages(prev => capMessages([...prev, userMsg])); @@ -1316,9 +1543,11 @@ const ChatScreen: React.FC = () => { }); } - // Text als separate Nachricht (damit ARIA weiss was zu tun ist) - if (messageText) { - rvs.send('chat', { + // Text als separate Nachricht (damit ARIA weiss was zu tun ist) — mit + // dem clientMsgId der Bubble, damit Bridge+ACK die richtige Bubble + // adressieren. + if (messageText && cmid) { + dispatchWithAck(cmid, 'chat', { text: messageText, voice: localXttsVoiceRef.current, speed: ttsSpeedRef.current, @@ -1328,7 +1557,7 @@ const ChatScreen: React.FC = () => { setPendingAttachments([]); setInputText(''); - }, [pendingAttachments, getCurrentLocation]); + }, [pendingAttachments, getCurrentLocation, dispatchWithAck]); // --- Rendering --- @@ -1595,7 +1824,31 @@ const ChatScreen: React.FC = () => { {'🗑'} ) : null} - {time} + + {time} + {isUser && item.deliveryStatus ? ( + item.deliveryStatus === 'failed' && item.clientMsgId ? ( + retryFailedMessage(item.clientMsgId!)} + > + {'⚠ tippen f. Retry'} + + ) : ( + + {item.deliveryStatus === 'queued' ? '⏱' : + item.deliveryStatus === 'sending' ? '⏳' : + item.deliveryStatus === 'sent' ? '✓' : + /* delivered */ '✓✓'} + + ) + ) : null} + ); }; @@ -1739,19 +1992,18 @@ const ChatScreen: React.FC = () => { }} scrollEventThrottle={120} onScrollToIndexFailed={(info) => { - // FlatList kennt das Item-Layout noch nicht. Zuerst grob in die - // Naehe scrollen (Average-Item-Hoehe-Schaetzung), dann mehrfach - // praezise nachsetzen — bei langem Chat braucht's manchmal mehrere - // Runden bis die Layouts gemessen sind. + // FlatList kennt das Item-Layout noch nicht. Wir scrollen grob in + // die Naehe (Average-Item-Hoehe-Schaetzung) und versuchen EINMAL + // nach 300ms praezise nachzusetzen. Mehr Retries → Endlos-Cascade + // (jeder failed Retry triggert wieder den Handler → 3, 9, 27 ... + // Scrolls in der Pipeline = der "permanent springen"-Bug). const offset = info.averageItemLength * info.index; try { flatListRef.current?.scrollToOffset({ offset, animated: false }); } catch {} - // viewPosition 0 = Item-Top oben am Viewport → Stefan landet am - // Text-Anfang der Bubble, nicht in der Mitte oder am Ende. - [120, 320, 600].forEach(delay => { - setTimeout(() => { - try { flatListRef.current?.scrollToIndex({ index: info.index, animated: true, viewPosition: 0 }); } catch {} - }, delay); - }); + clearPendingScrollRetry(); + pendingScrollRetry.current = setTimeout(() => { + pendingScrollRetry.current = null; + try { flatListRef.current?.scrollToIndex({ index: info.index, animated: true, viewPosition: 0 }); } catch {} + }, 300); }} keyExtractor={item => item.id} renderItem={renderMessage} @@ -2174,6 +2426,35 @@ const styles = StyleSheet.create({ marginTop: 4, alignSelf: 'flex-end', }, + statusRow: { + flexDirection: 'row', + alignItems: 'center', + alignSelf: 'flex-end', + gap: 6, + marginTop: 4, + }, + statusQueued: { + color: '#FFD60A', // Gelb — wartet auf Verbindung + fontSize: 11, + }, + statusSending: { + color: 'rgba(255,255,255,0.5)', + fontSize: 11, + }, + statusSent: { + color: 'rgba(255,255,255,0.6)', + fontSize: 12, + }, + statusDelivered: { + color: '#34C759', // Gruen — Brain hat geantwortet + fontSize: 12, + fontWeight: '700', + }, + statusFailed: { + color: '#FF3B30', + fontSize: 11, + fontWeight: '700', + }, emptyContainer: { flex: 1, alignItems: 'center', diff --git a/bridge/aria_bridge.py b/bridge/aria_bridge.py index 4e16c42..e8c4de9 100644 --- a/bridge/aria_bridge.py +++ b/bridge/aria_bridge.py @@ -25,6 +25,7 @@ import time import sys import tempfile import uuid +from collections import OrderedDict from pathlib import Path from typing import Optional @@ -475,6 +476,13 @@ class ARIABridge: self.current_mode = self._load_persisted_mode() self.running = False + # Idempotenz: zuletzt gesehene clientMsgIds (App-seitig generiert). + # Beim Reconnect/Retry sendet die App dieselbe ID nochmal — wir + # antworten erneut mit ACK aber leiten NICHT doppelt an Brain weiter. + # OrderedDict als FIFO mit Capping (Insertion-Order). + self._seen_client_msg_ids: "OrderedDict[str, float]" = OrderedDict() + self._SEEN_CLIENT_MSG_LIMIT = 200 + # Komponenten (TTS: F5-TTS remote auf der Gamebox, lokales TTS wurde entfernt) self.tts_enabled = True self.xtts_voice = "" @@ -1530,6 +1538,36 @@ class ARIABridge: except Exception: break + async def _send_chat_ack(self, client_msg_id: Optional[str]) -> None: + """Bestaetigt der App den Empfang einer chat/audio-Nachricht. + App nutzt das fuer Delivery-Status (✓ = sent). Ohne ACK wuerde die + App nach Timeout retryen — gegen Verlust bei Netz-Hicksern. + """ + if not client_msg_id: + return + await self._send_to_rvs({ + "type": "chat_ack", + "payload": {"clientMsgId": client_msg_id}, + "timestamp": int(asyncio.get_event_loop().time() * 1000), + }) + + def _is_duplicate_client_msg(self, client_msg_id: Optional[str]) -> bool: + """Prueft ob wir diese clientMsgId schon verarbeitet haben. + Wenn ja → True (Caller soll ACK senden aber NICHT an Brain forwarden). + Wenn nein → in den Seen-Cache aufnehmen + False zurueck. + """ + if not client_msg_id: + return False + if client_msg_id in self._seen_client_msg_ids: + logger.info("[rvs] Idempotenz: cmid=%s bereits verarbeitet, ignoriere", + client_msg_id) + return True + self._seen_client_msg_ids[client_msg_id] = time.time() + # Capping: aelteste Eintraege rauswerfen + while len(self._seen_client_msg_ids) > self._SEEN_CLIENT_MSG_LIMIT: + self._seen_client_msg_ids.popitem(last=False) + return False + async def _handle_rvs_message(self, raw_message: str) -> None: """Verarbeitet Nachrichten von der App (via RVS). @@ -1554,6 +1592,13 @@ class ARIABridge: sender = payload.get("sender", "") if sender in ("aria", "stt"): return + # Delivery-ACK: immer zurueckschicken (auch bei Idempotenz-Hit), + # damit die App den Status auf 'sent' setzen kann. Idempotenz- + # Check VERHINDERT aber die Doppel-Weiterleitung an Brain. + client_msg_id = payload.get("clientMsgId") or None + await self._send_chat_ack(client_msg_id) + if self._is_duplicate_client_msg(client_msg_id): + return text = payload.get("text", "") # Voice-Override fuer Folgenachrichten setzen — gilt bis zum naechsten # chat-Event. Leerer String "" = explizit Default-Voice (override loeschen). @@ -2153,6 +2198,12 @@ class ARIABridge: elif msg_type == "audio": # Audio von der App → decodieren → STT → an aria-core + # Delivery-ACK + Idempotenz wie bei chat — App nutzt die ACKs + # auch fuer Sprach-Bubbles (Status auf der Bubble: ✓ sent). + client_msg_id = payload.get("clientMsgId") or None + await self._send_chat_ack(client_msg_id) + if self._is_duplicate_client_msg(client_msg_id): + return audio_b64 = payload.get("base64", "") mime_type = payload.get("mimeType", "audio/mp4") duration_ms = payload.get("durationMs", 0)