Files
ARIA-AGENT/aria-brain/main.py
T
duffyduck 31aa86a2a9 feat(brain+ui+app): Triggers — passive Aufweck-Quellen fuer ARIA
ARIA hatte bisher nur ein "User fragt → Brain antwortet"-Modell. Neu:
Trigger laufen passiv im Hintergrund (kein LLM-Call) und wecken ARIA
nur dann auf wenn ein Event tatsaechlich passiert.

Drei Typen, zwei aktuell implementiert:
  timer   — einmalig zu festem ISO-Timestamp ("erinner mich in 10min")
  watcher — Polling alle N Sek einer Condition, feuert bei True mit Throttle
            (z.B. "disk_free_gb < 5", max 1x/h)
  cron    — Platzhalter fuer spaeter

aria-brain/triggers.py
  CRUD auf /data/triggers/<name>.json + /data/triggers/logs/<name>.jsonl.
  create_timer, create_watcher, mark_fired, list_logs, etc.

aria-brain/watcher.py
  Built-in Condition-Variablen: disk_free_gb, disk_free_pct, uptime_sec,
  hour_of_day, day_of_week, rvs_connected, memory_count.
  Sicherer Condition-Parser via ast — Whitelist auf Vergleich + BoolOp +
  Name + Const. Kein eval, kein exec, keine Builtins.

aria-brain/background.py
  Async Loop laeuft alle 30s, sammelt einmalig Variables, geht durch
  Trigger-Liste, _should_fire-Check (Timer: fires_at vergangen / Watcher:
  check_interval + throttle respektiert + condition true). Fire ruft
  agent.chat(prompt, source="trigger") — ARIA bekommt das wie eine
  Push-Nachricht und antwortet via Bridge → RVS → App.

aria-brain/main.py
  /triggers/list, /{name}, /{name}/logs, /timer, /watcher, PATCH, DELETE,
  /triggers/conditions (Variablen + aktuelle Werte). Lifespan-Handler
  startet den Background-Loop beim Container-Start, stoppt beim Shutdown.

aria-brain/agent.py
  Meta-Tools fuer ARIA: trigger_timer, trigger_watcher, trigger_cancel,
  trigger_list. ARIA legt Trigger via Tool-Call selbst an wenn Stefan das
  wuenscht. Side-Channel-Event 'trigger_created' wird in chat-Response
  mitgeschickt damit App + Diagnostic eine Bubble zeigen.

aria-brain/prompts.py
  Neue System-Prompt-Section: Liste aktiver Triggers + verfuegbare
  Condition-Variablen mit aktuellen Werten + Operatoren-Erklaerung.
  ARIA weiss damit immer was es schon gibt und welche Vars sie nutzen kann.

bridge/aria_bridge.py + rvs/server.js
  trigger_created als neuer RVS-Message-Type, Bridge forwarded das aus
  data.events analog zu skill_created.

diagnostic/index.html
  Neuer Top-Tab "Trigger". Liste mit Type-Badges (⏱ TIMER / 👁 WATCHER),
  Status, Fire-Count, last_fired. Aktivieren/Deaktivieren + Löschen pro
  Trigger. "+ Neu"-Modal mit Type-Dropdown, Timer-Minuten oder
  Watcher-Condition + Vars-Anzeige + Throttle. Info-Modal-Eintrag mit
  Erklaerung. Live-Bubble im Chat wenn ARIA selbst einen anlegt.

