""" Call-Metrics fuer den Proxy-Client. Pro Claude-Call wird ein Eintrag in /data/metrics.jsonl angehaengt: {"ts": , "model": "...", "in": , "out": } Tokens-Schaetzung: characters / 4 (Anthropic-Default-Heuristik). Nicht exakt aber gut genug fuer Quota-Monitoring. Wir summieren nicht in-memory weil der Brain-Container neugestartet werden kann — alles auf Disk. Auswertung via aggregate(window_seconds) — liefert {calls, tokens_in, tokens_out} fuer die letzten N Sekunden. Lazy gelesen, keine grossen Datenmengen erwartet (bei 1000 Calls/Tag ~70 KB pro Monat). Auto-Rotate: bei > 50k Zeilen werden die aeltesten 25k weggeschnitten. """ from __future__ import annotations import json import logging import os import time from pathlib import Path from typing import List logger = logging.getLogger(__name__) METRICS_FILE = Path(os.environ.get("METRICS_FILE", "/data/metrics.jsonl")) ROTATE_AT = 50_000 ROTATE_KEEP = 25_000 def _estimate_tokens(text: str) -> int: """Anthropic-Default: ~4 chars pro Token. Grob genug.""" if not text: return 0 return max(1, len(text) // 4) def _messages_tokens(messages: list) -> int: total = 0 for m in messages: # Pydantic-Model oder dict if hasattr(m, "content"): total += _estimate_tokens(m.content or "") elif isinstance(m, dict): c = m.get("content") or "" if isinstance(c, str): total += _estimate_tokens(c) return total def log_call(model: str, messages_in: list, reply_text: str = "") -> None: """Eine Call-Metric anhaengen. Robust gegen Fehler (silent fail).""" try: tokens_in = _messages_tokens(messages_in) tokens_out = _estimate_tokens(reply_text) line = json.dumps({ "ts": int(time.time() * 1000), "model": model, "in": tokens_in, "out": tokens_out, }) METRICS_FILE.parent.mkdir(parents=True, exist_ok=True) with METRICS_FILE.open("a", encoding="utf-8") as f: f.write(line + "\n") # Sanftes Rotate ohne hohe IO-Kosten — nur alle 1000 Calls checken if (tokens_in + tokens_out) % 1000 < 4: _maybe_rotate() except Exception as exc: logger.warning("metrics.log_call: %s", exc) def _maybe_rotate() -> None: try: if not METRICS_FILE.exists(): return with METRICS_FILE.open("r", encoding="utf-8") as f: lines = f.readlines() if len(lines) > ROTATE_AT: keep = lines[-ROTATE_KEEP:] METRICS_FILE.write_text("".join(keep), encoding="utf-8") logger.info("metrics rotated: %d → %d Zeilen", len(lines), len(keep)) except Exception as exc: logger.warning("metrics rotate: %s", exc) def aggregate(window_seconds: int) -> dict: """Aggregiert die Calls der letzten N Sekunden.""" now_ms = int(time.time() * 1000) cutoff_ms = now_ms - (window_seconds * 1000) calls = 0 tokens_in = 0 tokens_out = 0 by_model: dict[str, int] = {} if METRICS_FILE.exists(): try: for raw in METRICS_FILE.read_text(encoding="utf-8").splitlines(): raw = raw.strip() if not raw: continue try: obj = json.loads(raw) except Exception: continue if obj.get("ts", 0) < cutoff_ms: continue calls += 1 tokens_in += int(obj.get("in") or 0) tokens_out += int(obj.get("out") or 0) m = obj.get("model", "?") by_model[m] = by_model.get(m, 0) + 1 except Exception as exc: logger.warning("metrics aggregate: %s", exc) return { "window_seconds": window_seconds, "calls": calls, "tokens_in": tokens_in, "tokens_out": tokens_out, "by_model": by_model, } def stats() -> dict: """Komplett-Snapshot mit den drei wichtigsten Fenstern.""" return { "h1": aggregate(3600), "h5": aggregate(5 * 3600), "h24": aggregate(24 * 3600), "d30": aggregate(30 * 24 * 3600), }