""" Conversation-State — ein einziger Rolling-Window-State fuer ARIAs laufendes Gespraech mit Stefan. Stefan-Entscheidung: KEINE Sessions, KEIN Multi-Thread. EIN Strang, intern rollend. Was rausfaellt, wird ggf. destilliert und landet als type=fact Memory in der Vector-DB. Persistenz: append-only JSONL unter /data/conversation.jsonl. Bei Restart wird die letzte N gelesen (komplett vermeidet Memory- Overhead bei sehr langen Verlaeufen). """ from __future__ import annotations import json import logging import os from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from typing import List, Optional logger = logging.getLogger(__name__) CONVERSATION_FILE = Path(os.environ.get("CONVERSATION_FILE", "/data/conversation.jsonl")) @dataclass class Turn: role: str # "user" | "assistant" content: str ts: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) source: str = "" # "app" / "diagnostic" / "stt" — optional class Conversation: """In-Memory Rolling Window, mit JSONL-Persistenz.""" def __init__(self, max_window: int = 50, distill_threshold: int = 60, distill_count: int = 30): self.max_window = max_window self.distill_threshold = distill_threshold self.distill_count = distill_count self.turns: List[Turn] = [] self._load() def _load(self): if not CONVERSATION_FILE.exists(): return try: lines = CONVERSATION_FILE.read_text(encoding="utf-8").splitlines() except Exception as exc: logger.warning("Konversation laden fehlgeschlagen: %s", exc) return loaded: List[Turn] = [] for line in lines: line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if obj.get("op") == "distill": # Marker: bis hierhin wurde alles destilliert drop_until_ts = obj.get("ts", "") if drop_until_ts: loaded = [t for t in loaded if t.ts > drop_until_ts] continue role = obj.get("role") content = obj.get("content") if role in ("user", "assistant") and isinstance(content, str): loaded.append(Turn(role=role, content=content, ts=obj.get("ts", ""), source=obj.get("source", ""))) self.turns = loaded logger.info("Konversation geladen: %d Turns aus %s", len(self.turns), CONVERSATION_FILE) def _append_to_file(self, record: dict): try: CONVERSATION_FILE.parent.mkdir(parents=True, exist_ok=True) with CONVERSATION_FILE.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception as exc: logger.warning("Konversation persist fehlgeschlagen: %s", exc) def add(self, role: str, content: str, source: str = "") -> Turn: t = Turn(role=role, content=content, source=source) self.turns.append(t) self._append_to_file({ "ts": t.ts, "role": t.role, "content": t.content, "source": t.source, }) return t def window(self) -> List[Turn]: """Die letzten max_window Turns — gehen in den LLM-Prompt.""" return self.turns[-self.max_window:] def needs_distill(self) -> bool: return len(self.turns) > self.distill_threshold def take_oldest_for_distill(self) -> List[Turn]: """Gibt die N aeltesten Turns zurueck — fuer den Destillat-Call. Entfernt sie NICHT — das macht commit_distill nach erfolgreichem Call.""" return self.turns[: self.distill_count] def commit_distill(self, last_distilled_ts: str): """Schreibt einen Distill-Marker, entfernt aus dem In-Memory-Window.""" self._append_to_file({"op": "distill", "ts": last_distilled_ts}) self.turns = [t for t in self.turns if t.ts > last_distilled_ts] logger.info("Distill commit bei ts=%s — Window jetzt %d Turns", last_distilled_ts, len(self.turns)) def reset(self): """Hardes Reset — verwende vorsichtig (Diagnostic-Button).""" try: if CONVERSATION_FILE.exists(): CONVERSATION_FILE.unlink() except Exception: pass self.turns = [] logger.warning("Konversation komplett zurueckgesetzt") def _rewrite_file(self) -> None: """Datei komplett aus In-Memory-State neu schreiben. Wird nach Mutationen (Loeschen) genutzt. Alte distill-Marker gehen dabei verloren — das ist OK weil der In-Memory-State bereits post-distill ist.""" try: CONVERSATION_FILE.parent.mkdir(parents=True, exist_ok=True) tmp = CONVERSATION_FILE.with_suffix(".jsonl.tmp") with tmp.open("w", encoding="utf-8") as f: for t in self.turns: f.write(json.dumps({ "ts": t.ts, "role": t.role, "content": t.content, "source": t.source, }, ensure_ascii=False) + "\n") tmp.replace(CONVERSATION_FILE) except Exception as exc: logger.warning("Konversation rewrite fehlgeschlagen: %s", exc) def remove_by_match(self, role: str, content: str, ts_iso_hint: Optional[str] = None) -> bool: """Entfernt EINEN Turn mit passendem role + content. Bei Mehrfach-Match (z.B. zwei identische 'ja'-Turns) waehlt den naehesten zum ts_iso_hint, sonst den juengsten. Returns True wenn was entfernt wurde. """ candidates = [(i, t) for i, t in enumerate(self.turns) if t.role == role and t.content == content] if not candidates: logger.info("[conv] remove_by_match: kein Match fuer role=%s content[:40]=%r", role, content[:40]) return False if len(candidates) > 1 and ts_iso_hint: def _diff(item): _, turn = item try: return abs((datetime.fromisoformat(turn.ts.replace("Z", "+00:00")) - datetime.fromisoformat(ts_iso_hint.replace("Z", "+00:00"))).total_seconds()) except Exception: return 1e9 candidates.sort(key=_diff) idx, turn = candidates[0] if not ts_iso_hint else candidates[0] self.turns.pop(idx) self._rewrite_file() logger.info("[conv] Turn entfernt: role=%s ts=%s content[:40]=%r", turn.role, turn.ts, turn.content[:40]) return True def stats(self) -> dict: return { "turns": len(self.turns), "max_window": self.max_window, "distill_threshold": self.distill_threshold, "needs_distill": self.needs_distill(), }