""" Skill-Manager — Filesystem-Layer fuer ARIAs Faehigkeiten. Layout: /data/skills// skill.json - Manifest README.md - Beschreibung (vom Stil her: was, wann, wie aufrufen) run.sh - Entry-Point (sh, python -m, was auch immer) requirements.txt - optional, fuer local-venv venv/ - automatisch erzeugt bei local-venv bin/ - statische Binaries (fuer local-bin) logs/ - .json Run-Logs (append-only pro Run) Manifest (skill.json): { "name": "youtube2mp3", "description": "Konvertiert YouTube-Video-URL zu MP3", "execution": "local-venv" | "local-bin" | "bash", "entry": "run.sh", "args": [{"name": "url", "required": true}, ...], "requires": {"pip": [...], "binaries": [...]}, "active": true, "created_at": "ISO", "updated_at": "ISO", "last_used": null | "ISO", "use_count": 0, "version": "1.0", "author": "aria" | "stefan" } """ from __future__ import annotations import json import logging import os import re import shutil import subprocess import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) SKILLS_DIR = Path(os.environ.get("SKILLS_DIR", "/data/skills")) SHARED_UPLOADS = Path("/shared/uploads") VALID_EXECUTIONS = {"local-venv", "local-bin", "bash"} NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$") # Anti-Skill-Friedhof: ARIAs Lieblings-Suffixe wenn sie statt updaten neu baut VERSION_SUFFIX_RE = re.compile(r"(?:[-_]v\d+|[-_](?:new|fixed|old|alt|copy|final|clean))$", re.I) def _now() -> str: return datetime.now(timezone.utc).isoformat() def _safe_name(name: str) -> str: if not isinstance(name, str) or not NAME_RE.match(name): raise ValueError(f"Ungültiger Skill-Name: {name!r}") return name def _skill_dir(name: str) -> Path: return SKILLS_DIR / _safe_name(name) def _check_anti_graveyard(name: str) -> None: """Verhindert klassische Skill-Friedhof-Patterns beim Anlegen. Hard-Reject auf: 1. Versions-Suffixe (`-v2`, `_v3`, `-new`, `-fixed`, …) im Namen 2. Prefix-Kollision mit existierendem Skill (z.B. `spotify` existiert, jemand will `spotify-aria` oder `spotify-ctl` anlegen) """ if VERSION_SUFFIX_RE.search(name): raise ValueError( f"Skill-Name '{name}' enthaelt einen Versions-Suffix " f"(-v2 / _v3 / -new / -fixed / -old / -alt / -copy / -final / -clean). " f"Skills werden intern versioniert (skill_rollback). " f"Waehle einen klaren Namen ohne Suffix oder nutze skill_update auf " f"den bestehenden Skill." ) if not SKILLS_DIR.exists(): return existing = [p.name for p in SKILLS_DIR.iterdir() if p.is_dir()] for ex in existing: if ex == name: continue # wird spaeter mit "existiert bereits" abgefangen # neuer Name verlaengert existierenden Stem: 'spotify' da, neu 'spotify-aria' if name.startswith(ex + "-") or name.startswith(ex + "_"): raise ValueError( f"Skill-Name '{name}' kollidiert mit existierendem '{ex}'. " f"Wenn Du '{ex}' verbessern willst: skill_update auf '{ex}'. " f"Wenn es wirklich was anderes ist: waehle einen Namen ohne den " f"Praefix '{ex}-' / '{ex}_'." ) # neuer Name ist Kurzform eines existierenden: 'spotify-aria' da, neu 'spotify' if ex.startswith(name + "-") or ex.startswith(name + "_"): raise ValueError( f"Es existiert bereits '{ex}' mit Praefix '{name}'. Pruefe ob '{ex}' " f"das schon kann; wenn ja: skill_update auf '{ex}' oder Skill umbenennen." ) # ─── Listing ──────────────────────────────────────────────────────── def list_skills(active_only: bool = False) -> list[dict]: out: list[dict] = [] if not SKILLS_DIR.exists(): return out for entry in sorted(SKILLS_DIR.iterdir()): if not entry.is_dir(): continue manifest = read_manifest(entry.name) if manifest is None: continue if active_only and not manifest.get("active", True): continue out.append(manifest) return out def read_manifest(name: str) -> Optional[dict]: try: path = _skill_dir(name) / "skill.json" if not path.exists(): return None return json.loads(path.read_text(encoding="utf-8")) except Exception as exc: logger.warning("Manifest lesen %s: %s", name, exc) return None def write_manifest(name: str, manifest: dict) -> None: d = _skill_dir(name) d.mkdir(parents=True, exist_ok=True) manifest["updated_at"] = _now() (d / "skill.json").write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") def read_readme(name: str) -> str: path = _skill_dir(name) / "README.md" return path.read_text(encoding="utf-8") if path.exists() else "" # ─── Create / Update / Delete ──────────────────────────────────────── def create_skill( name: str, description: str, execution: str, entry_code: str, readme: str = "", args: Optional[list] = None, requires: Optional[dict] = None, pip_packages: Optional[list[str]] = None, author: str = "aria", ) -> dict: """Legt einen neuen Skill an. Wirft ValueError bei ungueltigen Inputs. entry_code wird je nach execution in run.sh oder run.py geschrieben. Bei local-venv wird sofort eine venv erzeugt + pip_packages installiert. """ name = _safe_name(name) if execution not in VALID_EXECUTIONS: raise ValueError(f"execution muss eines von {VALID_EXECUTIONS} sein") _check_anti_graveyard(name) d = _skill_dir(name) if d.exists(): raise ValueError(f"Skill '{name}' existiert bereits — erst loeschen oder updaten") d.mkdir(parents=True) (d / "logs").mkdir() # Entry-File: run.sh oder run.py if execution == "local-venv": entry_path = d / "run.py" entry_path.write_text(entry_code, encoding="utf-8") entry_name = "run.py" (d / "requirements.txt").write_text("\n".join(pip_packages or []) + "\n", encoding="utf-8") else: entry_path = d / "run.sh" # Shebang ergaenzen wenn nicht da content = entry_code if entry_code.startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + entry_code entry_path.write_text(content, encoding="utf-8") entry_path.chmod(0o755) entry_name = "run.sh" # README (d / "README.md").write_text(readme or f"# {name}\n\n{description}\n", encoding="utf-8") manifest = { "name": name, "description": description, "execution": execution, "entry": entry_name, "args": args or [], "requires": requires or {}, "active": True, "created_at": _now(), "updated_at": _now(), "last_used": None, "use_count": 0, "version": "1.0", "author": author, } write_manifest(name, manifest) # venv aufbauen bei local-venv if execution == "local-venv": try: _setup_venv(d, pip_packages or []) except Exception as exc: # venv-Aufbau fehlgeschlagen → Skill steht trotzdem im Repo, aber inaktiv manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) logger.warning("Skill %s: venv-Setup fehlgeschlagen → deaktiviert: %s", name, exc) logger.info("Skill erstellt: %s (%s)", name, execution) return manifest def _setup_venv(skill_dir: Path, pip_packages: list[str]) -> None: venv = skill_dir / "venv" logger.info("venv erstellen: %s", venv) subprocess.run(["python", "-m", "venv", str(venv)], check=True, timeout=120) pip = venv / "bin" / "pip" if pip_packages: subprocess.run([str(pip), "install", "--no-cache-dir", *pip_packages], check=True, timeout=600) def update_skill(name: str, patch: dict) -> dict: """Aktualisiert einen bestehenden Skill. Manifest-Felder ueber den `allowed`-Filter, Code-Aenderungen ueber dedizierte Keys: - `entry_code` (str) → ueberschreibt run.py / run.sh - `readme` (str) → ueberschreibt README.md - `pip_packages` (list) → ueberschreibt requirements.txt + venv-Rebuild (nur bei local-venv) """ manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") d = _skill_dir(name) allowed = {"description", "args", "requires", "active", "version", "entry"} for k, v in patch.items(): if k in allowed: manifest[k] = v # Code austauschen if "entry_code" in patch and patch["entry_code"]: execution = manifest.get("execution", "local-venv") if execution == "local-venv": entry_path = d / "run.py" entry_path.write_text(patch["entry_code"], encoding="utf-8") else: entry_path = d / "run.sh" content = patch["entry_code"] if patch["entry_code"].startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + patch["entry_code"] entry_path.write_text(content, encoding="utf-8") entry_path.chmod(0o755) # README austauschen if "readme" in patch and patch["readme"] is not None: (d / "README.md").write_text(patch["readme"], encoding="utf-8") # pip_packages geaendert → requirements.txt + venv neu aufbauen if "pip_packages" in patch and manifest.get("execution") == "local-venv": pip_packages = patch["pip_packages"] or [] (d / "requirements.txt").write_text("\n".join(pip_packages) + "\n", encoding="utf-8") # venv loeschen + neu aufbauen, damit alte Pakete weg sind venv = d / "venv" if venv.exists(): shutil.rmtree(venv, ignore_errors=True) try: _setup_venv(d, pip_packages) # Falls vorher wegen Setup-Error deaktiviert war: jetzt frei manifest.pop("setup_error", None) manifest["active"] = patch.get("active", True) except Exception as exc: manifest["active"] = False manifest["setup_error"] = str(exc)[:500] logger.warning("Skill %s: venv-Rebuild fehlgeschlagen: %s", name, exc) write_manifest(name, manifest) logger.info("Skill aktualisiert: %s (keys=%s)", name, sorted(patch.keys())) return manifest def delete_skill(name: str) -> None: d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") shutil.rmtree(d) logger.info("Skill geloescht: %s", name) # ─── Run ──────────────────────────────────────────────────────────── def run_skill(name: str, args: Optional[dict] = None, timeout_sec: int = 300) -> dict: """Fuehrt einen Skill aus. Args werden als ENV-Vars uebergeben (Praefix ARG_, z.B. ARG_URL fuer args["url"]). Returns: {ok, exit_code, stdout, stderr, duration_sec, log_path} """ manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") if not manifest.get("active", True): raise ValueError(f"Skill '{name}' ist deaktiviert") d = _skill_dir(name) entry = manifest.get("entry", "run.sh") exec_mode = manifest.get("execution", "bash") env = os.environ.copy() # Skill-Args als ENV-Vars for k, v in (args or {}).items(): if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", k): continue env[f"ARG_{k.upper()}"] = str(v) env["SKILL_DIR"] = str(d) env["SHARED_UPLOADS"] = str(SHARED_UPLOADS) # Brain-API fuer Skills die OAuth-Tokens / Brain-Helpers brauchen. # Beispiel: requests.get(f"{os.environ['BRAIN_INTERNAL_URL']}/oauth/spotify/token") env["BRAIN_INTERNAL_URL"] = os.environ.get("BRAIN_INTERNAL_URL", "http://localhost:8080") # Command bauen if exec_mode == "local-venv": python = d / "venv" / "bin" / "python" cmd = [str(python), str(d / entry)] elif exec_mode == "local-bin": # Skill bringt seine bin/ mit — wir prepended sie an den PATH env["PATH"] = f"{d / 'bin'}:{env.get('PATH', '')}" cmd = [str(d / entry)] else: # bash cmd = [str(d / entry)] log_id = f"{int(time.time())}-{uuid.uuid4().hex[:8]}" log_path = d / "logs" / f"{log_id}.json" t0 = time.time() try: proc = subprocess.run( cmd, env=env, cwd=str(d), capture_output=True, text=True, timeout=timeout_sec, ) out_text = proc.stdout err_text = proc.stderr exit_code = proc.returncode timed_out = False except subprocess.TimeoutExpired as exc: out_text = exc.stdout or "" err_text = (exc.stderr or "") + f"\n[TIMEOUT {timeout_sec}s]" exit_code = -1 timed_out = True duration = time.time() - t0 # Log schreiben (gekuerzt damit es nicht explodiert) record = { "ts": _now(), "args": args or {}, "exit_code": exit_code, "duration_sec": round(duration, 2), "stdout": (out_text or "")[:8000], "stderr": (err_text or "")[:8000], "timed_out": timed_out, } try: log_path.write_text(json.dumps(record, indent=2, ensure_ascii=False), encoding="utf-8") except Exception: pass # Stats updaten manifest["last_used"] = _now() manifest["use_count"] = int(manifest.get("use_count", 0)) + 1 write_manifest(name, manifest) record["ok"] = exit_code == 0 record["log_path"] = str(log_path) return record def list_logs(name: str, limit: int = 50) -> list[dict]: d = _skill_dir(name) / "logs" if not d.exists(): return [] files = sorted(d.glob("*.json"), reverse=True)[:limit] out: list[dict] = [] for f in files: try: data = json.loads(f.read_text(encoding="utf-8")) data["log_id"] = f.stem out.append(data) except Exception: continue return out # ─── Export / Import ──────────────────────────────────────────────── def export_skill(name: str) -> bytes: """Packt einen Skill als tar.gz und gibt die Bytes zurueck. venv und logs werden ausgeschlossen (werden beim Import neu gebaut).""" import io import tarfile d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tar: for path in d.iterdir(): if path.name in ("venv", "logs", "__pycache__"): continue tar.add(path, arcname=f"{name}/{path.name}") return buf.getvalue() def import_skill(tar_bytes: bytes, overwrite: bool = False) -> dict: """Importiert einen Skill aus tar.gz. Liefert das Manifest zurueck.""" import io import tarfile SKILLS_DIR.mkdir(parents=True, exist_ok=True) with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r:gz") as tar: # Erst Root-Name finden (= Skill-Name) members = tar.getmembers() if not members: raise ValueError("Leeres Archiv") root = members[0].name.split("/", 1)[0] name = _safe_name(root) d = _skill_dir(name) if d.exists(): if not overwrite: raise ValueError(f"Skill '{name}' existiert bereits — overwrite=true setzen") shutil.rmtree(d) # Extrahieren — Path-Traversal verhindern for m in members: target = SKILLS_DIR / m.name if not str(target.resolve()).startswith(str(SKILLS_DIR.resolve())): raise ValueError(f"Unsicherer Pfad im Archiv: {m.name}") tar.extractall(SKILLS_DIR) # logs-Verzeichnis anlegen falls fehlte (d / "logs").mkdir(exist_ok=True) # venv neu bauen falls local-venv manifest = read_manifest(name) or {} if manifest.get("execution") == "local-venv": req_file = d / "requirements.txt" pip_packages: list[str] = [] if req_file.exists(): pip_packages = [l.strip() for l in req_file.read_text().splitlines() if l.strip() and not l.startswith("#")] try: _setup_venv(d, pip_packages) except Exception as exc: logger.warning("Skill-Import %s: venv-Setup fehlgeschlagen: %s", name, exc) manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) return manifest