Files
ARIA-AGENT/aria-brain/metrics.py
T
duffyduck b2f7d6dda2 feat(brain+diagnostic): Token/Call-Metrics mit Subscription-Plan-Tracking
Stefan hat den Max 5x Plan (~\$90-100/Monat), ungefaehres Limit 225 Calls pro
5h-Fenster fuer Sonnet. Damit nicht in eine Tool-Loop-Schleife laufen ohne
es zu merken: kleine Metrics-Pipeline, sichtbar in der Diagnostic.

aria-brain/metrics.py
  Append-only JSONL Logger unter /data/metrics.jsonl. Pro Claude-Call eine
  Zeile {ts, model, in, out} mit Token-Schaetzung (chars/4, Anthropic-
  Heuristik). aggregate(window) zaehlt die letzten N Sekunden.
  Auto-Rotate bei 50k Zeilen → 25k behalten (~70 KB/Monat bei 1k Calls/Tag,
  Cap also weit oben).

aria-brain/proxy_client.py
  chat_full() ruft am Ende metrics.log_call(model, messages_in, reply).
  Failed/exception-Pfade loggen nicht (sonst false positives).

aria-brain/main.py
  GET /metrics/calls → {h1, h5, h24, d30}, jedes Window mit calls,
  tokens_in, tokens_out, by_model.

diagnostic/index.html
  Neue Card "Token / Calls" im Gehirn-Tab. Plan-Dropdown
  (Pro / Max 5x / Max 20x / Custom), localStorage-persistiert. 4 Metric-
  Zellen fuer 1h/5h/24h/30d mit Calls + Tokens. Progress-Bar oben zeigt
  5h-Counter gegen Plan-Limit. Warn-Klassen: gelb bei 80%, rot bei 90%.
  Auto-Refresh alle 30s wenn Gehirn-Tab offen, plus bei Tab-Wechsel.
  Info-Modal erklaert die Limits + dass HTTP-Call != User-Frage (Tool-Use
  kann pro Frage bis zu 8 Calls verursachen).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 23:43:56 +02:00

134 lines
4.2 KiB
Python

"""
Call-Metrics fuer den Proxy-Client.
Pro Claude-Call wird ein Eintrag in /data/metrics.jsonl angehaengt:
{"ts": <ms>, "model": "...", "in": <tokens_in_estimate>, "out": <tokens_out_estimate>}
Tokens-Schaetzung: characters / 4 (Anthropic-Default-Heuristik). Nicht exakt
aber gut genug fuer Quota-Monitoring. Wir summieren nicht in-memory weil
der Brain-Container neugestartet werden kann — alles auf Disk.
Auswertung via aggregate(window_seconds) — liefert {calls, tokens_in, tokens_out}
fuer die letzten N Sekunden. Lazy gelesen, keine grossen Datenmengen erwartet
(bei 1000 Calls/Tag ~70 KB pro Monat).
Auto-Rotate: bei > 50k Zeilen werden die aeltesten 25k weggeschnitten.
"""
from __future__ import annotations
import json
import logging
import os
import time
from pathlib import Path
from typing import List
logger = logging.getLogger(__name__)
METRICS_FILE = Path(os.environ.get("METRICS_FILE", "/data/metrics.jsonl"))
ROTATE_AT = 50_000
ROTATE_KEEP = 25_000
def _estimate_tokens(text: str) -> int:
"""Anthropic-Default: ~4 chars pro Token. Grob genug."""
if not text:
return 0
return max(1, len(text) // 4)
def _messages_tokens(messages: list) -> int:
total = 0
for m in messages:
# Pydantic-Model oder dict
if hasattr(m, "content"):
total += _estimate_tokens(m.content or "")
elif isinstance(m, dict):
c = m.get("content") or ""
if isinstance(c, str):
total += _estimate_tokens(c)
return total
def log_call(model: str, messages_in: list, reply_text: str = "") -> None:
"""Eine Call-Metric anhaengen. Robust gegen Fehler (silent fail)."""
try:
tokens_in = _messages_tokens(messages_in)
tokens_out = _estimate_tokens(reply_text)
line = json.dumps({
"ts": int(time.time() * 1000),
"model": model,
"in": tokens_in,
"out": tokens_out,
})
METRICS_FILE.parent.mkdir(parents=True, exist_ok=True)
with METRICS_FILE.open("a", encoding="utf-8") as f:
f.write(line + "\n")
# Sanftes Rotate ohne hohe IO-Kosten — nur alle 1000 Calls checken
if (tokens_in + tokens_out) % 1000 < 4:
_maybe_rotate()
except Exception as exc:
logger.warning("metrics.log_call: %s", exc)
def _maybe_rotate() -> None:
try:
if not METRICS_FILE.exists():
return
with METRICS_FILE.open("r", encoding="utf-8") as f:
lines = f.readlines()
if len(lines) > ROTATE_AT:
keep = lines[-ROTATE_KEEP:]
METRICS_FILE.write_text("".join(keep), encoding="utf-8")
logger.info("metrics rotated: %d%d Zeilen", len(lines), len(keep))
except Exception as exc:
logger.warning("metrics rotate: %s", exc)
def aggregate(window_seconds: int) -> dict:
"""Aggregiert die Calls der letzten N Sekunden."""
now_ms = int(time.time() * 1000)
cutoff_ms = now_ms - (window_seconds * 1000)
calls = 0
tokens_in = 0
tokens_out = 0
by_model: dict[str, int] = {}
if METRICS_FILE.exists():
try:
for raw in METRICS_FILE.read_text(encoding="utf-8").splitlines():
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except Exception:
continue
if obj.get("ts", 0) < cutoff_ms:
continue
calls += 1
tokens_in += int(obj.get("in") or 0)
tokens_out += int(obj.get("out") or 0)
m = obj.get("model", "?")
by_model[m] = by_model.get(m, 0) + 1
except Exception as exc:
logger.warning("metrics aggregate: %s", exc)
return {
"window_seconds": window_seconds,
"calls": calls,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"by_model": by_model,
}
def stats() -> dict:
"""Komplett-Snapshot mit den drei wichtigsten Fenstern."""
return {
"h1": aggregate(3600),
"h5": aggregate(5 * 3600),
"h24": aggregate(24 * 3600),
"d30": aggregate(30 * 24 * 3600),
}