""" Conversation-State — ein einziger Rolling-Window-State fuer ARIAs laufendes Gespraech mit Stefan. Stefan-Entscheidung: KEINE Sessions, KEIN Multi-Thread. EIN Strang, intern rollend. Was rausfaellt, wird ggf. destilliert und landet als type=fact Memory in der Vector-DB. Persistenz: append-only JSONL unter /data/conversation.jsonl. Bei Restart wird die letzte N gelesen (komplett vermeidet Memory- Overhead bei sehr langen Verlaeufen). """ from __future__ import annotations import json import logging import os from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from typing import List, Optional logger = logging.getLogger(__name__) CONVERSATION_FILE = Path(os.environ.get("CONVERSATION_FILE", "/data/conversation.jsonl")) @dataclass class Turn: role: str # "user" | "assistant" content: str ts: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) source: str = "" # "app" / "diagnostic" / "stt" — optional class Conversation: """In-Memory Rolling Window, mit JSONL-Persistenz.""" def __init__(self, max_window: int = 50, distill_threshold: int = 60, distill_count: int = 30): self.max_window = max_window self.distill_threshold = distill_threshold self.distill_count = distill_count self.turns: List[Turn] = [] self._load() def _load(self): if not CONVERSATION_FILE.exists(): return try: lines = CONVERSATION_FILE.read_text(encoding="utf-8").splitlines() except Exception as exc: logger.warning("Konversation laden fehlgeschlagen: %s", exc) return loaded: List[Turn] = [] for line in lines: line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if obj.get("op") == "distill": # Marker: bis hierhin wurde alles destilliert drop_until_ts = obj.get("ts", "") if drop_until_ts: loaded = [t for t in loaded if t.ts > drop_until_ts] continue role = obj.get("role") content = obj.get("content") if role in ("user", "assistant") and isinstance(content, str): loaded.append(Turn(role=role, content=content, ts=obj.get("ts", ""), source=obj.get("source", ""))) self.turns = loaded logger.info("Konversation geladen: %d Turns aus %s", len(self.turns), CONVERSATION_FILE) def _append_to_file(self, record: dict): try: CONVERSATION_FILE.parent.mkdir(parents=True, exist_ok=True) with CONVERSATION_FILE.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception as exc: logger.warning("Konversation persist fehlgeschlagen: %s", exc) def add(self, role: str, content: str, source: str = "") -> Turn: t = Turn(role=role, content=content, source=source) self.turns.append(t) self._append_to_file({ "ts": t.ts, "role": t.role, "content": t.content, "source": t.source, }) return t def window(self) -> List[Turn]: """Die letzten max_window Turns — gehen in den LLM-Prompt.""" return self.turns[-self.max_window:] def needs_distill(self) -> bool: return len(self.turns) > self.distill_threshold def take_oldest_for_distill(self) -> List[Turn]: """Gibt die N aeltesten Turns zurueck — fuer den Destillat-Call. Entfernt sie NICHT — das macht commit_distill nach erfolgreichem Call.""" return self.turns[: self.distill_count] def commit_distill(self, last_distilled_ts: str): """Schreibt einen Distill-Marker, entfernt aus dem In-Memory-Window.""" self._append_to_file({"op": "distill", "ts": last_distilled_ts}) self.turns = [t for t in self.turns if t.ts > last_distilled_ts] logger.info("Distill commit bei ts=%s — Window jetzt %d Turns", last_distilled_ts, len(self.turns)) def reset(self): """Hardes Reset — verwende vorsichtig (Diagnostic-Button).""" try: if CONVERSATION_FILE.exists(): CONVERSATION_FILE.unlink() except Exception: pass self.turns = [] logger.warning("Konversation komplett zurueckgesetzt") def stats(self) -> dict: return { "turns": len(self.turns), "max_window": self.max_window, "distill_threshold": self.distill_threshold, "needs_distill": self.needs_distill(), }