"""Einmalige Migration: project_id aus conversation.jsonl nach chat_backup.jsonl zurueckschreiben. Hintergrund: Seit es Projekte gibt (fc0f91d) taggt das Brain jeden Turn in conversation.jsonl mit project_id. chat_backup.jsonl (die Anzeige-Quelle fuer App + Diagnostic) bekam project_id aber erst spaeter (f51ad15). Alle Projekt- Nachrichten aus dem Zeitfenster dazwischen liegen daher in conversation.jsonl korrekt getaggt, in chat_backup.jsonl aber untagged → die UI zeigt sie im Hauptchat statt im Projekt. Diese Migration matcht chat_backup-Eintraege gegen conversation-Turns ueber (role, text) in Reihenfolge und traegt die fehlende project_id nach. Sie ist: - idempotent (Marker-Datei, laeuft genau einmal), - nicht-destruktiv (legt .bak an, aendert nur LEERE project_ids, entfernt nie einen bestehenden Tag), - atomar (tmp-Datei + os.replace). Reihenfolge-erhaltend: pro (role, normalisiertem Text) wird eine Deque der project_ids aus conversation.jsonl aufgebaut (inklusive "" fuer Hauptthread- Turns), damit wiederholte identische Texte ihre jeweils richtige Zuordnung bekommen und Hauptchat-Interleaving nicht faelschlich getaggt wird. """ from __future__ import annotations import json import logging import os from collections import defaultdict, deque from pathlib import Path logger = logging.getLogger("aria.migrate.backfill_projectid") CONVERSATION_FILE = Path(os.environ.get("CONVERSATION_FILE", "/data/conversation.jsonl")) CHAT_BACKUP_FILE = Path(os.environ.get("CHAT_BACKUP_FILE", "/shared/config/chat_backup.jsonl")) MARKER_FILE = Path("/shared/config/.chat_backup_projectid_backfill_v1") def _norm(text: str) -> str: """Match-Key: getrimmt + auf 500 Zeichen begrenzt. Reicht um Turns eindeutig zu unterscheiden, ist aber tolerant gegen minimale Trailing-Unterschiede.""" return (text or "").strip()[:500] def run() -> dict: """Fuehrt die Migration aus. Returns Status-Dict fuers Logging. Laeuft nur einmal (Marker). Fehlt eine der Quelldateien: still ueberspringen.""" if MARKER_FILE.exists(): return {"skipped": "marker_exists"} if not CHAT_BACKUP_FILE.exists(): return {"skipped": "no_chat_backup"} if not CONVERSATION_FILE.exists(): # Ohne Brain-Historie gibt es nichts zu uebernehmen — Marker trotzdem # setzen, damit wir nicht bei jedem Start neu pruefen. _write_marker(0, 0) return {"skipped": "no_conversation"} # 1) conversation.jsonl → Deque der project_ids je (role, normtext), in Reihenfolge. tag_queues: dict[tuple[str, str], deque[str]] = defaultdict(deque) conv_turns = 0 for line in _iter_jsonl(CONVERSATION_FILE): role = line.get("role") if role not in ("user", "assistant"): continue content = line.get("content") if not isinstance(content, str): continue conv_turns += 1 tag_queues[(role, _norm(content))].append((line.get("project_id") or "").strip()) # 2) chat_backup.jsonl durchgehen, leere project_ids nachtragen. try: backup_lines = CHAT_BACKUP_FILE.read_text(encoding="utf-8").splitlines() except Exception as exc: logger.warning("[backfill] chat_backup lesen fehlgeschlagen: %s", exc) return {"error": f"read_backup: {exc}"} out_lines: list[str] = [] patched = 0 matched = 0 for raw in backup_lines: raw = raw.strip() if not raw: continue try: obj = json.loads(raw) except Exception: out_lines.append(raw) # unveraendert durchreichen continue role = obj.get("role") text = obj.get("text") # Nur echte Chat-Bubbles matchen (keine file_deleted-/type-Marker). if role in ("user", "assistant") and isinstance(text, str): q = tag_queues.get((role, _norm(text))) if q: pid = q.popleft() # verbraucht → Reihenfolge fuer Duplikate bleibt korrekt matched += 1 existing = (obj.get("project_id") or "").strip() # Nur setzen wenn Backup-Eintrag noch KEINEN Tag hat und der # conversation-Turn einem Projekt gehoert. Bestehende Tags bleiben. if not existing and pid: obj["project_id"] = pid patched += 1 out_lines.append(json.dumps(obj, ensure_ascii=False)) # 3) Nichts zu tun? Marker setzen und raus. if patched == 0: _write_marker(conv_turns, 0) logger.info("[backfill] nichts nachzutragen (conv_turns=%s, matched=%s)", conv_turns, matched) return {"conv_turns": conv_turns, "matched": matched, "patched": 0} # 4) Sicherung + atomarer Rewrite. try: bak = CHAT_BACKUP_FILE.with_suffix(".jsonl.pre-backfill-v1.bak") if not bak.exists(): bak.write_bytes(CHAT_BACKUP_FILE.read_bytes()) tmp = CHAT_BACKUP_FILE.with_suffix(".jsonl.tmp") tmp.write_text("\n".join(out_lines) + "\n", encoding="utf-8") os.replace(tmp, CHAT_BACKUP_FILE) except Exception as exc: logger.warning("[backfill] Rewrite fehlgeschlagen: %s", exc) return {"error": f"rewrite: {exc}"} _write_marker(conv_turns, patched) logger.info("[backfill] %s Bubbles nachtraeglich getaggt (conv_turns=%s, matched=%s). Backup: %s", patched, conv_turns, matched, bak.name) return {"conv_turns": conv_turns, "matched": matched, "patched": patched} def _iter_jsonl(path: Path): try: for raw in path.read_text(encoding="utf-8").splitlines(): raw = raw.strip() if not raw: continue try: yield json.loads(raw) except Exception: continue except Exception as exc: logger.warning("[backfill] %s lesen fehlgeschlagen: %s", path, exc) def _write_marker(conv_turns: int, patched: int) -> None: try: MARKER_FILE.parent.mkdir(parents=True, exist_ok=True) MARKER_FILE.write_text( json.dumps({"conv_turns": conv_turns, "patched": patched}, ensure_ascii=False), encoding="utf-8", ) except Exception as exc: logger.warning("[backfill] Marker schreiben fehlgeschlagen: %s", exc)