From bf3dc635d911b58f0f4a41f558848f91d9a45109 Mon Sep 17 00:00:00 2001 From: duffyduck Date: Fri, 15 May 2026 11:07:39 +0200 Subject: [PATCH] feat(brain): Live-Tool-Events im Gedanken-Stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Proxy-Patch hookt Claude-CLI `assistant`-Events: bei jedem tool_use- Block (Bash, Read, Edit, Grep, ...) wird per HTTP-POST an die Bridge gemeldet. Bridge spiegelt das als `agent_activity tool=` an die RVS-Clients. App- und Diagnostic-Gedanken-Stream zeigen damit live mit was ARIA gerade macht — vorher kam pro Brain-Call nur EIN „💭 denkt" am Anfang und EIN „✓ fertig" am Ende. Drei neue Bausteine: - proxy-patches/routes.js: kompletter Replacement der npm-Version mit `_attachToolHook(subprocess)` — feuert pro tool_use-Block ein HTTP- POST an http://aria-bridge:8090/internal/agent-activity (URL via ARIA_TOOL_HOOK_URL Env-Variable ueberschreibbar). Fire-and-forget, fail-open — Brain-Call bricht NICHT ab wenn Bridge mal nicht da ist. - docker-compose.yml: vierter cp-Schritt im proxy-Service kopiert routes.js ueber die npm-Version (analog zu openai-to-cli + cli-to- openai). - bridge/aria_bridge.py: neuer `/internal/agent-activity`-Endpoint im bestehenden _serve_internal_http. Plus _emit_activity hat jetzt force=True-Param damit wiederholte gleiche Tool-Aufrufe (3x Bash in Folge) als drei Eintraege im Stream sichtbar bleiben. App + Diagnostic: pushThought-Dedup laesst tool-Events durch (3x Bash hintereinander gibt 3 Eintraege im Gedanken-Stream). Co-Authored-By: Claude Opus 4.7 (1M context) --- android/src/screens/ChatScreen.tsx | 7 +- bridge/aria_bridge.py | 29 ++- diagnostic/index.html | 6 +- docker-compose.yml | 1 + proxy-patches/routes.js | 309 +++++++++++++++++++++++++++++ 5 files changed, 345 insertions(+), 7 deletions(-) create mode 100644 proxy-patches/routes.js diff --git a/android/src/screens/ChatScreen.tsx b/android/src/screens/ChatScreen.tsx index aea6b5c..bf3928c 100644 --- a/android/src/screens/ChatScreen.tsx +++ b/android/src/screens/ChatScreen.tsx @@ -1024,9 +1024,12 @@ const ChatScreen: React.FC = () => { const tool = (message.payload.tool as string) || ''; setAgentActivity({ activity, tool }); // In den Gedanken-Stream einfuegen. Dedup gegen identische Folge- - // Events (z.B. zwei mal 'thinking' direkt hintereinander). + // Events (z.B. zwei mal 'thinking' direkt hintereinander). Tool- + // Events NIE dedupen — wenn ARIA dreimal Bash hintereinander ruft, + // sollen alle drei sichtbar sein. const key = `${activity}|${tool}`; - if (key !== lastThoughtKeyRef.current) { + const isTool = activity === 'tool'; + if (isTool || key !== lastThoughtKeyRef.current) { lastThoughtKeyRef.current = key; setThoughts(prev => { const next = [...prev, { ts: Date.now(), activity, tool }]; diff --git a/bridge/aria_bridge.py b/bridge/aria_bridge.py index 8dc99b6..6a8a8ca 100644 --- a/bridge/aria_bridge.py +++ b/bridge/aria_bridge.py @@ -2519,17 +2519,22 @@ class ARIABridge: status = await asyncio.get_event_loop().run_in_executor(None, _do_request) logger.info("[cancel] Diagnostic /api/cancel: %s", status) - async def _emit_activity(self, activity: str, tool: str = "") -> None: + async def _emit_activity(self, activity: str, tool: str = "", force: bool = False) -> None: """Sendet agent_activity an die App — nur wenn sich der State geaendert hat. Trailing Agent-Events nach chat:final werden 3s lang unterdrueckt - (nur 'idle' kommt immer durch).""" + (nur 'idle' kommt immer durch). + + force=True: kein State-Dedup — wird vom Proxy-Tool-Hook genutzt + damit auch wiederholte gleiche Tool-Aufrufe (z.B. 3x Bash + hintereinander) im Gedanken-Stream als eigene Eintraege sichtbar + bleiben.""" if activity != "idle" and self._last_chat_final_at > 0: since_final = asyncio.get_event_loop().time() - self._last_chat_final_at if since_final < 3.0: return state = (activity, tool) - if state == self._last_activity_state: + if not force and state == self._last_activity_state: return self._last_activity_state = state await self._send_to_rvs({ @@ -2677,6 +2682,24 @@ class ARIABridge: self._handle_trigger_fired(reply, trigger_name, ttype, events) ) await _send_response(writer, 200, {"ok": True}) + elif method == "POST" and path == "/internal/agent-activity": + # Vom Proxy gefeuert bei jedem Claude-Code-tool_use-Event + # (Bash, Read, Edit, Grep, ...). Wir spiegeln das als + # RVS agent_activity an App+Diagnostic damit der Gedanken- + # Stream live mitlaufen kann. + try: + data = json.loads(body.decode("utf-8", "ignore")) + except Exception as exc: + await _send_response(writer, 400, {"error": f"bad json: {exc}"}) + return + tool = (data.get("tool") or "").strip() + if not tool: + await _send_response(writer, 400, {"error": "tool erforderlich"}) + return + # Force-emit (kein Dedup): User soll JEDEN Tool-Call sehen + # selbst wenn derselbe Name zweimal in Folge kommt. + asyncio.create_task(self._emit_activity("tool", tool, force=True)) + await _send_response(writer, 200, {"ok": True}) elif method == "POST" and path == "/internal/delete-chat-message": try: data = json.loads(body.decode("utf-8", "ignore")) diff --git a/diagnostic/index.html b/diagnostic/index.html index a24d9e1..3ac186d 100644 --- a/diagnostic/index.html +++ b/diagnostic/index.html @@ -2252,9 +2252,11 @@ } function pushThought(activity, tool) { - // Dedup gegen direkt aufeinanderfolgende identische Events + // Dedup gegen direkt aufeinanderfolgende identische Events. Tool- + // Events NIE dedupen — drei Bash-Calls in Folge sollen drei Eintraege + // ergeben, nicht einen. const key = `${activity}|${tool || ''}`; - if (key === lastThoughtKey) return; + if (activity !== 'tool' && key === lastThoughtKey) return; lastThoughtKey = key; thoughtStream.push({ ts: Date.now(), activity, tool: tool || '' }); if (thoughtStream.length > MAX_THOUGHTS) thoughtStream = thoughtStream.slice(-MAX_THOUGHTS); diff --git a/docker-compose.yml b/docker-compose.yml index 2f955c0..94d33dc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,7 @@ services: sed -i 's/const DEFAULT_TIMEOUT = 300000;/const DEFAULT_TIMEOUT = 1200000;/' $$DIST/subprocess/manager.js && cp /proxy-patches/openai-to-cli.js $$DIST/adapter/openai-to-cli.js && cp /proxy-patches/cli-to-openai.js $$DIST/adapter/cli-to-openai.js && + cp /proxy-patches/routes.js $$DIST/server/routes.js && claude-max-api" volumes: - ~/.claude:/root/.claude # Claude CLI Auth (Credentials in /root/.claude/.credentials.json) diff --git a/proxy-patches/routes.js b/proxy-patches/routes.js new file mode 100644 index 0000000..100ad62 --- /dev/null +++ b/proxy-patches/routes.js @@ -0,0 +1,309 @@ +/** + * ARIA-patched API Route Handlers + * + * Erweiterung der npm-Version von claude-max-api-proxy: + * - Bei jedem Claude-CLI-`assistant`-Event mit tool_use-Block (Bash, Read, + * Edit, Grep, …) wird ein HTTP-POST an die Bridge gefeuert + * (ARIA_TOOL_HOOK_URL, default http://aria-bridge:8090/internal/agent-activity). + * Bridge spiegelt das als RVS `agent_activity` an App+Diagnostic → + * Gedanken-Stream zeigt live was ARIA gerade tool-maessig macht. + * - Fire-and-forget, fail-open. Wenn die Bridge nicht antwortet, bricht + * der Brain-Call NICHT ab. + * + * Wird zur Container-Startzeit ueber die npm-Version geschrieben + * (siehe docker-compose.yml proxy-Block). + */ +import { v4 as uuidv4 } from "uuid"; +import http from "http"; +import { ClaudeSubprocess } from "../subprocess/manager.js"; +import { openaiToCli } from "../adapter/openai-to-cli.js"; +import { cliResultToOpenai, createDoneChunk, } from "../adapter/cli-to-openai.js"; + +const TOOL_HOOK_URL = process.env.ARIA_TOOL_HOOK_URL + || "http://aria-bridge:8090/internal/agent-activity"; + +/** + * Pusht einen Tool-Use-Event an die Bridge. Fire-and-forget — keine Awaits, + * keine Fehler nach oben. Logged Fehler still. + */ +function _emitToolEvent(toolName) { + if (!toolName) return; + try { + const u = new URL(TOOL_HOOK_URL); + const body = JSON.stringify({ tool: String(toolName) }); + const req = http.request({ + method: "POST", + hostname: u.hostname, + port: u.port || 80, + path: u.pathname, + headers: { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(body) }, + timeout: 2000, + }, (res) => { res.resume(); }); + req.on("error", () => {}); + req.on("timeout", () => req.destroy()); + req.write(body); + req.end(); + } catch (_) { /* niemals weiterwerfen */ } +} + +/** + * Hookt die `assistant`-Events des Subprozesses. Jedes assistant-Message + * kann mehrere content-Bloecke haben — tool_use-Bloecke pushen wir live. + */ +function _attachToolHook(subprocess) { + subprocess.on("assistant", (message) => { + try { + const blocks = message?.message?.content || []; + for (const b of blocks) { + if (b && b.type === "tool_use" && b.name) { + _emitToolEvent(b.name); + } + } + } catch (_) { /* fail-open */ } + }); +} +/** + * Handle POST /v1/chat/completions + * + * Main endpoint for chat requests, supports both streaming and non-streaming + */ +export async function handleChatCompletions(req, res) { + const requestId = uuidv4().replace(/-/g, "").slice(0, 24); + const body = req.body; + const stream = body.stream === true; + try { + // Validate request + if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) { + res.status(400).json({ + error: { + message: "messages is required and must be a non-empty array", + type: "invalid_request_error", + code: "invalid_messages", + }, + }); + return; + } + // Convert to CLI input format + const cliInput = openaiToCli(body); + const subprocess = new ClaudeSubprocess(); + // ARIA-Patch: Tool-Use-Events live an die Bridge weiterleiten. + // Greift fuer beide Branches (stream + non-stream). + _attachToolHook(subprocess); + if (stream) { + await handleStreamingResponse(req, res, subprocess, cliInput, requestId); + } + else { + await handleNonStreamingResponse(res, subprocess, cliInput, requestId); + } + } + catch (error) { + const message = error instanceof Error ? error.message : "Unknown error"; + console.error("[handleChatCompletions] Error:", message); + if (!res.headersSent) { + res.status(500).json({ + error: { + message, + type: "server_error", + code: null, + }, + }); + } + } +} +/** + * Handle streaming response (SSE) + * + * IMPORTANT: The Express req.on("close") event fires when the request body + * is fully received, NOT when the client disconnects. For SSE connections, + * we use res.on("close") to detect actual client disconnection. + */ +async function handleStreamingResponse(req, res, subprocess, cliInput, requestId) { + // Set SSE headers + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Request-Id", requestId); + // CRITICAL: Flush headers immediately to establish SSE connection + // Without this, headers are buffered and client times out waiting + res.flushHeaders(); + // Send initial comment to confirm connection is alive + res.write(":ok\n\n"); + return new Promise((resolve, reject) => { + let isFirst = true; + let lastModel = "claude-sonnet-4"; + let isComplete = false; + // Handle actual client disconnect (response stream closed) + res.on("close", () => { + if (!isComplete) { + // Client disconnected before response completed - kill subprocess + subprocess.kill(); + } + resolve(); + }); + // Handle streaming content deltas + subprocess.on("content_delta", (event) => { + const text = event.event.delta?.text || ""; + if (text && !res.writableEnded) { + const chunk = { + id: `chatcmpl-${requestId}`, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model: lastModel, + choices: [{ + index: 0, + delta: { + role: isFirst ? "assistant" : undefined, + content: text, + }, + finish_reason: null, + }], + }; + res.write(`data: ${JSON.stringify(chunk)}\n\n`); + isFirst = false; + } + }); + // Handle final assistant message (for model name) + subprocess.on("assistant", (message) => { + lastModel = message.message.model; + }); + subprocess.on("result", (_result) => { + isComplete = true; + if (!res.writableEnded) { + // Send final done chunk with finish_reason + const doneChunk = createDoneChunk(requestId, lastModel); + res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); + res.write("data: [DONE]\n\n"); + res.end(); + } + resolve(); + }); + subprocess.on("error", (error) => { + console.error("[Streaming] Error:", error.message); + if (!res.writableEnded) { + res.write(`data: ${JSON.stringify({ + error: { message: error.message, type: "server_error", code: null }, + })}\n\n`); + res.end(); + } + resolve(); + }); + subprocess.on("close", (code) => { + // Subprocess exited - ensure response is closed + if (!res.writableEnded) { + if (code !== 0 && !isComplete) { + // Abnormal exit without result - send error + res.write(`data: ${JSON.stringify({ + error: { message: `Process exited with code ${code}`, type: "server_error", code: null }, + })}\n\n`); + } + res.write("data: [DONE]\n\n"); + res.end(); + } + resolve(); + }); + // Start the subprocess + subprocess.start(cliInput.prompt, { + model: cliInput.model, + sessionId: cliInput.sessionId, + }).catch((err) => { + console.error("[Streaming] Subprocess start error:", err); + reject(err); + }); + }); +} +/** + * Handle non-streaming response + */ +async function handleNonStreamingResponse(res, subprocess, cliInput, requestId) { + return new Promise((resolve) => { + let finalResult = null; + subprocess.on("result", (result) => { + finalResult = result; + }); + subprocess.on("error", (error) => { + console.error("[NonStreaming] Error:", error.message); + res.status(500).json({ + error: { + message: error.message, + type: "server_error", + code: null, + }, + }); + resolve(); + }); + subprocess.on("close", (code) => { + if (finalResult) { + res.json(cliResultToOpenai(finalResult, requestId)); + } + else if (!res.headersSent) { + res.status(500).json({ + error: { + message: `Claude CLI exited with code ${code} without response`, + type: "server_error", + code: null, + }, + }); + } + resolve(); + }); + // Start the subprocess + subprocess + .start(cliInput.prompt, { + model: cliInput.model, + sessionId: cliInput.sessionId, + }) + .catch((error) => { + res.status(500).json({ + error: { + message: error.message, + type: "server_error", + code: null, + }, + }); + resolve(); + }); + }); +} +/** + * Handle GET /v1/models + * + * Returns available models + */ +export function handleModels(_req, res) { + res.json({ + object: "list", + data: [ + { + id: "claude-opus-4", + object: "model", + owned_by: "anthropic", + created: Math.floor(Date.now() / 1000), + }, + { + id: "claude-sonnet-4", + object: "model", + owned_by: "anthropic", + created: Math.floor(Date.now() / 1000), + }, + { + id: "claude-haiku-4", + object: "model", + owned_by: "anthropic", + created: Math.floor(Date.now() / 1000), + }, + ], + }); +} +/** + * Handle GET /health + * + * Health check endpoint + */ +export function handleHealth(_req, res) { + res.json({ + status: "ok", + provider: "claude-code-cli", + timestamp: new Date().toISOString(), + }); +} +//# sourceMappingURL=routes.js.map \ No newline at end of file