android/src/screens/ChatScreen.tsx
  trigger_created RVS-Handler → eigene Bubble (gelber Border, " ARIA
  hat einen Trigger angelegt", Type/Detail/Message/Zeit). ChatMessage
  bekam triggerCreated-Feld. Lokal-only-Schutz beim Server-Sync analog
  zu skill_created.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 00:38:58 +02:00

648 lines
19 KiB
Python

"""
ARIA Brain — FastAPI-Einstieg.
Phase B Punkt 1: nur Skeleton.
- /health → Liveness
- /memory/list → alle Punkte (gefiltert)
- /memory/pinned → Hot Memory
- /memory/search?q=...&k=5 → semantische Suche
- /memory/save → neuen Punkt anlegen
- /memory/update/{id} → Punkt aendern (re-embed wenn content geaendert)
- /memory/delete/{id} → Punkt loeschen
- /memory/stats → Anzahl Punkte pro Type
/chat (Conversation-Loop) und /skills/* kommen in spaeteren Phasen.
"""
from __future__ import annotations
import logging
import os
from typing import List, Optional
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import Response
from pydantic import BaseModel, Field
from memory import Embedder, VectorStore, MemoryPoint
from conversation import Conversation
from proxy_client import ProxyClient
from agent import Agent
import skills as skills_mod
import metrics as metrics_mod
import triggers as triggers_mod
import watcher as watcher_mod
import background as background_mod
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("aria-brain")
QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Beim Brain-Start: Trigger-Background-Loop anwerfen. Beim Shutdown: stoppen."""
task = asyncio.create_task(background_mod.run_loop(agent))
logger.info("Lifespan: Trigger-Loop gestartet")
try:
yield
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info("Lifespan: Trigger-Loop gestoppt")
app = FastAPI(title="ARIA Brain", version="0.1.0", lifespan=lifespan)
_embedder: Optional[Embedder] = None
_store: Optional[VectorStore] = None
_conversation: Optional[Conversation] = None
_proxy: Optional[ProxyClient] = None
_agent: Optional[Agent] = None
def embedder() -> Embedder:
global _embedder
if _embedder is None:
_embedder = Embedder()
return _embedder
def store() -> VectorStore:
global _store
if _store is None:
_store = VectorStore(host=QDRANT_HOST, port=QDRANT_PORT)
return _store
def conversation() -> Conversation:
global _conversation
if _conversation is None:
_conversation = Conversation()
return _conversation
def proxy_client() -> ProxyClient:
global _proxy
if _proxy is None:
_proxy = ProxyClient()
return _proxy
def agent() -> Agent:
global _agent
if _agent is None:
_agent = Agent(store(), embedder(), conversation(), proxy_client())
return _agent
# ─── Pydantic-Schemas ─────────────────────────────────────────────────
class MemoryIn(BaseModel):
type: str = Field(..., description="identity|rule|preference|tool|skill|fact|conversation|reminder")
title: str
content: str
pinned: bool = False
category: str = ""
source: str = "manual"
tags: List[str] = Field(default_factory=list)
conversation_id: Optional[str] = None
class MemoryUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
pinned: Optional[bool] = None
category: Optional[str] = None
tags: Optional[List[str]] = None
class MemoryOut(BaseModel):
id: str
type: str
title: str
content: str
pinned: bool
category: str
source: str
tags: List[str]
created_at: str
updated_at: str
conversation_id: Optional[str] = None
score: Optional[float] = None
@classmethod
def from_point(cls, p: MemoryPoint) -> "MemoryOut":
return cls(**p.__dict__)
# ─── Health ───────────────────────────────────────────────────────────
@app.get("/health")
def health():
try:
n = store().count()
return {"status": "ok", "memory_count": n, "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
except Exception as exc:
return {"status": "degraded", "error": str(exc), "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
# ─── Memory-Endpoints ─────────────────────────────────────────────────
@app.get("/memory/stats")
def memory_stats():
s = store()
points = s.list_all()
by_type = {}
pinned = 0
for p in points:
by_type[p.type] = by_type.get(p.type, 0) + 1
if p.pinned:
pinned += 1
return {"total": len(points), "pinned": pinned, "by_type": by_type}
@app.get("/memory/list", response_model=List[MemoryOut])
def memory_list(type: Optional[str] = None, limit: int = 200):
s = store()
points = s.list_by_type(type, limit=limit) if type else s.list_all(limit=limit)
return [MemoryOut.from_point(p) for p in points]
@app.get("/memory/pinned", response_model=List[MemoryOut])
def memory_pinned():
return [MemoryOut.from_point(p) for p in store().list_pinned()]
@app.get("/memory/search", response_model=List[MemoryOut])
def memory_search(q: str, k: int = 5, type: Optional[str] = None, include_pinned: bool = False):
vec = embedder().embed(q)
points = store().search(vec, k=k, type_filter=type, exclude_pinned=not include_pinned)
return [MemoryOut.from_point(p) for p in points]
@app.post("/memory/save", response_model=MemoryOut)
def memory_save(body: MemoryIn):
s = store()
vec = embedder().embed(body.content)
point = MemoryPoint(
id="",
type=body.type,
title=body.title,
content=body.content,
pinned=body.pinned,
category=body.category,
source=body.source,
tags=body.tags,
conversation_id=body.conversation_id,
)
pid = s.upsert(point, vec)
saved = s.get(pid)
return MemoryOut.from_point(saved)
@app.patch("/memory/update/{point_id}", response_model=MemoryOut)
def memory_update(point_id: str, body: MemoryUpdate):
s = store()
existing = s.get(point_id)
if not existing:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
content_changed = body.content is not None and body.content != existing.content
if body.title is not None:
existing.title = body.title
if body.content is not None:
existing.content = body.content
if body.pinned is not None:
existing.pinned = body.pinned
if body.category is not None:
existing.category = body.category
if body.tags is not None:
existing.tags = body.tags
vec = embedder().embed(existing.content) if content_changed else None
if vec is None:
# Vektor unveraendert lassen — nur Payload neu schreiben
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.set_payload(
collection_name=COLLECTION,
payload=existing.to_payload() | {"updated_at": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()},
points=[point_id],
)
saved = s.get(point_id)
else:
s.upsert(existing, vec)
saved = s.get(point_id)
return MemoryOut.from_point(saved)
@app.delete("/memory/delete/{point_id}")
def memory_delete(point_id: str):
s = store()
if not s.get(point_id):
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
s.delete(point_id)
return {"deleted": point_id}
# ─── Migration aus brain-import/ ──────────────────────────────────────
IMPORT_DIR = os.environ.get("IMPORT_DIR", "/import")
@app.post("/memory/migrate")
def memory_migrate():
"""Liest /import/*.md und schreibt atomare Memory-Punkte in die DB.
Idempotent: bei Re-Run werden Punkte mit gleicher migration_key ersetzt."""
from pathlib import Path
from migration import run_migration
s = store()
e = embedder()
result = run_migration(Path(IMPORT_DIR), s, e)
return result
@app.get("/memory/import-files")
def memory_import_files():
"""Listet was unter /import/ liegt — fuer die Diagnostic-UI."""
from pathlib import Path
d = Path(IMPORT_DIR)
if not d.exists():
return {"import_dir": str(d), "exists": False, "files": []}
out = []
for p in sorted(d.iterdir()):
if p.is_file():
try:
out.append({"name": p.name, "size": p.stat().st_size})
except Exception:
pass
return {"import_dir": str(d), "exists": True, "files": out}
# ─── Bootstrap-Snapshot ───────────────────────────────────────────────
# "Bootstrap" = alle pinned Memories. Export/Import zum schnellen
# Wiederherstellen einer schlanken ARIA nach Wipe.
@app.get("/memory/export-bootstrap")
def memory_export_bootstrap():
"""Gibt alle pinned Memories als JSON zurueck — fuer Browser-Download."""
s = store()
pinned = s.list_pinned()
return {
"version": 1,
"exported_at": __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(),
"count": len(pinned),
"memories": [
{
"type": p.type,
"title": p.title,
"content": p.content,
"pinned": True,
"category": p.category,
"source": p.source,
"tags": p.tags,
}
for p in pinned
],
}
class BootstrapBundle(BaseModel):
version: int = 1
memories: List[dict]
@app.post("/memory/import-bootstrap")
def memory_import_bootstrap(body: BootstrapBundle):
"""Loescht alle pinned Memories und importiert die im Bundle.
Cold Memory (unpinned) bleibt unangetastet.
Wenn keine Memories im Bundle: nur loeschen ist NICHT erlaubt — der
Caller soll erst exportieren und dann importieren.
"""
if not body.memories:
raise HTTPException(400, "Bundle hat keine memories — Abbruch zur Sicherheit")
s = store()
e = embedder()
# Alle aktuell pinned Punkte loeschen
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.delete(
collection_name=COLLECTION,
points_selector=qm.FilterSelector(filter=qm.Filter(must=[
qm.FieldCondition(key="pinned", match=qm.MatchValue(value=True))
])),
)
# Neue Punkte einspeisen
created = 0
for m in body.memories:
content = (m.get("content") or "").strip()
if not content:
continue
point = MemoryPoint(
id="",
type=m.get("type", "fact"),
title=m.get("title", "(ohne Titel)"),
content=content,
pinned=True,
category=m.get("category", ""),
source=m.get("source", "bootstrap-import"),
tags=list(m.get("tags", [])),
)
vec = e.embed(content)
s.upsert(point, vec)
created += 1
return {"created": created, "deleted_previous_pinned": True}
# ─── Conversation-Loop ──────────────────────────────────────────────
class ChatIn(BaseModel):
message: str
source: str = "" # "app" / "diagnostic" / "stt" — optional
class ChatOut(BaseModel):
reply: str
turns: int
distilling: bool
events: list = Field(default_factory=list)
@app.post("/chat", response_model=ChatOut)
def chat(body: ChatIn, background: BackgroundTasks):
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
im Hintergrund nachdem die Response rausging."""
a = agent()
try:
reply = a.chat(body.message, source=body.source)
except ValueError as exc:
raise HTTPException(400, str(exc))
except RuntimeError as exc:
logger.error("chat fehlgeschlagen: %s", exc)
raise HTTPException(502, str(exc))
needs_distill = a.conversation.needs_distill()
if needs_distill:
background.add_task(a.distill_old_turns)
return ChatOut(
reply=reply,
turns=len(a.conversation.turns),
distilling=needs_distill,
events=a.pop_events(),
)
@app.get("/conversation/stats")
def conversation_stats():
return conversation().stats()
@app.post("/conversation/reset")
def conversation_reset():
"""Hardes Reset — der Rolling-Window-Verlauf wird komplett geleert.
Destillierte facts bleiben in der DB."""
conversation().reset()
return {"ok": True, "turns": 0}
@app.post("/conversation/distill")
def conversation_distill_now():
"""Manueller Trigger fuer Destillat — fuer Tests oder vor einem
bewussten Reset."""
return agent().distill_old_turns()
# ─── Call-Metrics (Token / Quota-Monitoring) ────────────────────────
@app.get("/metrics/calls")
def metrics_calls():
"""Liefert Aggregate fuer 1h / 5h / 24h / 30d.
Jedes Window: {window_seconds, calls, tokens_in, tokens_out, by_model}."""
return metrics_mod.stats()
# ─── Triggers (passive Aufweck-Quellen) ─────────────────────────────
class TriggerTimerBody(BaseModel):
name: str
fires_at: str # ISO timestamp
message: str
author: str = "stefan"
class TriggerWatcherBody(BaseModel):
name: str
condition: str
message: str
check_interval_sec: int = 300
throttle_sec: int = 3600
author: str = "stefan"
class TriggerPatch(BaseModel):
active: bool | None = None
message: str | None = None
condition: str | None = None
throttle_sec: int | None = None
check_interval_sec: int | None = None
fires_at: str | None = None
@app.get("/triggers/list")
def triggers_list(active_only: bool = False):
return {"triggers": triggers_mod.list_triggers(active_only=active_only)}
@app.get("/triggers/conditions")
def triggers_conditions():
"""Verfuegbare Variablen fuer Watcher-Conditions (mit aktuellen Werten)."""
return {
"variables": watcher_mod.describe_variables(),
"current": watcher_mod.collect_variables(),
}
@app.get("/triggers/{name}")
def triggers_get(name: str):
t = triggers_mod.read(name)
if t is None:
raise HTTPException(404, f"Trigger '{name}' nicht gefunden")
return t
@app.get("/triggers/{name}/logs")
def triggers_get_logs(name: str, limit: int = 50):
return {"logs": triggers_mod.list_logs(name, limit=limit)}
@app.post("/triggers/timer")
def triggers_create_timer(body: TriggerTimerBody):
try:
return triggers_mod.create_timer(
name=body.name, fires_at_iso=body.fires_at,
message=body.message, author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/triggers/watcher")
def triggers_create_watcher(body: TriggerWatcherBody):
try:
return triggers_mod.create_watcher(
name=body.name, condition=body.condition,
message=body.message,
check_interval_sec=body.check_interval_sec,
throttle_sec=body.throttle_sec,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/triggers/{name}")
def triggers_patch(name: str, body: TriggerPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return triggers_mod.update(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/triggers/{name}")
def triggers_delete(name: str):
try:
triggers_mod.delete(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
# ─── Skills ─────────────────────────────────────────────────────────
class SkillCreate(BaseModel):
name: str
description: str
execution: str # local-venv | local-bin | bash
entry_code: str
readme: str = ""
args: list = Field(default_factory=list)
requires: dict = Field(default_factory=dict)
pip_packages: list = Field(default_factory=list)
author: str = "stefan"
class SkillRun(BaseModel):
name: str
args: dict = Field(default_factory=dict)
timeout_sec: int = 300
class SkillPatch(BaseModel):
description: str | None = None
active: bool | None = None
args: list | None = None
@app.get("/skills/list")
def skills_list(active_only: bool = False):
return {"skills": skills_mod.list_skills(active_only=active_only)}
@app.get("/skills/{name}")
def skills_get(name: str):
m = skills_mod.read_manifest(name)
if m is None:
raise HTTPException(404, f"Skill '{name}' nicht gefunden")
readme = skills_mod.read_readme(name)
return {"manifest": m, "readme": readme}
@app.post("/skills/create")
def skills_create(body: SkillCreate):
try:
return skills_mod.create_skill(
name=body.name,
description=body.description,
execution=body.execution,
entry_code=body.entry_code,
readme=body.readme,
args=body.args,
requires=body.requires,
pip_packages=body.pip_packages,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/skills/run")
def skills_run(body: SkillRun):
try:
return skills_mod.run_skill(body.name, args=body.args, timeout_sec=body.timeout_sec)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/skills/{name}")
def skills_patch(name: str, body: SkillPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return skills_mod.update_skill(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/skills/{name}")
def skills_delete(name: str):
try:
skills_mod.delete_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
@app.get("/skills/{name}/logs")
def skills_logs(name: str, limit: int = 50):
return {"logs": skills_mod.list_logs(name, limit=limit)}
@app.get("/skills/{name}/export")
def skills_export(name: str):
try:
data = skills_mod.export_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return Response(
content=data,
media_type="application/gzip",
headers={"Content-Disposition": f'attachment; filename="skill-{name}.tar.gz"'},
)
@app.post("/skills/import")
async def skills_import(request: Request, overwrite: bool = False):
data = await request.body()
if not data:
raise HTTPException(400, "Leerer Body")
try:
manifest = skills_mod.import_skill(data, overwrite=overwrite)
except ValueError as exc:
raise HTTPException(400, str(exc))
return {"imported": manifest}