feat(brain): API-Heuristik — Cross-Session-Counter fuer Skill-Drift

Variante B: scaffold-reflex Regel allein reicht nicht weil jede Chat-
Anfrage eine eigene claude-CLI-Session ist. ARIA sieht in der aktuellen
Session nicht dass sie gestern auch schon 10x dieselbe API gecurled hat.
Beobachtung: 5+ Spotify-Bash-Calls hintereinander, kein Skill angelegt.

Loesung: Brain trackt server-side aus dem persistierten agent_stream.jsonl.
Bei jedem chat() wird der Log gescanned (cache 5min), Bash-curl-Calls
nach Hostname aggregiert. Hosts mit >=3 Calls in 24h ohne passenden
Skill landen als '## API-Heuristik'-Block im System-Prompt mit konkretem
skill_scaffold-Vorschlag.

Neue Module:
- aria-brain/api_heuristic.py:
  - compute_hints(existing_skills, force): Aggregiert + filtert
  - build_section(hints): formatiert als kompakten Markdown-Block
  - Smart suggestions mapping (api.spotify.com → oauth-api template etc.)
  - Ignoriert interne Hosts (aria-brain, localhost, docker-bridge)
  - 5-min Cache damit nicht jeder Turn die JSONL parst

- aria-brain/prompts.py: build_system_prompt nimmt api_heuristic_section
  als optionalen Block direkt nach Skills-Section.

- aria-brain/agent.py: vor build_system_prompt Heuristik berechnen mit
  aktueller Skill-Liste, Block durchreichen.

- 11. seed_rule scaffold-reflex umgeschrieben: kein 'in einer Session'
  mehr (das ergab keinen Sinn — jeder Turn neue Session). Stattdessen:
  '## API-Heuristik'-Block ist Dein Cross-Session-Gedaechtnis. Wenn da
  was steht: scaffolden BEVOR Du Bash machst.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 00:19:06 +02:00
