fix(brain): Proxy-Timeout 20min -> 24h Read, split httpx-Timeouts, Cleanup-Pfade
Brain timed bei langen Pentests nach exakt 20:00 min raus, obwohl ARIAs
Subprozess fleissig weiterarbeitete und der Live-View alles zeigte.
Root-Cause: proxy_client.py hatte einen 1200s httpx.Client-Timeout —
genau der Wert, den wir vor 5 Tagen am Proxy auf 24h hochgezogen hatten.
Schicht uebersehen.
- docker-compose.yml: PROXY_TIMEOUT_SEC=86400 als brain-env.
- proxy_client.py: httpx.Timeout split (connect=10, read=86400, write=30,
pool=10). Toter Proxy wird in 10s erkannt, lange ARIA-Sessions duerfen
24h laufen.
- routes.js handleNonStreamingResponse: res.on("close") + isComplete-Flag.
Brain-Disconnect killt jetzt den Subprozess statt ihn verwaisen zu lassen.
- agent.py chat(): try/except — bei Exception nach dem User-Turn wird ein
Assistant-Error-Marker geschrieben, damit Conversation user->assistant
konsistent bleibt (kein Tool-Call-Loop-Fail in Folge-Calls).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+49
-30
@@ -553,40 +553,59 @@ class Agent:
|
||||
len(hot), len(cold), len(active_skills), len(all_skills),
|
||||
len(self.conversation.window()), len(system_prompt))
|
||||
|
||||
# 6. Tool-Use-Loop
|
||||
# 6. Tool-Use-Loop. Bei Exception (z.B. Proxy-Timeout) muss ein
|
||||
# Assistant-Turn als Error-Marker geschrieben werden — der User-Turn
|
||||
# ist bereits in der Conversation. Ohne Gegenpart wird die naechste
|
||||
# Anfrage im Window an Claude geschickt mit user → user als letzten
|
||||
# zwei Turns, was OpenAI/Anthropic verwirrt und bei strict tools-Aufrufen
|
||||
# zu 400-Errors fuehren kann.
|
||||
final_reply = ""
|
||||
for iteration in range(self.MAX_TOOL_ITERATIONS):
|
||||
result = self.proxy.chat_full(messages, tools=tools)
|
||||
if result.tool_calls:
|
||||
# Assistant-Turn mit tool_calls in messages anhaengen (nicht in Conversation!)
|
||||
messages.append(ProxyMessage(
|
||||
role="assistant",
|
||||
content=result.content or None,
|
||||
tool_calls=[{
|
||||
"id": tc["id"], "type": "function",
|
||||
"function": {"name": tc["name"], "arguments": json.dumps(tc["arguments"])},
|
||||
} for tc in result.tool_calls],
|
||||
))
|
||||
# Tools ausfuehren + Ergebnis als role=tool zurueck
|
||||
for tc in result.tool_calls:
|
||||
tool_result = self._dispatch_tool(tc["name"], tc["arguments"])
|
||||
try:
|
||||
for iteration in range(self.MAX_TOOL_ITERATIONS):
|
||||
result = self.proxy.chat_full(messages, tools=tools)
|
||||
if result.tool_calls:
|
||||
# Assistant-Turn mit tool_calls in messages anhaengen (nicht in Conversation!)
|
||||
messages.append(ProxyMessage(
|
||||
role="tool",
|
||||
tool_call_id=tc["id"],
|
||||
name=tc["name"],
|
||||
content=tool_result[:8000],
|
||||
role="assistant",
|
||||
content=result.content or None,
|
||||
tool_calls=[{
|
||||
"id": tc["id"], "type": "function",
|
||||
"function": {"name": tc["name"], "arguments": json.dumps(tc["arguments"])},
|
||||
} for tc in result.tool_calls],
|
||||
))
|
||||
continue # next iteration mit Tool-Results
|
||||
# Kein Tool-Call mehr → final reply
|
||||
final_reply = (result.content or "").strip()
|
||||
break
|
||||
else:
|
||||
# Loop-Limit erreicht
|
||||
final_reply = "[Tool-Loop-Limit erreicht — ARIA hat zu viele Tool-Calls gemacht ohne fertig zu werden]"
|
||||
logger.warning("Tool-Loop hit MAX_TOOL_ITERATIONS=%d", self.MAX_TOOL_ITERATIONS)
|
||||
# Tools ausfuehren + Ergebnis als role=tool zurueck
|
||||
for tc in result.tool_calls:
|
||||
tool_result = self._dispatch_tool(tc["name"], tc["arguments"])
|
||||
messages.append(ProxyMessage(
|
||||
role="tool",
|
||||
tool_call_id=tc["id"],
|
||||
name=tc["name"],
|
||||
content=tool_result[:8000],
|
||||
))
|
||||
continue # next iteration mit Tool-Results
|
||||
# Kein Tool-Call mehr → final reply
|
||||
final_reply = (result.content or "").strip()
|
||||
break
|
||||
else:
|
||||
# Loop-Limit erreicht
|
||||
final_reply = "[Tool-Loop-Limit erreicht — ARIA hat zu viele Tool-Calls gemacht ohne fertig zu werden]"
|
||||
logger.warning("Tool-Loop hit MAX_TOOL_ITERATIONS=%d", self.MAX_TOOL_ITERATIONS)
|
||||
|
||||
if not final_reply:
|
||||
raise RuntimeError("Leerer Reply vom Proxy")
|
||||
if not final_reply:
|
||||
raise RuntimeError("Leerer Reply vom Proxy")
|
||||
|
||||
except Exception as exc:
|
||||
# Conversation-Konsistenz: User-Turn ist drin (Schritt 1), Assistant
|
||||
# muss auch rein damit die Paarung stimmt. Wir schreiben einen
|
||||
# Error-Marker statt zu rollback-en (rollback wuerde Race-Conditions
|
||||
# mit der JSONL-Persistenz aufmachen).
|
||||
err_text = f"[Fehler: {exc}]"
|
||||
logger.error("chat() Exception — schreibe Error-Marker als Assistant-Turn: %s", exc)
|
||||
try:
|
||||
self.conversation.add("assistant", err_text)
|
||||
except Exception as add_exc:
|
||||
logger.warning("Konnte Error-Marker nicht persistieren: %s", add_exc)
|
||||
raise
|
||||
|
||||
# 7. Assistant-Turn (final reply) in die Conversation
|
||||
self.conversation.add("assistant", final_reply)
|
||||
|
||||
@@ -25,7 +25,17 @@ logger = logging.getLogger(__name__)
|
||||
RUNTIME_CONFIG_FILE = Path("/shared/config/runtime.json")
|
||||
ENV_MODEL = os.environ.get("BRAIN_MODEL", "claude-sonnet-4")
|
||||
PROXY_URL = os.environ.get("PROXY_URL", "http://proxy:3456")
|
||||
PROXY_TIMEOUT_SEC = float(os.environ.get("PROXY_TIMEOUT_SEC", "1200"))
|
||||
# Read-Timeout: wie lange wir auf die HTTP-Antwort vom Proxy warten.
|
||||
# Proxy ist non-streaming → erstes Byte kommt erst NACH subprocess close.
|
||||
# Agent-Loops (Pentests etc.) koennen >1h dauern → muss hoch sein.
|
||||
# Default 24h, kann via PROXY_TIMEOUT_SEC env ueberschrieben werden.
|
||||
PROXY_TIMEOUT_SEC = float(os.environ.get("PROXY_TIMEOUT_SEC", "86400"))
|
||||
# Connect/Write/Pool: klein damit toter Proxy schnell erkannt wird.
|
||||
# Wenn der Proxy-Container nicht antwortet beim TCP-Connect oder waehrend
|
||||
# wir den Request-Body schreiben, ist er kaputt — kein Grund 24h zu warten.
|
||||
PROXY_CONNECT_TIMEOUT_SEC = float(os.environ.get("PROXY_CONNECT_TIMEOUT_SEC", "10"))
|
||||
PROXY_WRITE_TIMEOUT_SEC = float(os.environ.get("PROXY_WRITE_TIMEOUT_SEC", "30"))
|
||||
PROXY_POOL_TIMEOUT_SEC = float(os.environ.get("PROXY_POOL_TIMEOUT_SEC", "10"))
|
||||
|
||||
|
||||
def _read_model_from_runtime() -> str:
|
||||
@@ -62,8 +72,15 @@ class ProxyClient:
|
||||
def __init__(self, base_url: str = PROXY_URL, model: str = DEFAULT_MODEL):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.model = model
|
||||
# Persistente Client-Connection — vermeidet TCP-Handshake bei jedem Call
|
||||
self._client = httpx.Client(timeout=PROXY_TIMEOUT_SEC)
|
||||
# Persistente Client-Connection — vermeidet TCP-Handshake bei jedem Call.
|
||||
# Timeouts split nach Phase: connect/write/pool klein (toter Proxy → schnell
|
||||
# ReadTimeout), read gross (ARIA darf ewig rechnen).
|
||||
self._client = httpx.Client(timeout=httpx.Timeout(
|
||||
connect=PROXY_CONNECT_TIMEOUT_SEC,
|
||||
read=PROXY_TIMEOUT_SEC,
|
||||
write=PROXY_WRITE_TIMEOUT_SEC,
|
||||
pool=PROXY_POOL_TIMEOUT_SEC,
|
||||
))
|
||||
|
||||
def chat(self, messages: List[Message], model: Optional[str] = None) -> str:
|
||||
"""Convenience: einfacher Chat ohne Tools. Gibt nur den Reply-String zurueck."""
|
||||
|
||||
@@ -67,6 +67,14 @@ services:
|
||||
- QDRANT_PORT=6333
|
||||
- PROXY_URL=http://proxy:3456
|
||||
- ARIA_AUTH_TOKEN=${ARIA_AUTH_TOKEN:-}
|
||||
# Read-Timeout fuer den Proxy-Call. Hoch, weil Agent-Loops (Pentests
|
||||
# etc.) auch eine Stunde+ dauern koennen. Der Proxy seinerseits hat
|
||||
# einen Idle-Watchdog (Default 20min Inaktivitaet) der den Subprocess
|
||||
# killt, der dann seinen close-Event sendet — Brain bekommt also
|
||||
# immer was zurueck, auch bei wirklich haengenden Subprozessen.
|
||||
# Connect/Write/Pool sind klein (10/30/10s) damit toter Proxy
|
||||
# schnell erkannt wird (siehe proxy_client.py).
|
||||
- PROXY_TIMEOUT_SEC=${PROXY_TIMEOUT_SEC:-86400}
|
||||
volumes:
|
||||
- ./aria-data/brain/data:/data # Memory-Cache + Skills + Models (bind-mount fuer Export)
|
||||
- ./aria-data/brain-import:/import:ro # Quell-MDs fuer den initialen Memory-Import (read-only)
|
||||
|
||||
+28
-7
@@ -367,21 +367,42 @@ async function handleStreamingResponse(req, res, subprocess, cliInput, requestId
|
||||
async function handleNonStreamingResponse(res, subprocess, cliInput, requestId) {
|
||||
return new Promise((resolve) => {
|
||||
let finalResult = null;
|
||||
let isComplete = false;
|
||||
// Client-Disconnect-Handler — wenn Brain die HTTP-Verbindung kappt
|
||||
// (z.B. nach Read-Timeout), den noch laufenden Subprocess killen.
|
||||
// Im Streaming-Branch existiert das schon; non-streaming hatte's
|
||||
// bisher nicht → Subprozess lief verwaist weiter, Ressourcen-Leak.
|
||||
res.on("close", () => {
|
||||
if (!isComplete) {
|
||||
console.warn("[NonStreaming] Client disconnected before result — killing subprocess", requestId);
|
||||
try { subprocess.kill(); } catch (_) {}
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
subprocess.on("result", (result) => {
|
||||
finalResult = result;
|
||||
});
|
||||
subprocess.on("error", (error) => {
|
||||
console.error("[NonStreaming] Error:", error.message);
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message: error.message,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
isComplete = true;
|
||||
if (!res.headersSent) {
|
||||
res.status(500).json({
|
||||
error: {
|
||||
message: error.message,
|
||||
type: "server_error",
|
||||
code: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
subprocess.on("close", (code) => {
|
||||
isComplete = true;
|
||||
if (res.writableEnded) {
|
||||
// Client ist eh schon weg — nichts mehr zu senden.
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
if (finalResult) {
|
||||
res.json(cliResultToOpenai(finalResult, requestId));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user