Files
ARIA-AGENT/aria-brain/api_heuristic.py
T
duffyduck 210ce62ffe feat(brain): Skill-Bypass-Detection + Bypass-Lehre als pinned Memory
Variante 3+ (Lerneffekt-Variante): Variante C scaffolded zwar Skills auto,
aber ARIA lernt nicht — sie wird beim naechsten Mal trotzdem zu Bash
greifen. Stefans Punkt: Lernen geht nur ueber Brain-Memory.

Mechanik:
1. api_heuristic.detect_recent_bypass(skills, since_sec=600):
   schaut letzte 10 Min im agent_stream.jsonl, findet Bash-curl gegen
   Hosts fuer die bereits ein matching Skill existiert. Returnt
   {host, skill_name, count, last_ts}.

2. api_heuristic.build_bypass_section(events):
   Drastischer Markdown-Block "## 🚨 SKILL-BYPASS ERKANNT" mit konkretem
   run_<skill>-Hint pro betroffenem Host. Landet direkt im System-Prompt
   noch VOR dem normalen API-Heuristik-Block.

3. agent.py._upsert_bypass_lesson(ev):
   Schreibt eine pinned type=rule Memory mit source=auto-feedback und
   migration_key=auto/skill-bypass/<skill_name>. Idempotent: bei
   Wiederholung wird die alte Memory ueberschrieben (Counter aktualisiert),
   keine Karteileichen. Content nennt konkret den run-Tool-Namen und
   Performance-Vergleich (3s Tool-Call vs 13-20s Bash-Wrapper).

Diese Memory ist permanent pinned → kommt bei jedem Chat-Turn,
cross-session, cross-restart als Hot-Memory durch. Damit lernt ARIA
es im wortlichen Sinne, nicht nur Reibung in der aktuellen Konversation.

Idempotenz wichtig: bei jedem Bypass-Detection-Lauf wird die Memory
upgedatet (nicht dupliziert). Stefan kann sie via Diagnostic-Gehirn-Tab
loeschen falls sie nervt.

Stefan-Frage beantwortet: 'sie wuerde es aber nur lernen wenn sie es
auch im gehirn speichert oder?' — exakt. Schimpfen im Prompt ist
Reibung dieser Session, pinned Memory ist permanenter Lerneffekt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 00:37:40 +02:00

282 lines
11 KiB
Python

