feat(brain): Live-Tool-Events im Gedanken-Stream
Proxy-Patch hookt Claude-CLI `assistant`-Events: bei jedem tool_use- Block (Bash, Read, Edit, Grep, ...) wird per HTTP-POST an die Bridge gemeldet. Bridge spiegelt das als `agent_activity tool=<name>` an die RVS-Clients. App- und Diagnostic-Gedanken-Stream zeigen damit live mit was ARIA gerade macht — vorher kam pro Brain-Call nur EIN „💭 denkt" am Anfang und EIN „✓ fertig" am Ende. Drei neue Bausteine: - proxy-patches/routes.js: kompletter Replacement der npm-Version mit `_attachToolHook(subprocess)` — feuert pro tool_use-Block ein HTTP- POST an http://aria-bridge:8090/internal/agent-activity (URL via ARIA_TOOL_HOOK_URL Env-Variable ueberschreibbar). Fire-and-forget, fail-open — Brain-Call bricht NICHT ab wenn Bridge mal nicht da ist. - docker-compose.yml: vierter cp-Schritt im proxy-Service kopiert routes.js ueber die npm-Version (analog zu openai-to-cli + cli-to- openai). - bridge/aria_bridge.py: neuer `/internal/agent-activity`-Endpoint im bestehenden _serve_internal_http. Plus _emit_activity hat jetzt force=True-Param damit wiederholte gleiche Tool-Aufrufe (3x Bash in Folge) als drei Eintraege im Stream sichtbar bleiben. App + Diagnostic: pushThought-Dedup laesst tool-Events durch (3x Bash hintereinander gibt 3 Eintraege im Gedanken-Stream). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1024,9 +1024,12 @@ const ChatScreen: React.FC = () => {
|
||||
const tool = (message.payload.tool as string) || '';
|
||||
setAgentActivity({ activity, tool });
|
||||
// In den Gedanken-Stream einfuegen. Dedup gegen identische Folge-
|
||||
// Events (z.B. zwei mal 'thinking' direkt hintereinander).
|
||||
// Events (z.B. zwei mal 'thinking' direkt hintereinander). Tool-
|
||||
// Events NIE dedupen — wenn ARIA dreimal Bash hintereinander ruft,
|
||||
// sollen alle drei sichtbar sein.
|
||||
const key = `${activity}|${tool}`;
|
||||
if (key !== lastThoughtKeyRef.current) {
|
||||
const isTool = activity === 'tool';
|
||||
if (isTool || key !== lastThoughtKeyRef.current) {
|
||||
lastThoughtKeyRef.current = key;
|
||||
setThoughts(prev => {
|
||||
const next = [...prev, { ts: Date.now(), activity, tool }];
|
||||
|
||||
+26
-3
@@ -2519,17 +2519,22 @@ class ARIABridge:
|
||||
status = await asyncio.get_event_loop().run_in_executor(None, _do_request)
|
||||
logger.info("[cancel] Diagnostic /api/cancel: %s", status)
|
||||
|
||||
async def _emit_activity(self, activity: str, tool: str = "") -> None:
|
||||
async def _emit_activity(self, activity: str, tool: str = "", force: bool = False) -> None:
|
||||
"""Sendet agent_activity an die App — nur wenn sich der State geaendert hat.
|
||||
|
||||
Trailing Agent-Events nach chat:final werden 3s lang unterdrueckt
|
||||
(nur 'idle' kommt immer durch)."""
|
||||
(nur 'idle' kommt immer durch).
|
||||
|
||||
force=True: kein State-Dedup — wird vom Proxy-Tool-Hook genutzt
|
||||
damit auch wiederholte gleiche Tool-Aufrufe (z.B. 3x Bash
|
||||
hintereinander) im Gedanken-Stream als eigene Eintraege sichtbar
|
||||
bleiben."""
|
||||
if activity != "idle" and self._last_chat_final_at > 0:
|
||||
since_final = asyncio.get_event_loop().time() - self._last_chat_final_at
|
||||
if since_final < 3.0:
|
||||
return
|
||||
state = (activity, tool)
|
||||
if state == self._last_activity_state:
|
||||
if not force and state == self._last_activity_state:
|
||||
return
|
||||
self._last_activity_state = state
|
||||
await self._send_to_rvs({
|
||||
@@ -2677,6 +2682,24 @@ class ARIABridge:
|
||||
self._handle_trigger_fired(reply, trigger_name, ttype, events)
|
||||
)
|
||||
await _send_response(writer, 200, {"ok": True})
|
||||
elif method == "POST" and path == "/internal/agent-activity":
|
||||
# Vom Proxy gefeuert bei jedem Claude-Code-tool_use-Event
|
||||
# (Bash, Read, Edit, Grep, ...). Wir spiegeln das als
|
||||
# RVS agent_activity an App+Diagnostic damit der Gedanken-
|
||||
# Stream live mitlaufen kann.
|
||||
try:
|
||||
data = json.loads(body.decode("utf-8", "ignore"))
|
||||
except Exception as exc:
|
||||
await _send_response(writer, 400, {"error": f"bad json: {exc}"})
|
||||
return
|
||||
tool = (data.get("tool") or "").strip()
|
||||
if not tool:
|
||||
await _send_response(writer, 400, {"error": "tool erforderlich"})
|
||||
return
|
||||
# Force-emit (kein Dedup): User soll JEDEN Tool-Call sehen
|
||||
# selbst wenn derselbe Name zweimal in Folge kommt.
|
||||
asyncio.create_task(self._emit_activity("tool", tool, force=True))
|
||||
await _send_response(writer, 200, {"ok": True})
|
||||
elif method == "POST" and path == "/internal/delete-chat-message":
|
||||
try:
|
||||
data = json.loads(body.decode("utf-8", "ignore"))
|
||||
|
||||
@@ -2252,9 +2252,11 @@
|
||||
}
|
||||
|
||||
function pushThought(activity, tool) {
|
||||
// Dedup gegen direkt aufeinanderfolgende identische Events
|
||||
// Dedup gegen direkt aufeinanderfolgende identische Events. Tool-
|
||||
// Events NIE dedupen — drei Bash-Calls in Folge sollen drei Eintraege
|
||||
// ergeben, nicht einen.
|
||||
const key = `${activity}|${tool || ''}`;
|
||||
if (key === lastThoughtKey) return;
|
||||
if (activity !== 'tool' && key === lastThoughtKey) return;
|
||||
lastThoughtKey = key;
|
||||
thoughtStream.push({ ts: Date.now(), activity, tool: tool || '' });
|
||||
if (thoughtStream.length > MAX_THOUGHTS) thoughtStream = thoughtStream.slice(-MAX_THOUGHTS);
|
||||
|
||||
@@ -15,6 +15,7 @@ services:
|
||||
sed -i 's/const DEFAULT_TIMEOUT = 300000;/const DEFAULT_TIMEOUT = 1200000;/' $$DIST/subprocess/manager.js &&
|
||||
cp /proxy-patches/openai-to-cli.js $$DIST/adapter/openai-to-cli.js &&
|
||||
cp /proxy-patches/cli-to-openai.js $$DIST/adapter/cli-to-openai.js &&
|
||||
cp /proxy-patches/routes.js $$DIST/server/routes.js &&
|
||||
claude-max-api"
|
||||
volumes:
|
||||
- ~/.claude:/root/.claude # Claude CLI Auth (Credentials in /root/.claude/.credentials.json)
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
/**
|
||||
* ARIA-patched API Route Handlers
|
||||
*
|
||||
* Erweiterung der npm-Version von claude-max-api-proxy:
|
||||
* - Bei jedem Claude-CLI-`assistant`-Event mit tool_use-Block (Bash, Read,
|
||||
* Edit, Grep, …) wird ein HTTP-POST an die Bridge gefeuert
|
||||
* (ARIA_TOOL_HOOK_URL, default http://aria-bridge:8090/internal/agent-activity).
|
||||
* Bridge spiegelt das als RVS `agent_activity` an App+Diagnostic →
|
||||
* Gedanken-Stream zeigt live was ARIA gerade tool-maessig macht.
|
||||
* - Fire-and-forget, fail-open. Wenn die Bridge nicht antwortet, bricht
|
||||
* der Brain-Call NICHT ab.
|
||||
*
|
||||
* Wird zur Container-Startzeit ueber die npm-Version geschrieben
|
||||
* (siehe docker-compose.yml proxy-Block).
|
||||
*/
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import http from "http";
|
||||
import { ClaudeSubprocess } from "../subprocess/manager.js";
|
||||
import { openaiToCli } from "../adapter/openai-to-cli.js";
|
||||
import { cliResultToOpenai, createDoneChunk, } from "../adapter/cli-to-openai.js";
|
||||
|
||||
const TOOL_HOOK_URL = process.env.ARIA_TOOL_HOOK_URL
|
||||
|| "http://aria-bridge:8090/internal/agent-activity";
|
||||
|
||||
/**
|
||||
* Pusht einen Tool-Use-Event an die Bridge. Fire-and-forget — keine Awaits,
|
||||
* keine Fehler nach oben. Logged Fehler still.
|
||||
*/
|
||||
function _emitToolEvent(toolName) {
|
||||
if (!toolName) return;
|
||||
try {
|
||||
const u = new URL(TOOL_HOOK_URL);
|
||||
const body = JSON.stringify({ tool: String(toolName) });
|
||||
const req = http.request({
|
||||
method: "POST",
|
||||
hostname: u.hostname,
|
||||
port: u.port || 80,
|
||||
path: u.pathname,
|
||||
headers: { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(body) },
|
||||
timeout: 2000,
|
||||
}, (res) => { res.resume(); });
|
||||
req.on("error", () => {});
|
||||
req.on("timeout", () => req.destroy());
|
||||
req.write(body);
|
||||
req.end();
|
||||
} catch (_) { /* niemals weiterwerfen */ }
|
||||
}
|
||||
|
||||
/**
|
||||
* Hookt die `assistant`-Events des Subprozesses. Jedes assistant-Message
|
||||
* kann mehrere content-Bloecke haben — tool_use-Bloecke pushen wir live.
|
||||
*/
|
||||
function _attachToolHook(subprocess) {
|
||||
subprocess.on("assistant", (message) => {
|
||||
try {
|
||||
const blocks = message?.message?.content || [];
|
||||
for (const b of blocks) {
|
||||
if (b && b.type === "tool_use" && b.name) {
|
||||
_emitToolEvent(b.name);
|
||||
}
|
||||
}
|
||||
} catch (_) { /* fail-open */ }
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Handle POST /v1/chat/completions
|
||||
*
|
||||
* Main endpoint for chat requests, supports both streaming and non-streaming
|
||||
*/
|
||||
export async function handleChatCompletions(req, res) {
|
||||
const requestId = uuidv4().replace(/-/g, "").slice(0, 24);
|
||||
const body = req.body;
|
||||
const stream = body.stream === true;
|
||||
try {
|
||||
// Validate request
|
||||
if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) {
|
||||
res.status(400).json({
|
||||
error: {
|
||||
message: "messages is required and must be a non-empty array",
|
||||
type: "invalid_request_error",
|
||||
code: "invalid_messages",
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Convert to CLI input format
|
||||
const cliInput = openaiToCli(body);
|
||||
const subprocess = new ClaudeSubprocess();
|
||||
// ARIA-Patch: Tool-Use-Events live an die Bridge weiterleiten.
|
||||
// Greift fuer beide Branches (stream + non-stream).
|
||||
_attachToolHook(subprocess);
|
||||
if (stream) {
|
||||
await handleStreamingResponse(req, res, subprocess, cliInput, requestId);
|
||||
}
|
||||
else {
|
||||
await handleNonStreamingResponse(res, subprocess, cliInput, requestId);
|
||||
}
|
||||
}
|
||||
catch (error) {
|
||||
const message = error instanceof Error ? error.message : "Unknown error";
|
||||
console.error("[handleChatCompletions] Error:", message);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Handle streaming response (SSE)
|
||||
*
|
||||
* IMPORTANT: The Express req.on("close") event fires when the request body
|
||||
* is fully received, NOT when the client disconnects. For SSE connections,
|
||||
* we use res.on("close") to detect actual client disconnection.
|
||||
*/
|
||||
async function handleStreamingResponse(req, res, subprocess, cliInput, requestId) {
|
||||
// Set SSE headers
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Request-Id", requestId);
|
||||
// CRITICAL: Flush headers immediately to establish SSE connection
|
||||
// Without this, headers are buffered and client times out waiting
|
||||
res.flushHeaders();
|
||||
// Send initial comment to confirm connection is alive
|
||||
res.write(":ok\n\n");
|
||||
return new Promise((resolve, reject) => {
|
||||
let isFirst = true;
|
||||
let lastModel = "claude-sonnet-4";
|
||||
let isComplete = false;
|
||||
// Handle actual client disconnect (response stream closed)
|
||||
res.on("close", () => {
|
||||
if (!isComplete) {
|
||||
// Client disconnected before response completed - kill subprocess
|
||||
subprocess.kill();
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
// Handle streaming content deltas
|
||||
subprocess.on("content_delta", (event) => {
|
||||
const text = event.event.delta?.text || "";
|
||||
if (text && !res.writableEnded) {
|
||||
const chunk = {
|
||||
id: `chatcmpl-${requestId}`,
|
||||
object: "chat.completion.chunk",
|
||||
created: Math.floor(Date.now() / 1000),
|
||||
model: lastModel,
|
||||
choices: [{
|
||||
index: 0,
|
||||
delta: {
|
||||
role: isFirst ? "assistant" : undefined,
|
||||
content: text,
|
||||
},
|
||||
finish_reason: null,
|
||||
}],
|
||||
};
|
||||
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
|
||||
isFirst = false;
|
||||
}
|
||||
});
|
||||
// Handle final assistant message (for model name)
|
||||
subprocess.on("assistant", (message) => {
|
||||
lastModel = message.message.model;
|
||||
});
|
||||
subprocess.on("result", (_result) => {
|
||||
isComplete = true;
|
||||
if (!res.writableEnded) {
|
||||
// Send final done chunk with finish_reason
|
||||
const doneChunk = createDoneChunk(requestId, lastModel);
|
||||
res.write(`data: ${JSON.stringify(doneChunk)}\n\n`);
|
||||
res.write("data: [DONE]\n\n");
|
||||
res.end();
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
subprocess.on("error", (error) => {
|
||||
console.error("[Streaming] Error:", error.message);
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({
|
||||
error: { message: error.message, type: "server_error", code: null },
|
||||
})}\n\n`);
|
||||
res.end();
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
subprocess.on("close", (code) => {
|
||||
// Subprocess exited - ensure response is closed
|
||||
if (!res.writableEnded) {
|
||||
if (code !== 0 && !isComplete) {
|
||||
// Abnormal exit without result - send error
|
||||
res.write(`data: ${JSON.stringify({
|
||||
error: { message: `Process exited with code ${code}`, type: "server_error", code: null },
|
||||
})}\n\n`);
|
||||
}
|
||||
res.write("data: [DONE]\n\n");
|
||||
res.end();
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
// Start the subprocess
|
||||
subprocess.start(cliInput.prompt, {
|
||||
model: cliInput.model,
|
||||
sessionId: cliInput.sessionId,
|
||||
}).catch((err) => {
|
||||
console.error("[Streaming] Subprocess start error:", err);
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Handle non-streaming response
|
||||
*/
|
||||
async function handleNonStreamingResponse(res, subprocess, cliInput, requestId) {
|
||||
return new Promise((resolve) => {
|
||||
let finalResult = null;
|
||||
subprocess.on("result", (result) => {
|
||||
finalResult = result;
|
||||
});
|
||||
subprocess.on("error", (error) => {
|
||||
console.error("[NonStreaming] Error:", error.message);
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message: error.message,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
resolve();
|
||||
});
|
||||
subprocess.on("close", (code) => {
|
||||
if (finalResult) {
|
||||
res.json(cliResultToOpenai(finalResult, requestId));
|
||||
}
|
||||
else if (!res.headersSent) {
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message: `Claude CLI exited with code ${code} without response`,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
// Start the subprocess
|
||||
subprocess
|
||||
.start(cliInput.prompt, {
|
||||
model: cliInput.model,
|
||||
sessionId: cliInput.sessionId,
|
||||
})
|
||||
.catch((error) => {
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message: error.message,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Handle GET /v1/models
|
||||
*
|
||||
* Returns available models
|
||||
*/
|
||||
export function handleModels(_req, res) {
|
||||
res.json({
|
||||
object: "list",
|
||||
data: [
|
||||
{
|
||||
id: "claude-opus-4",
|
||||
object: "model",
|
||||
owned_by: "anthropic",
|
||||
created: Math.floor(Date.now() / 1000),
|
||||
},
|
||||
{
|
||||
id: "claude-sonnet-4",
|
||||
object: "model",
|
||||
owned_by: "anthropic",
|
||||
created: Math.floor(Date.now() / 1000),
|
||||
},
|
||||
{
|
||||
id: "claude-haiku-4",
|
||||
object: "model",
|
||||
owned_by: "anthropic",
|
||||
created: Math.floor(Date.now() / 1000),
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Handle GET /health
|
||||
*
|
||||
* Health check endpoint
|
||||
*/
|
||||
export function handleHealth(_req, res) {
|
||||
res.json({
|
||||
status: "ok",
|
||||
provider: "claude-code-cli",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
//# sourceMappingURL=routes.js.map
|
||||
Reference in New Issue
Block a user