Files
ARIA-AGENT/aria-brain/main.py
T
duffyduck da4e970a31 feat(brain): Memory-Anhaenge — multipart/form-data Endpoint daneben Base64
Stefan's Test scheiterte: ein normales Handy-Foto als Base64 in der
curl-d-Argumentliste sprengt Bash's ARG_MAX (typisch 128KB-2MB). Plus:
Browser-FormData und curl -F sind eh der Standard fuer File-Uploads.

Fix: zusaetzlicher Endpoint
  POST /memory/{id}/attachments/upload  (multipart/form-data, field: file)

Beispiel auf der VM:
  curl -F file=@/pfad/zu/foto.jpg \
       "$ARIA_BRAIN_URL/memory/<id>/attachments/upload" | jq

Base64-Endpoint (/memory/{id}/attachments) bleibt fuer kleine
Uploads + interne JSON-Tools. Beide rufen am Ende den gleichen
_commit_attachment_meta-Helper, der das Memory-Payload um den
neuen Anhang updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 02:32:24 +02:00

832 lines
27 KiB
Python

"""
ARIA Brain — FastAPI-Einstieg.
Phase B Punkt 1: nur Skeleton.
- /health → Liveness
- /memory/list → alle Punkte (gefiltert)
- /memory/pinned → Hot Memory
- /memory/search?q=...&k=5 → semantische Suche
- /memory/save → neuen Punkt anlegen
- /memory/update/{id} → Punkt aendern (re-embed wenn content geaendert)
- /memory/delete/{id} → Punkt loeschen
- /memory/stats → Anzahl Punkte pro Type
/chat (Conversation-Loop) und /skills/* kommen in spaeteren Phasen.
"""
from __future__ import annotations
import logging
import os
from typing import List, Optional
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, UploadFile, File
from fastapi.responses import Response
from pydantic import BaseModel, Field
from memory import Embedder, VectorStore, MemoryPoint
from conversation import Conversation
from proxy_client import ProxyClient
from agent import Agent
import skills as skills_mod
import metrics as metrics_mod
import triggers as triggers_mod
import watcher as watcher_mod
import background as background_mod
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("aria-brain")
QDRANT_HOST = os.environ.get("QDRANT_HOST", "aria-qdrant")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Beim Brain-Start: Trigger-Background-Loop anwerfen. Beim Shutdown: stoppen."""
task = asyncio.create_task(background_mod.run_loop(agent))
logger.info("Lifespan: Trigger-Loop gestartet")
try:
yield
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info("Lifespan: Trigger-Loop gestoppt")
app = FastAPI(title="ARIA Brain", version="0.1.0", lifespan=lifespan)
_embedder: Optional[Embedder] = None
_store: Optional[VectorStore] = None
_conversation: Optional[Conversation] = None
_proxy: Optional[ProxyClient] = None
_agent: Optional[Agent] = None
def embedder() -> Embedder:
global _embedder
if _embedder is None:
_embedder = Embedder()
return _embedder
def store() -> VectorStore:
global _store
if _store is None:
_store = VectorStore(host=QDRANT_HOST, port=QDRANT_PORT)
return _store
def conversation() -> Conversation:
global _conversation
if _conversation is None:
_conversation = Conversation()
return _conversation
def proxy_client() -> ProxyClient:
global _proxy
if _proxy is None:
_proxy = ProxyClient()
return _proxy
def agent() -> Agent:
global _agent
if _agent is None:
_agent = Agent(store(), embedder(), conversation(), proxy_client())
return _agent
# ─── Pydantic-Schemas ─────────────────────────────────────────────────
class MemoryIn(BaseModel):
type: str = Field(..., description="identity|rule|preference|tool|skill|fact|conversation|reminder")
title: str
content: str
pinned: bool = False
category: str = ""
source: str = "manual"
tags: List[str] = Field(default_factory=list)
conversation_id: Optional[str] = None
# Vorhandene Anhang-Metadaten beim Save mitgeben (i.d.R. werden Anhaenge
# nach dem Save via /memory/{id}/attachments hinzugefuegt — hier eher fuer
# Bootstrap-Import/Restore-Faelle relevant).
attachments: List[dict] = Field(default_factory=list)
class MemoryUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
pinned: Optional[bool] = None
category: Optional[str] = None
tags: Optional[List[str]] = None
class MemoryOut(BaseModel):
id: str
type: str
title: str
content: str
pinned: bool
category: str
source: str
tags: List[str]
created_at: str
updated_at: str
conversation_id: Optional[str] = None
score: Optional[float] = None
attachments: List[dict] = Field(default_factory=list)
@classmethod
def from_point(cls, p: MemoryPoint) -> "MemoryOut":
return cls(**p.__dict__)
class AttachmentUploadBody(BaseModel):
"""Base64-Upload via JSON — Diagnostic schickt Files so."""
name: str
data_base64: str
# ─── Health ───────────────────────────────────────────────────────────
@app.get("/health")
def health():
try:
n = store().count()
return {"status": "ok", "memory_count": n, "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
except Exception as exc:
return {"status": "degraded", "error": str(exc), "qdrant": f"{QDRANT_HOST}:{QDRANT_PORT}"}
# ─── Memory-Endpoints ─────────────────────────────────────────────────
@app.get("/memory/stats")
def memory_stats():
s = store()
points = s.list_all()
by_type = {}
pinned = 0
for p in points:
by_type[p.type] = by_type.get(p.type, 0) + 1
if p.pinned:
pinned += 1
return {"total": len(points), "pinned": pinned, "by_type": by_type}
@app.get("/memory/list", response_model=List[MemoryOut])
def memory_list(type: Optional[str] = None, limit: int = 200):
s = store()
points = s.list_by_type(type, limit=limit) if type else s.list_all(limit=limit)
return [MemoryOut.from_point(p) for p in points]
@app.get("/memory/pinned", response_model=List[MemoryOut])
def memory_pinned():
return [MemoryOut.from_point(p) for p in store().list_pinned()]
@app.get("/memory/search-text", response_model=List[MemoryOut])
def memory_search_text(
q: str,
k: int = 50,
type: Optional[str] = None,
include_pinned: bool = True,
):
"""Volltext-Substring-Suche (case-insensitive) ueber Title + Content +
Category + Tags. Findet exakte Begriffe — z.B. 'auto' matched 'Stefans Auto'.
Im Gegensatz zu /memory/search (semantic) keine 'klingt aehnlich'-Treffer."""
points = store().search_text(
q, k=k, type_filter=type,
exclude_pinned=not include_pinned,
)
return [MemoryOut.from_point(p) for p in points]
@app.get("/memory/search", response_model=List[MemoryOut])
def memory_search(
q: str,
k: int = 5,
type: Optional[str] = None,
include_pinned: bool = False,
score_threshold: Optional[float] = 0.30,
):
"""Semantische Suche. score_threshold filtert schwache Treffer raus
(Default 0.30 — MiniLM-multilingual liefert <0.25 fuer Rauschen).
Mit score_threshold=0 wird komplett Top-k zurueckgegeben."""
vec = embedder().embed(q)
points = store().search(
vec, k=k, type_filter=type, exclude_pinned=not include_pinned,
score_threshold=score_threshold if score_threshold and score_threshold > 0 else None,
)
return [MemoryOut.from_point(p) for p in points]
@app.post("/memory/save", response_model=MemoryOut)
def memory_save(body: MemoryIn):
s = store()
vec = embedder().embed(body.content)
point = MemoryPoint(
id="",
type=body.type,
title=body.title,
content=body.content,
pinned=body.pinned,
category=body.category,
source=body.source,
tags=body.tags,
conversation_id=body.conversation_id,
attachments=body.attachments or [],
)
pid = s.upsert(point, vec)
saved = s.get(pid)
return MemoryOut.from_point(saved)
@app.patch("/memory/update/{point_id}", response_model=MemoryOut)
def memory_update(point_id: str, body: MemoryUpdate):
s = store()
existing = s.get(point_id)
if not existing:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
content_changed = body.content is not None and body.content != existing.content
if body.title is not None:
existing.title = body.title
if body.content is not None:
existing.content = body.content
if body.pinned is not None:
existing.pinned = body.pinned
if body.category is not None:
existing.category = body.category
if body.tags is not None:
existing.tags = body.tags
vec = embedder().embed(existing.content) if content_changed else None
if vec is None:
# Vektor unveraendert lassen — nur Payload neu schreiben
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.set_payload(
collection_name=COLLECTION,
payload=existing.to_payload() | {"updated_at": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()},
points=[point_id],
)
saved = s.get(point_id)
else:
s.upsert(existing, vec)
saved = s.get(point_id)
return MemoryOut.from_point(saved)
@app.delete("/memory/delete/{point_id}")
def memory_delete(point_id: str):
s = store()
if not s.get(point_id):
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
s.delete(point_id)
# Anhaenge mit-loeschen damit nichts verwaist
try:
import memory_attachments as mem_att
n = mem_att.delete_all(point_id)
if n:
logger.info("Memory %s + %d Anhaenge geloescht", point_id, n)
except Exception as exc:
logger.warning("Anhang-Cleanup fuer %s fehlgeschlagen: %s", point_id, exc)
return {"deleted": point_id}
# ─── Memory-Anhaenge ──────────────────────────────────────────────────
@app.get("/memory/{point_id}/attachments")
def memory_attachments_list(point_id: str):
"""Liste der Anhaenge zum Memory. Source-of-Truth ist das Payload
in der DB, aber wir mergen vorsichtshalber mit dem Filesystem-Stand
(falls ein Upload-Restart zwischendrin schiefging)."""
import memory_attachments as mem_att
s = store()
m = s.get(point_id)
if not m:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
return {"memory_id": point_id, "attachments": mem_att.list_attachments(point_id)}
def _commit_attachment_meta(point_id: str, meta: dict) -> MemoryOut:
"""Shared-Helper: nach FS-Write das Payload um den neuen Anhang updaten.
Duplikat-Name wird ersetzt, sonst hinten dran."""
s = store()
m = s.get(point_id)
if not m:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
atts = [a for a in (m.attachments or []) if a.get("name") != meta["name"]]
atts.append(meta)
m.attachments = atts
from memory.vector_store import COLLECTION
import datetime as _dt
m.updated_at = _dt.datetime.now(_dt.timezone.utc).isoformat()
s.client.set_payload(
collection_name=COLLECTION,
payload=m.to_payload() | {"updated_at": m.updated_at},
points=[point_id],
)
return MemoryOut.from_point(s.get(point_id))
@app.post("/memory/{point_id}/attachments", response_model=MemoryOut)
def memory_attachments_add(point_id: str, body: AttachmentUploadBody):
"""Anhang als Base64 hochladen — fuer Diagnostic + interne Tools.
Fuer grosse Files lieber multipart-Variante (/upload) nutzen,
Base64 sprengt schnell die Bash-ARG_MAX-Grenze beim curl."""
import memory_attachments as mem_att
if not store().get(point_id):
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
try:
meta = mem_att.save_from_base64(point_id, body.name, body.data_base64)
except ValueError as exc:
raise HTTPException(400, str(exc))
return _commit_attachment_meta(point_id, meta)
@app.post("/memory/{point_id}/attachments/upload", response_model=MemoryOut)
async def memory_attachments_upload(point_id: str, file: UploadFile = File(...)):
"""Multipart-Upload — Standard fuer Browser-FormData und curl -F.
Verwendung:
curl -F file=@foto.jpg "$ARIA_BRAIN_URL/memory/<id>/attachments/upload"
"""
import memory_attachments as mem_att
if not store().get(point_id):
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
data = await file.read()
try:
meta = mem_att.save_attachment(point_id, file.filename or "datei", data)
except ValueError as exc:
raise HTTPException(400, str(exc))
return _commit_attachment_meta(point_id, meta)
@app.delete("/memory/{point_id}/attachments/{filename}", response_model=MemoryOut)
def memory_attachments_delete(point_id: str, filename: str):
"""Einzelnen Anhang loeschen (FS + Payload-Eintrag)."""
import memory_attachments as mem_att
s = store()
m = s.get(point_id)
if not m:
raise HTTPException(404, f"Memory {point_id} nicht gefunden")
removed_fs = mem_att.delete_attachment(point_id, filename)
safe = filename # Cleanup synchron mit FS — Payload-Match per name
atts = [a for a in (m.attachments or []) if a.get("name") not in (filename, safe)]
m.attachments = atts
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
import datetime as _dt
m.updated_at = _dt.datetime.now(_dt.timezone.utc).isoformat()
s.client.set_payload(
collection_name=COLLECTION,
payload=m.to_payload() | {"updated_at": m.updated_at},
points=[point_id],
)
if not removed_fs and not atts:
# weder im FS noch im Payload war was — Anhang existierte nicht
raise HTTPException(404, f"Anhang {filename} nicht gefunden")
return MemoryOut.from_point(s.get(point_id))
@app.get("/memory/{point_id}/attachments/{filename}")
def memory_attachments_get(point_id: str, filename: str):
"""Liefert die Bytes eines Anhangs. Diagnostic-Server kann das
durchproxien zur Vorschau/Download in der UI."""
import memory_attachments as mem_att
import mimetypes as _mt
data = mem_att.read_bytes(point_id, filename)
if data is None:
raise HTTPException(404, f"Anhang {filename} nicht gefunden")
mime = _mt.guess_type(filename)[0] or "application/octet-stream"
return Response(content=data, media_type=mime)
# ─── Migration aus brain-import/ ──────────────────────────────────────
IMPORT_DIR = os.environ.get("IMPORT_DIR", "/import")
@app.post("/memory/migrate")
def memory_migrate():
"""Liest /import/*.md und schreibt atomare Memory-Punkte in die DB.
Idempotent: bei Re-Run werden Punkte mit gleicher migration_key ersetzt."""
from pathlib import Path
from migration import run_migration
s = store()
e = embedder()
result = run_migration(Path(IMPORT_DIR), s, e)
return result
@app.get("/memory/import-files")
def memory_import_files():
"""Listet was unter /import/ liegt — fuer die Diagnostic-UI."""
from pathlib import Path
d = Path(IMPORT_DIR)
if not d.exists():
return {"import_dir": str(d), "exists": False, "files": []}
out = []
for p in sorted(d.iterdir()):
if p.is_file():
try:
out.append({"name": p.name, "size": p.stat().st_size})
except Exception:
pass
return {"import_dir": str(d), "exists": True, "files": out}
# ─── Bootstrap-Snapshot ───────────────────────────────────────────────
# "Bootstrap" = alle pinned Memories. Export/Import zum schnellen
# Wiederherstellen einer schlanken ARIA nach Wipe.
@app.get("/memory/export-bootstrap")
def memory_export_bootstrap():
"""Gibt alle pinned Memories als JSON zurueck — fuer Browser-Download."""
s = store()
pinned = s.list_pinned()
return {
"version": 1,
"exported_at": __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(),
"count": len(pinned),
"memories": [
{
"type": p.type,
"title": p.title,
"content": p.content,
"pinned": True,
"category": p.category,
"source": p.source,
"tags": p.tags,
}
for p in pinned
],
}
class BootstrapBundle(BaseModel):
version: int = 1
memories: List[dict]
@app.post("/memory/import-bootstrap")
def memory_import_bootstrap(body: BootstrapBundle):
"""Loescht alle pinned Memories und importiert die im Bundle.
Cold Memory (unpinned) bleibt unangetastet.
Wenn keine Memories im Bundle: nur loeschen ist NICHT erlaubt — der
Caller soll erst exportieren und dann importieren.
"""
if not body.memories:
raise HTTPException(400, "Bundle hat keine memories — Abbruch zur Sicherheit")
s = store()
e = embedder()
# Alle aktuell pinned Punkte loeschen
from qdrant_client.http import models as qm
from memory.vector_store import COLLECTION
s.client.delete(
collection_name=COLLECTION,
points_selector=qm.FilterSelector(filter=qm.Filter(must=[
qm.FieldCondition(key="pinned", match=qm.MatchValue(value=True))
])),
)
# Neue Punkte einspeisen
created = 0
for m in body.memories:
content = (m.get("content") or "").strip()
if not content:
continue
point = MemoryPoint(
id="",
type=m.get("type", "fact"),
title=m.get("title", "(ohne Titel)"),
content=content,
pinned=True,
category=m.get("category", ""),
source=m.get("source", "bootstrap-import"),
tags=list(m.get("tags", [])),
)
vec = e.embed(content)
s.upsert(point, vec)
created += 1
return {"created": created, "deleted_previous_pinned": True}
# ─── Conversation-Loop ──────────────────────────────────────────────
class ChatIn(BaseModel):
message: str
source: str = "" # "app" / "diagnostic" / "stt" — optional
class ChatOut(BaseModel):
reply: str
turns: int
distilling: bool
events: list = Field(default_factory=list)
@app.post("/chat", response_model=ChatOut)
def chat(body: ChatIn, background: BackgroundTasks):
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
im Hintergrund nachdem die Response rausging."""
a = agent()
try:
reply = a.chat(body.message, source=body.source)
except ValueError as exc:
raise HTTPException(400, str(exc))
except RuntimeError as exc:
logger.error("chat fehlgeschlagen: %s", exc)
raise HTTPException(502, str(exc))
needs_distill = a.conversation.needs_distill()
if needs_distill:
background.add_task(a.distill_old_turns)
return ChatOut(
reply=reply,
turns=len(a.conversation.turns),
distilling=needs_distill,
events=a.pop_events(),
)
@app.get("/conversation/stats")
def conversation_stats():
return conversation().stats()
@app.post("/conversation/reset")
def conversation_reset():
"""Hardes Reset — der Rolling-Window-Verlauf wird komplett geleert.
Destillierte facts bleiben in der DB."""
conversation().reset()
return {"ok": True, "turns": 0}
class ConvDeleteBody(BaseModel):
role: str
content: str
ts_iso_hint: Optional[str] = None
@app.post("/conversation/delete-turn")
def conversation_delete_turn(body: ConvDeleteBody):
"""Entfernt einen einzelnen Turn aus dem Rolling-Window + jsonl.
Match per role + content (erstes Vorkommen wenn ts_iso_hint None,
sonst nahester zur Zeit). 404 wenn kein Match.
POST statt DELETE weil FastAPI 0.115 keine Bodys auf DELETE
erlaubt — semantisch trotzdem eine Loeschung."""
ok = conversation().remove_by_match(
role=body.role, content=body.content, ts_iso_hint=body.ts_iso_hint,
)
if not ok:
raise HTTPException(404, "Turn mit diesem role+content nicht gefunden")
return {"ok": True, "turns": len(conversation().turns)}
@app.post("/conversation/distill")
def conversation_distill_now():
"""Manueller Trigger fuer Destillat — fuer Tests oder vor einem
bewussten Reset."""
return agent().distill_old_turns()
# ─── Call-Metrics (Token / Quota-Monitoring) ────────────────────────
@app.get("/metrics/calls")
def metrics_calls():
"""Liefert Aggregate fuer 1h / 5h / 24h / 30d.
Jedes Window: {window_seconds, calls, tokens_in, tokens_out, by_model}."""
return metrics_mod.stats()
# ─── Triggers (passive Aufweck-Quellen) ─────────────────────────────
class TriggerTimerBody(BaseModel):
name: str
fires_at: str # ISO timestamp
message: str
author: str = "stefan"
class TriggerWatcherBody(BaseModel):
name: str
condition: str
message: str
check_interval_sec: int = 300
throttle_sec: int = 3600
author: str = "stefan"
class TriggerPatch(BaseModel):
active: bool | None = None
message: str | None = None
condition: str | None = None
throttle_sec: int | None = None
check_interval_sec: int | None = None
fires_at: str | None = None
@app.get("/triggers/list")
def triggers_list(active_only: bool = False):
return {"triggers": triggers_mod.list_triggers(active_only=active_only)}
@app.get("/triggers/conditions")
def triggers_conditions():
"""Verfuegbare Variablen + Funktionen fuer Watcher-Conditions
(mit aktuellen Werten)."""
current = watcher_mod.collect_variables()
# near() ist ein callable in vars_ — fuer die UI rausfiltern
serializable = {k: v for k, v in current.items() if not callable(v)}
return {
"variables": watcher_mod.describe_variables(),
"functions": watcher_mod.describe_functions(),
"current": serializable,
}
@app.get("/triggers/{name}")
def triggers_get(name: str):
t = triggers_mod.read(name)
if t is None:
raise HTTPException(404, f"Trigger '{name}' nicht gefunden")
return t
@app.get("/triggers/{name}/logs")
def triggers_get_logs(name: str, limit: int = 50):
return {"logs": triggers_mod.list_logs(name, limit=limit)}
@app.post("/triggers/timer")
def triggers_create_timer(body: TriggerTimerBody):
try:
return triggers_mod.create_timer(
name=body.name, fires_at_iso=body.fires_at,
message=body.message, author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/triggers/watcher")
def triggers_create_watcher(body: TriggerWatcherBody):
try:
return triggers_mod.create_watcher(
name=body.name, condition=body.condition,
message=body.message,
check_interval_sec=body.check_interval_sec,
throttle_sec=body.throttle_sec,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/triggers/{name}")
def triggers_patch(name: str, body: TriggerPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return triggers_mod.update(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/triggers/{name}")
def triggers_delete(name: str):
try:
triggers_mod.delete(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
# ─── Skills ─────────────────────────────────────────────────────────
class SkillCreate(BaseModel):
name: str
description: str
execution: str # local-venv | local-bin | bash
entry_code: str
readme: str = ""
args: list = Field(default_factory=list)
requires: dict = Field(default_factory=dict)
pip_packages: list = Field(default_factory=list)
author: str = "stefan"
class SkillRun(BaseModel):
name: str
args: dict = Field(default_factory=dict)
timeout_sec: int = 300
class SkillPatch(BaseModel):
description: str | None = None
active: bool | None = None
args: list | None = None
@app.get("/skills/list")
def skills_list(active_only: bool = False):
return {"skills": skills_mod.list_skills(active_only=active_only)}
@app.get("/skills/{name}")
def skills_get(name: str):
m = skills_mod.read_manifest(name)
if m is None:
raise HTTPException(404, f"Skill '{name}' nicht gefunden")
readme = skills_mod.read_readme(name)
return {"manifest": m, "readme": readme}
@app.post("/skills/create")
def skills_create(body: SkillCreate):
try:
return skills_mod.create_skill(
name=body.name,
description=body.description,
execution=body.execution,
entry_code=body.entry_code,
readme=body.readme,
args=body.args,
requires=body.requires,
pip_packages=body.pip_packages,
author=body.author,
)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.post("/skills/run")
def skills_run(body: SkillRun):
try:
return skills_mod.run_skill(body.name, args=body.args, timeout_sec=body.timeout_sec)
except ValueError as exc:
raise HTTPException(400, str(exc))
@app.patch("/skills/{name}")
def skills_patch(name: str, body: SkillPatch):
patch = {k: v for k, v in body.model_dump().items() if v is not None}
try:
return skills_mod.update_skill(name, patch)
except ValueError as exc:
raise HTTPException(404, str(exc))
@app.delete("/skills/{name}")
def skills_delete(name: str):
try:
skills_mod.delete_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return {"deleted": name}
@app.get("/skills/{name}/logs")
def skills_logs(name: str, limit: int = 50):
return {"logs": skills_mod.list_logs(name, limit=limit)}
@app.get("/skills/{name}/export")
def skills_export(name: str):
try:
data = skills_mod.export_skill(name)
except ValueError as exc:
raise HTTPException(404, str(exc))
return Response(
content=data,
media_type="application/gzip",
headers={"Content-Disposition": f'attachment; filename="skill-{name}.tar.gz"'},
)
@app.post("/skills/import")
async def skills_import(request: Request, overwrite: bool = False):
data = await request.body()
if not data:
raise HTTPException(400, "Leerer Body")
try:
manifest = skills_mod.import_skill(data, overwrite=overwrite)
except ValueError as exc:
raise HTTPException(400, str(exc))
return {"imported": manifest}