Files
ARIA-AGENT/aria-brain/main.py
T
duffyduck 6549fcbce8 feat(brain): Volltext-Suche zusaetzlich zu Semantic — Default ist jetzt Wortlich
Stefan wollte ne richtige Suche statt nur "klingt aehnlich". Beide
Modi sind jetzt verfuegbar, Default ist Volltext:

- 📝 Wortlich (Substring, case-insensitive ueber Title + Content +
  Category + Tags) — neuer Endpoint /memory/search-text. Full-Scan
  via Qdrant scroll, k=50. Findet "cessna" exakt im Content. Bei
  kleiner DB (<1000 Eintraege) unkritisch performant.

- 🧠 Semantisch (Embedder + score_threshold 0.30) — bestehender
  /memory/search Endpoint. Findet konzeptuell verwandte Eintraege.

Diagnostic UI: Dropdown neben dem Suchfeld zum Modus-Wechsel.
Info-Banner zeigt klar welcher Modus aktiv ist.

Warum Wortlich Default: bei kleiner DB liefert Semantic gern False
Positives mit Score 0.30-0.45 fuer komplett unverwandte Begriffe
(z.B. "cessna" matched "Tageslog fuehren" mit 0.43). Wortlich ist
deterministisch und vermeidet das Rauschen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 16:59:31 +02:00

704 lines
21 KiB
Python