parent 0540c49c66
commit 845a8b0020
5 changed files with 226 additions and 23 deletions
+1 -1
View File
@@ -402,7 +402,7 @@ jedem Chat-Turn im Hot-Memory-Block auf:
- **oauth-reauth-reflex** — bei 401: ZUERST `oauth_get_token` (Auto-Refresh), nur bei dessen Fehler `oauth_authorize`
- **no-skill-drift** — kein Drift vom Skill zu Ad-hoc-Bash-Befehlen. Skill kaputt? `skill_logs` + `skill_update`. Niemals nur SAGEN „ich baue dir einen Skill", wenn `skill_create` nicht wirklich gefeuert wird
- **runtime-topology** (architektur) — ARIA laeuft als `claude`-CLI-Subprocess IM aria-proxy Container (alpine — kein python3/jq), NICHT im aria-brain. `/data/skills/` und `BRAIN_INTERNAL_URL` existieren dort nicht. Brain-Resources via Brain-Tools (`oauth_get_token`, `memory_search`, `run_<skill>` …), nicht via Bash. SSH zur VM-Host via `ssh aria@host` (Key liegt im Proxy)
- **scaffold-reflex** — wenn dieselbe externe API 2× via Bash-curl angerufen wurde: SOFORT `skill_scaffold` aufrufen. Niedrige Hürde (Template statt vollständiger Skill-Code) → höhere Adoption
- **scaffold-reflex** — Brain trackt cross-session welche externen Hosts via Bash-curl wiederholt (≥3× in 24h) ohne passenden Skill aufgerufen wurden. Ergebnis landet als `## API-Heuristik`-Block im System-Prompt mit konkretem `skill_scaffold(...)`-Vorschlag → ARIA scaffolded statt zu curlen. Data-Source: `agent_stream.jsonl`, Cache 5 min
- **external-api-auth-strategy** — OAuth2 → `oauth_get_token`, sonst `config_schema`, NIEMALS hardcoden
### Skill-Scaffold (Templates)
+14 -1
View File
@@ -879,6 +879,18 @@ class Agent:
oauth_host = os.environ.get("RVS_HOST", "").strip()
oauth_port = os.environ.get("RVS_PORT_PUBLIC", os.environ.get("RVS_PORT", "443")).strip()
oauth_tls = os.environ.get("RVS_TLS", "true").strip().lower() != "false"
# API-Heuristik: wenn ARIA gegen externe APIs wiederholt via Bash
# gecurled hat (cross-session, aus persistiertem agent_stream.jsonl),
# injiziert das einen Hinweis-Block der ihr scaffolden empfiehlt.
api_heuristic_section = ""
try:
import api_heuristic as _ah
hints = _ah.compute_hints(existing_skills=all_skills)
api_heuristic_section = _ah.build_section(hints)
except Exception as exc:
logger.warning("api_heuristic fehlgeschlagen: %s", exc)
system_prompt = build_system_prompt(hot, cold, skills=all_skills,
triggers=all_triggers,
condition_vars=condition_vars,
@@ -887,7 +899,8 @@ class Agent:
oauth_services=oauth_services,
oauth_callback_host=oauth_host,
oauth_callback_port=oauth_port,
oauth_callback_tls=oauth_tls)
oauth_callback_tls=oauth_tls,
api_heuristic_section=api_heuristic_section)
messages = [ProxyMessage(role="system", content=system_prompt)]
for t in self.conversation.window():
messages.append(ProxyMessage(role=t.role, content=t.content))
+182
View File
@@ -0,0 +1,182 @@
"""
API-Heuristik — Cross-Session-Tracker fuer wiederkehrende externe API-Calls.
Problem: ARIA driftet bei trivialen API-Calls zu Bash-curl statt Skills
zu bauen. Die seed_rule "scaffold-reflex" greift nicht zuverlaessig weil
jede Chat-Anfrage eine eigene Claude-CLI-Session ist — in der aktuellen
Session sieht sie nicht dass dieselbe API gestern auch schon 10x via
curl angerufen wurde.
Loesung: Brain trackt server-side. Beim Bauen des System-Prompts wird
`agent_stream.jsonl` der letzten 24h gescannt, Bash-curl-Calls werden
nach Hostname aggregiert. Hosts ueber Schwelle bei denen noch kein
matching Skill existiert landen als Hinweis-Block im System-Prompt —
ARIA sieht "du machst 17x curl gegen api.spotify.com, scaffold bitte".
Caching: Ergebnis 5 min gehalten, sonst grep wir bei jedem Turn die
log-Datei. Bei <1 MB log file ist das eh schnell.
"""
from __future__ import annotations
import json
import logging
import re
import time
from pathlib import Path
logger = logging.getLogger(__name__)
AGENT_STREAM_LOG = Path("/shared/logs/agent_stream.jsonl")
# Schwellen / Lookback — bewusst niedrig gehalten weil "2x ist Pattern" stimmt
LOOKBACK_HOURS = 24
THRESHOLD = 3
CACHE_TTL_SEC = 300
# Hosts die wir IGNORIEREN — interne Endpoints / Defaults
_IGNORED_HOSTS = {
"aria-brain", "brain", "localhost", "127.0.0.1", "0.0.0.0",
"api.example.com", # template-default in skill_templates
"aria-bridge", "aria-rvs", "aria-qdrant", "aria-proxy", "aria-diagnostic",
"172.17.0.1", # docker-host-bridge
}
# Bekannte Hosts → Template-Vorschlag fuer scaffold
_SUGGESTIONS: dict[str, tuple[str, str, dict]] = {
"api.spotify.com": ("spotify", "oauth-api", {"service": "spotify"}),
"api.github.com": ("github", "oauth-api", {"service": "github", "base_url": "https://api.github.com"}),
"api.openai.com": ("openai", "apikey-api",
{"api_name": "OpenAI", "key_env": "OPENAI_API_KEY",
"base_url": "https://api.openai.com"}),
"api.openweathermap.org": ("openweather", "apikey-api",
{"api_name": "OpenWeather", "key_env": "OWM_API_KEY",
"base_url": "https://api.openweathermap.org"}),
"api.telegram.org": ("telegram", "apikey-api",
{"api_name": "Telegram-Bot", "key_env": "TELEGRAM_BOT_TOKEN",
"auth_header": "", "auth_prefix": "",
"base_url": "https://api.telegram.org"}),
"graph.microsoft.com": ("microsoft", "oauth-api",
{"service": "microsoft", "base_url": "https://graph.microsoft.com"}),
"discord.com": ("discord", "oauth-api",
{"service": "discord", "base_url": "https://discord.com/api"}),
"api.notion.com": ("notion", "oauth-api",
{"service": "notion", "base_url": "https://api.notion.com"}),
"reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
"oauth.reddit.com": ("reddit", "oauth-api",
{"service": "reddit", "base_url": "https://oauth.reddit.com"}),
}
_cache: dict = {"computed_at": 0.0, "hints": []}
def _extract_hosts_from_bash_input(input_str: str) -> list[str]:
"""Hostnames aus URLs in einem Bash-Command. Sehr robust — sucht `https?://host`."""
if not input_str:
return []
return re.findall(r'https?://([a-zA-Z0-9.\-]+)', input_str)
def _host_already_has_skill(host: str, skills: list[dict]) -> bool:
"""Heuristik: Skill-Name enthaelt den 'wesentlichen' Teil des Hosts.
'api.spotify.com' → Stem 'spotify'. Wenn ein Skill 'spotify*' existiert: ja.
"""
parts = [p for p in host.split(".") if p and p not in ("api", "www", "oauth")]
if not parts:
return False
stem = parts[0].lower()
for s in skills:
sname = (s.get("name") or "").lower()
if stem and stem in sname:
return True
return False
def compute_hints(existing_skills: list[dict] | None = None, force: bool = False) -> list[dict]:
"""Aggregiert Bash-curl-Calls der letzten LOOKBACK_HOURS aus dem
agent_stream.jsonl. Returns Liste von Hinweisen, geordnet nach Count
absteigend; nur Hosts ohne matching Skill, nur >= THRESHOLD Calls.
Hint-Format: {host, count, lookback_hours, suggestion: (name, template, params) | None}
"""
skills = existing_skills or []
now = time.time()
if not force and (now - _cache["computed_at"]) < CACHE_TTL_SEC:
return _cache["hints"]
if not AGENT_STREAM_LOG.exists():
_cache.update(computed_at=now, hints=[])
return []
cutoff_ms = (now - LOOKBACK_HOURS * 3600) * 1000
counts: dict[str, int] = {}
try:
# Stream-Read damit grosse Files (50 MB cap) nicht in den Speicher kippen
with AGENT_STREAM_LOG.open(encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
e = json.loads(line)
except Exception:
continue
if e.get("kind") != "tool_use":
continue
if (e.get("name") or "") != "Bash":
continue
if (e.get("ts") or 0) < cutoff_ms:
continue
for host in _extract_hosts_from_bash_input(e.get("input") or ""):
h = host.lower()
if h in _IGNORED_HOSTS:
continue
counts[h] = counts.get(h, 0) + 1
except Exception as exc:
logger.warning("api_heuristic: konnte agent_stream nicht lesen: %s", exc)
return []
hints = []
for host, count in counts.items():
if count < THRESHOLD:
continue
if _host_already_has_skill(host, skills):
continue
hints.append({
"host": host,
"count": count,
"lookback_hours": LOOKBACK_HOURS,
"suggestion": _SUGGESTIONS.get(host),
})
hints.sort(key=lambda x: -x["count"])
_cache.update(computed_at=now, hints=hints)
return hints
def build_section(hints: list[dict]) -> str:
"""Formatiert einen kompakten System-Prompt-Block. Leer wenn nichts."""
if not hints:
return ""
lines = [
"## API-Heuristik (Cross-Session-Counter)",
"",
"Du hast in den letzten 24h diese externe(n) API(s) per Bash-curl "
"wiederholt angerufen, OHNE dass ein Skill dafuer existiert. Beim "
"naechsten Aufruf gegen einen dieser Hosts: BAUE ZUERST den Skill "
"via `skill_scaffold`, dann nutze ihn. Spart Stefan Wartezeit "
"und Dir Tool-Roundtrips.",
"",
]
for h in hints[:5]: # max 5 Eintraege damit Prompt nicht explodiert
sug = h.get("suggestion")
if sug:
name, tpl, params = sug
params_json = json.dumps(params, ensure_ascii=False)
sug_str = f"`skill_scaffold('{name}', '{tpl}', {params_json})`"
else:
sug_str = "`skill_scaffold` mit passendem Template (oauth-api / apikey-api)"
lines.append(f"- **{h['host']}** ({h['count']}x in 24h) → {sug_str}")
lines.append("")
return "\n".join(lines)
+6
View File
@@ -340,12 +340,18 @@ def build_system_prompt(
oauth_callback_host: str = "",
oauth_callback_port: str = "443",
oauth_callback_tls: bool = True,
api_heuristic_section: str = "",
) -> str:
"""Kompletter System-Prompt: Hot + Cold + Skills + Triggers + FLUX + OAuth."""
parts = [build_hot_memory_section(pinned), "", build_time_section()]
if skills:
parts.append("")
parts.append(build_skills_section(skills))
if api_heuristic_section:
# Direkt nach Skills weil thematisch verwandt ("welche Skills gibt's, "
# welche Skills FEHLEN")
parts.append("")
parts.append(api_heuristic_section)
if condition_vars:
parts.append("")
parts.append(build_triggers_section(triggers or [], condition_vars, condition_funcs))
+23 -21
View File
@@ -211,32 +211,34 @@ SEED_RULES: List[dict] = [
{
"migration_key": "seed/skill-rule/scaffold-reflex",
"type": "rule",
"title": "Skill-Regel: skill_scaffold-Reflex bei wiederholten API-Calls",
"title": "Skill-Regel: skill_scaffold-Reflex (mit Cross-Session-Counter)",
"category": "skills",
"content": (
"Wenn Du dieselbe externe API in einer Session 2x oder oefter "
"via Bash-curl angerufen hast: HOER AUF und rufe SOFORT "
"`skill_scaffold` auf. Drei Spotify-Calls hintereinander, "
"drei Wetter-Abfragen, drei Calendar-Events — JEDES dieser "
"Pattern schreit nach einem Skill.\n"
"Brain trackt server-side wie oft Du in den letzten 24h dieselbe "
"externe API per Bash-curl angerufen hast (Cross-Session-Counter, "
"siehe '## API-Heuristik'-Block im System-Prompt). Sobald da "
"ein Eintrag steht: das ist KEINE Empfehlung sondern eine "
"Aufforderung. RUFE als ALLERERSTES `skill_scaffold` mit dem "
"vorgeschlagenen Template und params auf, BEVOR Du wieder Bash-"
"curl machst. Dann nutze den frischen Skill via `run_<name>`.\n"
"\n"
"skill_scaffold ist die niedrige Hürde: Du musst keinen "
"kompletten Python-Skill schreiben. Du waehlst nur:\n"
" - oauth-api fuer OAuth2-Services (Spotify, GitHub, Reddit, "
"Google, Discord) -- Token kommt vom Brain mit Auto-Refresh\n"
" - apikey-api fuer statische Keys (OpenWeather, OpenAI, "
"Twilio) -- Key landet im config_schema, Stefan setzt ihn in "
"Diagnostic\n"
" - file-process fuer Datei-In/Datei-Out (PDF, Bild, Daten)\n"
"Warum: jede Chat-Anfrage ist eine eigene Claude-CLI-Session — "
"Du siehst nicht dass Du gestern auch schon 10x Spotify gecurled "
"hast. Der API-Heuristik-Block ist Dein Cross-Session-Gedaechtnis. "
"Wenn er leer ist: alles OK, weitermachen. Wenn nicht: scaffolden.\n"
"\n"
"Brain expandiert das Template in ~1s zu einem fertigen Skill. "
"Falls Du was Spezielles brauchst: erst Scaffold, dann "
"`skill_update` mit Anpassung. NICHT umgekehrt — und schon gar "
"nicht das fuenfte Mal das gleiche Bash-Skript bauen.\n"
"Templates (ausfuehrliche Doku siehe skill_scaffold-Tool):\n"
" - **oauth-api**: Spotify/GitHub/Reddit/Google/Discord. Token "
"kommt vom Brain mit Auto-Refresh.\n"
" - **apikey-api**: OpenWeather/OpenAI/Twilio. Key landet im "
"config_schema → CFG_<NAME> ENV. Stefan setzt ihn in Diagnostic.\n"
" - **file-process**: PDF/Bild/JSON-Wandler. process()-Stub, "
"danach `skill_update` mit echtem Code.\n"
"\n"
"Belohnung: ein Spotify-Skill macht 'welches lied laeuft' in "
"1 Tool-Call (~3s) statt 5 Bash-Roundtrips (~20s). Stefan "
"merkt das sofort und ist zufriedener."
"Belohnung konkret: ein Spotify-Skill macht 'welches lied laeuft' "
"in 1 Tool-Call (~3s) statt 3-5 Bash-Roundtrips (~13-20s). Stefan "
"merkt das sofort. Ein einmaliger Scaffold-Aufwand spart hunderte "
"Bash-Roundtrips."
),
},
{