70d1500096
OpenClaw (aria-core) ist raus, ARIA laeuft jetzt mit eigenem Agent-Framework
im aria-brain Container. Vector-DB-basiertes Gedaechtnis statt Sessions,
eigener Conversation-Loop mit Hot+Cold-Memory + Rolling Window, Tool-Use
fuer Skills, Memory-Destillat-Pipeline.
aria-brain/ (neuer Container)
- main.py FastAPI auf 8080, alle Endpoints
- agent.py Conversation-Loop mit Tool-Use (skill_create + run_<skill>)
- conversation.py Rolling Window, JSONL-Persistenz, Distill-Marker
- proxy_client.py httpx-Wrapper zum Claude-Proxy, OpenAI-Format
- prompts.py System-Prompt aus Hot+Cold+Skills
- migration.py Markdown-Parser fuer brain-import/ → atomare Memories
- skills.py Filesystem-Layer fuer /data/skills/<name>/ (Python-only,
venv pro Skill, tar.gz Export/Import, Run-Logs)
- memory/ Embedder (sentence-transformers, multilingual MiniLM)
+ VectorStore (Qdrant-Wrapper)
docker-compose.yml
- aria-core (OpenClaw) raus, openclaw-config Volume raus
- aria-brain Service (FastAPI + Memory)
- aria-qdrant Service (Vector-DB) mit Bind-Mount aria-data/brain/qdrant/
- Diagnostic teilt jetzt Netzwerk mit Bridge (vorher: aria-core)
- Brain bekommt SSH-Mount fuer aria-wohnung + /import fuer brain-import/
bridge/aria_bridge.py
- send_to_core → HTTP-Call an aria-brain:8080/chat (statt OpenClaw-WS)
- aria-core-spezifische Handler raus: doctor_fix, aria_restart,
aria_session_reset, Auto-Compact-Logik, OpenClaw-Handshake
- Generischer container_restart-Handler (Whitelist Bridge/Brain/Qdrant)
- Side-Channel-Events aus /chat-Response (z.B. skill_created) werden
als RVS-Events forwarded
- file_list_request / file_delete_request → an Diagnostic forwarded
- Tote OpenClaw-Connection-Logik bleibt im Code als Referenz (nicht aktiv)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
519 lines
16 KiB
Python
519 lines
16 KiB
Python
"""
|
|
ARIA Brain — FastAPI-Einstieg.
|
|
|
|
Phase B Punkt 1: nur Skeleton.
|
|
- /health → Liveness
|
|
- /memory/list → alle Punkte (gefiltert)
|
|
- /memory/pinned → Hot Memory
|
|
- /memory/search?q=...&k=5 → semantische Suche
|
|
- /memory/save → neuen Punkt anlegen
|
|
- /memory/update/{id} → Punkt aendern (re-embed wenn content geaendert)
|
|
- /memory/delete/{id} → Punkt loeschen
|
|
- /memory/stats → Anzahl Punkte pro Type
|
|
|
|
/chat (Conversation-Loop) und /skills/* kommen in spaeteren Phasen.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import List, Optional
|
|
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel, Field
|
|
|
|
from memory import Embedder, VectorStore, MemoryPoint
|
|
from conversation import Conversation
|
|
from proxy_client import ProxyClient
|
|
from agent import Agent
|
|
import skills as skills_mod
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
logger = logging.getLogger("aria-brain")
|
|
|
|
QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant")
|
|
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
|
|
|
|
app = FastAPI(title="ARIA Brain", version="0.1.0")
|
|
|
|
_embedder: Optional[Embedder] = None
|
|
_store: Optional[VectorStore] = None
|
|
_conversation: Optional[Conversation] = None
|
|
_proxy: Optional[ProxyClient] = None
|
|
_agent: Optional[Agent] = None
|
|
|
|
|
|
def embedder() -> Embedder:
|
|
global _embedder
|
|
if _embedder is None:
|
|
_embedder = Embedder()
|
|
return _embedder
|
|
|
|
|
|
def store() -> VectorStore:
|
|
global _store
|
|
if _store is None:
|
|
_store = VectorStore(host=QDRANT_HOST, port=QDRANT_PORT)
|
|
return _store
|
|
|
|
|
|
def conversation() -> Conversation:
|
|
global _conversation
|
|
if _conversation is None:
|
|
_conversation = Conversation()
|
|
return _conversation
|
|
|
|
|
|
def proxy_client() -> ProxyClient:
|
|
global _proxy
|
|
if _proxy is None:
|
|
_proxy = ProxyClient()
|
|
return _proxy
|
|
|
|
|
|
def agent() -> Agent:
|
|
global _agent
|
|
if _agent is None:
|
|
_agent = Agent(store(), embedder(), conversation(), proxy_client())
|
|
return _agent
|
|
|
|
|
|
# ─── Pydantic-Schemas ─────────────────────────────────────────────────
|
|
|
|
class MemoryIn(BaseModel):
|
|
type: str = Field(..., description="identity|rule|preference|tool|skill|fact|conversation|reminder")
|
|
title: str
|
|
content: str
|
|
pinned: bool = False
|
|
category: str = ""
|
|
source: str = "manual"
|
|
tags: List[str] = Field(default_factory=list)
|
|
conversation_id: Optional[str] = None
|
|
|
|
|
|
class MemoryUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
content: Optional[str] = None
|
|
pinned: Optional[bool] = None
|
|
category: Optional[str] = None
|
|
tags: Optional[List[str]] = None
|
|
|
|
|
|
class MemoryOut(BaseModel):
|
|
id: str
|
|
type: str
|
|
title: str
|
|
content: str
|
|
pinned: bool
|
|
category: str
|
|
source: str
|
|
tags: List[str]
|
|
created_at: str
|
|
updated_at: str
|
|
conversation_id: Optional[str] = None
|
|
score: Optional[float] = None
|
|
|
|
@classmethod
|
|
def from_point(cls, p: MemoryPoint) -> "MemoryOut":
|
|
return cls(**p.__dict__)
|
|
|
|
|
|
# ─── Health ───────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
try:
|
|
n = store().count()
|
|
return {"status": "ok", "memory_count": n, "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
|
|
except Exception as exc:
|
|
return {"status": "degraded", "error": str(exc), "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
|
|
|
|
|
|
# ─── Memory-Endpoints ─────────────────────────────────────────────────
|
|
|
|
@app.get("/memory/stats")
|
|
def memory_stats():
|
|
s = store()
|
|
points = s.list_all()
|
|
by_type = {}
|
|
pinned = 0
|
|
for p in points:
|
|
by_type[p.type] = by_type.get(p.type, 0) + 1
|
|
if p.pinned:
|
|
pinned += 1
|
|
return {"total": len(points), "pinned": pinned, "by_type": by_type}
|
|
|
|
|
|
@app.get("/memory/list", response_model=List[MemoryOut])
|
|
def memory_list(type: Optional[str] = None, limit: int = 200):
|
|
s = store()
|
|
points = s.list_by_type(type, limit=limit) if type else s.list_all(limit=limit)
|
|
return [MemoryOut.from_point(p) for p in points]
|
|
|
|
|
|
@app.get("/memory/pinned", response_model=List[MemoryOut])
|
|
def memory_pinned():
|
|
return [MemoryOut.from_point(p) for p in store().list_pinned()]
|
|
|
|
|
|
@app.get("/memory/search", response_model=List[MemoryOut])
|
|
def memory_search(q: str, k: int = 5, type: Optional[str] = None, include_pinned: bool = False):
|
|
vec = embedder().embed(q)
|
|
points = store().search(vec, k=k, type_filter=type, exclude_pinned=not include_pinned)
|
|
return [MemoryOut.from_point(p) for p in points]
|
|
|
|
|
|
@app.post("/memory/save", response_model=MemoryOut)
|
|
def memory_save(body: MemoryIn):
|
|
s = store()
|
|
vec = embedder().embed(body.content)
|
|
point = MemoryPoint(
|
|
id="",
|
|
type=body.type,
|
|
title=body.title,
|
|
content=body.content,
|
|
pinned=body.pinned,
|
|
category=body.category,
|
|
source=body.source,
|
|
tags=body.tags,
|
|
conversation_id=body.conversation_id,
|
|
)
|
|
pid = s.upsert(point, vec)
|
|
saved = s.get(pid)
|
|
return MemoryOut.from_point(saved)
|
|
|
|
|
|
@app.patch("/memory/update/{point_id}", response_model=MemoryOut)
|
|
def memory_update(point_id: str, body: MemoryUpdate):
|
|
s = store()
|
|
existing = s.get(point_id)
|
|
if not existing:
|
|
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
|
|
|
|
content_changed = body.content is not None and body.content != existing.content
|
|
if body.title is not None:
|
|
existing.title = body.title
|
|
if body.content is not None:
|
|
existing.content = body.content
|
|
if body.pinned is not None:
|
|
existing.pinned = body.pinned
|
|
if body.category is not None:
|
|
existing.category = body.category
|
|
if body.tags is not None:
|
|
existing.tags = body.tags
|
|
|
|
vec = embedder().embed(existing.content) if content_changed else None
|
|
if vec is None:
|
|
# Vektor unveraendert lassen — nur Payload neu schreiben
|
|
from qdrant_client.http import models as qm
|
|
from memory.vector_store import COLLECTION
|
|
s.client.set_payload(
|
|
collection_name=COLLECTION,
|
|
payload=existing.to_payload() | {"updated_at": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()},
|
|
points=[point_id],
|
|
)
|
|
saved = s.get(point_id)
|
|
else:
|
|
s.upsert(existing, vec)
|
|
saved = s.get(point_id)
|
|
return MemoryOut.from_point(saved)
|
|
|
|
|
|
@app.delete("/memory/delete/{point_id}")
|
|
def memory_delete(point_id: str):
|
|
s = store()
|
|
if not s.get(point_id):
|
|
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
|
|
s.delete(point_id)
|
|
return {"deleted": point_id}
|
|
|
|
|
|
# ─── Migration aus brain-import/ ──────────────────────────────────────
|
|
|
|
IMPORT_DIR = os.environ.get("IMPORT_DIR", "/import")
|
|
|
|
|
|
@app.post("/memory/migrate")
|
|
def memory_migrate():
|
|
"""Liest /import/*.md und schreibt atomare Memory-Punkte in die DB.
|
|
Idempotent: bei Re-Run werden Punkte mit gleicher migration_key ersetzt."""
|
|
from pathlib import Path
|
|
from migration import run_migration
|
|
s = store()
|
|
e = embedder()
|
|
result = run_migration(Path(IMPORT_DIR), s, e)
|
|
return result
|
|
|
|
|
|
@app.get("/memory/import-files")
|
|
def memory_import_files():
|
|
"""Listet was unter /import/ liegt — fuer die Diagnostic-UI."""
|
|
from pathlib import Path
|
|
d = Path(IMPORT_DIR)
|
|
if not d.exists():
|
|
return {"import_dir": str(d), "exists": False, "files": []}
|
|
out = []
|
|
for p in sorted(d.iterdir()):
|
|
if p.is_file():
|
|
try:
|
|
out.append({"name": p.name, "size": p.stat().st_size})
|
|
except Exception:
|
|
pass
|
|
return {"import_dir": str(d), "exists": True, "files": out}
|
|
|
|
|
|
# ─── Bootstrap-Snapshot ───────────────────────────────────────────────
|
|
# "Bootstrap" = alle pinned Memories. Export/Import zum schnellen
|
|
# Wiederherstellen einer schlanken ARIA nach Wipe.
|
|
|
|
@app.get("/memory/export-bootstrap")
|
|
def memory_export_bootstrap():
|
|
"""Gibt alle pinned Memories als JSON zurueck — fuer Browser-Download."""
|
|
s = store()
|
|
pinned = s.list_pinned()
|
|
return {
|
|
"version": 1,
|
|
"exported_at": __import__("datetime").datetime.now(
|
|
__import__("datetime").timezone.utc
|
|
).isoformat(),
|
|
"count": len(pinned),
|
|
"memories": [
|
|
{
|
|
"type": p.type,
|
|
"title": p.title,
|
|
"content": p.content,
|
|
"pinned": True,
|
|
"category": p.category,
|
|
"source": p.source,
|
|
"tags": p.tags,
|
|
}
|
|
for p in pinned
|
|
],
|
|
}
|
|
|
|
|
|
class BootstrapBundle(BaseModel):
|
|
version: int = 1
|
|
memories: List[dict]
|
|
|
|
|
|
@app.post("/memory/import-bootstrap")
|
|
def memory_import_bootstrap(body: BootstrapBundle):
|
|
"""Loescht alle pinned Memories und importiert die im Bundle.
|
|
Cold Memory (unpinned) bleibt unangetastet.
|
|
|
|
Wenn keine Memories im Bundle: nur loeschen ist NICHT erlaubt — der
|
|
Caller soll erst exportieren und dann importieren.
|
|
"""
|
|
if not body.memories:
|
|
raise HTTPException(400, "Bundle hat keine memories — Abbruch zur Sicherheit")
|
|
|
|
s = store()
|
|
e = embedder()
|
|
|
|
# Alle aktuell pinned Punkte loeschen
|
|
from qdrant_client.http import models as qm
|
|
from memory.vector_store import COLLECTION
|
|
s.client.delete(
|
|
collection_name=COLLECTION,
|
|
points_selector=qm.FilterSelector(filter=qm.Filter(must=[
|
|
qm.FieldCondition(key="pinned", match=qm.MatchValue(value=True))
|
|
])),
|
|
)
|
|
|
|
# Neue Punkte einspeisen
|
|
created = 0
|
|
for m in body.memories:
|
|
content = (m.get("content") or "").strip()
|
|
if not content:
|
|
continue
|
|
point = MemoryPoint(
|
|
id="",
|
|
type=m.get("type", "fact"),
|
|
title=m.get("title", "(ohne Titel)"),
|
|
content=content,
|
|
pinned=True,
|
|
category=m.get("category", ""),
|
|
source=m.get("source", "bootstrap-import"),
|
|
tags=list(m.get("tags", [])),
|
|
)
|
|
vec = e.embed(content)
|
|
s.upsert(point, vec)
|
|
created += 1
|
|
|
|
return {"created": created, "deleted_previous_pinned": True}
|
|
|
|
|
|
# ─── Conversation-Loop ──────────────────────────────────────────────
|
|
|
|
class ChatIn(BaseModel):
|
|
message: str
|
|
source: str = "" # "app" / "diagnostic" / "stt" — optional
|
|
|
|
|
|
class ChatOut(BaseModel):
|
|
reply: str
|
|
turns: int
|
|
distilling: bool
|
|
events: list = Field(default_factory=list)
|
|
|
|
|
|
@app.post("/chat", response_model=ChatOut)
|
|
def chat(body: ChatIn, background: BackgroundTasks):
|
|
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
|
|
im Hintergrund nachdem die Response rausging."""
|
|
a = agent()
|
|
try:
|
|
reply = a.chat(body.message, source=body.source)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc))
|
|
except RuntimeError as exc:
|
|
logger.error("chat fehlgeschlagen: %s", exc)
|
|
raise HTTPException(502, str(exc))
|
|
|
|
needs_distill = a.conversation.needs_distill()
|
|
if needs_distill:
|
|
background.add_task(a.distill_old_turns)
|
|
return ChatOut(
|
|
reply=reply,
|
|
turns=len(a.conversation.turns),
|
|
distilling=needs_distill,
|
|
events=a.pop_events(),
|
|
)
|
|
|
|
|
|
@app.get("/conversation/stats")
|
|
def conversation_stats():
|
|
return conversation().stats()
|
|
|
|
|
|
@app.post("/conversation/reset")
|
|
def conversation_reset():
|
|
"""Hardes Reset — der Rolling-Window-Verlauf wird komplett geleert.
|
|
Destillierte facts bleiben in der DB."""
|
|
conversation().reset()
|
|
return {"ok": True, "turns": 0}
|
|
|
|
|
|
@app.post("/conversation/distill")
|
|
def conversation_distill_now():
|
|
"""Manueller Trigger fuer Destillat — fuer Tests oder vor einem
|
|
bewussten Reset."""
|
|
return agent().distill_old_turns()
|
|
|
|
|
|
# ─── Skills ─────────────────────────────────────────────────────────
|
|
|
|
class SkillCreate(BaseModel):
|
|
name: str
|
|
description: str
|
|
execution: str # local-venv | local-bin | bash
|
|
entry_code: str
|
|
readme: str = ""
|
|
args: list = Field(default_factory=list)
|
|
requires: dict = Field(default_factory=dict)
|
|
pip_packages: list = Field(default_factory=list)
|
|
author: str = "stefan"
|
|
|
|
|
|
class SkillRun(BaseModel):
|
|
name: str
|
|
args: dict = Field(default_factory=dict)
|
|
timeout_sec: int = 300
|
|
|
|
|
|
class SkillPatch(BaseModel):
|
|
description: str | None = None
|
|
active: bool | None = None
|
|
args: list | None = None
|
|
|
|
|
|
@app.get("/skills/list")
|
|
def skills_list(active_only: bool = False):
|
|
return {"skills": skills_mod.list_skills(active_only=active_only)}
|
|
|
|
|
|
@app.get("/skills/{name}")
|
|
def skills_get(name: str):
|
|
m = skills_mod.read_manifest(name)
|
|
if m is None:
|
|
raise HTTPException(404, f"Skill '{name}' nicht gefunden")
|
|
readme = skills_mod.read_readme(name)
|
|
return {"manifest": m, "readme": readme}
|
|
|
|
|
|
@app.post("/skills/create")
|
|
def skills_create(body: SkillCreate):
|
|
try:
|
|
return skills_mod.create_skill(
|
|
name=body.name,
|
|
description=body.description,
|
|
execution=body.execution,
|
|
entry_code=body.entry_code,
|
|
readme=body.readme,
|
|
args=body.args,
|
|
requires=body.requires,
|
|
pip_packages=body.pip_packages,
|
|
author=body.author,
|
|
)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc))
|
|
|
|
|
|
@app.post("/skills/run")
|
|
def skills_run(body: SkillRun):
|
|
try:
|
|
return skills_mod.run_skill(body.name, args=body.args, timeout_sec=body.timeout_sec)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc))
|
|
|
|
|
|
@app.patch("/skills/{name}")
|
|
def skills_patch(name: str, body: SkillPatch):
|
|
patch = {k: v for k, v in body.model_dump().items() if v is not None}
|
|
try:
|
|
return skills_mod.update_skill(name, patch)
|
|
except ValueError as exc:
|
|
raise HTTPException(404, str(exc))
|
|
|
|
|
|
@app.delete("/skills/{name}")
|
|
def skills_delete(name: str):
|
|
try:
|
|
skills_mod.delete_skill(name)
|
|
except ValueError as exc:
|
|
raise HTTPException(404, str(exc))
|
|
return {"deleted": name}
|
|
|
|
|
|
@app.get("/skills/{name}/logs")
|
|
def skills_logs(name: str, limit: int = 50):
|
|
return {"logs": skills_mod.list_logs(name, limit=limit)}
|
|
|
|
|
|
@app.get("/skills/{name}/export")
|
|
def skills_export(name: str):
|
|
try:
|
|
data = skills_mod.export_skill(name)
|
|
except ValueError as exc:
|
|
raise HTTPException(404, str(exc))
|
|
return Response(
|
|
content=data,
|
|
media_type="application/gzip",
|
|
headers={"Content-Disposition": f'attachment; filename="skill-{name}.tar.gz"'},
|
|
)
|
|
|
|
|
|
@app.post("/skills/import")
|
|
async def skills_import(request: Request, overwrite: bool = False):
|
|
data = await request.body()
|
|
if not data:
|
|
raise HTTPException(400, "Leerer Body")
|
|
try:
|
|
manifest = skills_mod.import_skill(data, overwrite=overwrite)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc))
|
|
return {"imported": manifest}
|