feat(speaker-id): Phase 3 — Speaker-Gating im Streaming-STT

Sobald eine Streaming-Session ~1.5s Audio im Buffer hat, wird einmal pro
Session der Speaker-ID-Check ausgefuehrt (im Executor, ~50-100ms auf GPU).
Bei Match → Session laeuft normal weiter. Bei Mismatch → synthetisches
stt_endpoint mit text='' reason='speaker_mismatch' + stt_stream_done →
App ruft endConversation. Kein Whisper-Transcribe fuer fremde Stimmen →
Token + Latenz gespart.

- StreamSession: 3 neue Felder (speaker_checked, speaker_match,
  speaker_similarity).
- SessionManager._check_speaker / _finalize_speaker_mismatch:
  Check + sauberes Beenden bei Mismatch.
- _tick_session: Check-Gate vor STREAM_MIN_AUDIO_MS-Check eingehaengt.
- speaker_id.verify: threshold=None statt =DEFAULT_THRESHOLD damit
  config-Broadcast-Updates zur Laufzeit greifen (Default-Arg wird sonst
  zur Def-Zeit gebunden).

Fail-open: ohne Fingerprint returnt verify() (True, 0.0) — keine
Auswirkung. Stefan kann ohne Enrollment weiter wie bisher arbeiten.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 20:41:49 +02:00
parent e3fe27f736
commit ac53af5c24
2 changed files with 94 additions and 1 deletions
+87
View File
@@ -63,6 +63,7 @@ ALLOWED_MODELS = {"tiny", "base", "small", "medium", "large-v3"}
# Streaming-Parameter (Defaults — koennen pro Session vom App-Payload ueberschrieben werden) # Streaming-Parameter (Defaults — koennen pro Session vom App-Payload ueberschrieben werden)
STREAM_TRANSCRIBE_INTERVAL_MS = 700 # alle 700ms transkribieren waehrend Stream laeuft STREAM_TRANSCRIBE_INTERVAL_MS = 700 # alle 700ms transkribieren waehrend Stream laeuft
STREAM_SPEAKER_CHECK_MS = 1500 # Mindest-Audio fuer Speaker-ID-Pruefung
STREAM_DEFAULT_ENDPOINT_MS = 1500 # nach 1.5s ohne neuen Text → Endpoint STREAM_DEFAULT_ENDPOINT_MS = 1500 # nach 1.5s ohne neuen Text → Endpoint
STREAM_DEFAULT_HARD_CAP_MS = 60000 # nach 60s Audio: harter Cut egal was STREAM_DEFAULT_HARD_CAP_MS = 60000 # nach 60s Audio: harter Cut egal was
STREAM_MIN_AUDIO_MS = 600 # erst transkribieren wenn min 600ms Audio da STREAM_MIN_AUDIO_MS = 600 # erst transkribieren wenn min 600ms Audio da
@@ -311,6 +312,12 @@ class StreamSession:
last_transcribe_at: float = 0.0 last_transcribe_at: float = 0.0
closed: bool = False # nach stream_end gesetzt closed: bool = False # nach stream_end gesetzt
endpoint_sent: bool = False # Endpoint nur einmal feuern endpoint_sent: bool = False # Endpoint nur einmal feuern
# Speaker-ID Gating: bei aktiviertem Fingerprint pruefen wir die ersten
# ~1.5s der Aufnahme. Bei mismatch wird die Session sofort beendet mit
# synthetischem stt_endpoint(text='', reason='speaker_mismatch').
speaker_checked: bool = False
speaker_match: Optional[bool] = None
speaker_similarity: float = 0.0
class SessionManager: class SessionManager:
@@ -422,6 +429,77 @@ class SessionManager:
sid[:8], now - sess.last_chunk_at) sid[:8], now - sess.last_chunk_at)
self.drop(sid) self.drop(sid)
async def _check_speaker(self, sess: StreamSession, ws) -> None:
"""Speaker-ID einmalig pro Session: nimmt die ersten ~1.5s Audio,
rechnet das Embedding, vergleicht mit dem persistierten Fingerprint.
Ohne Fingerprint → fail-open (match=True). Bei mismatch wird die
Session sofort beendet mit synthetischem stt_endpoint."""
sess.speaker_checked = True
# Erste ~1.5s aus dem Buffer entnehmen (16kHz * 2 byte/sample = 32 bytes/ms)
head_bytes = bytes(sess.pcm_buffer[: STREAM_SPEAKER_CHECK_MS * 32])
if len(head_bytes) < speaker_id.MIN_SAMPLE_BYTES:
# Zu wenig — durchlassen
sess.speaker_match = True
sess.speaker_similarity = 0.0
return
try:
loop = asyncio.get_running_loop()
is_match, sim = await loop.run_in_executor(
None, speaker_id.verify, head_bytes,
)
except Exception as exc:
logger.warning("Stream %s: speaker-check crashed (%s) — fail-open",
sess.request_id[:8], exc)
sess.speaker_match = True
sess.speaker_similarity = 0.0
return
sess.speaker_match = is_match
sess.speaker_similarity = sim
logger.info("Stream %s: speaker-check sim=%.2f%s (threshold=%.2f)",
sess.request_id[:8], sim, "MATCH" if is_match else "REJECT",
speaker_id.DEFAULT_THRESHOLD)
await _debug_log(ws, "speaker.check",
f"id={sess.request_id[:12]} sim={sim:.2f} "
f"thr={speaker_id.DEFAULT_THRESHOLD:.2f} "
f"{'MATCH' if is_match else 'REJECT'}")
if not is_match:
await self._finalize_speaker_mismatch(sess, ws, sim)
async def _finalize_speaker_mismatch(self, sess: StreamSession, ws,
similarity: float) -> None:
"""Bei Speaker-Mismatch: synthetisches stt_endpoint (text='', reason=
'speaker_mismatch') schicken damit der App-Pfad sauber endet
(endConversation), Session droppen. Kein Whisper-Transcribe.
Spart die Token + die STT-Latenz fuer fremde Stimmen."""
if sess.endpoint_sent:
return
sess.endpoint_sent = True
duration_s = self._buffer_duration_ms(sess) / 1000.0
logger.info("Stream %s: speaker-mismatch (sim=%.2f) — DROP nach %.1fs",
sess.request_id[:8], similarity, duration_s)
endpoint_payload = {
"requestId": sess.request_id,
"audioRequestId": sess.audio_request_id,
"text": "",
"reason": "speaker_mismatch",
"durationS": duration_s,
"sttMs": 0,
"voice": sess.voice,
"speed": sess.speed,
"interrupted": sess.interrupted,
"speakerSimilarity": float(similarity),
}
if sess.location:
endpoint_payload["location"] = sess.location
await _send(ws, "stt_endpoint", endpoint_payload)
await _send(ws, "stt_stream_done", {
"requestId": sess.request_id,
"audioRequestId": sess.audio_request_id,
"text": "",
"reason": "speaker_mismatch",
})
self.drop(sess.request_id)
async def _tick_session(self, sess: StreamSession, now: float) -> None: async def _tick_session(self, sess: StreamSession, now: float) -> None:
ws = self._ws ws = self._ws
if ws is None: if ws is None:
@@ -442,6 +520,15 @@ class SessionManager:
await self._finalize(sess, ws, reason="stream_end") await self._finalize(sess, ws, reason="stream_end")
return return
# Speaker-ID Gating: sobald genug Audio da ist, einmalig pruefen ob's
# Stefan ist. Bei Mismatch → synthetisches Endpoint, Session zu.
# Wenn kein Fingerprint persistiert ist, returnt verify() fail-open
# mit (True, 0.0) — keine Auswirkung.
if not sess.speaker_checked and audio_ms >= STREAM_SPEAKER_CHECK_MS:
await self._check_speaker(sess, ws)
if sess.speaker_match is False:
return # Session bereits beendet via _finalize_speaker_mismatch
# Noch zu wenig Audio fuer eine erste Transkription # Noch zu wenig Audio fuer eine erste Transkription
if audio_ms < STREAM_MIN_AUDIO_MS: if audio_ms < STREAM_MIN_AUDIO_MS:
return return
+7 -1
View File
@@ -157,12 +157,18 @@ def delete_fingerprint() -> bool:
return False return False
def verify(audio_bytes: bytes, threshold: float = DEFAULT_THRESHOLD) -> tuple[bool, float]: def verify(audio_bytes: bytes, threshold: Optional[float] = None) -> tuple[bool, float]:
"""Returns (is_match, similarity). """Returns (is_match, similarity).
Wenn threshold=None: nutzt den Modul-Default (DEFAULT_THRESHOLD) — der wird
vom config-Broadcast zur Laufzeit auf den Diagnostic-Slider-Wert gesetzt.
Default-Arg-Bindung waere zur Def-Zeit, also bewusst None statt direkt.
Fail-open: wenn kein Fingerprint vorhanden ist oder das Embedding-Modell Fail-open: wenn kein Fingerprint vorhanden ist oder das Embedding-Modell
crasht, returnt (True, 0.0) — kein Filtering. Sonst wuerde ein kaputter crasht, returnt (True, 0.0) — kein Filtering. Sonst wuerde ein kaputter
Speaker-ID-Service die ganze Aufnahme blockieren.""" Speaker-ID-Service die ganze Aufnahme blockieren."""
if threshold is None:
threshold = DEFAULT_THRESHOLD
fp = load_fingerprint() fp = load_fingerprint()
if fp is None: if fp is None:
return True, 0.0 return True, 0.0