Files
ARIA-AGENT/aria-brain/api_heuristic.py
T
duffyduck 1ea7ab5ab1 fix(brain): run_<skill> Tool-Namen safe escapen — Bindestriche kippten Tools-Liste
Beobachtung beim zweiten Live-Test (01:13:41): ARIA versuchte echten
Tool-Call `run_spotify` — bekam aber Error: 'No such tool available'.

Ursache: _skill_to_tool baute Tool-Namen via `run_{s['name']}`. Bei
Skills wie 'yt-dlp-download' wurde daraus 'run_yt-dlp-download' mit
Bindestrich. Anthropic-Tool-Name-Schema ist eigentlich [a-zA-Z0-9_-],
ABER der claude-max-api-proxy konvertiert intern auf OpenAI-Format
und faellt bei Bindestrichen um — wenn EIN Tool ungueltig ist, kippt
die GANZE Tool-Liste, ARIA sieht nichts von 'run_*' inklusive
'run_spotify' obwohl der ja Bindestrich-frei war.

Fix:
- _skill_to_tool: name = "run_" + re.sub(r"[^a-zA-Z0-9_]", "_", s["name"])
  → run_yt_dlp_download statt run_yt-dlp-download.
- Dispatcher: bei tool_name='run_X' wird zuerst X als skill_name probiert,
  bei Miss wird ueber die Liste der existierenden Skills gemappt — der
  Skill mit safe_name(name)==X wird dann genommen.
- Bypass-Lesson + Bypass-Section: gleiche safe-Logik fuer den
  empfohlenen run_<tool>-String im Memory/Prompt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 01:17:48 +02:00

283 lines
11 KiB
Python

