1568c25ac4
v1 matchte text==content exakt und verfehlte damit Nachrichten, bei denen die Bridge den Brain-Text anreichert oder cleant: - User-Turns: _build_core_text PREPENDT [Hinweis: Barge-In]-/[GPS-Position]- Bloecke — die stehen in conversation.jsonl, nicht in chat_backup. - Assistant-Turns: conversation enthaelt [FILE: /shared/uploads/...]-Marker, chat_backup hat sie schon rausgecleant. _norm entfernt jetzt FILE-Marker + fuehrende [..]-Bloecke, kollabiert Whitespace und vergleicht einen 120-Zeichen-Praefix. Neuer Marker (v2) → laeuft einmal neu und fuellt die von v1 verbliebenen Luecken. Nicht-destruktiv (eigene .pre-backfill-v2.bak), bestehende Tags bleiben unangetastet. Mit Tests gegen GPS-/Barge-In-/FILE-Marker-Faelle verifiziert. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
172 lines
7.2 KiB
Python
172 lines
7.2 KiB
Python
"""Einmalige Migration: project_id aus conversation.jsonl nach chat_backup.jsonl
|
|
zurueckschreiben.
|
|
|
|
Hintergrund: Seit es Projekte gibt (fc0f91d) taggt das Brain jeden Turn in
|
|
conversation.jsonl mit project_id. chat_backup.jsonl (die Anzeige-Quelle fuer
|
|
App + Diagnostic) bekam project_id aber erst spaeter (f51ad15). Alle Projekt-
|
|
Nachrichten aus dem Zeitfenster dazwischen liegen daher in conversation.jsonl
|
|
korrekt getaggt, in chat_backup.jsonl aber untagged → die UI zeigt sie im
|
|
Hauptchat statt im Projekt.
|
|
|
|
Diese Migration matcht chat_backup-Eintraege gegen conversation-Turns ueber
|
|
(role, text) in Reihenfolge und traegt die fehlende project_id nach. Sie ist:
|
|
- idempotent (Marker-Datei, laeuft genau einmal),
|
|
- nicht-destruktiv (legt .bak an, aendert nur LEERE project_ids, entfernt nie
|
|
einen bestehenden Tag),
|
|
- atomar (tmp-Datei + os.replace).
|
|
|
|
Reihenfolge-erhaltend: pro (role, normalisiertem Text) wird eine Deque der
|
|
project_ids aus conversation.jsonl aufgebaut (inklusive "" fuer Hauptthread-
|
|
Turns), damit wiederholte identische Texte ihre jeweils richtige Zuordnung
|
|
bekommen und Hauptchat-Interleaving nicht faelschlich getaggt wird.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from collections import defaultdict, deque
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("aria.migrate.backfill_projectid")
|
|
|
|
CONVERSATION_FILE = Path(os.environ.get("CONVERSATION_FILE", "/data/conversation.jsonl"))
|
|
CHAT_BACKUP_FILE = Path(os.environ.get("CHAT_BACKUP_FILE", "/shared/config/chat_backup.jsonl"))
|
|
# v2: robusterer Match (Marker-Strip + Praefix). v1 verlangte exakte Gleichheit
|
|
# von text==content und verfehlte damit alle Nachrichten bei denen die Bridge
|
|
# den Brain-Text anreichert (GPS/Barge-In-Hints prepended) oder cleant
|
|
# (FILE-Marker entfernt). Neuer Marker → laeuft einmal neu, fuellt die Luecken.
|
|
MARKER_FILE = Path("/shared/config/.chat_backup_projectid_backfill_v2")
|
|
|
|
# _build_core_text (Bridge) PREPENDT bei User-Nachrichten Hinweis-/GPS-Bloecke
|
|
# in eckigen Klammern vor den eigentlichen Text; conversation.jsonl speichert
|
|
# diesen angereicherten Text, chat_backup nur den rohen. FILE-Marker stehen in
|
|
# conversation-Assistant-Turns, sind in chat_backup aber schon rausgecleant.
|
|
_FILE_MARKER_RE = re.compile(r"\[FILE:\s*/shared/uploads/[^\]]+\]", re.IGNORECASE)
|
|
_LEADING_BRACKET_RE = re.compile(r"^\s*(?:\[[^\]]*\]\s*)+")
|
|
_WS_RE = re.compile(r"\s+")
|
|
|
|
|
|
def _norm(text: str) -> str:
|
|
"""Match-Key: FILE-Marker + fuehrende [Hinweis]/[GPS]-Bloecke entfernen,
|
|
Whitespace kollabieren, auf 120-Zeichen-Praefix kuerzen. Toleriert damit
|
|
die Anreicherungs-/Cleaning-Unterschiede zwischen conversation und backup,
|
|
bleibt durch den 120er-Praefix aber spezifisch genug gegen Fehl-Matches."""
|
|
t = _FILE_MARKER_RE.sub("", text or "")
|
|
t = _LEADING_BRACKET_RE.sub("", t)
|
|
t = _WS_RE.sub(" ", t).strip()
|
|
return t[:120]
|
|
|
|
|
|
def run() -> dict:
|
|
"""Fuehrt die Migration aus. Returns Status-Dict fuers Logging.
|
|
Laeuft nur einmal (Marker). Fehlt eine der Quelldateien: still ueberspringen."""
|
|
if MARKER_FILE.exists():
|
|
return {"skipped": "marker_exists"}
|
|
if not CHAT_BACKUP_FILE.exists():
|
|
return {"skipped": "no_chat_backup"}
|
|
if not CONVERSATION_FILE.exists():
|
|
# Ohne Brain-Historie gibt es nichts zu uebernehmen — Marker trotzdem
|
|
# setzen, damit wir nicht bei jedem Start neu pruefen.
|
|
_write_marker(0, 0)
|
|
return {"skipped": "no_conversation"}
|
|
|
|
# 1) conversation.jsonl → Deque der project_ids je (role, normtext), in Reihenfolge.
|
|
tag_queues: dict[tuple[str, str], deque[str]] = defaultdict(deque)
|
|
conv_turns = 0
|
|
for line in _iter_jsonl(CONVERSATION_FILE):
|
|
role = line.get("role")
|
|
if role not in ("user", "assistant"):
|
|
continue
|
|
content = line.get("content")
|
|
if not isinstance(content, str):
|
|
continue
|
|
conv_turns += 1
|
|
tag_queues[(role, _norm(content))].append((line.get("project_id") or "").strip())
|
|
|
|
# 2) chat_backup.jsonl durchgehen, leere project_ids nachtragen.
|
|
try:
|
|
backup_lines = CHAT_BACKUP_FILE.read_text(encoding="utf-8").splitlines()
|
|
except Exception as exc:
|
|
logger.warning("[backfill] chat_backup lesen fehlgeschlagen: %s", exc)
|
|
return {"error": f"read_backup: {exc}"}
|
|
|
|
out_lines: list[str] = []
|
|
patched = 0
|
|
matched = 0
|
|
for raw in backup_lines:
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
obj = json.loads(raw)
|
|
except Exception:
|
|
out_lines.append(raw) # unveraendert durchreichen
|
|
continue
|
|
|
|
role = obj.get("role")
|
|
text = obj.get("text")
|
|
# Nur echte Chat-Bubbles matchen (keine file_deleted-/type-Marker).
|
|
if role in ("user", "assistant") and isinstance(text, str):
|
|
q = tag_queues.get((role, _norm(text)))
|
|
if q:
|
|
pid = q.popleft() # verbraucht → Reihenfolge fuer Duplikate bleibt korrekt
|
|
matched += 1
|
|
existing = (obj.get("project_id") or "").strip()
|
|
# Nur setzen wenn Backup-Eintrag noch KEINEN Tag hat und der
|
|
# conversation-Turn einem Projekt gehoert. Bestehende Tags bleiben.
|
|
if not existing and pid:
|
|
obj["project_id"] = pid
|
|
patched += 1
|
|
out_lines.append(json.dumps(obj, ensure_ascii=False))
|
|
|
|
# 3) Nichts zu tun? Marker setzen und raus.
|
|
if patched == 0:
|
|
_write_marker(conv_turns, 0)
|
|
logger.info("[backfill] nichts nachzutragen (conv_turns=%s, matched=%s)",
|
|
conv_turns, matched)
|
|
return {"conv_turns": conv_turns, "matched": matched, "patched": 0}
|
|
|
|
# 4) Sicherung + atomarer Rewrite.
|
|
try:
|
|
bak = CHAT_BACKUP_FILE.with_suffix(".jsonl.pre-backfill-v2.bak")
|
|
if not bak.exists():
|
|
bak.write_bytes(CHAT_BACKUP_FILE.read_bytes())
|
|
tmp = CHAT_BACKUP_FILE.with_suffix(".jsonl.tmp")
|
|
tmp.write_text("\n".join(out_lines) + "\n", encoding="utf-8")
|
|
os.replace(tmp, CHAT_BACKUP_FILE)
|
|
except Exception as exc:
|
|
logger.warning("[backfill] Rewrite fehlgeschlagen: %s", exc)
|
|
return {"error": f"rewrite: {exc}"}
|
|
|
|
_write_marker(conv_turns, patched)
|
|
logger.info("[backfill] %s Bubbles nachtraeglich getaggt (conv_turns=%s, matched=%s). Backup: %s",
|
|
patched, conv_turns, matched, bak.name)
|
|
return {"conv_turns": conv_turns, "matched": matched, "patched": patched}
|
|
|
|
|
|
def _iter_jsonl(path: Path):
|
|
try:
|
|
for raw in path.read_text(encoding="utf-8").splitlines():
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
yield json.loads(raw)
|
|
except Exception:
|
|
continue
|
|
except Exception as exc:
|
|
logger.warning("[backfill] %s lesen fehlgeschlagen: %s", path, exc)
|
|
|
|
|
|
def _write_marker(conv_turns: int, patched: int) -> None:
|
|
try:
|
|
MARKER_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
MARKER_FILE.write_text(
|
|
json.dumps({"conv_turns": conv_turns, "patched": patched}, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("[backfill] Marker schreiben fehlgeschlagen: %s", exc)
|