""" Skill-Manager — Filesystem-Layer fuer ARIAs Faehigkeiten. Layout: /data/skills// skill.json - Manifest README.md - Beschreibung (vom Stil her: was, wann, wie aufrufen) run.sh - Entry-Point (sh, python -m, was auch immer) requirements.txt - optional, fuer local-venv venv/ - automatisch erzeugt bei local-venv bin/ - statische Binaries (fuer local-bin) logs/ - .json Run-Logs (append-only pro Run) Manifest (skill.json): { "name": "youtube2mp3", "description": "Konvertiert YouTube-Video-URL zu MP3", "execution": "local-venv" | "local-bin" | "bash", "entry": "run.sh", "args": [{"name": "url", "required": true}, ...], "requires": {"pip": [...], "binaries": [...]}, "active": true, "created_at": "ISO", "updated_at": "ISO", "last_used": null | "ISO", "use_count": 0, "version": "1.0", "author": "aria" | "stefan" } """ from __future__ import annotations import json import logging import os import re import shutil import subprocess import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) SKILLS_DIR = Path(os.environ.get("SKILLS_DIR", "/data/skills")) SHARED_UPLOADS = Path("/shared/uploads") VALID_EXECUTIONS = {"local-venv", "local-bin", "bash"} NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$") def _now() -> str: return datetime.now(timezone.utc).isoformat() def _safe_name(name: str) -> str: if not isinstance(name, str) or not NAME_RE.match(name): raise ValueError(f"Ungültiger Skill-Name: {name!r}") return name def _skill_dir(name: str) -> Path: return SKILLS_DIR / _safe_name(name) # ─── Listing ──────────────────────────────────────────────────────── def list_skills(active_only: bool = False) -> list[dict]: out: list[dict] = [] if not SKILLS_DIR.exists(): return out for entry in sorted(SKILLS_DIR.iterdir()): if not entry.is_dir(): continue manifest = read_manifest(entry.name) if manifest is None: continue if active_only and not manifest.get("active", True): continue out.append(manifest) return out def read_manifest(name: str) -> Optional[dict]: try: path = _skill_dir(name) / "skill.json" if not path.exists(): return None return json.loads(path.read_text(encoding="utf-8")) except Exception as exc: logger.warning("Manifest lesen %s: %s", name, exc) return None def write_manifest(name: str, manifest: dict) -> None: d = _skill_dir(name) d.mkdir(parents=True, exist_ok=True) manifest["updated_at"] = _now() (d / "skill.json").write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") def read_readme(name: str) -> str: path = _skill_dir(name) / "README.md" return path.read_text(encoding="utf-8") if path.exists() else "" # ─── Create / Update / Delete ──────────────────────────────────────── def create_skill( name: str, description: str, execution: str, entry_code: str, readme: str = "", args: Optional[list] = None, requires: Optional[dict] = None, pip_packages: Optional[list[str]] = None, author: str = "aria", ) -> dict: """Legt einen neuen Skill an. Wirft ValueError bei ungueltigen Inputs. entry_code wird je nach execution in run.sh oder run.py geschrieben. Bei local-venv wird sofort eine venv erzeugt + pip_packages installiert. """ name = _safe_name(name) if execution not in VALID_EXECUTIONS: raise ValueError(f"execution muss eines von {VALID_EXECUTIONS} sein") d = _skill_dir(name) if d.exists(): raise ValueError(f"Skill '{name}' existiert bereits — erst loeschen oder updaten") d.mkdir(parents=True) (d / "logs").mkdir() # Entry-File: run.sh oder run.py if execution == "local-venv": entry_path = d / "run.py" entry_path.write_text(entry_code, encoding="utf-8") entry_name = "run.py" (d / "requirements.txt").write_text("\n".join(pip_packages or []) + "\n", encoding="utf-8") else: entry_path = d / "run.sh" # Shebang ergaenzen wenn nicht da content = entry_code if entry_code.startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + entry_code entry_path.write_text(content, encoding="utf-8") entry_path.chmod(0o755) entry_name = "run.sh" # README (d / "README.md").write_text(readme or f"# {name}\n\n{description}\n", encoding="utf-8") manifest = { "name": name, "description": description, "execution": execution, "entry": entry_name, "args": args or [], "requires": requires or {}, "active": True, "created_at": _now(), "updated_at": _now(), "last_used": None, "use_count": 0, "version": "1.0", "author": author, } write_manifest(name, manifest) # venv aufbauen bei local-venv if execution == "local-venv": try: _setup_venv(d, pip_packages or []) except Exception as exc: # venv-Aufbau fehlgeschlagen → Skill steht trotzdem im Repo, aber inaktiv manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) logger.warning("Skill %s: venv-Setup fehlgeschlagen → deaktiviert: %s", name, exc) logger.info("Skill erstellt: %s (%s)", name, execution) return manifest def _setup_venv(skill_dir: Path, pip_packages: list[str]) -> None: venv = skill_dir / "venv" logger.info("venv erstellen: %s", venv) subprocess.run(["python", "-m", "venv", str(venv)], check=True, timeout=120) pip = venv / "bin" / "pip" if pip_packages: subprocess.run([str(pip), "install", "--no-cache-dir", *pip_packages], check=True, timeout=600) def update_skill(name: str, patch: dict) -> dict: manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") allowed = {"description", "args", "requires", "active", "version", "entry"} for k, v in patch.items(): if k in allowed: manifest[k] = v write_manifest(name, manifest) return manifest def delete_skill(name: str) -> None: d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") shutil.rmtree(d) logger.info("Skill geloescht: %s", name) # ─── Run ──────────────────────────────────────────────────────────── def run_skill(name: str, args: Optional[dict] = None, timeout_sec: int = 300) -> dict: """Fuehrt einen Skill aus. Args werden als ENV-Vars uebergeben (Praefix ARG_, z.B. ARG_URL fuer args["url"]). Returns: {ok, exit_code, stdout, stderr, duration_sec, log_path} """ manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") if not manifest.get("active", True): raise ValueError(f"Skill '{name}' ist deaktiviert") d = _skill_dir(name) entry = manifest.get("entry", "run.sh") exec_mode = manifest.get("execution", "bash") env = os.environ.copy() # Skill-Args als ENV-Vars for k, v in (args or {}).items(): if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", k): continue env[f"ARG_{k.upper()}"] = str(v) env["SKILL_DIR"] = str(d) env["SHARED_UPLOADS"] = str(SHARED_UPLOADS) # Command bauen if exec_mode == "local-venv": python = d / "venv" / "bin" / "python" cmd = [str(python), str(d / entry)] elif exec_mode == "local-bin": # Skill bringt seine bin/ mit — wir prepended sie an den PATH env["PATH"] = f"{d / 'bin'}:{env.get('PATH', '')}" cmd = [str(d / entry)] else: # bash cmd = [str(d / entry)] log_id = f"{int(time.time())}-{uuid.uuid4().hex[:8]}" log_path = d / "logs" / f"{log_id}.json" t0 = time.time() try: proc = subprocess.run( cmd, env=env, cwd=str(d), capture_output=True, text=True, timeout=timeout_sec, ) out_text = proc.stdout err_text = proc.stderr exit_code = proc.returncode timed_out = False except subprocess.TimeoutExpired as exc: out_text = exc.stdout or "" err_text = (exc.stderr or "") + f"\n[TIMEOUT {timeout_sec}s]" exit_code = -1 timed_out = True duration = time.time() - t0 # Log schreiben (gekuerzt damit es nicht explodiert) record = { "ts": _now(), "args": args or {}, "exit_code": exit_code, "duration_sec": round(duration, 2), "stdout": (out_text or "")[:8000], "stderr": (err_text or "")[:8000], "timed_out": timed_out, } try: log_path.write_text(json.dumps(record, indent=2, ensure_ascii=False), encoding="utf-8") except Exception: pass # Stats updaten manifest["last_used"] = _now() manifest["use_count"] = int(manifest.get("use_count", 0)) + 1 write_manifest(name, manifest) record["ok"] = exit_code == 0 record["log_path"] = str(log_path) return record def list_logs(name: str, limit: int = 50) -> list[dict]: d = _skill_dir(name) / "logs" if not d.exists(): return [] files = sorted(d.glob("*.json"), reverse=True)[:limit] out: list[dict] = [] for f in files: try: data = json.loads(f.read_text(encoding="utf-8")) data["log_id"] = f.stem out.append(data) except Exception: continue return out # ─── Export / Import ──────────────────────────────────────────────── def export_skill(name: str) -> bytes: """Packt einen Skill als tar.gz und gibt die Bytes zurueck. venv und logs werden ausgeschlossen (werden beim Import neu gebaut).""" import io import tarfile d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tar: for path in d.iterdir(): if path.name in ("venv", "logs", "__pycache__"): continue tar.add(path, arcname=f"{name}/{path.name}") return buf.getvalue() def import_skill(tar_bytes: bytes, overwrite: bool = False) -> dict: """Importiert einen Skill aus tar.gz. Liefert das Manifest zurueck.""" import io import tarfile SKILLS_DIR.mkdir(parents=True, exist_ok=True) with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r:gz") as tar: # Erst Root-Name finden (= Skill-Name) members = tar.getmembers() if not members: raise ValueError("Leeres Archiv") root = members[0].name.split("/", 1)[0] name = _safe_name(root) d = _skill_dir(name) if d.exists(): if not overwrite: raise ValueError(f"Skill '{name}' existiert bereits — overwrite=true setzen") shutil.rmtree(d) # Extrahieren — Path-Traversal verhindern for m in members: target = SKILLS_DIR / m.name if not str(target.resolve()).startswith(str(SKILLS_DIR.resolve())): raise ValueError(f"Unsicherer Pfad im Archiv: {m.name}") tar.extractall(SKILLS_DIR) # logs-Verzeichnis anlegen falls fehlte (d / "logs").mkdir(exist_ok=True) # venv neu bauen falls local-venv manifest = read_manifest(name) or {} if manifest.get("execution") == "local-venv": req_file = d / "requirements.txt" pip_packages: list[str] = [] if req_file.exists(): pip_packages = [l.strip() for l in req_file.read_text().splitlines() if l.strip() and not l.startswith("#")] try: _setup_venv(d, pip_packages) except Exception as exc: logger.warning("Skill-Import %s: venv-Setup fehlgeschlagen: %s", name, exc) manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) return manifest