"""
ARIA Brain — FastAPI-Einstieg.
Phase B Punkt 1: nur Skeleton.
- /health → Liveness
- /memory/list → alle Punkte (gefiltert)
- /memory/pinned → Hot Memory
- /memory/search?q=...&k=5 → semantische Suche
- /memory/save → neuen Punkt anlegen
- /memory/update/{id} → Punkt aendern (re-embed wenn content geaendert)
- /memory/delete/{id} → Punkt loeschen
- /memory/stats → Anzahl Punkte pro Type
/chat (Conversation-Loop) und /skills/* kommen in spaeteren Phasen.
"""
from __future__ import annotations
import logging
import os
from typing import List, Optional
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import Response
from pydantic import BaseModel, Field
from memory import Embedder, VectorStore, MemoryPoint
from conversation import Conversation
from proxy_client import ProxyClient
from agent import Agent
import skills as skills_mod
import metrics as metrics_mod
import triggers as triggers_mod
import watcher as watcher_mod
import background as background_mod
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("aria-brain")
QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Beim Brain-Start: Trigger-Background-Loop anwerfen. Beim Shutdown: stoppen."""
task = asyncio.create_task(background_mod.run_loop(agent))
logger.info("Lifespan: Trigger-Loop gestartet")
try:
yield
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info("Lifespan: Trigger-Loop gestoppt")
app = FastAPI(title="ARIA Brain", version="0.1.0", lifespan=lifespan)
_embedder: Optional[Embedder] = None
_store: Optional[VectorStore] = None
_conversation: Optional[Conversation] = None
_proxy: Optional[ProxyClient] = None
_agent: Optional[Agent] = None
def embedder() -> Embedder:
global _embedder
if _embedder is None:
_embedder = Embedder()
return _embedder
def store() -> VectorStore:
global _store
if _store is None:
_store = VectorStore(host=QDRANT_HOST, port=QDRANT_PORT)
return _store
def conversation() -> Conversation:
global _conversation
if _conversation is None:
_conversation = Conversation()
return _conversation
def proxy_client() -> ProxyClient:
global _proxy
if _proxy is None:
_proxy = ProxyClient()
return _proxy
def agent() -> Agent:
global _agent
if _agent is None:
_agent = Agent(store(), embedder(), conversation(), proxy_client())
return _agent
# ─── Pydantic-Schemas ─────────────────────────────────────────────────
class MemoryIn(BaseModel):
type: str = Field(..., description="identity|rule|preference|tool|skill|fact|conversation|reminder")
title: str
content: str
pinned: bool = False
category: str = ""
source: str = "manual"
tags: List[str] = Field(default_factory=list)
conversation_id: Optional[str] = None
class MemoryUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
pinned: Optional[bool] = None
category: Optional[str] = None
tags: Optional[List[str]] = None
class MemoryOut(BaseModel):
id: str
type: str
title: str
content: str
pinned: bool
category: str
source: str
tags: List[str]
created_at: str
updated_at: str
conversation_id: Optional[str] = None
score: Optional[float] = None
@classmethod
def from_point(cls, p: MemoryPoint) -> "MemoryOut":
return cls(**p.__dict__)
# ─── Health ───────────────────────────────────────────────────────────
@app.get("/health")
def health():
try:
n = store().count()
return {"status": "ok", "memory_count": n, "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
except Exception as exc:
return {"status": "degraded", "error": str(exc), "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
# ─── Memory-Endpoints ─────────────────────────────────────────────────
@app.get("/memory/stats")
def memory_stats():
s = store()
points = s.list_all()
by_type = {}
pinned = 0
for p in points:
by_type[p.type] = by_type.get(p.type, 0) + 1
if p.pinned:
pinned += 1
return {"total": len(points), "pinned": pinned, "by_type": by_type}
@app.get("/memory/list", response_model=List[MemoryOut])
def memory_list(type: Optional[str] = None, limit: int = 200):
s = store()
points = s.list_by_type(type, limit=limit) if type else s.list_all(limit=limit)
return [MemoryOut.from_point(p) for p in points]
@app.get("/memory/pinned", response_model=List[MemoryOut])
def memory_pinned():
return [MemoryOut.from_point(p) for p in store().list_pinned()]
@app.get("/memory/search-text", response_model=List[MemoryOut])
def memory_search_text(
q: str,
k: int = 50,
type: Optional[str] = None,
include_pinned: bool = True,
):
"""Volltext-Substring-Suche (case-insensitive) ueber Title + Content +
Category + Tags. Findet exakte Begriffe — z.B. 'cessna' matched 'Cessna 172'.
Im Gegensatz zu /memory/search (semantic) keine 'klingt aehnlich'-Treffer."""
points = store().search_text(
q, k=k, type_filter=type,
exclude_pinned=not include_pinned,
)
return [MemoryOut.from_point(p) for p in points]
@app.get("/memory/search", response_model=List[MemoryOut])
def memory_search(
q: str,
k: int = 5,
type: Optional[str] = None,
include_pinned: bool = False,
score_threshold: Optional[float] = 0.30,
):
"""Semantische Suche. score_threshold filtert schwache Treffer raus
(Default 0.30 — MiniLM-multilingual liefert <0.25 fuer Rauschen).
Mit score_threshold=0 wird komplett Top-k zurueckgegeben."""
vec = embedder().embed(q)
points = store().search(
vec, k=k, type_filter=type, exclude_pinned=not include_pinned,
score_threshold=score_threshold if score_threshold and score_threshold > 0 else None,
)
return [MemoryOut.from_point(p) for p in points]
@app.post("/memory/save", response_model=MemoryOut)
def memory_save(body: MemoryIn):
s = store()
vec = embedder().embed(body.content)
point = MemoryPoint(
id="",
type=body.type,
title=body.title,
content=body.content,
pinned=body.pinned,
category=body.category,
source=body.source,
tags=body.tags,
conversation_id=body.conversation_id,
)
pid = s.upsert(point, vec)
saved = s.get(pid)
return MemoryOut.from_point(saved)
@app.patch("/memory/update/{point_id}", response_model=MemoryOut)
def memory_update(point_id: str, body: MemoryUpdate):
s = store()
existing = s.get(point_id)
if not existing:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
content_changed = body.content is not None and body.content != existing.content
if body.title is not None:
existing.title = body.title
if body.content is not None:
existing.content = body.content
if body.pinned is not None:
existing.pinned = body.pinned
if body.category is not None:
existing.category = body.category
if body.tags is not None:
existing.tags = body.tags
vec = embedder().embed(existing.content) if content_changed else None
if vec is None:
# Vektor unveraendert lassen — nur Payload neu schreiben
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.set_payload(
collection_name=COLLECTION,
payload=existing.to_payload() | {"updated_at": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()},
points=[point_id],
)
saved = s.get(point_id)
else:
s.upsert(existing, vec)
saved = s.get(point_id)
return MemoryOut.from_point(saved)
@app.delete("/memory/delete/{point_id}")
def memory_delete(point_id: str):
s = store()
if not s.get(point_id):
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
s.delete(point_id)
return {"deleted": point_id}
# ─── Migration aus brain-import/ ──────────────────────────────────────
IMPORT_DIR = os.environ.get("IMPORT_DIR", "/import")
@app.post("/memory/migrate")
def memory_migrate():
"""Liest /import/*.md und schreibt atomare Memory-Punkte in die DB.
Idempotent: bei Re-Run werden Punkte mit gleicher migration_key ersetzt."""
from pathlib import Path
from migration import run_migration
s = store()
e = embedder()
result = run_migration(Path(IMPORT_DIR), s, e)
return result
@app.get("/memory/import-files")
def memory_import_files():
"""Listet was unter /import/ liegt — fuer die Diagnostic-UI."""
from pathlib import Path
d = Path(IMPORT_DIR)
if not d.exists():
return {"import_dir": str(d), "exists": False, "files": []}
out = []
for p in sorted(d.iterdir()):
if p.is_file():
try:
out.append({"name": p.name, "size": p.stat().st_size})
except Exception:
pass
return {"import_dir": str(d), "exists": True, "files": out}
# ─── Bootstrap-Snapshot ───────────────────────────────────────────────
# "Bootstrap" = alle pinned Memories. Export/Import zum schnellen
# Wiederherstellen einer schlanken ARIA nach Wipe.
@app.get("/memory/export-bootstrap")
def memory_export_bootstrap():
"""Gibt alle pinned Memories als JSON zurueck — fuer Browser-Download."""
s = store()
pinned = s.list_pinned()
return {
"version": 1,
"exported_at": __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(),
"count": len(pinned),
"memories": [
{
"type": p.type,
"title": p.title,
"content": p.content,
"pinned": True,
"category": p.category,
"source": p.source,
"tags": p.tags,
}
for p in pinned
],
}
class BootstrapBundle(BaseModel):
version: int = 1
memories: List[dict]
@app.post("/memory/import-bootstrap")
def memory_import_bootstrap(body: BootstrapBundle):
"""Loescht alle pinned Memories und importiert die im Bundle.
Cold Memory (unpinned) bleibt unangetastet.
Wenn keine Memories im Bundle: nur loeschen ist NICHT erlaubt — der
Caller soll erst exportieren und dann importieren.
"""
if not body.memories:
raise HTTPException(400, "Bundle hat keine memories — Abbruch zur Sicherheit")
s = store()
e = embedder()
# Alle aktuell pinned Punkte loeschen
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.delete(
collection_name=COLLECTION,
points_selector=qm.FilterSelector(filter=qm.Filter(must=[
qm.FieldCondition(key="pinned", match=qm.MatchValue(value=True))
])),
)
# Neue Punkte einspeisen
created = 0
for m in body.memories:
content = (m.get("content") or "").strip()
if not content:
continue
point = MemoryPoint(
id="",
type=m.get("type", "fact"),
title=m.get("title", "(ohne Titel)"),
content=content,
pinned=True,
category=m.get("category", ""),
source=m.get("source", "bootstrap-import"),
tags=list(m.get("tags", [])),
)
vec = e.embed(content)
s.upsert(point, vec)
created += 1
return {"created": created, "deleted_previous_pinned": True}
# ─── Conversation-Loop ──────────────────────────────────────────────
class ChatIn(BaseModel):
message: str
source: str = "" # "app" / "diagnostic" / "stt" — optional
class ChatOut(BaseModel):
reply: str
turns: int
distilling: bool
events: list = Field(default_factory=list)
@app.post("/chat", response_model=ChatOut)
def chat(body: ChatIn, background: BackgroundTasks):
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
im Hintergrund nachdem die Response rausging."""
a = agent()
try:
reply = a.chat(body.message, source=body.source)
except ValueError as exc:
raise HTTPException(400, str(exc))
except RuntimeError as exc:
logger.error("chat fehlgeschlagen: %s", exc)
raise HTTPException(502, str(exc))
needs_distill = a.conversation.needs_distill()
if needs_distill:
background.add_task(a.distill_old_turns)
return ChatOut(
reply=reply,
turns=len(a.conversation.turns),
distilling=needs_distill,
events=a.pop_events(),
)
@app.get("/conversation/stats")
def conversation_stats():
return conversation().stats()
@app.post("/conversation/reset")
def conversation_reset():
"""Hardes Reset — der Rolling-Window-Verlauf wird komplett geleert.
Destillierte facts bleiben in der DB."""
conversation().reset()
return {"ok": True, "turns": 0}
class ConvDeleteBody(BaseModel):
role: str
content: str
ts_iso_hint: Optional[str] = None
@app.post("/conversation/delete-turn")
def conversation_delete_turn(body: ConvDeleteBody):
"""Entfernt einen einzelnen Turn aus dem Rolling-Window + jsonl.
Match per role + content (erstes Vorkommen wenn ts_iso_hint None,
sonst nahester zur Zeit). 404 wenn kein Match.
POST statt DELETE weil FastAPI 0.115 keine Bodys auf DELETE
erlaubt — semantisch trotzdem eine Loeschung."""
ok = conversation().remove_by_match(
role=body.role, content=body.content, ts_iso_hint=body.ts_iso_hint,
)
if not ok:
raise HTTPException(404, "Turn mit diesem role+content nicht gefunden")
return {"ok": True, "turns": len(conversation().turns)}
@app.post("/conversation/distill")
def conversation_distill_now():
"""Manueller Trigger fuer Destillat — fuer Tests oder vor einem
bewussten Reset."""
return agent().distill_old_turns()
# ─── Call-Metrics (Token / Quota-Monitoring) ────────────────────────
@app.get("/metrics/calls")
def metrics_calls():
"""Liefert Aggregate fuer 1h / 5h / 24h / 30d.
Jedes Window: {window_seconds, calls, tokens_in, tokens_out, by_model}."""
return metrics_mod.stats()
# ─── Triggers (passive Aufweck-Quellen) ─────────────────────────────
class TriggerTimerBody(BaseModel):
name: str
fires_at: str # ISO timestamp
message: str
author: str = "stefan"
class TriggerWatcherBody(BaseModel):
name: str
condition: str
message: str
check_interval_sec: int = 300
throttle_sec: int = 3600
author: str = "stefan"
class TriggerPatch(BaseModel):
active: bool | None = None
message: str | None = None
condition: str | None = None
throttle_sec: int | None = None
check_interval_sec: int | None = None
fires_at: str | None = None
@app.get("/triggers/list")
def triggers_list(active_only: bool = False):
return {"triggers": triggers_mod.list_triggers(active_only=active_only)}
@app.get("/triggers/conditions")
def triggers_conditions():
"""Verfuegbare Variablen + Funktionen fuer Watcher-Conditions
(mit aktuellen Werten)."""
current = watcher_mod.collect_variables()
# near() ist ein callable in vars_ — fuer die UI rausfiltern
serializable = {k: v for k, v in current.items() if not callable(v)}
return {
"variables": watcher_mod.describe_variables(),
"functions": watcher_mod.describe_functions(),
"current": serializable,
}
@app.get("/triggers/{name}")
def triggers_get(name: str):
t = triggers_mod.read(name)
if t is None:
raise HTTPException(404, f"Trigger '{name}' nicht gefunden")
return t
@app.get("/triggers/{name}/logs")
def triggers_get_logs(name: str, limit: int = 50):
return {"logs": triggers_mod.list_logs(name, limit=limit)}
@app.post("/triggers/timer")
def triggers_create_timer(body: TriggerTimerBody):
try:
return triggers_mod.create_timer(
name=body.name, fires_at_iso=body.fires_at,
message=body.message, author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/triggers/watcher")
def triggers_create_watcher(body: TriggerWatcherBody):
try:
return triggers_mod.create_watcher(
name=body.name, condition=body.condition,
message=body.message,
check_interval_sec=body.check_interval_sec,
throttle_sec=body.throttle_sec,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/triggers/{name}")
def triggers_patch(name: str, body: TriggerPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return triggers_mod.update(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/triggers/{name}")
def triggers_delete(name: str):
try:
triggers_mod.delete(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
# ─── Skills ─────────────────────────────────────────────────────────
class SkillCreate(BaseModel):
name: str
description: str
execution: str # local-venv | local-bin | bash
entry_code: str
readme: str = ""
args: list = Field(default_factory=list)
requires: dict = Field(default_factory=dict)
pip_packages: list = Field(default_factory=list)
author: str = "stefan"
class SkillRun(BaseModel):
name: str
args: dict = Field(default_factory=dict)
timeout_sec: int = 300
class SkillPatch(BaseModel):
description: str | None = None
active: bool | None = None
args: list | None = None
@app.get("/skills/list")
def skills_list(active_only: bool = False):
return {"skills": skills_mod.list_skills(active_only=active_only)}
@app.get("/skills/{name}")
def skills_get(name: str):
m = skills_mod.read_manifest(name)
if m is None:
raise HTTPException(404, f"Skill '{name}' nicht gefunden")
readme = skills_mod.read_readme(name)
return {"manifest": m, "readme": readme}
@app.post("/skills/create")
def skills_create(body: SkillCreate):
try:
return skills_mod.create_skill(
name=body.name,
description=body.description,
execution=body.execution,
entry_code=body.entry_code,
readme=body.readme,
args=body.args,
requires=body.requires,
pip_packages=body.pip_packages,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/skills/run")
def skills_run(body: SkillRun):
try:
return skills_mod.run_skill(body.name, args=body.args, timeout_sec=body.timeout_sec)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/skills/{name}")
def skills_patch(name: str, body: SkillPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return skills_mod.update_skill(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/skills/{name}")
def skills_delete(name: str):
try:
skills_mod.delete_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
@app.get("/skills/{name}/logs")
def skills_logs(name: str, limit: int = 50):
return {"logs": skills_mod.list_logs(name, limit=limit)}
@app.get("/skills/{name}/export")
def skills_export(name: str):
try:
data = skills_mod.export_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return Response(
content=data,
media_type="application/gzip",
headers={"Content-Disposition": f'attachment; filename="skill-{name}.tar.gz"'},
)
@app.post("/skills/import")
async def skills_import(request: Request, overwrite: bool = False):
data = await request.body()
if not data:
raise HTTPException(400, "Leerer Body")
try:
manifest = skills_mod.import_skill(data, overwrite=overwrite)
except ValueError as exc:
raise HTTPException(400, str(exc))
return {"imported": manifest}