0887674497
Brain timed bei langen Pentests nach exakt 20:00 min raus, obwohl ARIAs
Subprozess fleissig weiterarbeitete und der Live-View alles zeigte.
Root-Cause: proxy_client.py hatte einen 1200s httpx.Client-Timeout —
genau der Wert, den wir vor 5 Tagen am Proxy auf 24h hochgezogen hatten.
Schicht uebersehen.
- docker-compose.yml: PROXY_TIMEOUT_SEC=86400 als brain-env.
- proxy_client.py: httpx.Timeout split (connect=10, read=86400, write=30,
pool=10). Toter Proxy wird in 10s erkannt, lange ARIA-Sessions duerfen
24h laufen.
- routes.js handleNonStreamingResponse: res.on("close") + isComplete-Flag.
Brain-Disconnect killt jetzt den Subprozess statt ihn verwaisen zu lassen.
- agent.py chat(): try/except — bei Exception nach dem User-Turn wird ein
Assistant-Error-Marker geschrieben, damit Conversation user->assistant
konsistent bleibt (kein Tool-Call-Loop-Fail in Folge-Calls).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
181 lines
6.9 KiB
Python
181 lines
6.9 KiB
Python
"""
|
|
Claude-Aufruf ueber den lokalen Proxy.
|
|
|
|
Der Proxy (claude-max-api-proxy) bietet eine OpenAI-kompatible API
|
|
unter http://proxy:3456/v1/chat/completions. Wir nutzen non-streaming
|
|
mit einem laengeren Timeout — Claude Code spawnt pro Anfrage einen
|
|
neuen CLI-Prozess (Cold-Start), das dauert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import httpx
|
|
from pydantic import BaseModel
|
|
|
|
import metrics
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
RUNTIME_CONFIG_FILE = Path("/shared/config/runtime.json")
|
|
ENV_MODEL = os.environ.get("BRAIN_MODEL", "claude-sonnet-4")
|
|
PROXY_URL = os.environ.get("PROXY_URL", "http://proxy:3456")
|
|
# Read-Timeout: wie lange wir auf die HTTP-Antwort vom Proxy warten.
|
|
# Proxy ist non-streaming → erstes Byte kommt erst NACH subprocess close.
|
|
# Agent-Loops (Pentests etc.) koennen >1h dauern → muss hoch sein.
|
|
# Default 24h, kann via PROXY_TIMEOUT_SEC env ueberschrieben werden.
|
|
PROXY_TIMEOUT_SEC = float(os.environ.get("PROXY_TIMEOUT_SEC", "86400"))
|
|
# Connect/Write/Pool: klein damit toter Proxy schnell erkannt wird.
|
|
# Wenn der Proxy-Container nicht antwortet beim TCP-Connect oder waehrend
|
|
# wir den Request-Body schreiben, ist er kaputt — kein Grund 24h zu warten.
|
|
PROXY_CONNECT_TIMEOUT_SEC = float(os.environ.get("PROXY_CONNECT_TIMEOUT_SEC", "10"))
|
|
PROXY_WRITE_TIMEOUT_SEC = float(os.environ.get("PROXY_WRITE_TIMEOUT_SEC", "30"))
|
|
PROXY_POOL_TIMEOUT_SEC = float(os.environ.get("PROXY_POOL_TIMEOUT_SEC", "10"))
|
|
|
|
|
|
def _read_model_from_runtime() -> str:
|
|
"""Liest brainModel aus runtime.json. Fallback: ENV BRAIN_MODEL."""
|
|
try:
|
|
if RUNTIME_CONFIG_FILE.exists():
|
|
data = json.loads(RUNTIME_CONFIG_FILE.read_text(encoding="utf-8"))
|
|
m = (data.get("brainModel") or "").strip()
|
|
if m:
|
|
return m
|
|
except Exception as exc:
|
|
logger.warning("runtime.json lesen fehlgeschlagen: %s", exc)
|
|
return ENV_MODEL
|
|
|
|
|
|
DEFAULT_MODEL = _read_model_from_runtime()
|
|
|
|
|
|
class Message(BaseModel):
|
|
role: str # "system" | "user" | "assistant" | "tool"
|
|
content: Optional[str] = None
|
|
tool_calls: Optional[list] = None
|
|
tool_call_id: Optional[str] = None
|
|
name: Optional[str] = None # nur fuer role=tool
|
|
|
|
|
|
class ProxyResult(BaseModel):
|
|
content: str = ""
|
|
tool_calls: list = [] # je: {"id", "name", "arguments" (dict)}
|
|
finish_reason: str = ""
|
|
|
|
|
|
class ProxyClient:
|
|
def __init__(self, base_url: str = PROXY_URL, model: str = DEFAULT_MODEL):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.model = model
|
|
# Persistente Client-Connection — vermeidet TCP-Handshake bei jedem Call.
|
|
# Timeouts split nach Phase: connect/write/pool klein (toter Proxy → schnell
|
|
# ReadTimeout), read gross (ARIA darf ewig rechnen).
|
|
self._client = httpx.Client(timeout=httpx.Timeout(
|
|
connect=PROXY_CONNECT_TIMEOUT_SEC,
|
|
read=PROXY_TIMEOUT_SEC,
|
|
write=PROXY_WRITE_TIMEOUT_SEC,
|
|
pool=PROXY_POOL_TIMEOUT_SEC,
|
|
))
|
|
|
|
def chat(self, messages: List[Message], model: Optional[str] = None) -> str:
|
|
"""Convenience: einfacher Chat ohne Tools. Gibt nur den Reply-String zurueck."""
|
|
result = self.chat_full(messages, tools=None, model=model)
|
|
if not result.content:
|
|
raise RuntimeError("Proxy lieferte leeren content")
|
|
return result.content
|
|
|
|
def chat_full(
|
|
self,
|
|
messages: List[Message],
|
|
tools: Optional[list] = None,
|
|
model: Optional[str] = None,
|
|
) -> ProxyResult:
|
|
"""Full chat — kann Tool-Calls liefern (wenn tools mitgegeben).
|
|
|
|
tools-Format ist OpenAI-Style:
|
|
[{"type":"function","function":{"name":..,"description":..,"parameters":{...}}}, ...]
|
|
"""
|
|
url = f"{self.base_url}/v1/chat/completions"
|
|
# Pydantic-Dumps mit exclude_none damit role=tool ohne tool_calls geht
|
|
payload = {
|
|
"model": model or self.model,
|
|
"messages": [m.model_dump(exclude_none=True) for m in messages],
|
|
}
|
|
if tools:
|
|
payload["tools"] = tools
|
|
logger.info("Proxy → %s (%d Messages, %d tools, model=%s)",
|
|
url, len(messages), len(tools or []), payload["model"])
|
|
try:
|
|
r = self._client.post(url, json=payload)
|
|
except httpx.RequestError as exc:
|
|
raise RuntimeError(f"Proxy unreachable: {exc}") from exc
|
|
if r.status_code != 200:
|
|
raise RuntimeError(f"Proxy HTTP {r.status_code}: {r.text[:300]}")
|
|
try:
|
|
data = r.json()
|
|
except Exception as exc:
|
|
raise RuntimeError(f"Proxy invalid JSON: {exc}") from exc
|
|
|
|
choices = data.get("choices") or []
|
|
if not choices:
|
|
raise RuntimeError(f"Proxy ohne choices: {str(data)[:300]}")
|
|
|
|
msg = choices[0].get("message") or {}
|
|
finish_reason = choices[0].get("finish_reason", "")
|
|
|
|
# Diagnose: was hat der Proxy zurueckgegeben?
|
|
# Wir loggen die rohe message + finish_reason damit wir sehen ob
|
|
# tool_calls da sind, leer oder schlicht weggeschnitten werden.
|
|
logger.info("Proxy ← finish=%s keys=%s tool_calls=%d content_len=%d",
|
|
finish_reason,
|
|
sorted(msg.keys()),
|
|
len(msg.get("tool_calls") or []),
|
|
len(msg.get("content") or "") if isinstance(msg.get("content"), str)
|
|
else sum(len(p.get("text", "")) for p in (msg.get("content") or []) if isinstance(p, dict)))
|
|
try:
|
|
logger.info("Proxy ← raw-msg=%s", json.dumps(msg)[:1500])
|
|
except Exception:
|
|
logger.info("Proxy ← raw-msg(non-serial)=%s", str(msg)[:1500])
|
|
|
|
content = msg.get("content") or ""
|
|
if isinstance(content, list):
|
|
content = "".join(
|
|
part.get("text", "") for part in content if isinstance(part, dict) and part.get("type") == "text"
|
|
)
|
|
|
|
tool_calls_raw = msg.get("tool_calls") or []
|
|
tool_calls = []
|
|
import json as _json
|
|
for tc in tool_calls_raw:
|
|
fn = tc.get("function") or {}
|
|
args_raw = fn.get("arguments", "{}")
|
|
args: dict
|
|
if isinstance(args_raw, dict):
|
|
args = args_raw
|
|
else:
|
|
try:
|
|
args = _json.loads(args_raw)
|
|
except Exception:
|
|
args = {"_raw": args_raw}
|
|
tool_calls.append({
|
|
"id": tc.get("id", ""),
|
|
"name": fn.get("name", ""),
|
|
"arguments": args,
|
|
})
|
|
|
|
# Call-Metric anhaengen — Token-Schaetzung fuer Quota-Monitoring
|
|
metrics.log_call(payload["model"], messages, content or "")
|
|
|
|
return ProxyResult(content=content or "", tool_calls=tool_calls, finish_reason=finish_reason)
|
|
|
|
def close(self):
|
|
try:
|
|
self._client.close()
|
|
except Exception:
|
|
pass
|