""" Skill-Manager — Filesystem-Layer fuer ARIAs Faehigkeiten. Layout: /data/skills// skill.json - Manifest README.md - Beschreibung (vom Stil her: was, wann, wie aufrufen) run.sh - Entry-Point (sh, python -m, was auch immer) requirements.txt - optional, fuer local-venv venv/ - automatisch erzeugt bei local-venv bin/ - statische Binaries (fuer local-bin) logs/ - .json Run-Logs (append-only pro Run) Manifest (skill.json): { "name": "youtube2mp3", "description": "Konvertiert YouTube-Video-URL zu MP3", "execution": "local-venv" | "local-bin" | "bash", "entry": "run.sh", "args": [{"name": "url", "required": true}, ...], "requires": {"pip": [...], "binaries": [...]}, "active": true, "created_at": "ISO", "updated_at": "ISO", "last_used": null | "ISO", "use_count": 0, "version": "1.0", "author": "aria" | "stefan" } """ from __future__ import annotations import json import logging import os import re import shutil import subprocess import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) SKILLS_DIR = Path(os.environ.get("SKILLS_DIR", "/data/skills")) SHARED_UPLOADS = Path("/shared/uploads") SKILL_CONFIGS_FILE = Path(os.environ.get("SKILL_CONFIGS_FILE", "/shared/config/skill_configs.json")) # Beim Archivieren in versions/ ausgenommen (gross, regenerierbar, sind keine Sources) _VERSION_SKIP = {"venv", "logs", "versions", "__pycache__"} VALID_EXECUTIONS = {"local-venv", "local-bin", "bash"} NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{2,60}$") # Anti-Skill-Friedhof: ARIAs Lieblings-Suffixe wenn sie statt updaten neu baut VERSION_SUFFIX_RE = re.compile(r"(?:[-_]v\d+|[-_](?:new|fixed|old|alt|copy|final|clean))$", re.I) def _now() -> str: return datetime.now(timezone.utc).isoformat() def _safe_name(name: str) -> str: if not isinstance(name, str) or not NAME_RE.match(name): raise ValueError(f"Ungültiger Skill-Name: {name!r}") return name def _skill_dir(name: str) -> Path: return SKILLS_DIR / _safe_name(name) def _check_anti_graveyard(name: str) -> None: """Verhindert klassische Skill-Friedhof-Patterns beim Anlegen. Hard-Reject auf: 1. Versions-Suffixe (`-v2`, `_v3`, `-new`, `-fixed`, …) im Namen 2. Prefix-Kollision mit existierendem Skill (z.B. `spotify` existiert, jemand will `spotify-aria` oder `spotify-ctl` anlegen) """ if VERSION_SUFFIX_RE.search(name): raise ValueError( f"Skill-Name '{name}' enthaelt einen Versions-Suffix " f"(-v2 / _v3 / -new / -fixed / -old / -alt / -copy / -final / -clean). " f"Skills werden intern versioniert (skill_rollback). " f"Waehle einen klaren Namen ohne Suffix oder nutze skill_update auf " f"den bestehenden Skill." ) if not SKILLS_DIR.exists(): return existing = [p.name for p in SKILLS_DIR.iterdir() if p.is_dir()] for ex in existing: if ex == name: continue # wird spaeter mit "existiert bereits" abgefangen # neuer Name verlaengert existierenden Stem: 'spotify' da, neu 'spotify-aria' if name.startswith(ex + "-") or name.startswith(ex + "_"): raise ValueError( f"Skill-Name '{name}' kollidiert mit existierendem '{ex}'. " f"Wenn Du '{ex}' verbessern willst: skill_update auf '{ex}'. " f"Wenn es wirklich was anderes ist: waehle einen Namen ohne den " f"Praefix '{ex}-' / '{ex}_'." ) # neuer Name ist Kurzform eines existierenden: 'spotify-aria' da, neu 'spotify' if ex.startswith(name + "-") or ex.startswith(name + "_"): raise ValueError( f"Es existiert bereits '{ex}' mit Praefix '{name}'. Pruefe ob '{ex}' " f"das schon kann; wenn ja: skill_update auf '{ex}' oder Skill umbenennen." ) # ─── Listing ──────────────────────────────────────────────────────── def list_skills(active_only: bool = False) -> list[dict]: out: list[dict] = [] if not SKILLS_DIR.exists(): return out for entry in sorted(SKILLS_DIR.iterdir()): if not entry.is_dir(): continue manifest = read_manifest(entry.name) if manifest is None: continue if active_only and not manifest.get("active", True): continue out.append(manifest) return out def read_manifest(name: str) -> Optional[dict]: try: path = _skill_dir(name) / "skill.json" if not path.exists(): return None return json.loads(path.read_text(encoding="utf-8")) except Exception as exc: logger.warning("Manifest lesen %s: %s", name, exc) return None def write_manifest(name: str, manifest: dict) -> None: d = _skill_dir(name) d.mkdir(parents=True, exist_ok=True) manifest["updated_at"] = _now() (d / "skill.json").write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") def read_readme(name: str) -> str: path = _skill_dir(name) / "README.md" return path.read_text(encoding="utf-8") if path.exists() else "" # ─── Create / Update / Delete ──────────────────────────────────────── def create_skill( name: str, description: str, execution: str, entry_code: str, readme: str = "", args: Optional[list] = None, requires: Optional[dict] = None, pip_packages: Optional[list[str]] = None, author: str = "aria", config_schema: Optional[list] = None, ) -> dict: """Legt einen neuen Skill an. Wirft ValueError bei ungueltigen Inputs. entry_code wird je nach execution in run.sh oder run.py geschrieben. Bei local-venv wird sofort eine venv erzeugt + pip_packages installiert. """ name = _safe_name(name) if execution not in VALID_EXECUTIONS: raise ValueError(f"execution muss eines von {VALID_EXECUTIONS} sein") _check_anti_graveyard(name) d = _skill_dir(name) if d.exists(): raise ValueError(f"Skill '{name}' existiert bereits — erst loeschen oder updaten") d.mkdir(parents=True) (d / "logs").mkdir() # Entry-File: run.sh oder run.py if execution == "local-venv": entry_path = d / "run.py" entry_path.write_text(entry_code, encoding="utf-8") entry_name = "run.py" (d / "requirements.txt").write_text("\n".join(pip_packages or []) + "\n", encoding="utf-8") else: entry_path = d / "run.sh" # Shebang ergaenzen wenn nicht da content = entry_code if entry_code.startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + entry_code entry_path.write_text(content, encoding="utf-8") entry_path.chmod(0o755) entry_name = "run.sh" # README (d / "README.md").write_text(readme or f"# {name}\n\n{description}\n", encoding="utf-8") manifest = { "name": name, "description": description, "execution": execution, "entry": entry_name, "args": args or [], "requires": requires or {}, "active": True, "created_at": _now(), "updated_at": _now(), "last_used": None, "use_count": 0, "version": "1.0", "author": author, "config_schema": _normalize_config_schema(config_schema), "version_history": [], } write_manifest(name, manifest) # venv aufbauen bei local-venv if execution == "local-venv": try: _setup_venv(d, pip_packages or []) except Exception as exc: # venv-Aufbau fehlgeschlagen → Skill steht trotzdem im Repo, aber inaktiv manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) logger.warning("Skill %s: venv-Setup fehlgeschlagen → deaktiviert: %s", name, exc) logger.info("Skill erstellt: %s (%s)", name, execution) return manifest def _normalize_config_schema(schema: Optional[list]) -> list: """Filter + Normalisiert das config_schema. Erwartet Liste von Dicts mit Pflichtfeld 'name'. Optional: label, type (string|number|boolean|password), secret (bool), default, description.""" if not schema: return [] out = [] for f in schema: if not isinstance(f, dict): continue fname = (f.get("name") or "").strip() if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]{0,40}$", fname): continue ftype = (f.get("type") or "string").lower() if ftype not in ("string", "number", "boolean", "password"): ftype = "string" # password impliziert secret=True secret = bool(f.get("secret")) or ftype == "password" out.append({ "name": fname, "type": ftype, "label": (f.get("label") or fname), "secret": secret, "description": (f.get("description") or "")[:300], "default": f.get("default"), }) return out def _setup_venv(skill_dir: Path, pip_packages: list[str]) -> None: venv = skill_dir / "venv" logger.info("venv erstellen: %s", venv) subprocess.run(["python", "-m", "venv", str(venv)], check=True, timeout=120) pip = venv / "bin" / "pip" if pip_packages: subprocess.run([str(pip), "install", "--no-cache-dir", *pip_packages], check=True, timeout=600) def update_skill(name: str, patch: dict) -> dict: """Aktualisiert einen bestehenden Skill. Manifest-Felder ueber den `allowed`-Filter, Code-Aenderungen ueber dedizierte Keys: - `entry_code` (str) → ueberschreibt run.py / run.sh - `readme` (str) → ueberschreibt README.md - `pip_packages` (list) → ueberschreibt requirements.txt + venv-Rebuild (nur bei local-venv) """ manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") d = _skill_dir(name) # Auto-Archive: wenn strukturelle Aenderung (Code/README/Deps/Schema), erst # snapshot machen. So kann jeder skill_update zurueckgerollt werden. structural = any(k in patch for k in ("entry_code", "readme", "pip_packages", "config_schema", "args")) if structural: try: archive_current_version( name, summary=patch.get("_change_summary") or ", ".join( sorted(k for k in patch.keys() if k != "_change_summary") )[:200], ) except Exception as exc: logger.warning("update_skill: Auto-Archive %s fehlgeschlagen: %s", name, exc) # nach archive_current_version manifest neu laden (version_history geupdatet) manifest = read_manifest(name) or manifest allowed = {"description", "args", "requires", "active", "version", "entry"} for k, v in patch.items(): if k in allowed: manifest[k] = v if "config_schema" in patch: manifest["config_schema"] = _normalize_config_schema(patch["config_schema"]) # Code austauschen if "entry_code" in patch and patch["entry_code"]: execution = manifest.get("execution", "local-venv") if execution == "local-venv": entry_path = d / "run.py" entry_path.write_text(patch["entry_code"], encoding="utf-8") else: entry_path = d / "run.sh" content = patch["entry_code"] if patch["entry_code"].startswith("#!") else "#!/usr/bin/env bash\nset -euo pipefail\n" + patch["entry_code"] entry_path.write_text(content, encoding="utf-8") entry_path.chmod(0o755) # README austauschen if "readme" in patch and patch["readme"] is not None: (d / "README.md").write_text(patch["readme"], encoding="utf-8") # pip_packages geaendert → requirements.txt + venv neu aufbauen if "pip_packages" in patch and manifest.get("execution") == "local-venv": pip_packages = patch["pip_packages"] or [] (d / "requirements.txt").write_text("\n".join(pip_packages) + "\n", encoding="utf-8") # venv loeschen + neu aufbauen, damit alte Pakete weg sind venv = d / "venv" if venv.exists(): shutil.rmtree(venv, ignore_errors=True) try: _setup_venv(d, pip_packages) # Falls vorher wegen Setup-Error deaktiviert war: jetzt frei manifest.pop("setup_error", None) manifest["active"] = patch.get("active", True) except Exception as exc: manifest["active"] = False manifest["setup_error"] = str(exc)[:500] logger.warning("Skill %s: venv-Rebuild fehlgeschlagen: %s", name, exc) write_manifest(name, manifest) logger.info("Skill aktualisiert: %s (keys=%s)", name, sorted(patch.keys())) return manifest def scaffold_skill( name: str, template: str, params: Optional[dict] = None, author: str = "aria", ) -> dict: """Baut einen Skill aus einem Template-Skelett. ARIA muss nicht jedes Mal einen kompletten Python-Skill schreiben — sie waehlt ein Template und optionale Parameter, Brain expandiert das zu fertigem Code. Templates siehe `skill_templates.TEMPLATES`. Konkret: - 'oauth-api' : params={service, base_url?} - 'apikey-api': params={api_name, key_env, auth_header?, auth_prefix?, base_url?} - 'file-process': params={output_ext?} Wirft ValueError wenn Template unbekannt oder Name kollidiert. Sonst: ruft intern create_skill mit den expandierten Feldern auf. """ import skill_templates as _st spec = _st.expand(name, template, params or {}) return create_skill( name=name, description=spec["description"], execution="local-venv", entry_code=spec["entry_code"], readme=spec["readme"], args=spec["args"], pip_packages=spec["pip_packages"], config_schema=spec["config_schema"], author=author, ) def delete_skill(name: str) -> None: d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") shutil.rmtree(d) # Configs auch raeumen — sonst Karteileiche in skill_configs.json try: all_cfg = _load_all_skill_configs() if name in all_cfg: all_cfg.pop(name) _save_all_skill_configs(all_cfg) except Exception: pass logger.info("Skill geloescht: %s", name) # ─── Skill-Configs (statische Werte je Skill — API-Keys, IDs etc.) ── # Werte liegen zentral in /shared/config/skill_configs.json damit Stefan # sie im Diagnostic-UI editieren kann. Skill bekommt sie zur Laufzeit # als ENV `CFG_` — kein hardcoden im Code noetig. def _load_all_skill_configs() -> dict: if not SKILL_CONFIGS_FILE.exists(): return {} try: return json.loads(SKILL_CONFIGS_FILE.read_text(encoding="utf-8")) except Exception as exc: logger.warning("skill_configs.json kaputt (%s) — leeres dict", exc) return {} def _save_all_skill_configs(data: dict) -> None: SKILL_CONFIGS_FILE.parent.mkdir(parents=True, exist_ok=True) SKILL_CONFIGS_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") def get_skill_config(name: str) -> dict: """Liefert die rohen Config-Werte fuer einen Skill (ungemasked). Wird intern beim run_skill genutzt um CFG_-Env zu bauen.""" return _load_all_skill_configs().get(name, {}) def set_skill_config(name: str, values: dict) -> dict: """Speichert die Config-Werte fuer einen Skill (komplett ueberschreiben). Werte landen sofort persistent; naechster run_skill nutzt sie.""" if not isinstance(values, dict): raise ValueError("values muss ein Dict sein") all_cfg = _load_all_skill_configs() all_cfg[name] = values _save_all_skill_configs(all_cfg) return values def get_skill_config_masked(name: str) -> dict: """Wie get_skill_config, aber secret-Felder werden auf '***SET***' maskiert. Schema kommt aus dem skill.json — Felder ohne secret=True werden klar zurueckgegeben. Fuer UI-Anzeige.""" manifest = read_manifest(name) schema = (manifest or {}).get("config_schema") or [] secret_fields = {f.get("name") for f in schema if f.get("secret")} values = get_skill_config(name) return {k: ("***SET***" if (k in secret_fields and v) else v) for k, v in values.items()} def _config_env_name(field_name: str) -> str: """API-Key → CFG_API_KEY. Erlaubt nur a-zA-Z0-9_.""" safe = re.sub(r"[^a-zA-Z0-9]", "_", field_name).upper() return f"CFG_{safe}" # ─── Versionierung (Rollback-fähiges update_skill) ─────────────────── # Vor jedem strukturellen update wird der aktuelle Stand nach # versions/v_/ kopiert (ohne venv/logs/versions). Rollback kopiert # eine Version zurueck — vorher noch ein Auto-Snapshot, damit auch der # Rollback rueckholbar ist. def _versions_dir(name: str) -> Path: return _skill_dir(name) / "versions" def _copytree_skill(src: Path, dst: Path) -> None: """Kopiert Skill-Sources (alles ausser venv/logs/versions/__pycache__).""" dst.mkdir(parents=True, exist_ok=True) for item in src.iterdir(): if item.name in _VERSION_SKIP: continue target = dst / item.name if item.is_dir(): shutil.copytree(item, target, dirs_exist_ok=True) else: shutil.copy2(item, target) def archive_current_version(name: str, summary: str = "") -> str: """Kopiert den aktuellen Skill-Stand nach versions/v_/. Returnt die version_id. Im Manifest wird `version_history` gepflegt.""" d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") ts = int(time.time()) version_id = f"v_{ts}" # Kollisionsschutz bei sub-Sekunden-Calls while (_versions_dir(name) / version_id).exists(): ts += 1 version_id = f"v_{ts}" archive = _versions_dir(name) / version_id _copytree_skill(d, archive) (archive / "_version.json").write_text(json.dumps({ "version_id": version_id, "archived_at": _now(), "summary": (summary or "")[:300], }, indent=2, ensure_ascii=False), encoding="utf-8") # Manifest-History pflegen (read-back nach _copytree, damit history konsistent) manifest = read_manifest(name) if manifest is not None: hist = list(manifest.get("version_history") or []) hist.append({"version_id": version_id, "archived_at": _now(), "summary": (summary or "")[:300]}) # Cap auf 50 Versionen — alte Eintraege wegrotieren (Dateien bleiben aber) manifest["version_history"] = hist[-50:] write_manifest(name, manifest) return version_id def list_skill_versions(name: str) -> list[dict]: """Liste aller archivierten Versionen, neueste zuerst.""" versions = _versions_dir(name) if not versions.exists(): return [] out = [] for entry in sorted(versions.iterdir(), reverse=True): if not entry.is_dir(): continue meta = entry / "_version.json" if meta.exists(): try: out.append(json.loads(meta.read_text(encoding="utf-8"))) continue except Exception: pass out.append({"version_id": entry.name, "archived_at": "", "summary": ""}) return out def rollback_skill(name: str, version_id: str) -> dict: """Stellt eine archivierte Version wieder her. Vorher wird der aktuelle Stand automatisch als neue Version archiviert ('safety_snapshot') — Rollback ist also nicht destruktiv. venv wird neu aufgebaut wenn requirements.txt vorhanden ist.""" d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") archive = _versions_dir(name) / version_id if not archive.exists() or not archive.is_dir(): raise ValueError(f"Version '{version_id}' fuer Skill '{name}' nicht gefunden") # 1. Sicherung des aktuellen Stands safety = archive_current_version(name, summary=f"safety-snapshot vor rollback auf {version_id}") # 2. Aktuelle Sources loeschen (venv/logs/versions bleiben) for item in d.iterdir(): if item.name in _VERSION_SKIP: continue if item.is_dir(): shutil.rmtree(item, ignore_errors=True) else: try: item.unlink() except FileNotFoundError: pass # 3. Archive zurueck kopieren (ohne _version.json — das ist Versions-Metadata) for item in archive.iterdir(): if item.name == "_version.json": continue target = d / item.name if item.is_dir(): shutil.copytree(item, target, dirs_exist_ok=True) else: shutil.copy2(item, target) # 4. Manifest-Stempel manifest = read_manifest(name) if manifest is not None: manifest["updated_at"] = _now() manifest["last_rollback"] = {"to": version_id, "safety": safety, "at": _now()} write_manifest(name, manifest) # 5. venv-Rebuild bei local-venv req_file = d / "requirements.txt" if (manifest or {}).get("execution") == "local-venv" and req_file.exists(): pip_packages = [l.strip() for l in req_file.read_text(encoding="utf-8").splitlines() if l.strip() and not l.strip().startswith("#")] venv = d / "venv" if venv.exists(): shutil.rmtree(venv, ignore_errors=True) try: _setup_venv(d, pip_packages) if manifest is not None: manifest.pop("setup_error", None) manifest["active"] = True write_manifest(name, manifest) except Exception as exc: if manifest is not None: manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) logger.warning("Rollback %s: venv-Rebuild fehlgeschlagen: %s", name, exc) return {"ok": True, "name": name, "rolled_back_to": version_id, "safety_snapshot": safety} def delete_skill_version(name: str, version_id: str) -> dict: """Loescht eine einzelne Version aus versions/. Nicht-rueckholbar.""" archive = _versions_dir(name) / version_id if not archive.exists(): raise ValueError(f"Version '{version_id}' nicht gefunden") shutil.rmtree(archive) manifest = read_manifest(name) if manifest is not None: manifest["version_history"] = [v for v in (manifest.get("version_history") or []) if v.get("version_id") != version_id] write_manifest(name, manifest) return {"ok": True, "deleted": version_id} # ─── Run ──────────────────────────────────────────────────────────── def run_skill(name: str, args: Optional[dict] = None, timeout_sec: int = 300) -> dict: """Fuehrt einen Skill aus. Args werden als ENV-Vars uebergeben (Praefix ARG_, z.B. ARG_URL fuer args["url"]). Returns: {ok, exit_code, stdout, stderr, duration_sec, log_path} """ manifest = read_manifest(name) if manifest is None: raise ValueError(f"Skill '{name}' nicht gefunden") if not manifest.get("active", True): raise ValueError(f"Skill '{name}' ist deaktiviert") d = _skill_dir(name) entry = manifest.get("entry", "run.sh") exec_mode = manifest.get("execution", "bash") env = os.environ.copy() # Skill-Args als ENV-Vars for k, v in (args or {}).items(): if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", k): continue env[f"ARG_{k.upper()}"] = str(v) env["SKILL_DIR"] = str(d) env["SHARED_UPLOADS"] = str(SHARED_UPLOADS) # Brain-API fuer Skills die OAuth-Tokens / Brain-Helpers brauchen. # Beispiel: requests.get(f"{os.environ['BRAIN_INTERNAL_URL']}/oauth/spotify/token") env["BRAIN_INTERNAL_URL"] = os.environ.get("BRAIN_INTERNAL_URL", "http://localhost:8080") # Config-Schema-Werte als CFG_-ENV (P3). Default greift wenn Stefan # noch keinen Wert gesetzt hat — None wird uebersprungen damit der Skill # selbst entscheiden kann ob das ein Fehler ist. schema = manifest.get("config_schema") or [] values = get_skill_config(name) for field in schema: fname = field.get("name") if not fname: continue val = values.get(fname, field.get("default")) if val is None: continue env[_config_env_name(fname)] = str(val) # Command bauen if exec_mode == "local-venv": python = d / "venv" / "bin" / "python" cmd = [str(python), str(d / entry)] elif exec_mode == "local-bin": # Skill bringt seine bin/ mit — wir prepended sie an den PATH env["PATH"] = f"{d / 'bin'}:{env.get('PATH', '')}" cmd = [str(d / entry)] else: # bash cmd = [str(d / entry)] log_id = f"{int(time.time())}-{uuid.uuid4().hex[:8]}" log_path = d / "logs" / f"{log_id}.json" t0 = time.time() try: proc = subprocess.run( cmd, env=env, cwd=str(d), capture_output=True, text=True, timeout=timeout_sec, ) out_text = proc.stdout err_text = proc.stderr exit_code = proc.returncode timed_out = False except subprocess.TimeoutExpired as exc: out_text = exc.stdout or "" err_text = (exc.stderr or "") + f"\n[TIMEOUT {timeout_sec}s]" exit_code = -1 timed_out = True duration = time.time() - t0 # Log schreiben (gekuerzt damit es nicht explodiert) record = { "ts": _now(), "args": args or {}, "exit_code": exit_code, "duration_sec": round(duration, 2), "stdout": (out_text or "")[:8000], "stderr": (err_text or "")[:8000], "timed_out": timed_out, } try: log_path.write_text(json.dumps(record, indent=2, ensure_ascii=False), encoding="utf-8") except Exception: pass # Stats updaten manifest["last_used"] = _now() manifest["use_count"] = int(manifest.get("use_count", 0)) + 1 write_manifest(name, manifest) record["ok"] = exit_code == 0 record["log_path"] = str(log_path) return record def list_logs(name: str, limit: int = 50) -> list[dict]: d = _skill_dir(name) / "logs" if not d.exists(): return [] files = sorted(d.glob("*.json"), reverse=True)[:limit] out: list[dict] = [] for f in files: try: data = json.loads(f.read_text(encoding="utf-8")) data["log_id"] = f.stem out.append(data) except Exception: continue return out # ─── Export / Import ──────────────────────────────────────────────── def export_skill(name: str) -> bytes: """Packt einen Skill als tar.gz und gibt die Bytes zurueck. venv und logs werden ausgeschlossen (werden beim Import neu gebaut).""" import io import tarfile d = _skill_dir(name) if not d.exists(): raise ValueError(f"Skill '{name}' nicht gefunden") buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tar: for path in d.iterdir(): if path.name in ("venv", "logs", "__pycache__"): continue tar.add(path, arcname=f"{name}/{path.name}") return buf.getvalue() def import_skill(tar_bytes: bytes, overwrite: bool = False) -> dict: """Importiert einen Skill aus tar.gz. Liefert das Manifest zurueck.""" import io import tarfile SKILLS_DIR.mkdir(parents=True, exist_ok=True) with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r:gz") as tar: # Erst Root-Name finden (= Skill-Name) members = tar.getmembers() if not members: raise ValueError("Leeres Archiv") root = members[0].name.split("/", 1)[0] name = _safe_name(root) d = _skill_dir(name) if d.exists(): if not overwrite: raise ValueError(f"Skill '{name}' existiert bereits — overwrite=true setzen") shutil.rmtree(d) # Extrahieren — Path-Traversal verhindern for m in members: target = SKILLS_DIR / m.name if not str(target.resolve()).startswith(str(SKILLS_DIR.resolve())): raise ValueError(f"Unsicherer Pfad im Archiv: {m.name}") tar.extractall(SKILLS_DIR) # logs-Verzeichnis anlegen falls fehlte (d / "logs").mkdir(exist_ok=True) # venv neu bauen falls local-venv manifest = read_manifest(name) or {} if manifest.get("execution") == "local-venv": req_file = d / "requirements.txt" pip_packages: list[str] = [] if req_file.exists(): pip_packages = [l.strip() for l in req_file.read_text().splitlines() if l.strip() and not l.startswith("#")] try: _setup_venv(d, pip_packages) except Exception as exc: logger.warning("Skill-Import %s: venv-Setup fehlgeschlagen: %s", name, exc) manifest["active"] = False manifest["setup_error"] = str(exc)[:500] write_manifest(name, manifest) return manifest