Files
ARIA-AGENT/aria-brain/skills.py
T
duffyduck 32302a841e feat(brain): Skills holen OAuth-Tokens vom Brain + Anti-Friedhof-Check
P1+P2-Infrastruktur:

- Neuer Endpoint GET /oauth/{service}/token liefert aktuelles access_token
  mit Auto-Refresh (< 60s Restzeit). Skills rufen das ueber
  BRAIN_INTERNAL_URL ab statt client_secret hardzucoden.
- run_skill setzt BRAIN_INTERNAL_URL als ENV (Default http://localhost:8080,
  override via Brain-Env). Skills laufen im Brain-Container, localhost passt.
- skills.create_skill: _check_anti_graveyard rejected Versions-Suffixe
  (-v2, _v3, -new, -fixed, -old, -alt, -copy, -final, -clean) und
  Prefix-Kollisionen (z.B. spotify-aria wenn spotify schon existiert) — die
  zwei Patterns hinter dem alten Skill-Friedhof.

Tool-Description fuer skill_create um PFLICHT-VORHER-Block ergaenzt
(skill_list, kein Versionssuffix, oauth_get_token, config_schema) damit
ARIA die Regeln direkt im Schema sieht.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 23:04:22 +02:00

463 lines
17 KiB
Python

"""
Skill-Manager — Filesystem-Layer fuer ARIAs Faehigkeiten.
Layout:
/data/skills/<name>/
skill.json - Manifest
README.md - Beschreibung (vom Stil her: was, wann, wie aufrufen)
run.sh - Entry-Point (sh, python -m, was auch immer)
requirements.txt - optional, fuer local-venv
venv/ - automatisch erzeugt bei local-venv
bin/ - statische Binaries (fuer local-bin)
logs/ - <ts>.json Run-Logs (append-only pro Run)
Manifest (skill.json):
{
"name": "youtube2mp3",
"description": "Konvertiert YouTube-Video-URL zu MP3",
"execution": "local-venv" | "local-bin" | "bash",
"entry": "run.sh",
"args": [{"name": "url", "required": true}, ...],
"requires": {"pip": [...], "binaries": [...]},
"active": true,
"created_at": "ISO",
"updated_at": "ISO",
"last_used": null | "ISO",
"use_count": 0,
"version": "1.0",
"author": "aria" | "stefan"
}
"""
from __future__ import annotations
import json
import logging
import os
import re
import shutil
import subprocess
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
SKILLS_DIR = Path(os.environ.get("SKILLS_DIR", "/data/skills"))
SHARED_UPLOADS = Path("/shared/uploads")
VALID_EXECUTIONS = {"local-venv", "local-bin", "bash"}
NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$")
# Anti-Skill-Friedhof: ARIAs Lieblings-Suffixe wenn sie statt updaten neu baut
VERSION_SUFFIX_RE = re.compile(r"(?:[-_]v\d+|[-_](?:new|fixed|old|alt|copy|final|clean))$", re.I)
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _safe_name(name: str) -> str:
if not isinstance(name, str) or not NAME_RE.match(name):
raise ValueError(f"Ungültiger Skill-Name: {name!r}")
return name
def _skill_dir(name: str) -> Path:
return SKILLS_DIR / _safe_name(name)
def _check_anti_graveyard(name: str) -> None:
"""Verhindert klassische Skill-Friedhof-Patterns beim Anlegen.
Hard-Reject auf:
1. Versions-Suffixe (`-v2`, `_v3`, `-new`, `-fixed`, …) im Namen
2. Prefix-Kollision mit existierendem Skill (z.B. `spotify` existiert,
jemand will `spotify-aria` oder `spotify-ctl` anlegen)
"""
if VERSION_SUFFIX_RE.search(name):
raise ValueError(
f"Skill-Name '{name}' enthaelt einen Versions-Suffix "
f"(-v2 / _v3 / -new / -fixed / -old / -alt / -copy / -final / -clean). "
f"Skills werden intern versioniert (skill_rollback). "
f"Waehle einen klaren Namen ohne Suffix oder nutze skill_update auf "
f"den bestehenden Skill."
)
if not SKILLS_DIR.exists():
return
existing = [p.name for p in SKILLS_DIR.iterdir() if p.is_dir()]
for ex in existing:
if ex == name:
continue # wird spaeter mit "existiert bereits" abgefangen
# neuer Name verlaengert existierenden Stem: 'spotify' da, neu 'spotify-aria'
if name.startswith(ex + "-") or name.startswith(ex + "_"):
raise ValueError(
f"Skill-Name '{name}' kollidiert mit existierendem '{ex}'. "
f"Wenn Du '{ex}' verbessern willst: skill_update auf '{ex}'. "
f"Wenn es wirklich was anderes ist: waehle einen Namen ohne den "
f"Praefix '{ex}-' / '{ex}_'."
)
# neuer Name ist Kurzform eines existierenden: 'spotify-aria' da, neu 'spotify'
if ex.startswith(name + "-") or ex.startswith(name + "_"):
raise ValueError(
f"Es existiert bereits '{ex}' mit Praefix '{name}'. Pruefe ob '{ex}' "
f"das schon kann; wenn ja: skill_update auf '{ex}' oder Skill umbenennen."
)
# ─── Listing ────────────────────────────────────────────────────────
def list_skills(active_only: bool = False) -> list[dict]:
out: list[dict] = []
if not SKILLS_DIR.exists():
return out
for entry in sorted(SKILLS_DIR.iterdir()):
if not entry.is_dir():
continue
manifest = read_manifest(entry.name)
if manifest is None:
continue
if active_only and not manifest.get("active", True):
continue
out.append(manifest)
return out
def read_manifest(name: str) -> Optional[dict]:
try:
path = _skill_dir(name) / "skill.json"
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("Manifest lesen %s: %s", name, exc)
return None
def write_manifest(name: str, manifest: dict) -> None:
d = _skill_dir(name)
d.mkdir(parents=True, exist_ok=True)
manifest["updated_at"] = _now()
(d / "skill.json").write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8")
def read_readme(name: str) -> str:
path = _skill_dir(name) / "README.md"
return path.read_text(encoding="utf-8") if path.exists() else ""
# ─── Create / Update / Delete ────────────────────────────────────────
def create_skill(
name: str,
description: str,
execution: str,
entry_code: str,
readme: str = "",
args: Optional[list] = None,
requires: Optional[dict] = None,
pip_packages: Optional[list[str]] = None,
author: str = "aria",
) -> dict:
"""Legt einen neuen Skill an. Wirft ValueError bei ungueltigen Inputs.
entry_code wird je nach execution in run.sh oder run.py geschrieben.
Bei local-venv wird sofort eine venv erzeugt + pip_packages installiert.
"""
name = _safe_name(name)
if execution not in VALID_EXECUTIONS:
raise ValueError(f"execution muss eines von {VALID_EXECUTIONS} sein")
_check_anti_graveyard(name)
d = _skill_dir(name)
if d.exists():
raise ValueError(f"Skill '{name}' existiert bereits — erst loeschen oder updaten")
d.mkdir(parents=True)
(d / "logs").mkdir()
# Entry-File: run.sh oder run.py
if execution == "local-venv":
entry_path = d / "run.py"
entry_path.write_text(entry_code, encoding="utf-8")
entry_name = "run.py"
(d / "requirements.txt").write_text("\n".join(pip_packages or []) + "\n", encoding="utf-8")
else:
entry_path = d / "run.sh"
# Shebang ergaenzen wenn nicht da
content = entry_code if entry_code.startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + entry_code
entry_path.write_text(content, encoding="utf-8")
entry_path.chmod(0o755)
entry_name = "run.sh"
# README
(d / "README.md").write_text(readme or f"# {name}\n\n{description}\n", encoding="utf-8")
manifest = {
"name": name,
"description": description,
"execution": execution,
"entry": entry_name,
"args": args or [],
"requires": requires or {},
"active": True,
"created_at": _now(),
"updated_at": _now(),
"last_used": None,
"use_count": 0,
"version": "1.0",
"author": author,
}
write_manifest(name, manifest)
# venv aufbauen bei local-venv
if execution == "local-venv":
try:
_setup_venv(d, pip_packages or [])
except Exception as exc:
# venv-Aufbau fehlgeschlagen → Skill steht trotzdem im Repo, aber inaktiv
manifest["active"] = False
manifest["setup_error"] = str(exc)[:500]
write_manifest(name, manifest)
logger.warning("Skill %s: venv-Setup fehlgeschlagen → deaktiviert: %s", name, exc)
logger.info("Skill erstellt: %s (%s)", name, execution)
return manifest
def _setup_venv(skill_dir: Path, pip_packages: list[str]) -> None:
venv = skill_dir / "venv"
logger.info("venv erstellen: %s", venv)
subprocess.run(["python", "-m", "venv", str(venv)], check=True, timeout=120)
pip = venv / "bin" / "pip"
if pip_packages:
subprocess.run([str(pip), "install", "--no-cache-dir", *pip_packages], check=True, timeout=600)
def update_skill(name: str, patch: dict) -> dict:
"""Aktualisiert einen bestehenden Skill. Manifest-Felder ueber den
`allowed`-Filter, Code-Aenderungen ueber dedizierte Keys:
- `entry_code` (str) → ueberschreibt run.py / run.sh
- `readme` (str) → ueberschreibt README.md
- `pip_packages` (list) → ueberschreibt requirements.txt + venv-Rebuild
(nur bei local-venv)
"""
manifest = read_manifest(name)
if manifest is None:
raise ValueError(f"Skill '{name}' nicht gefunden")
d = _skill_dir(name)
allowed = {"description", "args", "requires", "active", "version", "entry"}
for k, v in patch.items():
if k in allowed:
manifest[k] = v
# Code austauschen
if "entry_code" in patch and patch["entry_code"]:
execution = manifest.get("execution", "local-venv")
if execution == "local-venv":
entry_path = d / "run.py"
entry_path.write_text(patch["entry_code"], encoding="utf-8")
else:
entry_path = d / "run.sh"
content = patch["entry_code"] if patch["entry_code"].startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + patch["entry_code"]
entry_path.write_text(content, encoding="utf-8")
entry_path.chmod(0o755)
# README austauschen
if "readme" in patch and patch["readme"] is not None:
(d / "README.md").write_text(patch["readme"], encoding="utf-8")
# pip_packages geaendert → requirements.txt + venv neu aufbauen
if "pip_packages" in patch and manifest.get("execution") == "local-venv":
pip_packages = patch["pip_packages"] or []
(d / "requirements.txt").write_text("\n".join(pip_packages) + "\n", encoding="utf-8")
# venv loeschen + neu aufbauen, damit alte Pakete weg sind
venv = d / "venv"
if venv.exists():
shutil.rmtree(venv, ignore_errors=True)
try:
_setup_venv(d, pip_packages)
# Falls vorher wegen Setup-Error deaktiviert war: jetzt frei
manifest.pop("setup_error", None)
manifest["active"] = patch.get("active", True)
except Exception as exc:
manifest["active"] = False
manifest["setup_error"] = str(exc)[:500]
logger.warning("Skill %s: venv-Rebuild fehlgeschlagen: %s", name, exc)
write_manifest(name, manifest)
logger.info("Skill aktualisiert: %s (keys=%s)", name, sorted(patch.keys()))
return manifest
def delete_skill(name: str) -> None:
d = _skill_dir(name)
if not d.exists():
raise ValueError(f"Skill '{name}' nicht gefunden")
shutil.rmtree(d)
logger.info("Skill geloescht: %s", name)
# ─── Run ────────────────────────────────────────────────────────────
def run_skill(name: str, args: Optional[dict] = None, timeout_sec: int = 300) -> dict:
"""Fuehrt einen Skill aus. Args werden als ENV-Vars uebergeben
(Praefix ARG_, z.B. ARG_URL fuer args["url"]).
Returns: {ok, exit_code, stdout, stderr, duration_sec, log_path}
"""
manifest = read_manifest(name)
if manifest is None:
raise ValueError(f"Skill '{name}' nicht gefunden")
if not manifest.get("active", True):
raise ValueError(f"Skill '{name}' ist deaktiviert")
d = _skill_dir(name)
entry = manifest.get("entry", "run.sh")
exec_mode = manifest.get("execution", "bash")
env = os.environ.copy()
# Skill-Args als ENV-Vars
for k, v in (args or {}).items():
if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", k):
continue
env[f"ARG_{k.upper()}"] = str(v)
env["SKILL_DIR"] = str(d)
env["SHARED_UPLOADS"] = str(SHARED_UPLOADS)
# Brain-API fuer Skills die OAuth-Tokens / Brain-Helpers brauchen.
# Beispiel: requests.get(f"{os.environ['BRAIN_INTERNAL_URL']}/oauth/spotify/token")
env["BRAIN_INTERNAL_URL"] = os.environ.get("BRAIN_INTERNAL_URL", "http://localhost:8080")
# Command bauen
if exec_mode == "local-venv":
python = d / "venv" / "bin" / "python"
cmd = [str(python), str(d / entry)]
elif exec_mode == "local-bin":
# Skill bringt seine bin/ mit — wir prepended sie an den PATH
env["PATH"] = f"{d / 'bin'}:{env.get('PATH', '')}"
cmd = [str(d / entry)]
else: # bash
cmd = [str(d / entry)]
log_id = f"{int(time.time())}-{uuid.uuid4().hex[:8]}"
log_path = d / "logs" / f"{log_id}.json"
t0 = time.time()
try:
proc = subprocess.run(
cmd, env=env, cwd=str(d),
capture_output=True, text=True, timeout=timeout_sec,
)
out_text = proc.stdout
err_text = proc.stderr
exit_code = proc.returncode
timed_out = False
except subprocess.TimeoutExpired as exc:
out_text = exc.stdout or ""
err_text = (exc.stderr or "") + f"\n[TIMEOUT {timeout_sec}s]"
exit_code = -1
timed_out = True
duration = time.time() - t0
# Log schreiben (gekuerzt damit es nicht explodiert)
record = {
"ts": _now(),
"args": args or {},
"exit_code": exit_code,
"duration_sec": round(duration, 2),
"stdout": (out_text or "")[:8000],
"stderr": (err_text or "")[:8000],
"timed_out": timed_out,
}
try:
log_path.write_text(json.dumps(record, indent=2, ensure_ascii=False), encoding="utf-8")
except Exception:
pass
# Stats updaten
manifest["last_used"] = _now()
manifest["use_count"] = int(manifest.get("use_count", 0)) + 1
write_manifest(name, manifest)
record["ok"] = exit_code == 0
record["log_path"] = str(log_path)
return record
def list_logs(name: str, limit: int = 50) -> list[dict]:
d = _skill_dir(name) / "logs"
if not d.exists():
return []
files = sorted(d.glob("*.json"), reverse=True)[:limit]
out: list[dict] = []
for f in files:
try:
data = json.loads(f.read_text(encoding="utf-8"))
data["log_id"] = f.stem
out.append(data)
except Exception:
continue
return out
# ─── Export / Import ────────────────────────────────────────────────
def export_skill(name: str) -> bytes:
"""Packt einen Skill als tar.gz und gibt die Bytes zurueck.
venv und logs werden ausgeschlossen (werden beim Import neu gebaut)."""
import io
import tarfile
d = _skill_dir(name)
if not d.exists():
raise ValueError(f"Skill '{name}' nicht gefunden")
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for path in d.iterdir():
if path.name in ("venv", "logs", "__pycache__"):
continue
tar.add(path, arcname=f"{name}/{path.name}")
return buf.getvalue()
def import_skill(tar_bytes: bytes, overwrite: bool = False) -> dict:
"""Importiert einen Skill aus tar.gz. Liefert das Manifest zurueck."""
import io
import tarfile
SKILLS_DIR.mkdir(parents=True, exist_ok=True)
with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r:gz") as tar:
# Erst Root-Name finden (= Skill-Name)
members = tar.getmembers()
if not members:
raise ValueError("Leeres Archiv")
root = members[0].name.split("/", 1)[0]
name = _safe_name(root)
d = _skill_dir(name)
if d.exists():
if not overwrite:
raise ValueError(f"Skill '{name}' existiert bereits — overwrite=true setzen")
shutil.rmtree(d)
# Extrahieren — Path-Traversal verhindern
for m in members:
target = SKILLS_DIR / m.name
if not str(target.resolve()).startswith(str(SKILLS_DIR.resolve())):
raise ValueError(f"Unsicherer Pfad im Archiv: {m.name}")
tar.extractall(SKILLS_DIR)
# logs-Verzeichnis anlegen falls fehlte
(d / "logs").mkdir(exist_ok=True)
# venv neu bauen falls local-venv
manifest = read_manifest(name) or {}
if manifest.get("execution") == "local-venv":
req_file = d / "requirements.txt"
pip_packages: list[str] = []
if req_file.exists():
pip_packages = [l.strip() for l in req_file.read_text().splitlines() if l.strip() and not l.startswith("#")]
try:
_setup_venv(d, pip_packages)
except Exception as exc:
logger.warning("Skill-Import %s: venv-Setup fehlgeschlagen: %s", name, exc)
manifest["active"] = False
manifest["setup_error"] = str(exc)[:500]
write_manifest(name, manifest)
return manifest