feat(projects): Migration — alt-getaggte Nachrichten nachtraeglich in Projekte einsortieren
Projekt-Nachrichten aus der Zeit vor dem chat_backup-project_id-Feld (getaggt in conversation.jsonl seitfc0f91d, aber chat_backup fuehrte project_id erst abf51ad15) lagen in der UI im Hauptchat statt im Projekt. Die App/Diagnostic zeigen aus chat_backup.jsonl — dort fehlte der Tag. Neue Einmal-Migration (Brain-Lifespan) schreibt project_id aus conversation.jsonl per (role, text)-Match reihenfolge-erhaltend nach chat_backup.jsonl zurueck: - idempotent via Marker /shared/config/.chat_backup_projectid_backfill_v1 - nicht-destruktiv: legt .pre-backfill-v1.bak an, setzt nur LEERE project_ids, entfernt/aendert nie einen bestehenden Tag - atomar (tmp + os.replace) - Duplikate: Deque je (role, normtext) inkl. "" fuer Hauptthread → korrekte Zuordnung auch bei wiederholten Texten, kein faelschliches Taggen von Hauptchat-Interleaving Mit Logik-Tests (Zuordnung, Duplikat-Reihenfolge, Idempotenz) verifiziert. Nachrichten aus der Zeit bevor es Projekte gab bleiben untagged im Hauptchat. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -112,6 +112,17 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.warning("Lifespan: spotify fast_patterns Migration: %s", exc)
|
||||
|
||||
# Einmalige Migration: project_id aus conversation.jsonl nach chat_backup.jsonl
|
||||
# zurueckschreiben, damit alt-getaggte Projekt-Nachrichten (getaggt bevor
|
||||
# chat_backup project_id fuehrte) in der UI wieder im richtigen Projekt
|
||||
# landen. Idempotent (Marker), nicht-destruktiv (.bak), atomar.
|
||||
try:
|
||||
import migrate_backfill_projectid
|
||||
res = migrate_backfill_projectid.run()
|
||||
logger.info("Lifespan: chat_backup project_id Backfill: %s", res)
|
||||
except Exception as exc:
|
||||
logger.warning("Lifespan: project_id Backfill Migration: %s", exc)
|
||||
|
||||
task = asyncio.create_task(background_mod.run_loop(agent))
|
||||
logger.info("Lifespan: Trigger-Loop gestartet")
|
||||
try:
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
"""Einmalige Migration: project_id aus conversation.jsonl nach chat_backup.jsonl
|
||||
zurueckschreiben.
|
||||
|
||||
Hintergrund: Seit es Projekte gibt (fc0f91d) taggt das Brain jeden Turn in
|
||||
conversation.jsonl mit project_id. chat_backup.jsonl (die Anzeige-Quelle fuer
|
||||
App + Diagnostic) bekam project_id aber erst spaeter (f51ad15). Alle Projekt-
|
||||
Nachrichten aus dem Zeitfenster dazwischen liegen daher in conversation.jsonl
|
||||
korrekt getaggt, in chat_backup.jsonl aber untagged → die UI zeigt sie im
|
||||
Hauptchat statt im Projekt.
|
||||
|
||||
Diese Migration matcht chat_backup-Eintraege gegen conversation-Turns ueber
|
||||
(role, text) in Reihenfolge und traegt die fehlende project_id nach. Sie ist:
|
||||
- idempotent (Marker-Datei, laeuft genau einmal),
|
||||
- nicht-destruktiv (legt .bak an, aendert nur LEERE project_ids, entfernt nie
|
||||
einen bestehenden Tag),
|
||||
- atomar (tmp-Datei + os.replace).
|
||||
|
||||
Reihenfolge-erhaltend: pro (role, normalisiertem Text) wird eine Deque der
|
||||
project_ids aus conversation.jsonl aufgebaut (inklusive "" fuer Hauptthread-
|
||||
Turns), damit wiederholte identische Texte ihre jeweils richtige Zuordnung
|
||||
bekommen und Hauptchat-Interleaving nicht faelschlich getaggt wird.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from collections import defaultdict, deque
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("aria.migrate.backfill_projectid")
|
||||
|
||||
CONVERSATION_FILE = Path(os.environ.get("CONVERSATION_FILE", "/data/conversation.jsonl"))
|
||||
CHAT_BACKUP_FILE = Path(os.environ.get("CHAT_BACKUP_FILE", "/shared/config/chat_backup.jsonl"))
|
||||
MARKER_FILE = Path("/shared/config/.chat_backup_projectid_backfill_v1")
|
||||
|
||||
|
||||
def _norm(text: str) -> str:
|
||||
"""Match-Key: getrimmt + auf 500 Zeichen begrenzt. Reicht um Turns eindeutig
|
||||
zu unterscheiden, ist aber tolerant gegen minimale Trailing-Unterschiede."""
|
||||
return (text or "").strip()[:500]
|
||||
|
||||
|
||||
def run() -> dict:
|
||||
"""Fuehrt die Migration aus. Returns Status-Dict fuers Logging.
|
||||
Laeuft nur einmal (Marker). Fehlt eine der Quelldateien: still ueberspringen."""
|
||||
if MARKER_FILE.exists():
|
||||
return {"skipped": "marker_exists"}
|
||||
if not CHAT_BACKUP_FILE.exists():
|
||||
return {"skipped": "no_chat_backup"}
|
||||
if not CONVERSATION_FILE.exists():
|
||||
# Ohne Brain-Historie gibt es nichts zu uebernehmen — Marker trotzdem
|
||||
# setzen, damit wir nicht bei jedem Start neu pruefen.
|
||||
_write_marker(0, 0)
|
||||
return {"skipped": "no_conversation"}
|
||||
|
||||
# 1) conversation.jsonl → Deque der project_ids je (role, normtext), in Reihenfolge.
|
||||
tag_queues: dict[tuple[str, str], deque[str]] = defaultdict(deque)
|
||||
conv_turns = 0
|
||||
for line in _iter_jsonl(CONVERSATION_FILE):
|
||||
role = line.get("role")
|
||||
if role not in ("user", "assistant"):
|
||||
continue
|
||||
content = line.get("content")
|
||||
if not isinstance(content, str):
|
||||
continue
|
||||
conv_turns += 1
|
||||
tag_queues[(role, _norm(content))].append((line.get("project_id") or "").strip())
|
||||
|
||||
# 2) chat_backup.jsonl durchgehen, leere project_ids nachtragen.
|
||||
try:
|
||||
backup_lines = CHAT_BACKUP_FILE.read_text(encoding="utf-8").splitlines()
|
||||
except Exception as exc:
|
||||
logger.warning("[backfill] chat_backup lesen fehlgeschlagen: %s", exc)
|
||||
return {"error": f"read_backup: {exc}"}
|
||||
|
||||
out_lines: list[str] = []
|
||||
patched = 0
|
||||
matched = 0
|
||||
for raw in backup_lines:
|
||||
raw = raw.strip()
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(raw)
|
||||
except Exception:
|
||||
out_lines.append(raw) # unveraendert durchreichen
|
||||
continue
|
||||
|
||||
role = obj.get("role")
|
||||
text = obj.get("text")
|
||||
# Nur echte Chat-Bubbles matchen (keine file_deleted-/type-Marker).
|
||||
if role in ("user", "assistant") and isinstance(text, str):
|
||||
q = tag_queues.get((role, _norm(text)))
|
||||
if q:
|
||||
pid = q.popleft() # verbraucht → Reihenfolge fuer Duplikate bleibt korrekt
|
||||
matched += 1
|
||||
existing = (obj.get("project_id") or "").strip()
|
||||
# Nur setzen wenn Backup-Eintrag noch KEINEN Tag hat und der
|
||||
# conversation-Turn einem Projekt gehoert. Bestehende Tags bleiben.
|
||||
if not existing and pid:
|
||||
obj["project_id"] = pid
|
||||
patched += 1
|
||||
out_lines.append(json.dumps(obj, ensure_ascii=False))
|
||||
|
||||
# 3) Nichts zu tun? Marker setzen und raus.
|
||||
if patched == 0:
|
||||
_write_marker(conv_turns, 0)
|
||||
logger.info("[backfill] nichts nachzutragen (conv_turns=%s, matched=%s)",
|
||||
conv_turns, matched)
|
||||
return {"conv_turns": conv_turns, "matched": matched, "patched": 0}
|
||||
|
||||
# 4) Sicherung + atomarer Rewrite.
|
||||
try:
|
||||
bak = CHAT_BACKUP_FILE.with_suffix(".jsonl.pre-backfill-v1.bak")
|
||||
if not bak.exists():
|
||||
bak.write_bytes(CHAT_BACKUP_FILE.read_bytes())
|
||||
tmp = CHAT_BACKUP_FILE.with_suffix(".jsonl.tmp")
|
||||
tmp.write_text("\n".join(out_lines) + "\n", encoding="utf-8")
|
||||
os.replace(tmp, CHAT_BACKUP_FILE)
|
||||
except Exception as exc:
|
||||
logger.warning("[backfill] Rewrite fehlgeschlagen: %s", exc)
|
||||
return {"error": f"rewrite: {exc}"}
|
||||
|
||||
_write_marker(conv_turns, patched)
|
||||
logger.info("[backfill] %s Bubbles nachtraeglich getaggt (conv_turns=%s, matched=%s). Backup: %s",
|
||||
patched, conv_turns, matched, bak.name)
|
||||
return {"conv_turns": conv_turns, "matched": matched, "patched": patched}
|
||||
|
||||
|
||||
def _iter_jsonl(path: Path):
|
||||
try:
|
||||
for raw in path.read_text(encoding="utf-8").splitlines():
|
||||
raw = raw.strip()
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(raw)
|
||||
except Exception:
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.warning("[backfill] %s lesen fehlgeschlagen: %s", path, exc)
|
||||
|
||||
|
||||
def _write_marker(conv_turns: int, patched: int) -> None:
|
||||
try:
|
||||
MARKER_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
MARKER_FILE.write_text(
|
||||
json.dumps({"conv_turns": conv_turns, "patched": patched}, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("[backfill] Marker schreiben fehlgeschlagen: %s", exc)
|
||||
Reference in New Issue
Block a user