Files
ARIA-AGENT/aria-brain/agent.py
T
duffyduck 8c74b3fed8 fix(brain): Timer in 2min funktioniert wieder — Zeit im Prompt + in_seconds-Param
ARIA wusste nicht wieviel Uhr es ist (kein Bash, kein Time-Tool, kein
Timestamp im System-Prompt) und konnte fires_at als ISO-UTC schlicht
nicht ausrechnen. Zwei Fixes:

1. prompts.py: build_time_section() injiziert UTC + lokale Zeit
   (Europa/Berlin Sommer/Winter-Heuristik) als '## Aktuelle Zeit'-Block
   oben in den System-Prompt. Hilft auch beim Einordnen von
   Watcher-Conditions wie hour_of_day == 8.

2. agent.py trigger_timer-Tool: neuer Parameter `in_seconds` als
   Alternative zu fires_at. Bei relativen Angaben ('in 2 Minuten')
   rechnet jetzt der Server den absoluten Timestamp aus — keine
   Rechnerei in der LLM noetig. fires_at bleibt fuer feste Termine.
   required nur noch name + message.

Diagnostic-API (/triggers/timer) bleibt absolute-only, da der Browser
selbst datetime hat.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 01:21:23 +02:00

580 lines
25 KiB
Python

"""
Conversation-Loop. Eine Anfrage von Stefan, eine Antwort von ARIA.
Pro Turn:
1. user-Turn an die laufende Conversation appenden
2. Hot Memory holen (alle pinned Punkte)
3. Cold Memory holen (Top-K semantisch zur user-Nachricht)
4. System-Prompt aus Hot+Cold bauen
5. Messages = [system, *window, user]
6. Claude via Proxy aufrufen
7. Assistant-Reply in Conversation appenden + zurueckgeben
Memory-Destillat laeuft asynchron NACH dem Reply, gesteuert vom
/chat-Endpoint ueber BackgroundTasks.
"""
from __future__ import annotations
import json
import logging
from typing import Optional
from conversation import Conversation, Turn
from memory import Embedder, VectorStore, MemoryPoint
from prompts import build_system_prompt
from proxy_client import ProxyClient, Message as ProxyMessage
import skills as skills_mod
import triggers as triggers_mod
import watcher as watcher_mod
logger = logging.getLogger(__name__)
# Meta-Tool: ARIA kann selbst neue Skills bauen
META_TOOLS = [
{
"type": "function",
"function": {
"name": "skill_create",
"description": (
"Erstelle einen neuen Skill (wiederverwendbare Faehigkeit). "
"Skills sind IMMER Python — jeder Skill bekommt seine eigene venv "
"mit den pip_packages die er braucht.\n\n"
"HARTE REGEL — IMMER Skill anlegen wenn: die Loesung erfordert eine "
"pip-Library. Sonst muesste der Install bei jedem Container-Restart "
"neu laufen (Brain hat keinen persistenten State ausser /data/skills/).\n\n"
"Sonst NUR wenn ALLE Kriterien erfuellt sind:\n"
" 1) wiederkehrend (Aufgabe kommt realistisch nochmal),\n"
" 2) nicht-trivial (mehrere Schritte),\n"
" 3) parametrisierbar (nimmt Eingaben, gibt Ergebnis),\n"
" 4) wiederverwendbar als ganzes Paket.\n"
"NICHT fuer einzelne Shell-Befehle (date, hostname, ls etc.) und "
"nicht fuer Einmal-Faelle. Stefan kann Skill-Erstellung explizit "
"triggern (\"bau daraus einen Skill\").\n\n"
"Wenn etwas nur via apt-Paket geht — Stefan fragen ob es ins "
"Brain-Dockerfile soll, NICHT als Skill bauen."
),
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "kurz, kebab-case, a-z 0-9 - _"},
"description": {"type": "string", "description": "Was kann der Skill? 1 Satz."},
"entry_code": {
"type": "string",
"description": (
"Python-Code. Args lesen via os.environ['ARG_NAME']. "
"Resultat per print() (stdout) zurueck. Bei Fehler: "
"non-zero exit (sys.exit(1) o.ae.)."
),
},
"readme": {"type": "string", "description": "Markdown — was macht der Skill, Beispiel-Aufrufe"},
"pip_packages": {
"type": "array",
"items": {"type": "string"},
"description": "pip-Pakete die in der venv installiert werden (z.B. requests, yt-dlp, pypdf)",
},
"args": {
"type": "array",
"items": {"type": "object"},
"description": "Argumente-Schema [{name, type, required, description}]",
},
},
"required": ["name", "description", "entry_code"],
},
},
},
{
"type": "function",
"function": {
"name": "skill_list",
"description": "Zeigt alle Skills (inkl. deaktivierte). Sollte selten noetig sein — die Liste steht eh im System-Prompt.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "trigger_timer",
"description": (
"Lege einen Timer-Trigger an — feuert EINMALIG und ruft dich dann selbst auf "
"(Push-Nachricht an Stefan). Use-Case: 'erinnere mich in 10min', "
"'sag mir um 14:30 Bescheid'. Genau EINES von `in_seconds` ODER `fires_at` "
"muss gesetzt sein."
),
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "kurzer kebab-case-Name, a-z 0-9 - _"},
"in_seconds": {
"type": "integer",
"description": (
"Relativ ab jetzt in Sekunden. Bevorzugt bei Angaben wie "
"'in 2 Minuten' (=120), 'in 1 Stunde' (=3600). "
"Server berechnet daraus den absoluten Feuer-Zeitpunkt."
),
},
"fires_at": {
"type": "string",
"description": (
"Absoluter ISO-Timestamp UTC fuer feste Termine, z.B. "
"'2026-05-12T14:30:00Z'. Die aktuelle Zeit findest du im "
"System-Prompt unter '## Aktuelle Zeit'. Fuer relative Angaben "
"lieber `in_seconds` nutzen."
),
},
"message": {"type": "string", "description": "Was soll bei der Erinnerung gesagt werden"},
},
"required": ["name", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_watcher",
"description": (
"Lege einen Watcher-Trigger an — pollt alle paar Minuten eine Condition, "
"feuert wenn sie wahr wird (mit Throttle damit's nicht spammt). "
"Use-Case: 'sag bescheid wenn Disk unter 5GB', 'pingt mich wenn um 8 Uhr'. "
"Welche Variablen verfuegbar sind und ihre Bedeutung steht im System-Prompt."
),
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "kurzer Name"},
"condition": {
"type": "string",
"description": (
"Boolescher Ausdruck mit den erlaubten Variablen, z.B. "
"'disk_free_gb < 5', 'hour_of_day == 8 and day_of_week == \"mon\"'. "
"Operatoren: < > <= >= == != and or not"
),
},
"message": {"type": "string", "description": "Was soll bei Erfuellung gesagt werden"},
"check_interval_sec": {
"type": "integer",
"description": "Wie oft Condition pruefen (Default 300 = alle 5min, min 30)",
},
"throttle_sec": {
"type": "integer",
"description": "Mindestabstand zwischen 2 Feuerungen (Default 3600 = max 1x/h)",
},
},
"required": ["name", "condition", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_cancel",
"description": "Loescht einen Trigger (Timer abbrechen oder Watcher entfernen).",
"parameters": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
},
{
"type": "function",
"function": {
"name": "trigger_list",
"description": "Zeigt alle Trigger (active + inaktiv). Selten noetig — Stefan sieht sie im Diagnostic.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "request_location_tracking",
"description": (
"Bittet die App, das kontinuierliche GPS-Tracking zu aktivieren oder zu "
"deaktivieren. Default ist AUS (Akku-Schutz). Nutze das wenn du einen "
"GPS-basierten Watcher anlegst (z.B. `near(...)`), sonst hat die App "
"veraltete Position und der Watcher feuert nie. Auch wieder ausschalten "
"wenn der letzte GPS-Watcher geloescht wurde."
),
"parameters": {
"type": "object",
"properties": {
"on": {"type": "boolean", "description": "true = Tracking an, false = aus"},
"reason": {"type": "string", "description": "Kurzer Grund (wird in App-Notification angezeigt)"},
},
"required": ["on"],
},
},
},
]
def _skill_to_tool(s: dict) -> dict:
"""Mappt einen Skill auf ein OpenAI-Function-Tool."""
args = s.get("args") or []
props = {}
required = []
for a in args:
if not isinstance(a, dict):
continue
name = a.get("name") or ""
if not name:
continue
props[name] = {
"type": a.get("type", "string"),
"description": a.get("description", ""),
}
if a.get("required"):
required.append(name)
return {
"type": "function",
"function": {
"name": f"run_{s['name']}",
"description": s.get("description", "(ohne Beschreibung)"),
"parameters": {
"type": "object",
"properties": props,
"required": required,
},
},
}
class Agent:
def __init__(self, store: VectorStore, embedder: Embedder,
conversation: Conversation, proxy: ProxyClient,
cold_k: int = 5):
self.store = store
self.embedder = embedder
self.conversation = conversation
self.proxy = proxy
self.cold_k = cold_k
# Side-Channel-Events die im Turn entstehen (z.B. skill_create).
# Werden vom /chat-Endpoint in der Response mitgeschickt, damit
# Stefan in der App und Diagnostic eine sichtbare Bubble bekommt.
self._pending_events: list[dict] = []
def pop_events(self) -> list[dict]:
"""Holt die Events des letzten chat()-Calls und leert die Liste."""
events = self._pending_events
self._pending_events = []
return events
# ── Hauptpfad: ein User-Turn → Tool-Loop → finaler Reply ──
MAX_TOOL_ITERATIONS = 8 # Schutz vor Endlos-Loops
def chat(self, user_message: str, source: str = "") -> str:
user_message = (user_message or "").strip()
if not user_message:
raise ValueError("Leere Nachricht")
# Events vom letzten Turn weglassen
self._pending_events = []
# 1. User-Turn an die Konversation
self.conversation.add("user", user_message, source=source)
# 2. Hot Memory (alle pinned Punkte)
hot = self.store.list_pinned()
# 3. Cold Memory (Top-K semantic)
try:
qvec = self.embedder.embed(user_message)
cold = self.store.search(qvec, k=self.cold_k, exclude_pinned=True)
except Exception as exc:
logger.warning("Cold-Search fehlgeschlagen: %s", exc)
cold = []
# 4. Aktive Skills holen + Tool-Liste bauen
all_skills = skills_mod.list_skills(active_only=False)
active_skills = [s for s in all_skills if s.get("active", True)]
tools = list(META_TOOLS) + [_skill_to_tool(s) for s in active_skills]
# Trigger-Liste + Variablen-Info fuer den System-Prompt
all_triggers = triggers_mod.list_triggers(active_only=False)
condition_vars = watcher_mod.describe_variables()
condition_funcs = watcher_mod.describe_functions()
# 5. System-Prompt + Window-Messages
system_prompt = build_system_prompt(hot, cold, skills=all_skills,
triggers=all_triggers,
condition_vars=condition_vars,
condition_funcs=condition_funcs)
messages = [ProxyMessage(role="system", content=system_prompt)]
for t in self.conversation.window():
messages.append(ProxyMessage(role=t.role, content=t.content))
logger.info("chat: pinned=%d cold=%d skills=%d/%d window=%d prompt_chars=%d",
len(hot), len(cold), len(active_skills), len(all_skills),
len(self.conversation.window()), len(system_prompt))
# 6. Tool-Use-Loop
final_reply = ""
for iteration in range(self.MAX_TOOL_ITERATIONS):
result = self.proxy.chat_full(messages, tools=tools)
if result.tool_calls:
# Assistant-Turn mit tool_calls in messages anhaengen (nicht in Conversation!)
messages.append(ProxyMessage(
role="assistant",
content=result.content or None,
tool_calls=[{
"id": tc["id"], "type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(tc["arguments"])},
} for tc in result.tool_calls],
))
# Tools ausfuehren + Ergebnis als role=tool zurueck
for tc in result.tool_calls:
tool_result = self._dispatch_tool(tc["name"], tc["arguments"])
messages.append(ProxyMessage(
role="tool",
tool_call_id=tc["id"],
name=tc["name"],
content=tool_result[:8000],
))
continue # next iteration mit Tool-Results
# Kein Tool-Call mehr → final reply
final_reply = (result.content or "").strip()
break
else:
# Loop-Limit erreicht
final_reply = "[Tool-Loop-Limit erreicht — ARIA hat zu viele Tool-Calls gemacht ohne fertig zu werden]"
logger.warning("Tool-Loop hit MAX_TOOL_ITERATIONS=%d", self.MAX_TOOL_ITERATIONS)
if not final_reply:
raise RuntimeError("Leerer Reply vom Proxy")
# 7. Assistant-Turn (final reply) in die Conversation
self.conversation.add("assistant", final_reply)
return final_reply
# ── Tool-Dispatcher ───────────────────────────────────────
def _dispatch_tool(self, name: str, arguments: dict) -> str:
"""Fuehrt einen Tool-Call aus und gibt ein kurzes Text-Resultat zurueck.
Niemals werfen — Fehler werden als Text-Resultat reportet damit Claude
weitermachen kann."""
try:
if name == "skill_create":
# ARIA-Skills sind immer Python — execution ist nicht mehr im Schema
manifest = skills_mod.create_skill(
name=arguments["name"],
description=arguments["description"],
execution="local-venv",
entry_code=arguments["entry_code"],
readme=arguments.get("readme", ""),
args=arguments.get("args", []),
pip_packages=arguments.get("pip_packages", []),
author="aria",
)
# Side-Channel-Event: Stefan soll sehen wenn ARIA was anlegt
self._pending_events.append({
"type": "skill_created",
"skill": {
"name": manifest["name"],
"description": manifest.get("description", ""),
"execution": manifest.get("execution", ""),
"active": manifest.get("active", True),
"setup_error": manifest.get("setup_error"),
},
})
return f"OK — Skill '{manifest['name']}' erstellt (active={manifest['active']})."
if name == "skill_list":
items = skills_mod.list_skills(active_only=False)
if not items:
return "(keine Skills vorhanden)"
return "\n".join(
f"- {s['name']} ({s['execution']}) {'aktiv' if s.get('active', True) else 'DEAKTIVIERT'}: {s.get('description', '')}"
for s in items
)
if name.startswith("run_"):
skill_name = name[len("run_"):]
res = skills_mod.run_skill(skill_name, args=arguments)
snippet = (res.get("stdout") or "")[:2000] or "(kein stdout)"
err = (res.get("stderr") or "")[:500]
marker = "OK" if res["ok"] else f"FEHLER (exit={res['exit_code']})"
out = f"{marker} · {res['duration_sec']}s\nstdout:\n{snippet}"
if err:
out += f"\nstderr:\n{err}"
return out
if name == "trigger_timer":
fires_at_iso = arguments.get("fires_at")
in_seconds = arguments.get("in_seconds")
if not fires_at_iso and in_seconds is not None:
from datetime import datetime as _dt, timezone as _tz, timedelta as _td
try:
secs = int(in_seconds)
except (TypeError, ValueError):
return "FEHLER: in_seconds muss eine ganze Zahl sein."
if secs < 1:
return "FEHLER: in_seconds muss >= 1 sein."
fires_at_iso = (_dt.now(_tz.utc) + _td(seconds=secs)).isoformat(timespec="seconds")
if not fires_at_iso:
return "FEHLER: entweder `in_seconds` ODER `fires_at` muss gesetzt sein."
t = triggers_mod.create_timer(
name=arguments["name"],
fires_at_iso=fires_at_iso,
message=arguments["message"],
author="aria",
)
self._pending_events.append({
"type": "trigger_created",
"trigger": {"name": t["name"], "type": "timer",
"fires_at": t["fires_at"], "message": t["message"]},
})
return f"OK — Timer '{t['name']}' angelegt, feuert um {t['fires_at']}."
if name == "trigger_watcher":
t = triggers_mod.create_watcher(
name=arguments["name"],
condition=arguments["condition"],
message=arguments["message"],
check_interval_sec=int(arguments.get("check_interval_sec", 300)),
throttle_sec=int(arguments.get("throttle_sec", 3600)),
author="aria",
)
self._pending_events.append({
"type": "trigger_created",
"trigger": {"name": t["name"], "type": "watcher",
"condition": t["condition"], "message": t["message"]},
})
return f"OK — Watcher '{t['name']}' angelegt: feuert wenn '{t['condition']}'."
if name == "trigger_cancel":
try:
triggers_mod.delete(arguments["name"])
return f"OK — Trigger '{arguments['name']}' geloescht."
except ValueError as e:
return f"FEHLER: {e}"
if name == "request_location_tracking":
on = bool(arguments.get("on", False))
reason = (arguments.get("reason") or "").strip()
self._pending_events.append({
"type": "location_tracking",
"on": on,
"reason": reason,
})
return f"OK — Tracking-Request gesendet (on={on}). App wird in Kuerze umschalten."
if name == "trigger_list":
items = triggers_mod.list_triggers(active_only=False)
if not items:
return "(keine Trigger vorhanden)"
lines = []
for t in items:
state = "aktiv" if t.get("active", True) else "DEAKTIVIERT"
if t["type"] == "timer":
lines.append(f"- {t['name']} (timer, {state}): feuert {t.get('fires_at')}\"{t.get('message','')[:50]}\"")
elif t["type"] == "watcher":
lines.append(f"- {t['name']} (watcher, {state}): cond=\"{t.get('condition')}\", throttle={t.get('throttle_sec')}s")
else:
lines.append(f"- {t['name']} ({t['type']}, {state})")
return "\n".join(lines)
return f"Unbekanntes Tool: {name}"
except Exception as exc:
logger.exception("Tool '%s' fehlgeschlagen", name)
return f"FEHLER: {exc}"
# ── Memory-Destillat (laeuft im Hintergrund) ──────────────
def distill_old_turns(self) -> dict:
"""Nimmt die N aeltesten Turns und destilliert sie zu fact-Memories.
Pattern: separater Claude-Call, lieferte 3-7 JSON-Facts, die als
type=fact, source=distilled gespeichert werden. Erfolgreiches
Schreiben → Turns aus dem Window entfernen.
"""
if not self.conversation.needs_distill():
return {"distilled": 0, "reason": "kein Bedarf"}
old_turns = self.conversation.take_oldest_for_distill()
if not old_turns:
return {"distilled": 0, "reason": "keine alten Turns"}
# Konversation als Klartext bauen
transcript = "\n".join(
f"[{t.role.upper()}] {t.content}" for t in old_turns
)[:30000] # Cap auf 30k Zeichen damit der Prompt nicht explodiert
system = (
"Du extrahierst aus einer Konversation zwischen Stefan und ARIA die "
"wichtigsten dauerhaft relevanten Fakten — keine Smalltalk-Details, "
"keine flüchtigen Zustände. Antworte AUSSCHLIESSLICH mit gültigem JSON "
"im Format: {\"facts\": [{\"title\": \"kurz, max 80 Zeichen\", "
"\"content\": \"1-3 Sätze, konkret und nützlich\"}]}. "
"Mindestens 0, höchstens 7 Facts. Wenn nichts wichtig genug ist: leeres Array."
)
user = (
"Hier ist der Konversations-Abschnitt:\n\n"
f"{transcript}\n\n"
"Extrahiere die wichtigsten Fakten als JSON."
)
try:
raw = self.proxy.chat([
ProxyMessage(role="system", content=system),
ProxyMessage(role="user", content=user),
])
except Exception as exc:
logger.warning("Destillat-Call fehlgeschlagen: %s — Turns bleiben", exc)
return {"distilled": 0, "error": str(exc)}
facts = self._parse_facts(raw)
if facts is None:
logger.warning("Destillat lieferte unparsbares JSON: %r", raw[:200])
return {"distilled": 0, "error": "JSON parse failed", "raw": raw[:200]}
# Facts in die DB schreiben
created = 0
for f in facts:
content = (f.get("content") or "").strip()
if not content:
continue
title = (f.get("title") or "").strip()[:120] or "Fakt"
point = MemoryPoint(
id="",
type="fact",
title=title,
content=content,
pinned=False,
category="konversation",
source="distilled",
tags=[],
)
try:
vec = self.embedder.embed(content)
self.store.upsert(point, vec)
created += 1
except Exception as exc:
logger.warning("Fakt schreiben fehlgeschlagen: %s", exc)
# Erst nach erfolgreichem Schreiben aus dem Window entfernen
last_ts = old_turns[-1].ts
self.conversation.commit_distill(last_ts)
logger.info("Destillat: %d Facts geschrieben, %d Turns aus Window entfernt",
created, len(old_turns))
return {"distilled": created, "removed_turns": len(old_turns)}
@staticmethod
def _parse_facts(raw: str) -> Optional[list]:
if not raw:
return None
# JSON robust extrahieren — Claude kann Code-Fences setzen
cleaned = raw.strip()
if cleaned.startswith("```"):
# ```json oder ``` rauswerfen
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[: -3]
cleaned = cleaned.strip()
# Erstes { bis letztes }
start = cleaned.find("{")
end = cleaned.rfind("}")
if start == -1 or end == -1 or end < start:
return None
try:
obj = json.loads(cleaned[start: end + 1])
except Exception:
return None
facts = obj.get("facts") if isinstance(obj, dict) else None
if not isinstance(facts, list):
return None
return facts