feat(brain): Phase B — Vector-DB-Memory, Conversation-Loop, Skills, Tool-Use

OpenClaw (aria-core) ist raus, ARIA laeuft jetzt mit eigenem Agent-Framework
im aria-brain Container. Vector-DB-basiertes Gedaechtnis statt Sessions,
eigener Conversation-Loop mit Hot+Cold-Memory + Rolling Window, Tool-Use
fuer Skills, Memory-Destillat-Pipeline.

aria-brain/ (neuer Container)
  - main.py            FastAPI auf 8080, alle Endpoints
  - agent.py           Conversation-Loop mit Tool-Use (skill_create + run_<skill>)
  - conversation.py    Rolling Window, JSONL-Persistenz, Distill-Marker
  - proxy_client.py    httpx-Wrapper zum Claude-Proxy, OpenAI-Format
  - prompts.py         System-Prompt aus Hot+Cold+Skills
  - migration.py       Markdown-Parser fuer brain-import/ → atomare Memories
  - skills.py          Filesystem-Layer fuer /data/skills/<name>/ (Python-only,
                       venv pro Skill, tar.gz Export/Import, Run-Logs)
  - memory/            Embedder (sentence-transformers, multilingual MiniLM)
                       + VectorStore (Qdrant-Wrapper)

docker-compose.yml
  - aria-core (OpenClaw) raus, openclaw-config Volume raus
  - aria-brain Service (FastAPI + Memory)
  - aria-qdrant Service (Vector-DB) mit Bind-Mount aria-data/brain/qdrant/
  - Diagnostic teilt jetzt Netzwerk mit Bridge (vorher: aria-core)
  - Brain bekommt SSH-Mount fuer aria-wohnung + /import fuer brain-import/

bridge/aria_bridge.py
  - send_to_core → HTTP-Call an aria-brain:8080/chat (statt OpenClaw-WS)
  - aria-core-spezifische Handler raus: doctor_fix, aria_restart,
    aria_session_reset, Auto-Compact-Logik, OpenClaw-Handshake
  - Generischer container_restart-Handler (Whitelist Bridge/Brain/Qdrant)
  - Side-Channel-Events aus /chat-Response (z.B. skill_created) werden
    als RVS-Events forwarded
  - file_list_request / file_delete_request → an Diagnostic forwarded
  - Tote OpenClaw-Connection-Logik bleibt im Code als Referenz (nicht aktiv)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-11 22:23:17 +02:00
