""" Conversation-Loop. Eine Anfrage von Stefan, eine Antwort von ARIA. Pro Turn: 1. user-Turn an die laufende Conversation appenden 2. Hot Memory holen (alle pinned Punkte) 3. Cold Memory holen (Top-K semantisch zur user-Nachricht) 4. System-Prompt aus Hot+Cold bauen 5. Messages = [system, *window, user] 6. Claude via Proxy aufrufen 7. Assistant-Reply in Conversation appenden + zurueckgeben Memory-Destillat laeuft asynchron NACH dem Reply, gesteuert vom /chat-Endpoint ueber BackgroundTasks. """ from __future__ import annotations import json import logging from typing import Optional from conversation import Conversation, Turn from memory import Embedder, VectorStore, MemoryPoint from prompts import build_system_prompt from proxy_client import ProxyClient, Message as ProxyMessage import skills as skills_mod import triggers as triggers_mod import watcher as watcher_mod logger = logging.getLogger(__name__) # Meta-Tool: ARIA kann selbst neue Skills bauen META_TOOLS = [ { "type": "function", "function": { "name": "skill_create", "description": ( "Erstelle einen neuen Skill (wiederverwendbare Faehigkeit). " "Skills sind IMMER Python — jeder Skill bekommt seine eigene venv " "mit den pip_packages die er braucht.\n\n" "HARTE REGEL — IMMER Skill anlegen wenn: die Loesung erfordert eine " "pip-Library. Sonst muesste der Install bei jedem Container-Restart " "neu laufen (Brain hat keinen persistenten State ausser /data/skills/).\n\n" "Sonst NUR wenn ALLE Kriterien erfuellt sind:\n" " 1) wiederkehrend (Aufgabe kommt realistisch nochmal),\n" " 2) nicht-trivial (mehrere Schritte),\n" " 3) parametrisierbar (nimmt Eingaben, gibt Ergebnis),\n" " 4) wiederverwendbar als ganzes Paket.\n" "NICHT fuer einzelne Shell-Befehle (date, hostname, ls etc.) und " "nicht fuer Einmal-Faelle. Stefan kann Skill-Erstellung explizit " "triggern (\"bau daraus einen Skill\").\n\n" "Wenn etwas nur via apt-Paket geht — Stefan fragen ob es ins " "Brain-Dockerfile soll, NICHT als Skill bauen." ), "parameters": { "type": "object", "properties": { "name": {"type": "string", "description": "kurz, kebab-case, a-z 0-9 - _"}, "description": {"type": "string", "description": "Was kann der Skill? 1 Satz."}, "entry_code": { "type": "string", "description": ( "Python-Code. Args lesen via os.environ['ARG_NAME']. " "Resultat per print() (stdout) zurueck. Bei Fehler: " "non-zero exit (sys.exit(1) o.ae.)." ), }, "readme": {"type": "string", "description": "Markdown — was macht der Skill, Beispiel-Aufrufe"}, "pip_packages": { "type": "array", "items": {"type": "string"}, "description": "pip-Pakete die in der venv installiert werden (z.B. requests, yt-dlp, pypdf)", }, "args": { "type": "array", "items": {"type": "object"}, "description": "Argumente-Schema [{name, type, required, description}]", }, }, "required": ["name", "description", "entry_code"], }, }, }, { "type": "function", "function": { "name": "skill_list", "description": "Zeigt alle Skills (inkl. deaktivierte). Sollte selten noetig sein — die Liste steht eh im System-Prompt.", "parameters": {"type": "object", "properties": {}}, }, }, { "type": "function", "function": { "name": "trigger_timer", "description": ( "Lege einen Timer-Trigger an — feuert EINMALIG und ruft dich dann selbst auf " "(Push-Nachricht an Stefan). Use-Case: 'erinnere mich in 10min', " "'sag mir um 14:30 Bescheid'. Genau EINES von `in_seconds` ODER `fires_at` " "muss gesetzt sein." ), "parameters": { "type": "object", "properties": { "name": {"type": "string", "description": "kurzer kebab-case-Name, a-z 0-9 - _"}, "in_seconds": { "type": "integer", "description": ( "Relativ ab jetzt in Sekunden. Bevorzugt bei Angaben wie " "'in 2 Minuten' (=120), 'in 1 Stunde' (=3600). " "Server berechnet daraus den absoluten Feuer-Zeitpunkt." ), }, "fires_at": { "type": "string", "description": ( "Absoluter ISO-Timestamp UTC fuer feste Termine, z.B. " "'2026-05-12T14:30:00Z'. Die aktuelle Zeit findest du im " "System-Prompt unter '## Aktuelle Zeit'. Fuer relative Angaben " "lieber `in_seconds` nutzen." ), }, "message": {"type": "string", "description": "Was soll bei der Erinnerung gesagt werden"}, }, "required": ["name", "message"], }, }, }, { "type": "function", "function": { "name": "trigger_watcher", "description": ( "Lege einen Watcher-Trigger an — pollt alle paar Minuten eine Condition, " "feuert wenn sie wahr wird (mit Throttle damit's nicht spammt). " "Use-Case: 'sag bescheid wenn Disk unter 5GB', 'pingt mich wenn um 8 Uhr'. " "Welche Variablen verfuegbar sind und ihre Bedeutung steht im System-Prompt." ), "parameters": { "type": "object", "properties": { "name": {"type": "string", "description": "kurzer Name"}, "condition": { "type": "string", "description": ( "Boolescher Ausdruck mit den erlaubten Variablen, z.B. " "'disk_free_gb < 5', 'hour_of_day == 8 and day_of_week == \"mon\"'. " "Operatoren: < > <= >= == != and or not" ), }, "message": {"type": "string", "description": "Was soll bei Erfuellung gesagt werden"}, "check_interval_sec": { "type": "integer", "description": "Wie oft Condition pruefen (Default 300 = alle 5min, min 30)", }, "throttle_sec": { "type": "integer", "description": "Mindestabstand zwischen 2 Feuerungen (Default 3600 = max 1x/h)", }, }, "required": ["name", "condition", "message"], }, }, }, { "type": "function", "function": { "name": "trigger_cancel", "description": "Loescht einen Trigger (Timer abbrechen oder Watcher entfernen).", "parameters": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, }, { "type": "function", "function": { "name": "trigger_list", "description": "Zeigt alle Trigger (active + inaktiv). Selten noetig — Stefan sieht sie im Diagnostic.", "parameters": {"type": "object", "properties": {}}, }, }, { "type": "function", "function": { "name": "request_location_tracking", "description": ( "Bittet die App, das kontinuierliche GPS-Tracking zu aktivieren oder zu " "deaktivieren. Default ist AUS (Akku-Schutz). Nutze das wenn du einen " "GPS-basierten Watcher anlegst (z.B. `near(...)`), sonst hat die App " "veraltete Position und der Watcher feuert nie. Auch wieder ausschalten " "wenn der letzte GPS-Watcher geloescht wurde." ), "parameters": { "type": "object", "properties": { "on": {"type": "boolean", "description": "true = Tracking an, false = aus"}, "reason": {"type": "string", "description": "Kurzer Grund (wird in App-Notification angezeigt)"}, }, "required": ["on"], }, }, }, ] def _skill_to_tool(s: dict) -> dict: """Mappt einen Skill auf ein OpenAI-Function-Tool.""" args = s.get("args") or [] props = {} required = [] for a in args: if not isinstance(a, dict): continue name = a.get("name") or "" if not name: continue props[name] = { "type": a.get("type", "string"), "description": a.get("description", ""), } if a.get("required"): required.append(name) return { "type": "function", "function": { "name": f"run_{s['name']}", "description": s.get("description", "(ohne Beschreibung)"), "parameters": { "type": "object", "properties": props, "required": required, }, }, } class Agent: def __init__(self, store: VectorStore, embedder: Embedder, conversation: Conversation, proxy: ProxyClient, cold_k: int = 5): self.store = store self.embedder = embedder self.conversation = conversation self.proxy = proxy self.cold_k = cold_k # Side-Channel-Events die im Turn entstehen (z.B. skill_create). # Werden vom /chat-Endpoint in der Response mitgeschickt, damit # Stefan in der App und Diagnostic eine sichtbare Bubble bekommt. self._pending_events: list[dict] = [] def pop_events(self) -> list[dict]: """Holt die Events des letzten chat()-Calls und leert die Liste.""" events = self._pending_events self._pending_events = [] return events # ── Hauptpfad: ein User-Turn → Tool-Loop → finaler Reply ── MAX_TOOL_ITERATIONS = 8 # Schutz vor Endlos-Loops def chat(self, user_message: str, source: str = "") -> str: user_message = (user_message or "").strip() if not user_message: raise ValueError("Leere Nachricht") # Events vom letzten Turn weglassen self._pending_events = [] # 1. User-Turn an die Konversation self.conversation.add("user", user_message, source=source) # 2. Hot Memory (alle pinned Punkte) hot = self.store.list_pinned() # 3. Cold Memory (Top-K semantic) try: qvec = self.embedder.embed(user_message) cold = self.store.search(qvec, k=self.cold_k, exclude_pinned=True) except Exception as exc: logger.warning("Cold-Search fehlgeschlagen: %s", exc) cold = [] # 4. Aktive Skills holen + Tool-Liste bauen all_skills = skills_mod.list_skills(active_only=False) active_skills = [s for s in all_skills if s.get("active", True)] tools = list(META_TOOLS) + [_skill_to_tool(s) for s in active_skills] # Trigger-Liste + Variablen-Info fuer den System-Prompt all_triggers = triggers_mod.list_triggers(active_only=False) condition_vars = watcher_mod.describe_variables() condition_funcs = watcher_mod.describe_functions() # 5. System-Prompt + Window-Messages system_prompt = build_system_prompt(hot, cold, skills=all_skills, triggers=all_triggers, condition_vars=condition_vars, condition_funcs=condition_funcs) messages = [ProxyMessage(role="system", content=system_prompt)] for t in self.conversation.window(): messages.append(ProxyMessage(role=t.role, content=t.content)) logger.info("chat: pinned=%d cold=%d skills=%d/%d window=%d prompt_chars=%d", len(hot), len(cold), len(active_skills), len(all_skills), len(self.conversation.window()), len(system_prompt)) # 6. Tool-Use-Loop final_reply = "" for iteration in range(self.MAX_TOOL_ITERATIONS): result = self.proxy.chat_full(messages, tools=tools) if result.tool_calls: # Assistant-Turn mit tool_calls in messages anhaengen (nicht in Conversation!) messages.append(ProxyMessage( role="assistant", content=result.content or None, tool_calls=[{ "id": tc["id"], "type": "function", "function": {"name": tc["name"], "arguments": json.dumps(tc["arguments"])}, } for tc in result.tool_calls], )) # Tools ausfuehren + Ergebnis als role=tool zurueck for tc in result.tool_calls: tool_result = self._dispatch_tool(tc["name"], tc["arguments"]) messages.append(ProxyMessage( role="tool", tool_call_id=tc["id"], name=tc["name"], content=tool_result[:8000], )) continue # next iteration mit Tool-Results # Kein Tool-Call mehr → final reply final_reply = (result.content or "").strip() break else: # Loop-Limit erreicht final_reply = "[Tool-Loop-Limit erreicht — ARIA hat zu viele Tool-Calls gemacht ohne fertig zu werden]" logger.warning("Tool-Loop hit MAX_TOOL_ITERATIONS=%d", self.MAX_TOOL_ITERATIONS) if not final_reply: raise RuntimeError("Leerer Reply vom Proxy") # 7. Assistant-Turn (final reply) in die Conversation self.conversation.add("assistant", final_reply) return final_reply # ── Tool-Dispatcher ─────────────────────────────────────── def _dispatch_tool(self, name: str, arguments: dict) -> str: """Fuehrt einen Tool-Call aus und gibt ein kurzes Text-Resultat zurueck. Niemals werfen — Fehler werden als Text-Resultat reportet damit Claude weitermachen kann.""" try: if name == "skill_create": # ARIA-Skills sind immer Python — execution ist nicht mehr im Schema manifest = skills_mod.create_skill( name=arguments["name"], description=arguments["description"], execution="local-venv", entry_code=arguments["entry_code"], readme=arguments.get("readme", ""), args=arguments.get("args", []), pip_packages=arguments.get("pip_packages", []), author="aria", ) # Side-Channel-Event: Stefan soll sehen wenn ARIA was anlegt self._pending_events.append({ "type": "skill_created", "skill": { "name": manifest["name"], "description": manifest.get("description", ""), "execution": manifest.get("execution", ""), "active": manifest.get("active", True), "setup_error": manifest.get("setup_error"), }, }) return f"OK — Skill '{manifest['name']}' erstellt (active={manifest['active']})." if name == "skill_list": items = skills_mod.list_skills(active_only=False) if not items: return "(keine Skills vorhanden)" return "\n".join( f"- {s['name']} ({s['execution']}) {'aktiv' if s.get('active', True) else 'DEAKTIVIERT'}: {s.get('description', '')}" for s in items ) if name.startswith("run_"): skill_name = name[len("run_"):] res = skills_mod.run_skill(skill_name, args=arguments) snippet = (res.get("stdout") or "")[:2000] or "(kein stdout)" err = (res.get("stderr") or "")[:500] marker = "OK" if res["ok"] else f"FEHLER (exit={res['exit_code']})" out = f"{marker} · {res['duration_sec']}s\nstdout:\n{snippet}" if err: out += f"\nstderr:\n{err}" return out if name == "trigger_timer": fires_at_iso = arguments.get("fires_at") in_seconds = arguments.get("in_seconds") if not fires_at_iso and in_seconds is not None: from datetime import datetime as _dt, timezone as _tz, timedelta as _td try: secs = int(in_seconds) except (TypeError, ValueError): return "FEHLER: in_seconds muss eine ganze Zahl sein." if secs < 1: return "FEHLER: in_seconds muss >= 1 sein." fires_at_iso = (_dt.now(_tz.utc) + _td(seconds=secs)).isoformat(timespec="seconds") if not fires_at_iso: return "FEHLER: entweder `in_seconds` ODER `fires_at` muss gesetzt sein." t = triggers_mod.create_timer( name=arguments["name"], fires_at_iso=fires_at_iso, message=arguments["message"], author="aria", ) self._pending_events.append({ "type": "trigger_created", "trigger": {"name": t["name"], "type": "timer", "fires_at": t["fires_at"], "message": t["message"]}, }) return f"OK — Timer '{t['name']}' angelegt, feuert um {t['fires_at']}." if name == "trigger_watcher": t = triggers_mod.create_watcher( name=arguments["name"], condition=arguments["condition"], message=arguments["message"], check_interval_sec=int(arguments.get("check_interval_sec", 300)), throttle_sec=int(arguments.get("throttle_sec", 3600)), author="aria", ) self._pending_events.append({ "type": "trigger_created", "trigger": {"name": t["name"], "type": "watcher", "condition": t["condition"], "message": t["message"]}, }) return f"OK — Watcher '{t['name']}' angelegt: feuert wenn '{t['condition']}'." if name == "trigger_cancel": try: triggers_mod.delete(arguments["name"]) return f"OK — Trigger '{arguments['name']}' geloescht." except ValueError as e: return f"FEHLER: {e}" if name == "request_location_tracking": on = bool(arguments.get("on", False)) reason = (arguments.get("reason") or "").strip() self._pending_events.append({ "type": "location_tracking", "on": on, "reason": reason, }) return f"OK — Tracking-Request gesendet (on={on}). App wird in Kuerze umschalten." if name == "trigger_list": items = triggers_mod.list_triggers(active_only=False) if not items: return "(keine Trigger vorhanden)" lines = [] for t in items: state = "aktiv" if t.get("active", True) else "DEAKTIVIERT" if t["type"] == "timer": lines.append(f"- {t['name']} (timer, {state}): feuert {t.get('fires_at')} — \"{t.get('message','')[:50]}\"") elif t["type"] == "watcher": lines.append(f"- {t['name']} (watcher, {state}): cond=\"{t.get('condition')}\", throttle={t.get('throttle_sec')}s") else: lines.append(f"- {t['name']} ({t['type']}, {state})") return "\n".join(lines) return f"Unbekanntes Tool: {name}" except Exception as exc: logger.exception("Tool '%s' fehlgeschlagen", name) return f"FEHLER: {exc}" # ── Memory-Destillat (laeuft im Hintergrund) ────────────── def distill_old_turns(self) -> dict: """Nimmt die N aeltesten Turns und destilliert sie zu fact-Memories. Pattern: separater Claude-Call, lieferte 3-7 JSON-Facts, die als type=fact, source=distilled gespeichert werden. Erfolgreiches Schreiben → Turns aus dem Window entfernen. """ if not self.conversation.needs_distill(): return {"distilled": 0, "reason": "kein Bedarf"} old_turns = self.conversation.take_oldest_for_distill() if not old_turns: return {"distilled": 0, "reason": "keine alten Turns"} # Konversation als Klartext bauen transcript = "\n".join( f"[{t.role.upper()}] {t.content}" for t in old_turns )[:30000] # Cap auf 30k Zeichen damit der Prompt nicht explodiert system = ( "Du extrahierst aus einer Konversation zwischen Stefan und ARIA die " "wichtigsten dauerhaft relevanten Fakten — keine Smalltalk-Details, " "keine flüchtigen Zustände. Antworte AUSSCHLIESSLICH mit gültigem JSON " "im Format: {\"facts\": [{\"title\": \"kurz, max 80 Zeichen\", " "\"content\": \"1-3 Sätze, konkret und nützlich\"}]}. " "Mindestens 0, höchstens 7 Facts. Wenn nichts wichtig genug ist: leeres Array." ) user = ( "Hier ist der Konversations-Abschnitt:\n\n" f"{transcript}\n\n" "Extrahiere die wichtigsten Fakten als JSON." ) try: raw = self.proxy.chat([ ProxyMessage(role="system", content=system), ProxyMessage(role="user", content=user), ]) except Exception as exc: logger.warning("Destillat-Call fehlgeschlagen: %s — Turns bleiben", exc) return {"distilled": 0, "error": str(exc)} facts = self._parse_facts(raw) if facts is None: logger.warning("Destillat lieferte unparsbares JSON: %r", raw[:200]) return {"distilled": 0, "error": "JSON parse failed", "raw": raw[:200]} # Facts in die DB schreiben created = 0 for f in facts: content = (f.get("content") or "").strip() if not content: continue title = (f.get("title") or "").strip()[:120] or "Fakt" point = MemoryPoint( id="", type="fact", title=title, content=content, pinned=False, category="konversation", source="distilled", tags=[], ) try: vec = self.embedder.embed(content) self.store.upsert(point, vec) created += 1 except Exception as exc: logger.warning("Fakt schreiben fehlgeschlagen: %s", exc) # Erst nach erfolgreichem Schreiben aus dem Window entfernen last_ts = old_turns[-1].ts self.conversation.commit_distill(last_ts) logger.info("Destillat: %d Facts geschrieben, %d Turns aus Window entfernt", created, len(old_turns)) return {"distilled": created, "removed_turns": len(old_turns)} @staticmethod def _parse_facts(raw: str) -> Optional[list]: if not raw: return None # JSON robust extrahieren — Claude kann Code-Fences setzen cleaned = raw.strip() if cleaned.startswith("```"): # ```json oder ``` rauswerfen cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned[3:] if cleaned.endswith("```"): cleaned = cleaned[: -3] cleaned = cleaned.strip() # Erstes { bis letztes } start = cleaned.find("{") end = cleaned.rfind("}") if start == -1 or end == -1 or end < start: return None try: obj = json.loads(cleaned[start: end + 1]) except Exception: return None facts = obj.get("facts") if isinstance(obj, dict) else None if not isinstance(facts, list): return None return facts