""" API-Heuristik — Cross-Session-Tracker fuer wiederkehrende externe API-Calls. Problem: ARIA driftet bei trivialen API-Calls zu Bash-curl statt Skills zu bauen. Die seed_rule "scaffold-reflex" greift nicht zuverlaessig weil jede Chat-Anfrage eine eigene Claude-CLI-Session ist — in der aktuellen Session sieht sie nicht dass dieselbe API gestern auch schon 10x via curl angerufen wurde. Loesung: Brain trackt server-side. Beim Bauen des System-Prompts wird `agent_stream.jsonl` der letzten 24h gescannt, Bash-curl-Calls werden nach Hostname aggregiert. Hosts ueber Schwelle bei denen noch kein matching Skill existiert landen als Hinweis-Block im System-Prompt — ARIA sieht "du machst 17x curl gegen api.spotify.com, scaffold bitte". Caching: Ergebnis 5 min gehalten, sonst grep wir bei jedem Turn die log-Datei. Bei <1 MB log file ist das eh schnell. """ from __future__ import annotations import json import logging import re import time from pathlib import Path logger = logging.getLogger(__name__) AGENT_STREAM_LOG = Path("/shared/logs/agent_stream.jsonl") # Schwellen / Lookback — bewusst niedrig gehalten weil "2x ist Pattern" stimmt LOOKBACK_HOURS = 24 THRESHOLD = 3 CACHE_TTL_SEC = 300 # Hosts die wir IGNORIEREN — interne Endpoints / Defaults _IGNORED_HOSTS = { "aria-brain", "brain", "localhost", "127.0.0.1", "0.0.0.0", "api.example.com", # template-default in skill_templates "aria-bridge", "aria-rvs", "aria-qdrant", "aria-proxy", "aria-diagnostic", "172.17.0.1", # docker-host-bridge } # Bekannte Hosts → Template-Vorschlag fuer scaffold _SUGGESTIONS: dict[str, tuple[str, str, dict]] = { "api.spotify.com": ("spotify", "oauth-api", {"service": "spotify"}), "api.github.com": ("github", "oauth-api", {"service": "github", "base_url": "https://api.github.com"}), "api.openai.com": ("openai", "apikey-api", {"api_name": "OpenAI", "key_env": "OPENAI_API_KEY", "base_url": "https://api.openai.com"}), "api.openweathermap.org": ("openweather", "apikey-api", {"api_name": "OpenWeather", "key_env": "OWM_API_KEY", "base_url": "https://api.openweathermap.org"}), "api.telegram.org": ("telegram", "apikey-api", {"api_name": "Telegram-Bot", "key_env": "TELEGRAM_BOT_TOKEN", "auth_header": "", "auth_prefix": "", "base_url": "https://api.telegram.org"}), "graph.microsoft.com": ("microsoft", "oauth-api", {"service": "microsoft", "base_url": "https://graph.microsoft.com"}), "discord.com": ("discord", "oauth-api", {"service": "discord", "base_url": "https://discord.com/api"}), "api.notion.com": ("notion", "oauth-api", {"service": "notion", "base_url": "https://api.notion.com"}), "reddit.com": ("reddit", "oauth-api", {"service": "reddit", "base_url": "https://oauth.reddit.com"}), "oauth.reddit.com": ("reddit", "oauth-api", {"service": "reddit", "base_url": "https://oauth.reddit.com"}), } _cache: dict = {"computed_at": 0.0, "hints": []} def invalidate_cache() -> None: """Cache leeren — sinnvoll nach skill_create / scaffold damit der neue Skill sofort beim naechsten Aufruf erkannt wird.""" _cache.update(computed_at=0.0, hints=[]) def detect_recent_bypass( existing_skills: list[dict], since_sec: int = 600, ) -> list[dict]: """Findet Skill-Bypass-Vorfaelle: Bash-curl gegen einen Host fuer den bereits ein matching Skill existiert. ARIA haette `run_` nutzen sollen, hat aber gecurled. Das ist Drift — wir wollen es Brain merken. Returns: liste {host, skill_name, count, last_ts} fuer Hosts wo ein Bypass in den letzten `since_sec` Sekunden vorkam. """ if not AGENT_STREAM_LOG.exists() or not existing_skills: return [] cutoff_ms = (time.time() - since_sec) * 1000 # Map host → matching skill_name host_to_skill = {} for s in existing_skills: sname = (s.get("name") or "").lower() if not sname: continue # Heuristik wie in _host_already_has_skill: stem des Skill-Namens # mit Hostnamen verglichen. Fuer scaffolded skills nehmen wir den # Skill-Namen als stem (z.B. "spotify" -> matched api.spotify.com) host_to_skill[sname] = sname bypass_events: dict[str, dict] = {} try: with AGENT_STREAM_LOG.open(encoding="utf-8") as f: for line in f: if not line.strip(): continue try: e = json.loads(line) except Exception: continue if e.get("kind") != "tool_use": continue if (e.get("name") or "") != "Bash": continue ts = e.get("ts") or 0 if ts < cutoff_ms: continue for host in _extract_hosts_from_bash_input(e.get("input") or ""): h = host.lower() if h in _IGNORED_HOSTS: continue # Welcher Skill-Name matched diesen Host? matched_skill = None for skill_stem in host_to_skill: if skill_stem in h: matched_skill = host_to_skill[skill_stem] break if not matched_skill: continue entry = bypass_events.setdefault(h, { "host": h, "skill_name": matched_skill, "count": 0, "last_ts": 0, }) entry["count"] += 1 if ts > entry["last_ts"]: entry["last_ts"] = ts except Exception as exc: logger.warning("detect_recent_bypass: konnte log nicht lesen: %s", exc) return [] return list(bypass_events.values()) def build_bypass_section(bypass_events: list[dict]) -> str: """Drastischer Block fuer den System-Prompt wenn ARIA gerade gegen einen Host gecurled hat OBWOHL der Skill existiert. Inhalt soll sie spuerbar ermahnen — wirkt nur in der aktuellen Session.""" if not bypass_events: return "" lines = [ "## 🚨 SKILL-BYPASS ERKANNT", "", "Du hast gerade — IN DEN LETZTEN MINUTEN — Bash-curl gegen Hosts " "gemacht obwohl ein passender Skill existiert. Das ist Verschwendung: " "5 Bash-Roundtrips à 3s statt 1 Tool-Call à 3s. Stefan wartet doppelt. " "AB JETZT in diesem Chat:", "", ] for ev in bypass_events: sname = ev["skill_name"] host = ev["host"] count = ev["count"] lines.append(f"- gegen **{host}** ({count}x kuerzlich) → nutze " f"`run_{sname.replace('-', '_')}(...)` statt curl. " f"Der Skill ist da. Nutze ihn.") lines.append("") return "\n".join(lines) def _extract_hosts_from_bash_input(input_str: str) -> list[str]: """Hostnames aus URLs in einem Bash-Command. Sehr robust — sucht `https?://host`.""" if not input_str: return [] return re.findall(r'https?://([a-zA-Z0-9.\-]+)', input_str) def _host_already_has_skill(host: str, skills: list[dict]) -> bool: """Heuristik: Skill-Name enthaelt den 'wesentlichen' Teil des Hosts. 'api.spotify.com' → Stem 'spotify'. Wenn ein Skill 'spotify*' existiert: ja. """ parts = [p for p in host.split(".") if p and p not in ("api", "www", "oauth")] if not parts: return False stem = parts[0].lower() for s in skills: sname = (s.get("name") or "").lower() if stem and stem in sname: return True return False def compute_hints(existing_skills: list[dict] | None = None, force: bool = False) -> list[dict]: """Aggregiert Bash-curl-Calls der letzten LOOKBACK_HOURS aus dem agent_stream.jsonl. Returns Liste von Hinweisen, geordnet nach Count absteigend; nur Hosts ohne matching Skill, nur >= THRESHOLD Calls. Hint-Format: {host, count, lookback_hours, suggestion: (name, template, params) | None} """ skills = existing_skills or [] now = time.time() if not force and (now - _cache["computed_at"]) < CACHE_TTL_SEC: return _cache["hints"] if not AGENT_STREAM_LOG.exists(): _cache.update(computed_at=now, hints=[]) return [] cutoff_ms = (now - LOOKBACK_HOURS * 3600) * 1000 counts: dict[str, int] = {} try: # Stream-Read damit grosse Files (50 MB cap) nicht in den Speicher kippen with AGENT_STREAM_LOG.open(encoding="utf-8") as f: for line in f: if not line.strip(): continue try: e = json.loads(line) except Exception: continue if e.get("kind") != "tool_use": continue if (e.get("name") or "") != "Bash": continue if (e.get("ts") or 0) < cutoff_ms: continue for host in _extract_hosts_from_bash_input(e.get("input") or ""): h = host.lower() if h in _IGNORED_HOSTS: continue counts[h] = counts.get(h, 0) + 1 except Exception as exc: logger.warning("api_heuristic: konnte agent_stream nicht lesen: %s", exc) return [] hints = [] for host, count in counts.items(): if count < THRESHOLD: continue if _host_already_has_skill(host, skills): continue hints.append({ "host": host, "count": count, "lookback_hours": LOOKBACK_HOURS, "suggestion": _SUGGESTIONS.get(host), }) hints.sort(key=lambda x: -x["count"]) _cache.update(computed_at=now, hints=hints) return hints def build_section(hints: list[dict]) -> str: """Formatiert einen kompakten System-Prompt-Block. Leer wenn nichts.""" if not hints: return "" lines = [ "## API-Heuristik (Cross-Session-Counter)", "", "Du hast in den letzten 24h diese externe(n) API(s) per Bash-curl " "wiederholt angerufen, OHNE dass ein Skill dafuer existiert. Beim " "naechsten Aufruf gegen einen dieser Hosts: BAUE ZUERST den Skill " "via `skill_scaffold`, dann nutze ihn. Spart Stefan Wartezeit " "und Dir Tool-Roundtrips.", "", ] for h in hints[:5]: # max 5 Eintraege damit Prompt nicht explodiert sug = h.get("suggestion") if sug: name, tpl, params = sug params_json = json.dumps(params, ensure_ascii=False) sug_str = f"`skill_scaffold('{name}', '{tpl}', {params_json})`" else: sug_str = "`skill_scaffold` mit passendem Template (oauth-api / apikey-api)" lines.append(f"- **{h['host']}** ({h['count']}x in 24h) → {sug_str}") lines.append("") return "\n".join(lines)