"""
API-Heuristik — Cross-Session-Tracker fuer wiederkehrende externe API-Calls.
Problem: ARIA driftet bei trivialen API-Calls zu Bash-curl statt Skills
zu bauen. Die seed_rule "scaffold-reflex" greift nicht zuverlaessig weil
jede Chat-Anfrage eine eigene Claude-CLI-Session ist — in der aktuellen
Session sieht sie nicht dass dieselbe API gestern auch schon 10x via
curl angerufen wurde.
Loesung: Brain trackt server-side. Beim Bauen des System-Prompts wird
`agent_stream.jsonl` der letzten 24h gescannt, Bash-curl-Calls werden
nach Hostname aggregiert. Hosts ueber Schwelle bei denen noch kein
matching Skill existiert landen als Hinweis-Block im System-Prompt —
ARIA sieht "du machst 17x curl gegen api.spotify.com, scaffold bitte".
Caching: Ergebnis 5 min gehalten, sonst grep wir bei jedem Turn die
log-Datei. Bei <1 MB log file ist das eh schnell.
"""
from __future__ import annotations
import json
import logging
import re
import time
from pathlib import Path
logger = logging.getLogger(__name__)
AGENT_STREAM_LOG = Path("/shared/logs/agent_stream.jsonl")
# Schwellen / Lookback — bewusst niedrig gehalten weil "2x ist Pattern" stimmt
LOOKBACK_HOURS = 24
THRESHOLD = 3
CACHE_TTL_SEC = 300
# Hosts die wir IGNORIEREN — interne Endpoints / Defaults
_IGNORED_HOSTS = {
"aria-brain", "brain", "localhost", "127.0.0.1", "0.0.0.0",
"api.example.com", # template-default in skill_templates
"aria-bridge", "aria-rvs", "aria-qdrant", "aria-proxy", "aria-diagnostic",
"172.17.0.1", # docker-host-bridge
}
# Bekannte Hosts → Template-Vorschlag fuer scaffold
_SUGGESTIONS: dict[str, tuple[str, str, dict]] = {
"api.spotify.com": ("spotify", "oauth-api", {"service": "spotify"}),
"api.github.com": ("github", "oauth-api", {"service": "github", "base_url": "https://api.github.com"}),
"api.openai.com": ("openai", "apikey-api",
{"api_name": "OpenAI", "key_env": "OPENAI_API_KEY",
"base_url": "https://api.openai.com"}),
"api.openweathermap.org": ("openweather", "apikey-api",
{"api_name": "OpenWeather", "key_env": "OWM_API_KEY",
"base_url": "https://api.openweathermap.org"}),
"api.telegram.org": ("telegram", "apikey-api",
{"api_name": "Telegram-Bot", "key_env": "TELEGRAM_BOT_TOKEN",
"auth_header": "", "auth_prefix": "",
"base_url": "https://api.telegram.org"}),
"graph.microsoft.com": ("microsoft", "oauth-api",
{"service": "microsoft", "base_url": "https://graph.microsoft.com"}),
"discord.com": ("discord", "oauth-api",
{"service": "discord", "base_url": "https://discord.com/api"}),
"api.notion.com": ("notion", "oauth-api",
{"service": "notion", "base_url": "https://api.notion.com"}),
"reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
"oauth.reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
}
_cache: dict = {"computed_at": 0.0, "hints": []}
def invalidate_cache() -> None:
"""Cache leeren — sinnvoll nach skill_create / scaffold damit der neue
Skill sofort beim naechsten Aufruf erkannt wird."""
_cache.update(computed_at=0.0, hints=[])
def detect_recent_bypass(
existing_skills: list[dict],
since_sec: int = 600,
) -> list[dict]:
"""Findet Skill-Bypass-Vorfaelle: Bash-curl gegen einen Host fuer den
bereits ein matching Skill existiert. ARIA haette `run_<skill>` nutzen
sollen, hat aber gecurled. Das ist Drift — wir wollen es Brain merken.
Returns: liste {host, skill_name, count, last_ts} fuer Hosts wo ein
Bypass in den letzten `since_sec` Sekunden vorkam.
"""
if not AGENT_STREAM_LOG.exists() or not existing_skills:
return []
cutoff_ms = (time.time() - since_sec) * 1000
# Map host → matching skill_name
host_to_skill = {}
for s in existing_skills:
sname = (s.get("name") or "").lower()
if not sname:
continue
# Heuristik wie in _host_already_has_skill: stem des Skill-Namens
# mit Hostnamen verglichen. Fuer scaffolded skills nehmen wir den
# Skill-Namen als stem (z.B. "spotify" -> matched api.spotify.com)
host_to_skill[sname] = sname
bypass_events: dict[str, dict] = {}
try:
with AGENT_STREAM_LOG.open(encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
e = json.loads(line)
except Exception:
continue
if e.get("kind") != "tool_use":
continue
if (e.get("name") or "") != "Bash":
continue
ts = e.get("ts") or 0
if ts < cutoff_ms:
continue
for host in _extract_hosts_from_bash_input(e.get("input") or ""):
h = host.lower()
if h in _IGNORED_HOSTS:
continue
# Welcher Skill-Name matched diesen Host?
matched_skill = None
for skill_stem in host_to_skill:
if skill_stem in h:
matched_skill = host_to_skill[skill_stem]
break
if not matched_skill:
continue
entry = bypass_events.setdefault(h, {
"host": h, "skill_name": matched_skill,
"count": 0, "last_ts": 0,
})
entry["count"] += 1
if ts > entry["last_ts"]:
entry["last_ts"] = ts
except Exception as exc:
logger.warning("detect_recent_bypass: konnte log nicht lesen: %s", exc)
return []
return list(bypass_events.values())
def build_bypass_section(bypass_events: list[dict]) -> str:
"""Drastischer Block fuer den System-Prompt wenn ARIA gerade gegen einen
Host gecurled hat OBWOHL der Skill existiert. Inhalt soll sie spuerbar
ermahnen — wirkt nur in der aktuellen Session."""
if not bypass_events:
return ""
lines = [
"## 🚨 SKILL-BYPASS ERKANNT",
"",
"Du hast gerade — IN DEN LETZTEN MINUTEN — Bash-curl gegen Hosts "
"gemacht obwohl ein passender Skill existiert. Das ist Verschwendung: "
"5 Bash-Roundtrips à 3s statt 1 Tool-Call à 3s. Stefan wartet doppelt. "
"AB JETZT in diesem Chat:",
"",
]
for ev in bypass_events:
sname = ev["skill_name"]
host = ev["host"]
count = ev["count"]
safe = re.sub(r"[^a-zA-Z0-9_]", "_", sname)
lines.append(f"- gegen **{host}** ({count}x kuerzlich) → nutze "
f"`run_{safe}(...)` statt curl. "
f"Der Skill ist da. Nutze ihn.")
lines.append("")
return "\n".join(lines)
def _extract_hosts_from_bash_input(input_str: str) -> list[str]:
"""Hostnames aus URLs in einem Bash-Command. Sehr robust — sucht `https?://host`."""
if not input_str:
return []
return re.findall(r'https?://([a-zA-Z0-9.\-]+)', input_str)
def _host_already_has_skill(host: str, skills: list[dict]) -> bool:
"""Heuristik: Skill-Name enthaelt den 'wesentlichen' Teil des Hosts.
'api.spotify.com' → Stem 'spotify'. Wenn ein Skill 'spotify*' existiert: ja.
"""
parts = [p for p in host.split(".") if p and p not in ("api", "www", "oauth")]
if not parts:
return False
stem = parts[0].lower()
for s in skills:
sname = (s.get("name") or "").lower()
if stem and stem in sname:
return True
return False
def compute_hints(existing_skills: list[dict] | None = None, force: bool = False) -> list[dict]:
"""Aggregiert Bash-curl-Calls der letzten LOOKBACK_HOURS aus dem
agent_stream.jsonl. Returns Liste von Hinweisen, geordnet nach Count
absteigend; nur Hosts ohne matching Skill, nur >= THRESHOLD Calls.
Hint-Format: {host, count, lookback_hours, suggestion: (name, template, params) | None}
"""
skills = existing_skills or []
now = time.time()
if not force and (now - _cache["computed_at"]) < CACHE_TTL_SEC:
return _cache["hints"]
if not AGENT_STREAM_LOG.exists():
_cache.update(computed_at=now, hints=[])
return []
cutoff_ms = (now - LOOKBACK_HOURS * 3600) * 1000
counts: dict[str, int] = {}
try:
# Stream-Read damit grosse Files (50 MB cap) nicht in den Speicher kippen
with AGENT_STREAM_LOG.open(encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
e = json.loads(line)
except Exception:
continue
if e.get("kind") != "tool_use":
continue
if (e.get("name") or "") != "Bash":
continue
if (e.get("ts") or 0) < cutoff_ms:
continue
for host in _extract_hosts_from_bash_input(e.get("input") or ""):
h = host.lower()
if h in _IGNORED_HOSTS:
continue
counts[h] = counts.get(h, 0) + 1
except Exception as exc:
logger.warning("api_heuristic: konnte agent_stream nicht lesen: %s", exc)
return []
hints = []
for host, count in counts.items():
if count < THRESHOLD:
continue
if _host_already_has_skill(host, skills):
continue
hints.append({
"host": host,
"count": count,
"lookback_hours": LOOKBACK_HOURS,
"suggestion": _SUGGESTIONS.get(host),
})
hints.sort(key=lambda x: -x["count"])
_cache.update(computed_at=now, hints=hints)
return hints
def build_section(hints: list[dict]) -> str:
"""Formatiert einen kompakten System-Prompt-Block. Leer wenn nichts."""
if not hints:
return ""
lines = [
"## API-Heuristik (Cross-Session-Counter)",
"",
"Du hast in den letzten 24h diese externe(n) API(s) per Bash-curl "
"wiederholt angerufen, OHNE dass ein Skill dafuer existiert. Beim "
"naechsten Aufruf gegen einen dieser Hosts: BAUE ZUERST den Skill "
"via `skill_scaffold`, dann nutze ihn. Spart Stefan Wartezeit "
"und Dir Tool-Roundtrips.",
"",
]
for h in hints[:5]: # max 5 Eintraege damit Prompt nicht explodiert
sug = h.get("suggestion")
if sug:
name, tpl, params = sug
params_json = json.dumps(params, ensure_ascii=False)
sug_str = f"`skill_scaffold('{name}', '{tpl}', {params_json})`"
else:
sug_str = "`skill_scaffold` mit passendem Template (oauth-api / apikey-api)"
lines.append(f"- **{h['host']}** ({h['count']}x in 24h) → {sug_str}")
lines.append("")
return "\n".join(lines)