e3fe27f736
App-Seite: - VoiceIdEnrollment.tsx (neue Komponente, ~370 Zeilen): Status-Karte (loading/unenrolled/enrolled/error), Sample-Recorder mit Countdown (4s fest pro Sample), Liste mit einzelnem Loeschen, Save-Button (disabled bis 5 Samples), Fingerprint-Delete mit Confirm. - SettingsScreen.tsx: neue Section 🎤 'Stimme einrichten' zwischen Wake-Word und Sprachausgabe. - Sample-Format: WAV via audioService.startRecording — wird whisper-bridge-seitig per wave-Modul gestrippt. Diagnostic-Seite: - Neue settings-section 'Voice-ID (Sprecher-Erkennung)': Status-Anzeige (live ueber voice_id_status_response), Threshold-Slider 0.30-0.70 (persistiert in voice_config.json, broadcast als config-Message), Refresh + Delete-Button. - server.js: 2 neue actions (voice_id_status, voice_id_delete), send_voice_config nimmt voiceIdThreshold mit auf. Backend: - speaker_id.py: _normalize_audio_bytes erkennt jetzt WAV-Header (RIFF/WAVE) und strippt auf rohes PCM — sonst werfen die ECAPA- Embeddings auf den 44-Byte-Header rein. - bridge.py: config-Broadcast-Handler setzt voiceIdThreshold auf speaker_id.DEFAULT_THRESHOLD (wird erst in Phase 3 beim Gating genutzt, persistiert aber schon). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
226 lines
8.3 KiB
Python
226 lines
8.3 KiB
Python
"""
|
|
Speaker-ID Backend fuer ARIAs Stimmen-Erkennung.
|
|
|
|
Nutzt SpeechBrain ECAPA-TDNN (192-dim Embeddings, auf VoxCeleb-1+2 trainiert).
|
|
Fingerprint = gemittelter, L2-normalisierter Embedding-Vektor aus N
|
|
Enrollment-Samples. Verify: cosine_similarity(neue_aufnahme, fingerprint).
|
|
|
|
Persistenz: /voice-id/fingerprint.json (Float-Liste + Metadaten).
|
|
Modell-Cache: /root/.cache/huggingface/ (Bind-Mount mit f5tts geteilt).
|
|
|
|
Verhalten OHNE Enrollment (kein Fingerprint vorhanden):
|
|
verify() → (True, 0.0) — Fail-open, damit Speaker-ID-Gating den
|
|
ungeenrollten Brain-Pfad nicht versehentlich blockiert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VOICE_ID_DIR = Path(os.environ.get("VOICE_ID_DIR", "/voice-id"))
|
|
FINGERPRINT_FILE = VOICE_ID_DIR / "fingerprint.json"
|
|
|
|
# Cosine-Threshold: 0.5 ist konservativ (wenig false-positives), 0.3 ist
|
|
# locker (mehr Treffer auch bei Nebengeraeuschen). Stefan kann's per
|
|
# Diagnostic-Setting feintunen.
|
|
DEFAULT_THRESHOLD = 0.5
|
|
|
|
# Minimal-Sample-Laenge fuer ein verlaessliches Embedding (~1s @ 16kHz int16 = 32000 bytes)
|
|
MIN_SAMPLE_BYTES = 32000
|
|
|
|
_model = None
|
|
|
|
|
|
def _ensure_loaded():
|
|
"""Lazy-Load des ECAPA-TDNN. Holt das Modell beim ersten Aufruf von HF;
|
|
danach cached im HF-Cache-Volume. Erste Init: ~30s download + load,
|
|
danach <1s warm. Wirft bei Fehler — Caller muss catchen + fail-open."""
|
|
global _model
|
|
if _model is not None:
|
|
return _model
|
|
import torch
|
|
from speechbrain.inference.speaker import EncoderClassifier
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
logger.info("[speaker-id] loading ECAPA-TDNN on %s ...", device)
|
|
_model = EncoderClassifier.from_hparams(
|
|
source="speechbrain/spkrec-ecapa-voxceleb",
|
|
savedir="/root/.cache/huggingface/speechbrain-ecapa",
|
|
run_opts={"device": device},
|
|
)
|
|
logger.info("[speaker-id] model ready (device=%s)", device)
|
|
return _model
|
|
|
|
|
|
def _normalize_audio_bytes(audio_bytes: bytes) -> bytes:
|
|
"""Akzeptiert entweder rohes 16kHz int16 LE PCM ODER eine WAV-Datei (RIFF/WAVE).
|
|
Bei WAV wird der Header gestrippt + Format validiert (16kHz / mono / int16).
|
|
Ergebnis: rohes PCM."""
|
|
if (len(audio_bytes) >= 44
|
|
and audio_bytes[:4] == b"RIFF"
|
|
and audio_bytes[8:12] == b"WAVE"):
|
|
import io
|
|
import wave
|
|
with wave.open(io.BytesIO(audio_bytes), "rb") as wav:
|
|
sr = wav.getframerate()
|
|
ch = wav.getnchannels()
|
|
sw = wav.getsampwidth()
|
|
if sr != 16000:
|
|
raise ValueError(f"WAV-Samplerate {sr} != 16000")
|
|
if ch != 1:
|
|
raise ValueError(f"WAV-Kanalzahl {ch} != 1 (mono erwartet)")
|
|
if sw != 2:
|
|
raise ValueError(f"WAV-Sampleweite {sw} != 2 (int16 erwartet)")
|
|
return wav.readframes(wav.getnframes())
|
|
return audio_bytes
|
|
|
|
|
|
def _audio_bytes_to_tensor(audio_bytes: bytes):
|
|
"""int16 LE PCM (16kHz mono) → Torch-Tensor (1, N), normalisiert auf [-1, 1].
|
|
WAV wird vorher auf rohes PCM reduziert (Header strippen)."""
|
|
import torch
|
|
raw = _normalize_audio_bytes(audio_bytes)
|
|
arr = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
|
|
return torch.from_numpy(arr).unsqueeze(0)
|
|
|
|
|
|
def embed(audio_bytes: bytes) -> np.ndarray:
|
|
"""Berechnet das Speaker-Embedding fuer einen Audio-Chunk.
|
|
Erwartet 16kHz int16 LE PCM Mono. Returns 192-dim numpy float32."""
|
|
import torch
|
|
model = _ensure_loaded()
|
|
wav = _audio_bytes_to_tensor(audio_bytes)
|
|
with torch.no_grad():
|
|
emb = model.encode_batch(wav)
|
|
return emb.squeeze().cpu().numpy().astype(np.float32)
|
|
|
|
|
|
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Kosinus-Aehnlichkeit zwischen zwei 1D-Vektoren, Range [-1, 1].
|
|
Hoeher = aehnlicher. Bei normalisierten Vektoren ist das gleich dem Skalarprodukt."""
|
|
na = np.linalg.norm(a)
|
|
nb = np.linalg.norm(b)
|
|
if na < 1e-9 or nb < 1e-9:
|
|
return 0.0
|
|
return float(np.dot(a, b) / (na * nb))
|
|
|
|
|
|
def save_fingerprint(embeddings: list[np.ndarray], sample_durations_s: list[float]) -> dict:
|
|
"""Mittelt + L2-normalisiert die Embeddings und schreibt sie nach
|
|
FINGERPRINT_FILE. Returns das gespeicherte Dict."""
|
|
if not embeddings:
|
|
raise ValueError("Keine Embeddings zum Speichern")
|
|
VOICE_ID_DIR.mkdir(parents=True, exist_ok=True)
|
|
stacked = np.stack(embeddings)
|
|
mean = stacked.mean(axis=0)
|
|
mean = mean / max(np.linalg.norm(mean), 1e-9)
|
|
data = {
|
|
"version": 1,
|
|
"embedding": mean.tolist(),
|
|
"embedding_dim": int(mean.shape[0]),
|
|
"sample_count": len(embeddings),
|
|
"sample_durations_s": [float(s) for s in sample_durations_s],
|
|
"updated_at": int(time.time()),
|
|
}
|
|
FINGERPRINT_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
|
logger.info("[speaker-id] fingerprint gespeichert: %d Samples, dim=%d, total_s=%.1f",
|
|
len(embeddings), mean.shape[0], sum(sample_durations_s))
|
|
return data
|
|
|
|
|
|
def load_fingerprint() -> Optional[dict]:
|
|
"""Returns das Fingerprint-Dict oder None wenn noch nicht enrolled."""
|
|
if not FINGERPRINT_FILE.exists():
|
|
return None
|
|
try:
|
|
return json.loads(FINGERPRINT_FILE.read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
logger.warning("[speaker-id] fingerprint laden fehlgeschlagen: %s", exc)
|
|
return None
|
|
|
|
|
|
def delete_fingerprint() -> bool:
|
|
"""Loescht den Fingerprint (z.B. fuer Re-Enrollment). True wenn was weg ist."""
|
|
if FINGERPRINT_FILE.exists():
|
|
FINGERPRINT_FILE.unlink()
|
|
logger.info("[speaker-id] fingerprint geloescht")
|
|
return True
|
|
return False
|
|
|
|
|
|
def verify(audio_bytes: bytes, threshold: float = DEFAULT_THRESHOLD) -> tuple[bool, float]:
|
|
"""Returns (is_match, similarity).
|
|
|
|
Fail-open: wenn kein Fingerprint vorhanden ist oder das Embedding-Modell
|
|
crasht, returnt (True, 0.0) — kein Filtering. Sonst wuerde ein kaputter
|
|
Speaker-ID-Service die ganze Aufnahme blockieren."""
|
|
fp = load_fingerprint()
|
|
if fp is None:
|
|
return True, 0.0
|
|
if len(audio_bytes) < MIN_SAMPLE_BYTES:
|
|
# Zu wenig Audio fuer ein verlaessliches Embedding → durchlassen
|
|
return True, 0.0
|
|
try:
|
|
saved_emb = np.array(fp["embedding"], dtype=np.float32)
|
|
new_emb = embed(audio_bytes)
|
|
except Exception as exc:
|
|
logger.warning("[speaker-id] verify embed failed: %s — fail-open", exc)
|
|
return True, 0.0
|
|
sim = cosine_similarity(new_emb, saved_emb)
|
|
return sim >= threshold, sim
|
|
|
|
|
|
def status() -> dict:
|
|
"""Status-Snapshot fuer die App / Diagnostic."""
|
|
fp = load_fingerprint()
|
|
return {
|
|
"enrolled": fp is not None,
|
|
"sample_count": fp.get("sample_count", 0) if fp else 0,
|
|
"sample_durations_s": fp.get("sample_durations_s", []) if fp else [],
|
|
"updated_at": fp.get("updated_at") if fp else None,
|
|
"embedding_dim": fp.get("embedding_dim") if fp else None,
|
|
"default_threshold": DEFAULT_THRESHOLD,
|
|
}
|
|
|
|
|
|
def enroll_from_samples(samples_b64: list[str]) -> dict:
|
|
"""Verarbeitet base64-Samples (16kHz int16 LE PCM Mono) zu einem neuen
|
|
Fingerprint. Returns Status-Dict. Wirft ValueError wenn nichts brauchbar ist."""
|
|
if not samples_b64:
|
|
raise ValueError("Keine Samples uebergeben")
|
|
embeddings: list[np.ndarray] = []
|
|
durations: list[float] = []
|
|
rejected: list[dict] = []
|
|
for idx, s in enumerate(samples_b64):
|
|
try:
|
|
raw = base64.b64decode(s)
|
|
except Exception as exc:
|
|
rejected.append({"index": idx, "reason": f"base64: {exc}"})
|
|
continue
|
|
if len(raw) < MIN_SAMPLE_BYTES:
|
|
rejected.append({"index": idx, "reason": f"zu kurz ({len(raw)} bytes)"})
|
|
continue
|
|
try:
|
|
emb = embed(raw)
|
|
embeddings.append(emb)
|
|
durations.append(len(raw) / 2 / 16000.0)
|
|
except Exception as exc:
|
|
rejected.append({"index": idx, "reason": f"embed: {exc}"})
|
|
if not embeddings:
|
|
raise ValueError(
|
|
f"Keine Samples konnten verarbeitet werden ({len(rejected)} rejected). "
|
|
f"Details: {rejected[:3]}"
|
|
)
|
|
fingerprint = save_fingerprint(embeddings, durations)
|
|
fingerprint["rejected"] = rejected
|
|
return fingerprint
|