"""
API-Heuristik — Cross-Session-Tracker fuer wiederkehrende externe API-Calls.
Problem: ARIA driftet bei trivialen API-Calls zu Bash-curl statt Skills
zu bauen. Die seed_rule "scaffold-reflex" greift nicht zuverlaessig weil
jede Chat-Anfrage eine eigene Claude-CLI-Session ist — in der aktuellen
Session sieht sie nicht dass dieselbe API gestern auch schon 10x via
curl angerufen wurde.
Loesung: Brain trackt server-side. Beim Bauen des System-Prompts wird
`agent_stream.jsonl` der letzten 24h gescannt, Bash-curl-Calls werden
nach Hostname aggregiert. Hosts ueber Schwelle bei denen noch kein
matching Skill existiert landen als Hinweis-Block im System-Prompt —
ARIA sieht "du machst 17x curl gegen api.spotify.com, scaffold bitte".
Caching: Ergebnis 5 min gehalten, sonst grep wir bei jedem Turn die
log-Datei. Bei <1 MB log file ist das eh schnell.
"""
from __future__ import annotations
import json
import logging
import re
import time
from pathlib import Path
logger = logging.getLogger(__name__)
AGENT_STREAM_LOG = Path("/shared/logs/agent_stream.jsonl")
# Schwellen / Lookback — bewusst niedrig gehalten weil "2x ist Pattern" stimmt
LOOKBACK_HOURS = 24
THRESHOLD = 3
CACHE_TTL_SEC = 300
# Hosts die wir IGNORIEREN — interne Endpoints / Defaults
_IGNORED_HOSTS = {
"aria-brain", "brain", "localhost", "127.0.0.1", "0.0.0.0",
"api.example.com", # template-default in skill_templates
"aria-bridge", "aria-rvs", "aria-qdrant", "aria-proxy", "aria-diagnostic",
"172.17.0.1", # docker-host-bridge
}
# Bekannte Hosts → Template-Vorschlag fuer scaffold
_SUGGESTIONS: dict[str, tuple[str, str, dict]] = {
"api.spotify.com": ("spotify", "oauth-api", {"service": "spotify"}),
"api.github.com": ("github", "oauth-api", {"service": "github", "base_url": "https://api.github.com"}),
"api.openai.com": ("openai", "apikey-api",
{"api_name": "OpenAI", "key_env": "OPENAI_API_KEY",
"base_url": "https://api.openai.com"}),
"api.openweathermap.org": ("openweather", "apikey-api",
{"api_name": "OpenWeather", "key_env": "OWM_API_KEY",
"base_url": "https://api.openweathermap.org"}),
"api.telegram.org": ("telegram", "apikey-api",
{"api_name": "Telegram-Bot", "key_env": "TELEGRAM_BOT_TOKEN",
"auth_header": "", "auth_prefix": "",
"base_url": "https://api.telegram.org"}),
"graph.microsoft.com": ("microsoft", "oauth-api",
{"service": "microsoft", "base_url": "https://graph.microsoft.com"}),
"discord.com": ("discord", "oauth-api",
{"service": "discord", "base_url": "https://discord.com/api"}),
"api.notion.com": ("notion", "oauth-api",
{"service": "notion", "base_url": "https://api.notion.com"}),
"reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
"oauth.reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
}
_cache: dict = {"computed_at": 0.0, "hints": []}
def invalidate_cache() -> None:
"""Cache leeren — sinnvoll nach skill_create / scaffold damit der neue
Skill sofort beim naechsten Aufruf erkannt wird."""
_cache.update(computed_at=0.0, hints=[])
def detect_recent_bypass(
existing_skills: list[dict],
since_sec: int = 600,
) -> list[dict]:
"""Findet Skill-Bypass-Vorfaelle: Bash-curl gegen einen Host fuer den
bereits ein matching Skill existiert. ARIA haette `run_<skill>` nutzen
sollen, hat aber gecurled. Das ist Drift — wir wollen es Brain merken.
Returns: liste {host, skill_name, count, last_ts} fuer Hosts wo ein
Bypass in den letzten `since_sec` Sekunden vorkam.
"""
if not AGENT_STREAM_LOG.exists() or not existing_skills:
return []
cutoff_ms = (time.time() - since_sec) * 1000
# Map host → matching skill_name
host_to_skill = {}
for s in existing_skills:
sname = (s.get("name") or "").lower()
if not sname:
continue
# Heuristik wie in _host_already_has_skill: stem des Skill-Namens
# mit Hostnamen verglichen. Fuer scaffolded skills nehmen wir den
# Skill-Namen als stem (z.B. "spotify" -> matched api.spotify.com)
host_to_skill[sname] = sname
bypass_events: dict[str, dict] = {}
try:
with AGENT_STREAM_LOG.open(encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
e = json.loads(line)
except Exception:
continue
if e.get("kind") != "tool_use":
continue
if (e.get("name") or "") != "Bash":
continue
ts = e.get("ts") or 0
if ts < cutoff_ms:
continue
for host in _extract_hosts_from_bash_input(e.get("input") or ""):
h = host.lower()
if h in _IGNORED_HOSTS:
continue
# Welcher Skill-Name matched diesen Host?
matched_skill = None
for skill_stem in host_to_skill:
if skill_stem in h:
matched_skill = host_to_skill[skill_stem]
break
if not matched_skill:
continue
entry = bypass_events.setdefault(h, {
"host": h, "skill_name": matched_skill,
"count": 0, "last_ts": 0,
})
entry["count"] += 1
if ts > entry["last_ts"]:
entry["last_ts"] = ts
except Exception as exc:
logger.warning("detect_recent_bypass: konnte log nicht lesen: %s", exc)
return []
return list(bypass_events.values())
def build_bypass_section(bypass_events: list[dict]) -> str:
"""Drastischer Block fuer den System-Prompt wenn ARIA gerade gegen einen
Host gecurled hat OBWOHL der Skill existiert. Inhalt soll sie spuerbar
ermahnen — wirkt nur in der aktuellen Session."""
if not bypass_events:
return ""
lines = [
"## 🚨 SKILL-BYPASS ERKANNT",
"",
"Du hast gerade — IN DEN LETZTEN MINUTEN — Bash-curl gegen Hosts "
"gemacht obwohl ein passender Skill existiert. Das ist Verschwendung: "
"5 Bash-Roundtrips à 3s statt 1 Tool-Call à 3s. Stefan wartet doppelt. "
"AB JETZT in diesem Chat:",
"",
]
for ev in bypass_events:
sname = ev["skill_name"]
host = ev["host"]
count = ev["count"]
lines.append(f"- gegen **{host}** ({count}x kuerzlich) → nutze "
f"`run_{sname.replace('-', '_')}(...)` statt curl. "
f"Der Skill ist da. Nutze ihn.")
lines.append("")
return "\n".join(lines)
def _extract_hosts_from_bash_input(input_str: str) -> list[str]:
"""Hostnames aus URLs in einem Bash-Command. Sehr robust — sucht `https?://host`."""
if not input_str:
return []
return re.findall(r'https?://([a-zA-Z0-9.\-]+)', input_str)
def _host_already_has_skill(host: str, skills: list[dict]) -> bool:
"""Heuristik: Skill-Name enthaelt den 'wesentlichen' Teil des Hosts.
'api.spotify.com' → Stem 'spotify'. Wenn ein Skill 'spotify*' existiert: ja.
"""
parts = [p for p in host.split(".") if p and p not in ("api", "www", "oauth")]
if not parts:
return False
stem = parts[0].lower()
for s in skills:
sname = (s.get("name") or "").lower()
if stem and stem in sname:
return True
return False
def compute_hints(existing_skills: list[dict] | None = None, force: bool = False) -> list[dict]:
"""Aggregiert Bash-curl-Calls der letzten LOOKBACK_HOURS aus dem
agent_stream.jsonl. Returns Liste von Hinweisen, geordnet nach Count
absteigend; nur Hosts ohne matching Skill, nur >= THRESHOLD Calls.
Hint-Format: {host, count, lookback_hours, suggestion: (name, template, params) | None}
"""
skills = existing_skills or []
now = time.time()
if not force and (now - _cache["computed_at"]) < CACHE_TTL_SEC:
return _cache["hints"]
if not AGENT_STREAM_LOG.exists():
_cache.update(computed_at=now, hints=[])
return []
cutoff_ms = (now - LOOKBACK_HOURS * 3600) * 1000
counts: dict[str, int] = {}
try:
# Stream-Read damit grosse Files (50 MB cap) nicht in den Speicher kippen
with AGENT_STREAM_LOG.open(encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
e = json.loads(line)
except Exception:
continue
if e.get("kind") != "tool_use":
continue
if (e.get("name") or "") != "Bash":
continue
if (e.get("ts") or 0) < cutoff_ms:
continue
for host in _extract_hosts_from_bash_input(e.get("input") or ""):
h = host.lower()
if h in _IGNORED_HOSTS:
continue
counts[h] = counts.get(h, 0) + 1
except Exception as exc:
logger.warning("api_heuristic: konnte agent_stream nicht lesen: %s", exc)
return []
hints = []
for host, count in counts.items():
if count < THRESHOLD:
continue
if _host_already_has_skill(host, skills):
continue
hints.append({
"host": host,
"count": count,
"lookback_hours": LOOKBACK_HOURS,
"suggestion": _SUGGESTIONS.get(host),
})
hints.sort(key=lambda x: -x["count"])
_cache.update(computed_at=now, hints=hints)
return hints
def build_section(hints: list[dict]) -> str:
"""Formatiert einen kompakten System-Prompt-Block. Leer wenn nichts."""
if not hints:
return ""
lines = [
"## API-Heuristik (Cross-Session-Counter)",
"",
"Du hast in den letzten 24h diese externe(n) API(s) per Bash-curl "
"wiederholt angerufen, OHNE dass ein Skill dafuer existiert. Beim "
"naechsten Aufruf gegen einen dieser Hosts: BAUE ZUERST den Skill "
"via `skill_scaffold`, dann nutze ihn. Spart Stefan Wartezeit "
"und Dir Tool-Roundtrips.",
"",
]
for h in hints[:5]: # max 5 Eintraege damit Prompt nicht explodiert
sug = h.get("suggestion")
if sug:
name, tpl, params = sug
params_json = json.dumps(params, ensure_ascii=False)
sug_str = f"`skill_scaffold('{name}', '{tpl}', {params_json})`"
else:
sug_str = "`skill_scaffold` mit passendem Template (oauth-api / apikey-api)"
lines.append(f"- **{h['host']}** ({h['count']}x in 24h) → {sug_str}")
lines.append("")
return "\n".join(lines)