feat(brain+diagnostic): Token/Call-Metrics mit Subscription-Plan-Tracking
Stefan hat den Max 5x Plan (~\$90-100/Monat), ungefaehres Limit 225 Calls pro
5h-Fenster fuer Sonnet. Damit nicht in eine Tool-Loop-Schleife laufen ohne
es zu merken: kleine Metrics-Pipeline, sichtbar in der Diagnostic.
aria-brain/metrics.py
Append-only JSONL Logger unter /data/metrics.jsonl. Pro Claude-Call eine
Zeile {ts, model, in, out} mit Token-Schaetzung (chars/4, Anthropic-
Heuristik). aggregate(window) zaehlt die letzten N Sekunden.
Auto-Rotate bei 50k Zeilen → 25k behalten (~70 KB/Monat bei 1k Calls/Tag,
Cap also weit oben).
aria-brain/proxy_client.py
chat_full() ruft am Ende metrics.log_call(model, messages_in, reply).
Failed/exception-Pfade loggen nicht (sonst false positives).
aria-brain/main.py
GET /metrics/calls → {h1, h5, h24, d30}, jedes Window mit calls,
tokens_in, tokens_out, by_model.
diagnostic/index.html
Neue Card "Token / Calls" im Gehirn-Tab. Plan-Dropdown
(Pro / Max 5x / Max 20x / Custom), localStorage-persistiert. 4 Metric-
Zellen fuer 1h/5h/24h/30d mit Calls + Tokens. Progress-Bar oben zeigt
5h-Counter gegen Plan-Limit. Warn-Klassen: gelb bei 80%, rot bei 90%.
Auto-Refresh alle 30s wenn Gehirn-Tab offen, plus bei Tab-Wechsel.
Info-Modal erklaert die Limits + dass HTTP-Call != User-Frage (Tool-Use
kann pro Frage bis zu 8 Calls verursachen).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -29,6 +29,7 @@ from conversation import Conversation
|
||||
from proxy_client import ProxyClient
|
||||
from agent import Agent
|
||||
import skills as skills_mod
|
||||
import metrics as metrics_mod
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
||||
logger = logging.getLogger("aria-brain")
|
||||
@@ -404,6 +405,15 @@ def conversation_distill_now():
|
||||
return agent().distill_old_turns()
|
||||
|
||||
|
||||
# ─── Call-Metrics (Token / Quota-Monitoring) ────────────────────────
|
||||
|
||||
@app.get("/metrics/calls")
|
||||
def metrics_calls():
|
||||
"""Liefert Aggregate fuer 1h / 5h / 24h / 30d.
|
||||
Jedes Window: {window_seconds, calls, tokens_in, tokens_out, by_model}."""
|
||||
return metrics_mod.stats()
|
||||
|
||||
|
||||
# ─── Skills ─────────────────────────────────────────────────────────
|
||||
|
||||
class SkillCreate(BaseModel):
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
"""
|
||||
Call-Metrics fuer den Proxy-Client.
|
||||
|
||||
Pro Claude-Call wird ein Eintrag in /data/metrics.jsonl angehaengt:
|
||||
|
||||
{"ts": <ms>, "model": "...", "in": <tokens_in_estimate>, "out": <tokens_out_estimate>}
|
||||
|
||||
Tokens-Schaetzung: characters / 4 (Anthropic-Default-Heuristik). Nicht exakt
|
||||
aber gut genug fuer Quota-Monitoring. Wir summieren nicht in-memory weil
|
||||
der Brain-Container neugestartet werden kann — alles auf Disk.
|
||||
|
||||
Auswertung via aggregate(window_seconds) — liefert {calls, tokens_in, tokens_out}
|
||||
fuer die letzten N Sekunden. Lazy gelesen, keine grossen Datenmengen erwartet
|
||||
(bei 1000 Calls/Tag ~70 KB pro Monat).
|
||||
|
||||
Auto-Rotate: bei > 50k Zeilen werden die aeltesten 25k weggeschnitten.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_FILE = Path(os.environ.get("METRICS_FILE", "/data/metrics.jsonl"))
|
||||
ROTATE_AT = 50_000
|
||||
ROTATE_KEEP = 25_000
|
||||
|
||||
|
||||
def _estimate_tokens(text: str) -> int:
|
||||
"""Anthropic-Default: ~4 chars pro Token. Grob genug."""
|
||||
if not text:
|
||||
return 0
|
||||
return max(1, len(text) // 4)
|
||||
|
||||
|
||||
def _messages_tokens(messages: list) -> int:
|
||||
total = 0
|
||||
for m in messages:
|
||||
# Pydantic-Model oder dict
|
||||
if hasattr(m, "content"):
|
||||
total += _estimate_tokens(m.content or "")
|
||||
elif isinstance(m, dict):
|
||||
c = m.get("content") or ""
|
||||
if isinstance(c, str):
|
||||
total += _estimate_tokens(c)
|
||||
return total
|
||||
|
||||
|
||||
def log_call(model: str, messages_in: list, reply_text: str = "") -> None:
|
||||
"""Eine Call-Metric anhaengen. Robust gegen Fehler (silent fail)."""
|
||||
try:
|
||||
tokens_in = _messages_tokens(messages_in)
|
||||
tokens_out = _estimate_tokens(reply_text)
|
||||
line = json.dumps({
|
||||
"ts": int(time.time() * 1000),
|
||||
"model": model,
|
||||
"in": tokens_in,
|
||||
"out": tokens_out,
|
||||
})
|
||||
METRICS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with METRICS_FILE.open("a", encoding="utf-8") as f:
|
||||
f.write(line + "\n")
|
||||
# Sanftes Rotate ohne hohe IO-Kosten — nur alle 1000 Calls checken
|
||||
if (tokens_in + tokens_out) % 1000 < 4:
|
||||
_maybe_rotate()
|
||||
except Exception as exc:
|
||||
logger.warning("metrics.log_call: %s", exc)
|
||||
|
||||
|
||||
def _maybe_rotate() -> None:
|
||||
try:
|
||||
if not METRICS_FILE.exists():
|
||||
return
|
||||
with METRICS_FILE.open("r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
if len(lines) > ROTATE_AT:
|
||||
keep = lines[-ROTATE_KEEP:]
|
||||
METRICS_FILE.write_text("".join(keep), encoding="utf-8")
|
||||
logger.info("metrics rotated: %d → %d Zeilen", len(lines), len(keep))
|
||||
except Exception as exc:
|
||||
logger.warning("metrics rotate: %s", exc)
|
||||
|
||||
|
||||
def aggregate(window_seconds: int) -> dict:
|
||||
"""Aggregiert die Calls der letzten N Sekunden."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
cutoff_ms = now_ms - (window_seconds * 1000)
|
||||
calls = 0
|
||||
tokens_in = 0
|
||||
tokens_out = 0
|
||||
by_model: dict[str, int] = {}
|
||||
if METRICS_FILE.exists():
|
||||
try:
|
||||
for raw in METRICS_FILE.read_text(encoding="utf-8").splitlines():
|
||||
raw = raw.strip()
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(raw)
|
||||
except Exception:
|
||||
continue
|
||||
if obj.get("ts", 0) < cutoff_ms:
|
||||
continue
|
||||
calls += 1
|
||||
tokens_in += int(obj.get("in") or 0)
|
||||
tokens_out += int(obj.get("out") or 0)
|
||||
m = obj.get("model", "?")
|
||||
by_model[m] = by_model.get(m, 0) + 1
|
||||
except Exception as exc:
|
||||
logger.warning("metrics aggregate: %s", exc)
|
||||
return {
|
||||
"window_seconds": window_seconds,
|
||||
"calls": calls,
|
||||
"tokens_in": tokens_in,
|
||||
"tokens_out": tokens_out,
|
||||
"by_model": by_model,
|
||||
}
|
||||
|
||||
|
||||
def stats() -> dict:
|
||||
"""Komplett-Snapshot mit den drei wichtigsten Fenstern."""
|
||||
return {
|
||||
"h1": aggregate(3600),
|
||||
"h5": aggregate(5 * 3600),
|
||||
"h24": aggregate(24 * 3600),
|
||||
"d30": aggregate(30 * 24 * 3600),
|
||||
}
|
||||
@@ -18,6 +18,8 @@ from typing import List, Optional
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
|
||||
import metrics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
RUNTIME_CONFIG_FILE = Path("/shared/config/runtime.json")
|
||||
@@ -135,6 +137,9 @@ class ProxyClient:
|
||||
"arguments": args,
|
||||
})
|
||||
|
||||
# Call-Metric anhaengen — Token-Schaetzung fuer Quota-Monitoring
|
||||
metrics.log_call(payload["model"], messages, content or "")
|
||||
|
||||
return ProxyResult(content=content or "", tool_calls=tool_calls, finish_reason=finish_reason)
|
||||
|
||||
def close(self):
|
||||
|
||||
Reference in New Issue
Block a user