/** * ARIA-patched API Route Handlers * * Erweiterung der npm-Version von claude-max-api-proxy: * - Bei jedem Claude-CLI-`assistant`-Event mit tool_use-Block (Bash, Read, * Edit, Grep, …) wird ein HTTP-POST an die Bridge gefeuert * (ARIA_TOOL_HOOK_URL, default http://aria-bridge:8090/internal/agent-activity). * Bridge spiegelt das als RVS `agent_activity` an App+Diagnostic → * Gedanken-Stream zeigt live was ARIA gerade tool-maessig macht. * - Voller Live-Stream (assistant_text, tool_use mit input, tool_result) * geht an ARIA_STREAM_HOOK_URL → Bridge → RVS `agent_stream` → Diagnostic * "ARIA Live"-View (TeamViewer-mäßiger Mirror der Claude-Code-Session). * - Subprocess-Tracking + POST /v1/cancel-all fuer Not-Aus (Hard-Kill). * - Fire-and-forget, fail-open. Wenn die Bridge nicht antwortet, bricht * der Brain-Call NICHT ab. * * Wird zur Container-Startzeit ueber die npm-Version geschrieben * (siehe docker-compose.yml proxy-Block). */ import { v4 as uuidv4 } from "uuid"; import http from "http"; import { ClaudeSubprocess } from "../subprocess/manager.js"; import { openaiToCli } from "../adapter/openai-to-cli.js"; import { cliResultToOpenai, createDoneChunk, } from "../adapter/cli-to-openai.js"; const TOOL_HOOK_URL = process.env.ARIA_TOOL_HOOK_URL || "http://aria-bridge:8090/internal/agent-activity"; const STREAM_HOOK_URL = process.env.ARIA_STREAM_HOOK_URL || "http://aria-bridge:8090/internal/agent-stream"; // Tool-Output kann sehr lang werden (git log -p, find /). Wir truncaten // hart auf 4 KB pro Event — der User sieht weiterhin den Anfang und einen // "...(N bytes truncated)" Hinweis. Vollstaendiger Output bleibt im Brain // und wird normal verarbeitet, das hier ist NUR fuer den Live-Mirror. const TOOL_RESULT_MAX_CHARS = 4096; const TOOL_INPUT_MAX_CHARS = 2048; /** * Generic Fire-and-forget POST an die Bridge. Keine Awaits, keine Fehler * nach oben. Eingesetzt fuer Tool-Hook + Stream-Hook. */ function _postJson(url, body) { try { const u = new URL(url); const data = JSON.stringify(body); const req = http.request({ method: "POST", hostname: u.hostname, port: u.port || 80, path: u.pathname, headers: { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(data) }, timeout: 2000, }, (res) => { res.resume(); }); req.on("error", () => {}); req.on("timeout", () => req.destroy()); req.write(data); req.end(); } catch (_) { /* niemals weiterwerfen */ } } /** * Pusht einen Tool-Use-Event an die Bridge (alter Gedanken-Stream-Pfad). */ function _emitToolEvent(toolName) { if (!toolName) return; _postJson(TOOL_HOOK_URL, { tool: String(toolName) }); } /** * Pusht ein Stream-Event an die Bridge (neuer "ARIA Live"-Pfad). * kind: "start" | "text" | "tool_use" | "tool_result" | "end" */ function _emitStreamEvent(requestId, kind, fields) { _postJson(STREAM_HOOK_URL, { requestId, kind, ts: Date.now(), ...fields }); } function _truncate(str, max) { if (typeof str !== "string") str = String(str ?? ""); if (str.length <= max) return { text: str, truncatedBytes: 0 }; return { text: str.slice(0, max), truncatedBytes: str.length - max }; } // ── Subprocess-Tracking fuer Not-Aus ────────────────────────── // requestId → ClaudeSubprocess. Eintraege werden beim close/result-Event // wieder entfernt. /v1/cancel-all iteriert und ruft .kill() auf jeden. const _activeSubprocesses = new Map(); function _trackSubprocess(requestId, subprocess) { _activeSubprocesses.set(requestId, subprocess); const cleanup = () => _activeSubprocesses.delete(requestId); subprocess.on("close", cleanup); subprocess.on("error", cleanup); } /** * Hookt assistant + user Events und pusht beides an Bridge: * - Alt-API: nur Tool-Namen an /internal/agent-activity (Gedanken-Stream) * - Neu-API: voller Stream (text/tool_use/tool_result) an /internal/agent-stream */ function _attachToolHook(subprocess, requestId) { subprocess.on("assistant", (message) => { try { const blocks = message?.message?.content || []; for (const b of blocks) { if (!b) continue; if (b.type === "tool_use") { if (b.name) _emitToolEvent(b.name); const inputStr = b.input ? JSON.stringify(b.input) : ""; const inp = _truncate(inputStr, TOOL_INPUT_MAX_CHARS); _emitStreamEvent(requestId, "tool_use", { id: b.id || null, name: b.name || "", input: inp.text, inputTruncatedBytes: inp.truncatedBytes, }); } else if (b.type === "text" && b.text) { _emitStreamEvent(requestId, "text", { text: b.text }); } else if (b.type === "thinking" && b.thinking) { // Wenn das Modell Extended Thinking emittiert — selten in // Claude Code CLI, aber moeglich. Markieren wir extra. _emitStreamEvent(requestId, "thinking", { text: b.thinking }); } } } catch (_) { /* fail-open */ } }); // user-Events enthalten tool_result-Blocks subprocess.on("user", (message) => { try { const blocks = message?.message?.content || []; for (const b of blocks) { if (b && b.type === "tool_result") { let content = ""; if (typeof b.content === "string") content = b.content; else if (Array.isArray(b.content)) { content = b.content.map(c => (c && c.type === "text" && c.text) ? c.text : "").join(""); } const out = _truncate(content, TOOL_RESULT_MAX_CHARS); _emitStreamEvent(requestId, "tool_result", { id: b.tool_use_id || null, content: out.text, truncatedBytes: out.truncatedBytes, isError: b.is_error === true, }); } } } catch (_) { /* fail-open */ } }); } /** * Handle POST /v1/chat/completions * * Main endpoint for chat requests, supports both streaming and non-streaming */ export async function handleChatCompletions(req, res) { const requestId = uuidv4().replace(/-/g, "").slice(0, 24); const body = req.body; const stream = body.stream === true; try { // Validate request if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) { res.status(400).json({ error: { message: "messages is required and must be a non-empty array", type: "invalid_request_error", code: "invalid_messages", }, }); return; } // Convert to CLI input format const cliInput = openaiToCli(body); const subprocess = new ClaudeSubprocess(); // ARIA-Patch: Tool-Use-Events + voller Live-Stream an die Bridge. // Plus: Subprocess fuer Not-Aus tracken (Hard-Kill via /v1/cancel-all). _attachToolHook(subprocess, requestId); _trackSubprocess(requestId, subprocess); _emitStreamEvent(requestId, "start", { model: body.model || null }); subprocess.on("result", () => _emitStreamEvent(requestId, "end", { reason: "result" })); subprocess.on("close", (code) => _emitStreamEvent(requestId, "end", { reason: "close", code })); subprocess.on("error", (err) => _emitStreamEvent(requestId, "end", { reason: "error", error: String(err?.message || err) })); if (stream) { await handleStreamingResponse(req, res, subprocess, cliInput, requestId); } else { await handleNonStreamingResponse(res, subprocess, cliInput, requestId); } } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; console.error("[handleChatCompletions] Error:", message); if (!res.headersSent) { res.status(500).json({ error: { message, type: "server_error", code: null, }, }); } } } /** * Handle streaming response (SSE) * * IMPORTANT: The Express req.on("close") event fires when the request body * is fully received, NOT when the client disconnects. For SSE connections, * we use res.on("close") to detect actual client disconnection. */ async function handleStreamingResponse(req, res, subprocess, cliInput, requestId) { // Set SSE headers res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Request-Id", requestId); // CRITICAL: Flush headers immediately to establish SSE connection // Without this, headers are buffered and client times out waiting res.flushHeaders(); // Send initial comment to confirm connection is alive res.write(":ok\n\n"); return new Promise((resolve, reject) => { let isFirst = true; let lastModel = "claude-sonnet-4"; let isComplete = false; // Handle actual client disconnect (response stream closed) res.on("close", () => { if (!isComplete) { // Client disconnected before response completed - kill subprocess subprocess.kill(); } resolve(); }); // Handle streaming content deltas subprocess.on("content_delta", (event) => { const text = event.event.delta?.text || ""; if (text && !res.writableEnded) { const chunk = { id: `chatcmpl-${requestId}`, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: lastModel, choices: [{ index: 0, delta: { role: isFirst ? "assistant" : undefined, content: text, }, finish_reason: null, }], }; res.write(`data: ${JSON.stringify(chunk)}\n\n`); isFirst = false; } }); // Handle final assistant message (for model name) subprocess.on("assistant", (message) => { lastModel = message.message.model; }); subprocess.on("result", (_result) => { isComplete = true; if (!res.writableEnded) { // Send final done chunk with finish_reason const doneChunk = createDoneChunk(requestId, lastModel); res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); res.write("data: [DONE]\n\n"); res.end(); } resolve(); }); subprocess.on("error", (error) => { console.error("[Streaming] Error:", error.message); if (!res.writableEnded) { res.write(`data: ${JSON.stringify({ error: { message: error.message, type: "server_error", code: null }, })}\n\n`); res.end(); } resolve(); }); subprocess.on("close", (code) => { // Subprocess exited - ensure response is closed if (!res.writableEnded) { if (code !== 0 && !isComplete) { // Abnormal exit without result - send error res.write(`data: ${JSON.stringify({ error: { message: `Process exited with code ${code}`, type: "server_error", code: null }, })}\n\n`); } res.write("data: [DONE]\n\n"); res.end(); } resolve(); }); // Start the subprocess subprocess.start(cliInput.prompt, { model: cliInput.model, sessionId: cliInput.sessionId, }).catch((err) => { console.error("[Streaming] Subprocess start error:", err); reject(err); }); }); } /** * Handle non-streaming response */ async function handleNonStreamingResponse(res, subprocess, cliInput, requestId) { return new Promise((resolve) => { let finalResult = null; subprocess.on("result", (result) => { finalResult = result; }); subprocess.on("error", (error) => { console.error("[NonStreaming] Error:", error.message); res.status(500).json({ error: { message: error.message, type: "server_error", code: null, }, }); resolve(); }); subprocess.on("close", (code) => { if (finalResult) { res.json(cliResultToOpenai(finalResult, requestId)); } else if (!res.headersSent) { res.status(500).json({ error: { message: `Claude CLI exited with code ${code} without response`, type: "server_error", code: null, }, }); } resolve(); }); // Start the subprocess subprocess .start(cliInput.prompt, { model: cliInput.model, sessionId: cliInput.sessionId, }) .catch((error) => { res.status(500).json({ error: { message: error.message, type: "server_error", code: null, }, }); resolve(); }); }); } /** * Handle GET /v1/models * * Returns available models */ export function handleModels(_req, res) { res.json({ object: "list", data: [ { id: "claude-opus-4", object: "model", owned_by: "anthropic", created: Math.floor(Date.now() / 1000), }, { id: "claude-sonnet-4", object: "model", owned_by: "anthropic", created: Math.floor(Date.now() / 1000), }, { id: "claude-haiku-4", object: "model", owned_by: "anthropic", created: Math.floor(Date.now() / 1000), }, ], }); } /** * Handle GET /health * * Health check endpoint */ export function handleHealth(_req, res) { res.json({ status: "ok", provider: "claude-code-cli", timestamp: new Date().toISOString(), }); } // ── Not-Aus Side-Channel ─────────────────────────────────── // // claude-max-api-proxy steuert seine eigene Route-Registrierung — wir // koennen da nicht reinpatchen ohne sed-Operationen am npm-Paket. Saubrer: // ein dedizierter kleiner HTTP-Listener nur fuer den Not-Aus, auf einem // internen Port im aria-net. Bridge ruft den, killt alle aktiven Claude- // Subprocesses. App + Diagnostic sehen den Stream sofort enden. const INTERNAL_PORT = parseInt(process.env.ARIA_PROXY_INTERNAL_PORT || "3457", 10); const INTERNAL_HOST = "0.0.0.0"; // im aria-net erreichbar, nicht nach extern exposed function _cancelAll() { const ids = Array.from(_activeSubprocesses.keys()); let killed = 0; for (const [id, subp] of _activeSubprocesses) { try { subp.kill(); killed++; } catch (e) { console.error("[aria-not-aus] kill failed for", id, e?.message); } } _activeSubprocesses.clear(); return { killed, requestIds: ids }; } try { const internalServer = http.createServer((req, res) => { if (req.method === "POST" && req.url === "/cancel-all") { const result = _cancelAll(); console.warn("[aria-not-aus] /cancel-all — killed", result.killed, "subprocess(es)"); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ...result })); return; } if (req.method === "GET" && req.url === "/health") { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, active: _activeSubprocesses.size })); return; } res.writeHead(404).end(); }); internalServer.on("error", (err) => { console.error("[aria-not-aus] internal listener error:", err.message); }); internalServer.listen(INTERNAL_PORT, INTERNAL_HOST, () => { console.log("[aria-not-aus] internal listener on", INTERNAL_HOST + ":" + INTERNAL_PORT); }); } catch (e) { console.error("[aria-not-aus] startup failed:", e?.message); } //# sourceMappingURL=routes.js.map