feat: App-Chat-Sync — verpasste Nachrichten + chat_cleared Live-Update
Zwei zusammenhaengende Bugs:
1. App aktualisierte nicht wenn die Diagnostic "Konversation komplett
zuruecksetzen" gedrueckt hat — die App hatte den lokalen Stand weiter
2. Nachrichten die kamen waehrend die App offline/geschlossen war,
wurden nicht nachgeladen
Loesung: chat_backup.jsonl wird wieder geschrieben (Bridge statt Diagnostic,
weil OpenClaw-Code-Pfad tot ist) und dient als Server-Truth fuer App+Diagnostic.
bridge/aria_bridge.py
_append_chat_backup() schreibt jeden Turn (User + ARIA) als JSONL-Zeile
in /shared/config/chat_backup.jsonl. Trigger: send_to_core (User) +
_process_core_response (Assistant, inkl. file-Attachments).
_read_chat_backup_since(since_ms, limit) liest die Datei, filtert auf
ts > since_ms, gibt max limit neueste zurueck. Honoriert file_deleted-Marker.
Neuer RVS-Handler chat_history_request {since, limit?} → antwortet mit
chat_history_response {messages: [...], since}.
diagnostic/server.js
/api/chat-history-clear broadcastet jetzt zusaetzlich chat_cleared via
RVS (sendToRVS_raw), damit App ihre lokale Liste auch leert. Vorher nur
Browser-Clients via broadcast() — App war aussen vor.
rvs/server.js
ALLOWED_TYPES um chat_history_request, chat_history_response, chat_cleared.
android/src/screens/ChatScreen.tsx
- Bei (re)connect: AsyncStorage 'aria_chat_last_sync' lesen → send
chat_history_request {since}
- Handler chat_history_response: incoming → ChatMessage[] mappen,
Attachments aus 'files'-Array rekonstruieren, mergen (Dedup via timestamp),
lastSync hochziehen
- Handler chat_cleared: setMessages([]) + AsyncStorage 'chat_messages' +
'last_sync' weg
- Bei jeder eingehenden chat-Message: 'aria_chat_last_sync' updaten damit
Reconnect nicht doppelt nachzieht
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -396,6 +396,52 @@ const ChatScreen: React.FC = () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// skill_created: ARIA hat einen neuen Skill angelegt → eigene Bubble
|
// skill_created: ARIA hat einen neuen Skill angelegt → eigene Bubble
|
||||||
|
// chat_cleared: Diagnostic hat die History komplett geleert
|
||||||
|
// → lokal auch loeschen (visuell + Persistenz)
|
||||||
|
if (message.type === 'chat_cleared') {
|
||||||
|
console.log('[Chat] chat_cleared — leere lokale Anzeige + Storage');
|
||||||
|
setMessages([]);
|
||||||
|
AsyncStorage.removeItem(CHAT_STORAGE_KEY).catch(() => {});
|
||||||
|
AsyncStorage.removeItem('aria_chat_last_sync').catch(() => {});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// chat_history_response: verpasste Nachrichten nachladen (bei Reconnect)
|
||||||
|
if (message.type === 'chat_history_response') {
|
||||||
|
const p = (message.payload || {}) as any;
|
||||||
|
const incoming = (p.messages || []) as Array<any>;
|
||||||
|
if (!incoming.length) return;
|
||||||
|
console.log(`[Chat] ${incoming.length} verpasste Nachrichten nachgeladen`);
|
||||||
|
const toAdd: ChatMessage[] = incoming.map(m => {
|
||||||
|
const role = m.role === 'user' ? 'user' : 'aria';
|
||||||
|
// ARIA-File-Marker aus dem Backup als attachments rekonstruieren
|
||||||
|
const files = Array.isArray(m.files) ? m.files : [];
|
||||||
|
const attachments = files.map((f: any) => ({
|
||||||
|
type: (typeof f.mimeType === 'string' && f.mimeType.startsWith('image/')) ? 'image' : 'file',
|
||||||
|
name: f.name || 'datei',
|
||||||
|
size: f.size || 0,
|
||||||
|
mimeType: f.mimeType || '',
|
||||||
|
serverPath: f.serverPath || '',
|
||||||
|
})) as Attachment[];
|
||||||
|
return {
|
||||||
|
id: nextId(),
|
||||||
|
sender: role as 'user' | 'aria',
|
||||||
|
text: m.text || '',
|
||||||
|
timestamp: m.ts || Date.now(),
|
||||||
|
attachments: attachments.length ? attachments : undefined,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
const maxTs = incoming.reduce((mx: number, m: any) => Math.max(mx, m.ts || 0), 0);
|
||||||
|
setMessages(prev => {
|
||||||
|
// Dedup auf ts-basis: nicht erneut adden wenn schon was bei +/- 1s vorhanden
|
||||||
|
const existingTs = new Set(prev.map(m => m.timestamp));
|
||||||
|
const newOnes = toAdd.filter(m => !existingTs.has(m.timestamp));
|
||||||
|
return capMessages([...prev, ...newOnes]);
|
||||||
|
});
|
||||||
|
if (maxTs > 0) AsyncStorage.setItem('aria_chat_last_sync', String(maxTs)).catch(() => {});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (message.type === 'skill_created') {
|
if (message.type === 'skill_created') {
|
||||||
const p = (message.payload || {}) as any;
|
const p = (message.payload || {}) as any;
|
||||||
const skillMsg: ChatMessage = {
|
const skillMsg: ChatMessage = {
|
||||||
@@ -480,6 +526,13 @@ const ChatScreen: React.FC = () => {
|
|||||||
const dbgText = ((message.payload.text as string) || '').slice(0, 60);
|
const dbgText = ((message.payload.text as string) || '').slice(0, 60);
|
||||||
console.log('[Chat] chat-event sender=%s text=%s', sender || '(none)', dbgText);
|
console.log('[Chat] chat-event sender=%s text=%s', sender || '(none)', dbgText);
|
||||||
|
|
||||||
|
// last-sync tracken — so dass beim Reconnect nicht wieder dieselbe
|
||||||
|
// Nachricht aus dem Server-Backup nachgeladen wird
|
||||||
|
if (sender === 'aria' || sender === 'user' || sender === 'stt') {
|
||||||
|
const ts = message.timestamp || Date.now();
|
||||||
|
AsyncStorage.setItem('aria_chat_last_sync', String(ts)).catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
// STT-Ergebnis: Transkribierten Text in die Sprach-Bubble schreiben.
|
// STT-Ergebnis: Transkribierten Text in die Sprach-Bubble schreiben.
|
||||||
// WICHTIG: Nur die ERSTE noch unaufgeloeste Aufnahme matchen — sonst
|
// WICHTIG: Nur die ERSTE noch unaufgeloeste Aufnahme matchen — sonst
|
||||||
// wuerde bei zwei kurz hintereinander gesendeten Audios beide Bubbles
|
// wuerde bei zwei kurz hintereinander gesendeten Audios beide Bubbles
|
||||||
@@ -647,6 +700,15 @@ const ChatScreen: React.FC = () => {
|
|||||||
|
|
||||||
const unsubState = rvs.onStateChange((state) => {
|
const unsubState = rvs.onStateChange((state) => {
|
||||||
setConnectionState(state);
|
setConnectionState(state);
|
||||||
|
// Bei (re)connect: verpasste Chat-Eintraege seit der letzten gesehenen
|
||||||
|
// Nachricht abholen. lastChatSync wird beim Eingang von Nachrichten
|
||||||
|
// hochgezaehlt; default 0 = alle (gecappt auf Server-Limit).
|
||||||
|
if (state === 'connected') {
|
||||||
|
AsyncStorage.getItem('aria_chat_last_sync').then(stored => {
|
||||||
|
const since = stored ? parseInt(stored, 10) || 0 : 0;
|
||||||
|
rvs.send('chat_history_request' as any, { since, limit: 100 });
|
||||||
|
}).catch(() => {});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Initalen Status setzen
|
// Initalen Status setzen
|
||||||
|
|||||||
@@ -919,6 +919,56 @@ class ARIABridge:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[rvs] file_from_aria broadcast fehlgeschlagen: %s", e)
|
logger.warning("[rvs] file_from_aria broadcast fehlgeschlagen: %s", e)
|
||||||
|
|
||||||
|
def _append_chat_backup(self, entry: dict) -> None:
|
||||||
|
"""Schreibt eine Zeile in /shared/config/chat_backup.jsonl.
|
||||||
|
Wird von Diagnostic + App als History-Quelle gelesen.
|
||||||
|
entry braucht mindestens {role, text}; ts wird ergaenzt."""
|
||||||
|
try:
|
||||||
|
line = {"ts": int(asyncio.get_event_loop().time() * 1000)}
|
||||||
|
line.update(entry)
|
||||||
|
Path("/shared/config").mkdir(parents=True, exist_ok=True)
|
||||||
|
with open("/shared/config/chat_backup.jsonl", "a", encoding="utf-8") as f:
|
||||||
|
f.write(json.dumps(line, ensure_ascii=False) + "\n")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[backup] chat_backup-Write fehlgeschlagen: %s", e)
|
||||||
|
|
||||||
|
def _read_chat_backup_since(self, since_ms: int, limit: int = 100) -> list[dict]:
|
||||||
|
"""Liest chat_backup.jsonl, gibt Eintraege > since_ms zurueck, max limit neueste.
|
||||||
|
File-deleted-Marker werden honoriert: vor einem file_deleted-Marker liegende
|
||||||
|
Eintraege mit gleichem Pfad werden als deleted markiert."""
|
||||||
|
path = Path("/shared/config/chat_backup.jsonl")
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
lines = path.read_text(encoding="utf-8").splitlines()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[backup] Lesen fehlgeschlagen: %s", e)
|
||||||
|
return []
|
||||||
|
out: list[dict] = []
|
||||||
|
for raw in lines:
|
||||||
|
raw = raw.strip()
|
||||||
|
if not raw:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
obj = json.loads(raw)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
ts = obj.get("ts") or 0
|
||||||
|
if ts <= since_ms:
|
||||||
|
continue
|
||||||
|
# file_deleted-Marker: nicht als Chat ausliefern, aber an die App schicken
|
||||||
|
# damit sie ihre Bubbles updaten kann (separater Pfad existiert ja schon)
|
||||||
|
if obj.get("type") == "file_deleted":
|
||||||
|
continue
|
||||||
|
role = obj.get("role")
|
||||||
|
if role not in ("user", "assistant"):
|
||||||
|
continue
|
||||||
|
out.append(obj)
|
||||||
|
# Auf "limit" neueste cappen
|
||||||
|
if len(out) > limit:
|
||||||
|
out = out[-limit:]
|
||||||
|
return out
|
||||||
|
|
||||||
async def _process_core_response(self, text: str, payload: dict) -> None:
|
async def _process_core_response(self, text: str, payload: dict) -> None:
|
||||||
"""Verarbeitet eine fertige Antwort von aria-core.
|
"""Verarbeitet eine fertige Antwort von aria-core.
|
||||||
|
|
||||||
@@ -933,6 +983,9 @@ class ARIABridge:
|
|||||||
logger.info("[core] NO_REPLY empfangen — Antwort still verworfen")
|
logger.info("[core] NO_REPLY empfangen — Antwort still verworfen")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Antwort in chat_backup.jsonl loggen (cleaned text, ohne File-Marker)
|
||||||
|
# — passiert weiter unten nach extract_file_markers
|
||||||
|
|
||||||
# File-Marker `[FILE: /shared/uploads/aria_xyz.pdf]` extrahieren —
|
# File-Marker `[FILE: /shared/uploads/aria_xyz.pdf]` extrahieren —
|
||||||
# ARIA legt damit Dateien fuer den User bereit (Bilder, PDFs, etc.).
|
# ARIA legt damit Dateien fuer den User bereit (Bilder, PDFs, etc.).
|
||||||
# Der Marker wird aus dem Antworttext entfernt (TTS soll ihn nicht
|
# Der Marker wird aus dem Antworttext entfernt (TTS soll ihn nicht
|
||||||
@@ -949,6 +1002,15 @@ class ARIABridge:
|
|||||||
f"aber nicht erstellt:\n{missing_list}\n"
|
f"aber nicht erstellt:\n{missing_list}\n"
|
||||||
"Bitte ARIA bitten, sie wirklich zu schreiben.").strip()
|
"Bitte ARIA bitten, sie wirklich zu schreiben.").strip()
|
||||||
|
|
||||||
|
# Antwort in chat_backup.jsonl loggen (gecleanter Text, ohne File-Marker)
|
||||||
|
# File-Marker werden separat als file_from_aria-Events ausgeliefert.
|
||||||
|
self._append_chat_backup({
|
||||||
|
"role": "assistant",
|
||||||
|
"text": text,
|
||||||
|
"files": [{"serverPath": f["serverPath"], "name": f["name"],
|
||||||
|
"mimeType": f["mimeType"], "size": f["size"]} for f in aria_files],
|
||||||
|
})
|
||||||
|
|
||||||
metadata = payload.get("metadata", {})
|
metadata = payload.get("metadata", {})
|
||||||
is_critical = metadata.get("critical", False)
|
is_critical = metadata.get("critical", False)
|
||||||
requested_voice = metadata.get("voice")
|
requested_voice = metadata.get("voice")
|
||||||
@@ -1184,6 +1246,10 @@ class ARIABridge:
|
|||||||
payload = json.dumps({"message": text, "source": source}).encode("utf-8")
|
payload = json.dumps({"message": text, "source": source}).encode("utf-8")
|
||||||
logger.info("[brain] chat ← %s '%s'", source, text[:80])
|
logger.info("[brain] chat ← %s '%s'", source, text[:80])
|
||||||
|
|
||||||
|
# User-Nachricht in chat_backup.jsonl loggen — wird beim App-Reconnect
|
||||||
|
# / Diagnostic-Reload als History-Quelle gelesen.
|
||||||
|
self._append_chat_backup({"role": "user", "text": text, "source": source})
|
||||||
|
|
||||||
# agent_activity broadcasten (App + Diagnostic "ARIA denkt..." Indicator)
|
# agent_activity broadcasten (App + Diagnostic "ARIA denkt..." Indicator)
|
||||||
await self._send_to_rvs({
|
await self._send_to_rvs({
|
||||||
"type": "agent_activity",
|
"type": "agent_activity",
|
||||||
@@ -1657,6 +1723,20 @@ class ARIABridge:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
|
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
|
||||||
|
|
||||||
|
elif msg_type == "chat_history_request":
|
||||||
|
# App holt verpasste Nachrichten beim Reconnect.
|
||||||
|
# payload: {since: <ts_ms>}, default 0 = alles
|
||||||
|
since = int(payload.get("since") or 0)
|
||||||
|
limit = int(payload.get("limit") or 100)
|
||||||
|
logger.info("[rvs] chat_history_request since=%d limit=%d", since, limit)
|
||||||
|
messages = self._read_chat_backup_since(since, limit=limit)
|
||||||
|
await self._send_to_rvs({
|
||||||
|
"type": "chat_history_response",
|
||||||
|
"payload": {"messages": messages, "since": since},
|
||||||
|
"timestamp": int(asyncio.get_event_loop().time() * 1000),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
|
||||||
elif msg_type == "file_list_request":
|
elif msg_type == "file_list_request":
|
||||||
# App fragt die Liste aller /shared/uploads/-Dateien an.
|
# App fragt die Liste aller /shared/uploads/-Dateien an.
|
||||||
logger.info("[rvs] file_list_request von App")
|
logger.info("[rvs] file_list_request von App")
|
||||||
|
|||||||
+11
-4
@@ -1449,15 +1449,22 @@ const server = http.createServer((req, res) => {
|
|||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
} else if (req.url === "/api/chat-history-clear" && req.method === "POST") {
|
} else if (req.url === "/api/chat-history-clear" && req.method === "POST") {
|
||||||
// Leert die Diagnostic-Anzeige-History (chat_backup.jsonl).
|
// Leert die Diagnostic-Anzeige-History (chat_backup.jsonl) UND broadcastet
|
||||||
// Brain's Rolling-Window (conversation.jsonl) ist davon unabhaengig —
|
// chat_cleared an alle RVS-Clients (App leert lokal). Brain's
|
||||||
// der Caller sollte zusaetzlich /api/brain/conversation/reset triggern.
|
// Rolling-Window (conversation.jsonl) ist davon unabhaengig — Caller
|
||||||
|
// sollte zusaetzlich /api/brain/conversation/reset triggern.
|
||||||
log("warn", "server", "HTTP /api/chat-history-clear");
|
log("warn", "server", "HTTP /api/chat-history-clear");
|
||||||
try {
|
try {
|
||||||
const file = "/shared/config/chat_backup.jsonl";
|
const file = "/shared/config/chat_backup.jsonl";
|
||||||
if (fs.existsSync(file)) fs.unlinkSync(file);
|
if (fs.existsSync(file)) fs.unlinkSync(file);
|
||||||
// Broadcast: alle Browser-Clients sollen ihre lokale Chat-View leeren
|
// Browser-Clients: leere chat_history
|
||||||
broadcast({ type: "chat_history", messages: [] });
|
broadcast({ type: "chat_history", messages: [] });
|
||||||
|
// App via RVS: chat_cleared
|
||||||
|
sendToRVS_raw({
|
||||||
|
type: "chat_cleared",
|
||||||
|
payload: { ts: Date.now() },
|
||||||
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
res.writeHead(200, { "Content-Type": "application/json" });
|
res.writeHead(200, { "Content-Type": "application/json" });
|
||||||
res.end(JSON.stringify({ ok: true }));
|
res.end(JSON.stringify({ ok: true }));
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ const ALLOWED_TYPES = new Set([
|
|||||||
"xtts_export_voice", "xtts_voice_exported",
|
"xtts_export_voice", "xtts_voice_exported",
|
||||||
"xtts_import_voice", "xtts_voice_imported",
|
"xtts_import_voice", "xtts_voice_imported",
|
||||||
"skill_created",
|
"skill_created",
|
||||||
|
"chat_history_request", "chat_history_response", "chat_cleared",
|
||||||
"xtts_delete_voice",
|
"xtts_delete_voice",
|
||||||
"voice_preload", "voice_ready",
|
"voice_preload", "voice_ready",
|
||||||
"stt_request", "stt_response",
|
"stt_request", "stt_response",
|
||||||
|
|||||||
Reference in New Issue
Block a user