""" ARIA Brain — FastAPI-Einstieg. Phase B Punkt 1: nur Skeleton. - /health → Liveness - /memory/list → alle Punkte (gefiltert) - /memory/pinned → Hot Memory - /memory/search?q=...&k=5 → semantische Suche - /memory/save → neuen Punkt anlegen - /memory/update/{id} → Punkt aendern (re-embed wenn content geaendert) - /memory/delete/{id} → Punkt loeschen - /memory/stats → Anzahl Punkte pro Type /chat (Conversation-Loop) und /skills/* kommen in spaeteren Phasen. """ from __future__ import annotations import logging import os from typing import List, Optional from fastapi import FastAPI, HTTPException, BackgroundTasks, Request from fastapi.responses import Response from pydantic import BaseModel, Field from memory import Embedder, VectorStore, MemoryPoint from conversation import Conversation from proxy_client import ProxyClient from agent import Agent import skills as skills_mod logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") logger = logging.getLogger("aria-brain") QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant") QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333")) app = FastAPI(title="ARIA Brain", version="0.1.0") _embedder: Optional[Embedder] = None _store: Optional[VectorStore] = None _conversation: Optional[Conversation] = None _proxy: Optional[ProxyClient] = None _agent: Optional[Agent] = None def embedder() -> Embedder: global _embedder if _embedder is None: _embedder = Embedder() return _embedder def store() -> VectorStore: global _store if _store is None: _store = VectorStore(host=QDRANT_HOST, port=QDRANT_PORT) return _store def conversation() -> Conversation: global _conversation if _conversation is None: _conversation = Conversation() return _conversation def proxy_client() -> ProxyClient: global _proxy if _proxy is None: _proxy = ProxyClient() return _proxy def agent() -> Agent: global _agent if _agent is None: _agent = Agent(store(), embedder(), conversation(), proxy_client()) return _agent # ─── Pydantic-Schemas ───────────────────────────────────────────────── class MemoryIn(BaseModel): type: str = Field(..., description="identity|rule|preference|tool|skill|fact|conversation|reminder") title: str content: str pinned: bool = False category: str = "" source: str = "manual" tags: List[str] = Field(default_factory=list) conversation_id: Optional[str] = None class MemoryUpdate(BaseModel): title: Optional[str] = None content: Optional[str] = None pinned: Optional[bool] = None category: Optional[str] = None tags: Optional[List[str]] = None class MemoryOut(BaseModel): id: str type: str title: str content: str pinned: bool category: str source: str tags: List[str] created_at: str updated_at: str conversation_id: Optional[str] = None score: Optional[float] = None @classmethod def from_point(cls, p: MemoryPoint) -> "MemoryOut": return cls(**p.__dict__) # ─── Health ─────────────────────────────────────────────────────────── @app.get("/health") def health(): try: n = store().count() return {"status": "ok", "memory_count": n, "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"} except Exception as exc: return {"status": "degraded", "error": str(exc), "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"} # ─── Memory-Endpoints ───────────────────────────────────────────────── @app.get("/memory/stats") def memory_stats(): s = store() points = s.list_all() by_type = {} pinned = 0 for p in points: by_type[p.type] = by_type.get(p.type, 0) + 1 if p.pinned: pinned += 1 return {"total": len(points), "pinned": pinned, "by_type": by_type} @app.get("/memory/list", response_model=List[MemoryOut]) def memory_list(type: Optional[str] = None, limit: int = 200): s = store() points = s.list_by_type(type, limit=limit) if type else s.list_all(limit=limit) return [MemoryOut.from_point(p) for p in points] @app.get("/memory/pinned", response_model=List[MemoryOut]) def memory_pinned(): return [MemoryOut.from_point(p) for p in store().list_pinned()] @app.get("/memory/search", response_model=List[MemoryOut]) def memory_search(q: str, k: int = 5, type: Optional[str] = None, include_pinned: bool = False): vec = embedder().embed(q) points = store().search(vec, k=k, type_filter=type, exclude_pinned=not include_pinned) return [MemoryOut.from_point(p) for p in points] @app.post("/memory/save", response_model=MemoryOut) def memory_save(body: MemoryIn): s = store() vec = embedder().embed(body.content) point = MemoryPoint( id="", type=body.type, title=body.title, content=body.content, pinned=body.pinned, category=body.category, source=body.source, tags=body.tags, conversation_id=body.conversation_id, ) pid = s.upsert(point, vec) saved = s.get(pid) return MemoryOut.from_point(saved) @app.patch("/memory/update/{point_id}", response_model=MemoryOut) def memory_update(point_id: str, body: MemoryUpdate): s = store() existing = s.get(point_id) if not existing: raise HTTPException(404, f"Memory {point_id} nicht gefunden") content_changed = body.content is not None and body.content != existing.content if body.title is not None: existing.title = body.title if body.content is not None: existing.content = body.content if body.pinned is not None: existing.pinned = body.pinned if body.category is not None: existing.category = body.category if body.tags is not None: existing.tags = body.tags vec = embedder().embed(existing.content) if content_changed else None if vec is None: # Vektor unveraendert lassen — nur Payload neu schreiben from qdrant_client.http import models as qm from memory.vector_store import COLLECTION s.client.set_payload( collection_name=COLLECTION, payload=existing.to_payload() | {"updated_at": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()}, points=[point_id], ) saved = s.get(point_id) else: s.upsert(existing, vec) saved = s.get(point_id) return MemoryOut.from_point(saved) @app.delete("/memory/delete/{point_id}") def memory_delete(point_id: str): s = store() if not s.get(point_id): raise HTTPException(404, f"Memory {point_id} nicht gefunden") s.delete(point_id) return {"deleted": point_id} # ─── Migration aus brain-import/ ────────────────────────────────────── IMPORT_DIR = os.environ.get("IMPORT_DIR", "/import") @app.post("/memory/migrate") def memory_migrate(): """Liest /import/*.md und schreibt atomare Memory-Punkte in die DB. Idempotent: bei Re-Run werden Punkte mit gleicher migration_key ersetzt.""" from pathlib import Path from migration import run_migration s = store() e = embedder() result = run_migration(Path(IMPORT_DIR), s, e) return result @app.get("/memory/import-files") def memory_import_files(): """Listet was unter /import/ liegt — fuer die Diagnostic-UI.""" from pathlib import Path d = Path(IMPORT_DIR) if not d.exists(): return {"import_dir": str(d), "exists": False, "files": []} out = [] for p in sorted(d.iterdir()): if p.is_file(): try: out.append({"name": p.name, "size": p.stat().st_size}) except Exception: pass return {"import_dir": str(d), "exists": True, "files": out} # ─── Bootstrap-Snapshot ─────────────────────────────────────────────── # "Bootstrap" = alle pinned Memories. Export/Import zum schnellen # Wiederherstellen einer schlanken ARIA nach Wipe. @app.get("/memory/export-bootstrap") def memory_export_bootstrap(): """Gibt alle pinned Memories als JSON zurueck — fuer Browser-Download.""" s = store() pinned = s.list_pinned() return { "version": 1, "exported_at": __import__("datetime").datetime.now( __import__("datetime").timezone.utc ).isoformat(), "count": len(pinned), "memories": [ { "type": p.type, "title": p.title, "content": p.content, "pinned": True, "category": p.category, "source": p.source, "tags": p.tags, } for p in pinned ], } class BootstrapBundle(BaseModel): version: int = 1 memories: List[dict] @app.post("/memory/import-bootstrap") def memory_import_bootstrap(body: BootstrapBundle): """Loescht alle pinned Memories und importiert die im Bundle. Cold Memory (unpinned) bleibt unangetastet. Wenn keine Memories im Bundle: nur loeschen ist NICHT erlaubt — der Caller soll erst exportieren und dann importieren. """ if not body.memories: raise HTTPException(400, "Bundle hat keine memories — Abbruch zur Sicherheit") s = store() e = embedder() # Alle aktuell pinned Punkte loeschen from qdrant_client.http import models as qm from memory.vector_store import COLLECTION s.client.delete( collection_name=COLLECTION, points_selector=qm.FilterSelector(filter=qm.Filter(must=[ qm.FieldCondition(key="pinned", match=qm.MatchValue(value=True)) ])), ) # Neue Punkte einspeisen created = 0 for m in body.memories: content = (m.get("content") or "").strip() if not content: continue point = MemoryPoint( id="", type=m.get("type", "fact"), title=m.get("title", "(ohne Titel)"), content=content, pinned=True, category=m.get("category", ""), source=m.get("source", "bootstrap-import"), tags=list(m.get("tags", [])), ) vec = e.embed(content) s.upsert(point, vec) created += 1 return {"created": created, "deleted_previous_pinned": True} # ─── Conversation-Loop ────────────────────────────────────────────── class ChatIn(BaseModel): message: str source: str = "" # "app" / "diagnostic" / "stt" — optional class ChatOut(BaseModel): reply: str turns: int distilling: bool events: list = Field(default_factory=list) @app.post("/chat", response_model=ChatOut) def chat(body: ChatIn, background: BackgroundTasks): """Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft im Hintergrund nachdem die Response rausging.""" a = agent() try: reply = a.chat(body.message, source=body.source) except ValueError as exc: raise HTTPException(400, str(exc)) except RuntimeError as exc: logger.error("chat fehlgeschlagen: %s", exc) raise HTTPException(502, str(exc)) needs_distill = a.conversation.needs_distill() if needs_distill: background.add_task(a.distill_old_turns) return ChatOut( reply=reply, turns=len(a.conversation.turns), distilling=needs_distill, events=a.pop_events(), ) @app.get("/conversation/stats") def conversation_stats(): return conversation().stats() @app.post("/conversation/reset") def conversation_reset(): """Hardes Reset — der Rolling-Window-Verlauf wird komplett geleert. Destillierte facts bleiben in der DB.""" conversation().reset() return {"ok": True, "turns": 0} @app.post("/conversation/distill") def conversation_distill_now(): """Manueller Trigger fuer Destillat — fuer Tests oder vor einem bewussten Reset.""" return agent().distill_old_turns() # ─── Skills ───────────────────────────────────────────────────────── class SkillCreate(BaseModel): name: str description: str execution: str # local-venv | local-bin | bash entry_code: str readme: str = "" args: list = Field(default_factory=list) requires: dict = Field(default_factory=dict) pip_packages: list = Field(default_factory=list) author: str = "stefan" class SkillRun(BaseModel): name: str args: dict = Field(default_factory=dict) timeout_sec: int = 300 class SkillPatch(BaseModel): description: str | None = None active: bool | None = None args: list | None = None @app.get("/skills/list") def skills_list(active_only: bool = False): return {"skills": skills_mod.list_skills(active_only=active_only)} @app.get("/skills/{name}") def skills_get(name: str): m = skills_mod.read_manifest(name) if m is None: raise HTTPException(404, f"Skill '{name}' nicht gefunden") readme = skills_mod.read_readme(name) return {"manifest": m, "readme": readme} @app.post("/skills/create") def skills_create(body: SkillCreate): try: return skills_mod.create_skill( name=body.name, description=body.description, execution=body.execution, entry_code=body.entry_code, readme=body.readme, args=body.args, requires=body.requires, pip_packages=body.pip_packages, author=body.author, ) except ValueError as exc: raise HTTPException(400, str(exc)) @app.post("/skills/run") def skills_run(body: SkillRun): try: return skills_mod.run_skill(body.name, args=body.args, timeout_sec=body.timeout_sec) except ValueError as exc: raise HTTPException(400, str(exc)) @app.patch("/skills/{name}") def skills_patch(name: str, body: SkillPatch): patch = {k: v for k, v in body.model_dump().items() if v is not None} try: return skills_mod.update_skill(name, patch) except ValueError as exc: raise HTTPException(404, str(exc)) @app.delete("/skills/{name}") def skills_delete(name: str): try: skills_mod.delete_skill(name) except ValueError as exc: raise HTTPException(404, str(exc)) return {"deleted": name} @app.get("/skills/{name}/logs") def skills_logs(name: str, limit: int = 50): return {"logs": skills_mod.list_logs(name, limit=limit)} @app.get("/skills/{name}/export") def skills_export(name: str): try: data = skills_mod.export_skill(name) except ValueError as exc: raise HTTPException(404, str(exc)) return Response( content=data, media_type="application/gzip", headers={"Content-Disposition": f'attachment; filename="skill-{name}.tar.gz"'}, ) @app.post("/skills/import") async def skills_import(request: Request, overwrite: bool = False): data = await request.body() if not data: raise HTTPException(400, "Leerer Body") try: manifest = skills_mod.import_skill(data, overwrite=overwrite) except ValueError as exc: raise HTTPException(400, str(exc)) return {"imported": manifest}