parent d0cb7acd10
commit 70d1500096
14 changed files with 2572 additions and 149 deletions
+163 -116
View File
@@ -536,14 +536,9 @@ class ARIABridge:
# sprengt die argv-Liste beim Claude-Subprocess-Spawn (E2BIG). Bei
# COMPACT_AFTER erreicht → Sessions reset + Container restart.
# Counter ueberlebt Bridge-Restart nicht (frischer Zaehler beim Start ok).
self._user_message_count: int = 0
# Aus runtime.json gelesen (Diagnostic → Einstellungen → Compact-Schwelle)
# Default 140, 0 = deaktiviert
try:
rt = json.loads(Path("/shared/config/runtime.json").read_text()) if Path("/shared/config/runtime.json").exists() else {}
self._compact_after = int(rt.get("compactAfterMessages", 140))
except Exception:
self._compact_after = 140
# _user_message_count + _compact_after entfallen — Auto-Compact war
# aria-core-spezifisch (E2BIG-Schutz). Der neue Brain-Loop kennt
# diese Begrenzung nicht.
# Pending Files: wenn die App ein Bild + Text gleichzeitig schickt, kommen
# zwei separate RVS-Events ('file' und 'chat') — wir buffern die Files
# kurz und mergen sie mit dem nachfolgenden Chat-Text zu einer einzigen
@@ -1176,73 +1171,108 @@ class ARIABridge:
await self.send_to_core(text, source="app-file+chat")
return True
async def _trigger_session_reset(self) -> None:
"""Sessions loeschen + Container restart via Diagnostic HTTP-API."""
try:
req = urllib.request.Request(
"http://localhost:3001/api/aria-session-reset",
data=b"{}",
method="POST",
headers={"Content-Type": "application/json"},
)
def _do_reset():
try:
with urllib.request.urlopen(req, timeout=45) as resp:
return resp.status
except Exception as e:
return f"err:{e}"
result = await asyncio.get_event_loop().run_in_executor(None, _do_reset)
logger.info("[core] Session-Reset Result: %s", result)
except Exception as e:
logger.warning("[core] Session-Reset Trigger fehlgeschlagen: %s", e)
async def send_to_core(self, text: str, source: str = "bridge") -> None:
"""Sendet Text an aria-core (OpenClaw chat.send Protokoll)."""
if self.ws_core is None:
logger.error("[core] Nicht verbunden — Nachricht verworfen: '%s'", text[:60])
return
"""Sendet Text an aria-brain (HTTP /chat) und broadcastet die Antwort.
# Auto-Compact: bei zu vielen User-Messages laeuft argv beim Subprocess-
# Spawn ueber (E2BIG). Vor send pruefen, ggf. Sessions resetten.
if source.startswith("app") and self._compact_after > 0:
self._user_message_count += 1
if self._user_message_count >= self._compact_after:
logger.warning("[core] Auto-Compact: %d Messages erreicht — Session-Reset",
self._user_message_count)
self._user_message_count = 0
# Reset triggern via Diagnostic (asynchron, blockiert send nicht)
asyncio.create_task(self._trigger_session_reset())
# User informieren — der naechste Request kommt erst nach Restart durch
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Compact] Konversation war lang ({self._compact_after} Nachrichten) — Session wurde geleert, ARIA startet frisch. Deine letzte Nachricht bitte gleich nochmal senden.",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
Nicht-Streaming: wir warten bis Brain fertig ist, dann pushen wir
die komplette Reply via RVS an alle Clients (App + Diagnostic).
TTS wird vom Bridge-Code separat angestossen (gleiche Logik wie
vorher mit aria-core).
"""
brain_url = os.environ.get("BRAIN_URL", "http://aria-brain:8080")
url = f"{brain_url}/chat"
payload = json.dumps({"message": text, "source": source}).encode("utf-8")
logger.info("[brain] chat ← %s '%s'", source, text[:80])
# Aktive Session vom Diagnostic holen
self._fetch_active_session()
req_id = self._next_req_id()
message = json.dumps({
"type": "req",
"id": req_id,
"method": "chat.send",
"params": {
"sessionKey": self._session_key,
"message": text,
"idempotencyKey": str(uuid.uuid4()),
},
# agent_activity broadcasten (App + Diagnostic "ARIA denkt..." Indicator)
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": "thinking"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
def _do_call():
try:
req = urllib.request.Request(
url, data=payload, method="POST",
headers={"Content-Type": "application/json"},
)
# Cold-Start kann lange dauern, 5min Timeout
with urllib.request.urlopen(req, timeout=300) as resp:
return resp.status, resp.read().decode("utf-8", errors="ignore")
except Exception as exc:
return None, str(exc)
status, body = await asyncio.get_event_loop().run_in_executor(None, _do_call)
if status != 200:
logger.error("[brain] /chat fehlgeschlagen: status=%s body=%s", status, body[:200])
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": "idle"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": f"[Brain-Fehler] {body[:200] or 'unbekannt'}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
try:
await self.ws_core.send(message)
logger.info("[core] chat.send (%s, id=%s): '%s'", source, req_id, text[:80])
data = json.loads(body)
except Exception:
logger.exception("[core] Sendefehler")
logger.error("[brain] /chat lieferte ungueltiges JSON: %s", body[:200])
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": "idle"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
reply = (data.get("reply") or "").strip()
if not reply:
logger.warning("[brain] /chat: leerer Reply")
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": "idle"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
return
# Side-Channel-Events VOR der Chat-Bubble broadcasten (z.B. skill_created)
# damit sie in der UI vor der Reply auftauchen
for event in data.get("events", []) or []:
etype = event.get("type")
if etype == "skill_created":
await self._send_to_rvs({
"type": "skill_created",
"payload": event.get("skill", {}),
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
logger.info("[brain] ARIA hat einen Skill erstellt: %s",
event.get("skill", {}).get("name"))
# _process_core_response uebernimmt alles weitere:
# File-Marker extrahieren + broadcasten, NO_REPLY-Check, Chat-
# Broadcast an RVS, TTS, agent_activity idle. Wir geben das
# raw payload mit dem reply rein damit Mode/voice-Metadata
# passend behandelt wird (hier minimal, weil Brain noch keine
# metadata mitschickt).
try:
await self._process_core_response(reply, {})
except Exception:
logger.exception("[brain] _process_core_response Fehler")
await self._send_to_rvs({
"type": "agent_activity",
"payload": {"activity": "idle"},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
if data.get("distilling"):
logger.info("[brain] Destillat laeuft im Hintergrund")
# ── RVS Verbindung (App-Relay) ──────────────────────────
@@ -1627,21 +1657,67 @@ class ARIABridge:
except Exception as e:
logger.warning("[rvs] file_saved konnte nicht an App gesendet werden: %s", e)
elif msg_type == "aria_session_reset":
# Manueller Compact-Trigger: Sessions weg + Restart
logger.warning("[rvs] aria_session_reset Request von App")
self._user_message_count = 0
asyncio.create_task(self._trigger_session_reset())
return
elif msg_type == "aria_restart":
# App-Button "ARIA hart neu starten" → docker restart aria-core
# via Diagnostic (der hat den Docker-Socket gemountet).
logger.warning("[rvs] aria_restart Request von App — harter Container-Restart")
elif msg_type == "file_list_request":
# App fragt die Liste aller /shared/uploads/-Dateien an.
logger.info("[rvs] file_list_request von App")
try:
req = urllib.request.Request(
"http://localhost:3001/api/aria-restart",
data=b"{}",
"http://localhost:3001/api/files-list",
method="GET",
)
def _do_list():
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8", errors="ignore"))
except Exception as e:
return {"ok": False, "error": str(e)}
d = await asyncio.get_event_loop().run_in_executor(None, _do_list)
await self._send_to_rvs({
"type": "file_list_response",
"payload": d,
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.warning("[rvs] file_list_request: %s", e)
return
elif msg_type == "file_delete_request":
# App will eine Datei loeschen — leite an Diagnostic.
p = payload.get("path", "")
logger.warning("[rvs] file_delete_request von App: %s", p)
try:
body_bytes = json.dumps({"path": p}).encode("utf-8")
req = urllib.request.Request(
"http://localhost:3001/api/files-delete",
data=body_bytes,
method="POST",
headers={"Content-Type": "application/json"},
)
def _do_delete():
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status, resp.read().decode("utf-8", errors="ignore")
except Exception as e:
return None, str(e)
status, body = await asyncio.get_event_loop().run_in_executor(None, _do_delete)
logger.info("[rvs] file_delete_request %s: status=%s", p, status)
# Diagnostic broadcastet file_deleted via sendToRVS_raw — kommt
# ueber den persistenten WS-Path zur App. Wir bestaetigen
# zusaetzlich, damit der Caller sicher ist dass es durch ist.
except Exception as e:
logger.warning("[rvs] file_delete_request: %s", e)
return
elif msg_type == "container_restart":
# App-Button "Container neu" — leitet generisch an Diagnostic
# weiter. Whitelist ist im Diagnostic-Server.
name = payload.get("name", "")
logger.warning("[rvs] container_restart Request von App: %s", name)
try:
body_bytes = json.dumps({"name": name}).encode("utf-8")
req = urllib.request.Request(
"http://localhost:3001/api/container-restart",
data=body_bytes,
method="POST",
headers={"Content-Type": "application/json"},
)
@@ -1652,49 +1728,19 @@ class ARIABridge:
except Exception as e:
return None, str(e)
status, body = await asyncio.get_event_loop().run_in_executor(None, _do_restart)
logger.info("[rvs] aria_restart Result: status=%s", status)
# Note: bei erfolgreichem Restart ist die RVS-Verbindung sehr
# wahrscheinlich kurz weg (aria-bridge ist im service:aria-Network).
# Die Antwort kommt evtl. nicht mehr durch — egal.
except Exception as e:
logger.warning("[rvs] aria_restart Weiterleitung fehlgeschlagen: %s", e)
return
elif msg_type == "doctor_fix":
# App-Button "ARIA reparieren" → openclaw doctor --fix anstossen.
# Bridge erreicht aria-core nicht via docker (kein docker-socket
# gemountet), aber der Diagnostic-Server hat den Socket. HTTP-Call
# an http://localhost:3001/api/doctor-fix.
logger.info("[rvs] doctor_fix Request von App — leite an Diagnostic weiter")
try:
req = urllib.request.Request(
"http://localhost:3001/api/doctor-fix",
data=b"{}",
method="POST",
headers={"Content-Type": "application/json"},
)
# Blocking call ist OK weil openclaw doctor schnell durchlaeuft.
# In Executor laufen lassen damit der asyncio-Loop nicht blockt.
def _do_fix():
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, resp.read().decode("utf-8", errors="ignore")
except Exception as e:
return None, str(e)
status, body = await asyncio.get_event_loop().run_in_executor(None, _do_fix)
logger.info("[rvs] container_restart %s Result: status=%s", name, status)
ok = status == 200
logger.info("[rvs] doctor_fix Result: status=%s ok=%s", status, ok)
await self._send_to_rvs({
"type": "chat",
"payload": {
"text": "[Reparatur] ARIA wurde durchgecheckt — sollte wieder antworten." if ok
else f"[Reparatur] Fehlgeschlagen: {body[:200]}",
"text": f"[Container] {name} neu gestartet." if ok
else f"[Container] Restart {name} fehlgeschlagen: {body[:200]}",
"sender": "aria",
},
"timestamp": int(asyncio.get_event_loop().time() * 1000),
})
except Exception as e:
logger.warning("[rvs] doctor_fix Weiterleitung fehlgeschlagen: %s", e)
logger.warning("[rvs] container_restart Weiterleitung fehlgeschlagen: %s", e)
return
elif msg_type == "file_request":
@@ -2122,7 +2168,8 @@ class ARIABridge:
self.running = True
tasks = [
asyncio.create_task(self.connect_to_core()),
# connect_to_core entfaellt — Bridge ruft jetzt aria-brain ueber
# HTTP (siehe send_to_core). Keine persistente WS-Verbindung mehr.
asyncio.create_task(self.connect_to_rvs()),
]