ARIA-AGENT/xtts/f5tts/bridge.py

782 lines
33 KiB
Python

#!/usr/bin/env python3
"""
ARIA F5-TTS Bridge — laeuft auf der Gamebox (RTX 3060).
Empfaengt xtts_request via RVS → F5-TTS Voice Cloning auf GPU → streamt
16-bit PCM Chunks als audio_pcm Nachrichten zurueck an die App.
Voice-Layout im VOICES_DIR:
{name}.wav — Referenz-Audio (6-10s, 24kHz mono empfohlen)
{name}.txt — Referenz-Text (UTF-8, was im WAV gesprochen wird)
Beim voice_upload senden wir intern einen stt_request an die whisper-bridge
und legen die Transkription als .txt ab — der User muss keinen Text eingeben.
Env:
RVS_HOST, RVS_PORT, RVS_TLS, RVS_TLS_FALLBACK, RVS_TOKEN
F5TTS_MODEL Default: F5TTS_v1_Base
F5TTS_DEVICE Default: cuda
VOICES_DIR Default: /voices
"""
import asyncio
import base64
import json
import logging
import os
import re
import subprocess
import sys
import tempfile
import time
import uuid
from pathlib import Path
from typing import Optional
import numpy as np
import soundfile as sf
import websockets
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("f5tts-bridge")
# HuggingFace + Torch download-Logs etwas daempfen
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
RVS_HOST = os.getenv("RVS_HOST", "").strip()
RVS_PORT = int(os.getenv("RVS_PORT", "443"))
RVS_TLS = os.getenv("RVS_TLS", "true").lower() == "true"
RVS_TLS_FALLBACK = os.getenv("RVS_TLS_FALLBACK", "true").lower() == "true"
RVS_TOKEN = os.getenv("RVS_TOKEN", "").strip()
# F5-TTS Konfiguration
# ─────────────────────────────────────────────────────────────────
# Defaults sind hard-coded — bewusst KEINE ENV-Vars (ausser F5TTS_DEVICE,
# weil Hardware-Bootstrap). Alle Settings werden zur Laufzeit via RVS
# config-Broadcast aus Diagnostic uebersteuert (Felder f5ttsModel,
# f5ttsCkptFile, f5ttsVocabFile, f5ttsCfgStrength, f5ttsNfeStep).
F5TTS_DEVICE = os.getenv("F5TTS_DEVICE", "cuda") # nur Bootstrap
DEFAULT_F5TTS_MODEL = "F5TTS_v1_Base"
DEFAULT_F5TTS_CKPT_FILE = "" # leer = Default-Checkpoint von HF
DEFAULT_F5TTS_VOCAB_FILE = "" # leer = Default-Vocab vom Modell
# cfg_strength: wie stark der Generator am Referenz-Voice klebt.
# Default F5-TTS = 2.0. Bei nicht-EN/CN Sprachen (Deutsch!) hilft 2.5+,
# damit das Modell nicht in eine andere Sprache abrutscht.
DEFAULT_F5TTS_CFG_STRENGTH = 2.5
DEFAULT_F5TTS_NFE_STEP = 32
VOICES_DIR = Path(os.getenv("VOICES_DIR", "/voices"))
PCM_CHUNK_BYTES = 8192 # ~170ms @ 24kHz mono s16
TARGET_SR = 24000 # F5-TTS native
# F5-TTS hat ein 12s Hard-Limit fuer Referenz-Audio. Laengere WAVs werden
# vom Modell stumm abgeschnitten — aber unser ref_text bleibt lang und passt
# dann nicht mehr zum gekuerzten Audio (Quali leidet, warmup-Render ist
# unnoetig lange). Wir clippen explizit auf 10s + re-transkribieren den Text
# damit beide synchron bleiben.
REF_MAX_SECONDS = 10.0
# Wird in einer Uebergangsphase als "ungueltige Referenz" erkannt (alte voices,
# die hochgeladen wurden bevor die whisper-bridge online war). Bei Erkennung
# loeschen wir die .txt und ziehen den echten Text nach.
_LEGACY_PLACEHOLDER_REF = "Das ist ein Referenz Audio."
# ── Lazy F5-TTS Loader ──────────────────────────────────────
_F5TTS_cls = None
def _get_f5tts_cls():
"""Lazy import damit Startup-Logs nicht durch Torch-Warnungen zumuellen."""
global _F5TTS_cls
if _F5TTS_cls is None:
from f5_tts.api import F5TTS as _cls
_F5TTS_cls = _cls
return _F5TTS_cls
class F5Runner:
"""Haelt das F5-TTS-Modell. Synthese laeuft im Executor (blocking).
Live-Settings (Modell, cfg_strength, nfe_step) werden ueber update_config()
aus dem Diagnostic-Config-Broadcast gesetzt; bei Modell-Wechsel wird
automatisch neu geladen.
"""
def __init__(self) -> None:
self.model = None
self._lock = asyncio.Lock()
# Aktuelle Werte — gestartet mit Hard-Defaults, ueberschrieben von Diagnostic
self.model_id: str = DEFAULT_F5TTS_MODEL
self.ckpt_file: str = DEFAULT_F5TTS_CKPT_FILE
self.vocab_file: str = DEFAULT_F5TTS_VOCAB_FILE
self.cfg_strength: float = DEFAULT_F5TTS_CFG_STRENGTH
self.nfe_step: int = DEFAULT_F5TTS_NFE_STEP
# Last load-time fuer service_status Broadcast
self.last_load_seconds: float = 0.0
self._load_started_at: float = 0.0
def _load_blocking(self) -> None:
cls = _get_f5tts_cls()
logger.info("Lade F5-TTS '%s' (device=%s, ckpt=%s)...",
self.model_id, F5TTS_DEVICE, self.ckpt_file or "default")
self._load_started_at = time.time()
kwargs = {"model": self.model_id, "device": F5TTS_DEVICE}
if self.ckpt_file:
kwargs["ckpt_file"] = self.ckpt_file
if self.vocab_file:
kwargs["vocab_file"] = self.vocab_file
self.model = cls(**kwargs)
elapsed = time.time() - self._load_started_at
logger.info("F5-TTS geladen in %.1fs (cfg_strength=%.1f, nfe=%d)",
elapsed, self.cfg_strength, self.nfe_step)
# Wird von outside (run_loop) gelesen um service_status auf 'ready' zu setzen
self.last_load_seconds = elapsed
async def ensure_loaded(self) -> None:
async with self._lock:
if self.model is not None:
return
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._load_blocking)
async def update_config(self, payload: dict) -> None:
"""Liest f5tts*-Felder aus einem config-Broadcast.
Bei Modell-relevantem Wechsel wird neu geladen."""
new_model = (payload.get("f5ttsModel") or "").strip() or self.model_id
new_ckpt = payload.get("f5ttsCkptFile", self.ckpt_file) or ""
new_vocab = payload.get("f5ttsVocabFile", self.vocab_file) or ""
try:
new_cfg = float(payload.get("f5ttsCfgStrength", self.cfg_strength))
except (TypeError, ValueError):
new_cfg = self.cfg_strength
try:
new_nfe = int(payload.get("f5ttsNfeStep", self.nfe_step))
except (TypeError, ValueError):
new_nfe = self.nfe_step
# Settings die KEINEN Modell-Reload brauchen (zur naechsten Synthese aktiv)
self.cfg_strength = new_cfg
self.nfe_step = new_nfe
# Settings die einen Reload triggern
model_changed = (new_model != self.model_id
or new_ckpt != self.ckpt_file
or new_vocab != self.vocab_file)
if model_changed:
logger.info("F5-TTS Config-Wechsel: model=%s ckpt=%s vocab=%s — Reload",
new_model, new_ckpt or "default", new_vocab or "default")
self.model_id = new_model
self.ckpt_file = new_ckpt
self.vocab_file = new_vocab
async with self._lock:
old = self.model
self.model = None
# Alte Instanz freigeben
try:
if old is not None:
del old
except Exception:
pass
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._load_blocking)
else:
logger.info("F5-TTS Live-Config: cfg_strength=%.2f nfe=%d", new_cfg, new_nfe)
def _infer_blocking(self, gen_text: str, ref_wav: str, ref_text: str) -> tuple[np.ndarray, int]:
wav, sr, _ = self.model.infer(
ref_file=ref_wav,
ref_text=ref_text,
gen_text=gen_text,
remove_silence=True,
seed=-1,
cfg_strength=self.cfg_strength,
nfe_step=self.nfe_step,
)
# F5-TTS gibt float32 1D-Array — auf 24kHz sample-rate standard
if not isinstance(wav, np.ndarray):
wav = np.asarray(wav, dtype=np.float32)
if wav.ndim > 1:
wav = wav.squeeze()
return wav.astype(np.float32), int(sr)
async def synthesize(self, gen_text: str, ref_wav: str, ref_text: str) -> tuple[np.ndarray, int]:
await self.ensure_loaded()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._infer_blocking, gen_text, ref_wav, ref_text)
# ── Helpers ─────────────────────────────────────────────────
_SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+|\n+")
def split_sentences(text: str, max_len: int = 350) -> list[str]:
"""Teilt langen Text an Satzgrenzen. Kurze Texte bleiben als-is."""
text = text.strip()
if not text:
return []
if len(text) <= max_len:
return [text]
parts = [p.strip() for p in _SENTENCE_SPLIT.split(text) if p.strip()]
# Zu kurze Fragmente mergen damit F5-TTS nicht an jedem Komma neu startet
merged: list[str] = []
buf = ""
for p in parts:
if len(buf) + len(p) + 1 <= max_len:
buf = f"{buf} {p}".strip()
else:
if buf:
merged.append(buf)
buf = p
if buf:
merged.append(buf)
return merged or [text]
def float_to_pcm16(wav: np.ndarray) -> bytes:
"""Float32 (-1..+1) → int16 little-endian bytes."""
wav = np.clip(wav, -1.0, 1.0)
pcm = (wav * 32767.0).astype(np.int16)
return pcm.tobytes()
def sanitize_voice_name(name: str) -> str:
return re.sub(r"[^a-zA-Z0-9_-]", "_", name)
def voice_paths(name: str) -> tuple[Path, Path]:
safe = sanitize_voice_name(name)
return VOICES_DIR / f"{safe}.wav", VOICES_DIR / f"{safe}.txt"
def normalize_ref_wav(src_wav: Path, max_seconds: float = REF_MAX_SECONDS) -> tuple[Path, bool]:
"""Bringt die Referenz-WAV in F5-TTS-freundliche Form:
* 24kHz mono
* max max_seconds Dauer
* Stille am Anfang + Ende abgeschnitten (silenceremove-Filter)
* Lautheit auf -16 LUFS normalisiert (loudnorm-Filter) damit
das Modell konsistente Amplituden sieht
F5-TTS reagiert empfindlich auf leise / verrauschte / zerhackte
Referenzen. Konsistente, saubere Input-Lautheit hilft der Quali.
Returns:
(path, was_modified) — was_modified=True wenn die Datei wirklich
geaendert wurde (Caller sollte dann den passenden .txt invalidieren).
"""
tmp_out = src_wav.with_suffix(".conv.wav")
# silenceremove am Anfang: bis -50dB gesprochen wird
# silenceremove am Ende: ueber -50dB rein, dann 0.5s stille als Cutoff
# loudnorm: EBU R128, Ziel -16 LUFS
af = ("silenceremove=start_periods=1:start_duration=0.05:start_threshold=-50dB,"
"silenceremove=stop_periods=1:stop_duration=0.5:stop_threshold=-50dB,"
"loudnorm=I=-16:TP=-1.5:LRA=11")
cmd = ["ffmpeg", "-y", "-i", str(src_wav),
"-af", af,
"-ar", str(TARGET_SR), "-ac", "1",
"-t", str(max_seconds),
"-f", "wav", str(tmp_out)]
r = subprocess.run(cmd, capture_output=True, timeout=30)
if r.returncode != 0:
logger.warning("ffmpeg-Normalisierung von %s fehlgeschlagen: %s",
src_wav, r.stderr.decode(errors="replace")[:300])
try:
tmp_out.unlink()
except OSError:
pass
return src_wav, False
os.replace(tmp_out, src_wav)
try:
info = sf.info(str(src_wav))
logger.info("Referenz-WAV normalisiert: %s (%.1fs, %dHz mono, -16 LUFS, silence getrimmt)",
src_wav.name, info.duration, info.samplerate)
except Exception:
logger.info("Referenz-WAV normalisiert: %s", src_wav.name)
return src_wav, True
async def _send(ws, mtype: str, payload: dict) -> None:
try:
await ws.send(json.dumps({
"type": mtype,
"payload": payload,
"timestamp": int(time.time() * 1000),
}))
except Exception as e:
logger.warning("Send fehlgeschlagen (%s): %s", mtype, e)
# ── Interne Transkription via whisper-bridge ────────────────
_pending_stt: dict[str, asyncio.Future] = {}
_STT_TIMEOUT_S = 60.0
async def request_transcription(ws, wav_path: Path, language: str = "de") -> Optional[str]:
"""Sendet einen stt_request an die whisper-bridge (ueber RVS) und wartet auf stt_response."""
try:
with open(wav_path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode("ascii")
except Exception as e:
logger.error("Lesen %s fehlgeschlagen: %s", wav_path, e)
return None
request_id = str(uuid.uuid4())
loop = asyncio.get_event_loop()
fut: asyncio.Future = loop.create_future()
_pending_stt[request_id] = fut
try:
await _send(ws, "stt_request", {
"requestId": request_id,
"audio": audio_b64,
"mimeType": "audio/wav",
"model": "small", # klein reicht fuer Voice-Referenz
"language": language,
})
return await asyncio.wait_for(fut, timeout=_STT_TIMEOUT_S)
except asyncio.TimeoutError:
logger.warning("Transkription Timeout fuer %s", wav_path.name)
return None
except Exception as e:
logger.warning("Transkription Fehler: %s", e)
return None
finally:
_pending_stt.pop(request_id, None)
# ── TTS-Request Handler ─────────────────────────────────────
# Queue damit sich parallele Requests nicht ueberlappen (GPU-Throughput)
_tts_queue: asyncio.Queue[tuple] = asyncio.Queue()
async def _tts_worker(ws, runner: F5Runner) -> None:
"""Serialisiert Synthesen — GPU kann sonst OOM gehen."""
while True:
text, voice, request_id, message_id, language = await _tts_queue.get()
try:
await _do_tts(ws, runner, text, voice, request_id, message_id, language)
except Exception:
logger.exception("TTS-Worker Fehler")
finally:
_tts_queue.task_done()
async def _do_tts(ws, runner: F5Runner, text: str, voice: str,
request_id: str, message_id: str, language: str) -> None:
t0 = time.time()
ref_wav_path, ref_txt_path = voice_paths(voice) if voice else (None, None)
# WAV zu lang? F5-TTS limitiert intern auf 12s, dann passt der txt nicht
# mehr zum Audio. Wir clippen explizit auf 10s und invalidieren den txt,
# damit er on-the-fly passend zum gekuerzten Audio neu transkribiert wird.
if voice and ref_wav_path and ref_wav_path.exists():
try:
info = sf.info(str(ref_wav_path))
if info.duration > REF_MAX_SECONDS + 0.5:
logger.info("Voice '%s' WAV ist %.1fs (>%.0fs) → clippen + txt neu",
voice, info.duration, REF_MAX_SECONDS)
_, modified = normalize_ref_wav(ref_wav_path)
if modified and ref_txt_path and ref_txt_path.exists():
ref_txt_path.unlink()
except Exception as e:
logger.warning("Konnte WAV-Dauer nicht pruefen: %s", e)
# Legacy-Platzhalter erkennen → behandeln als "kein txt" und neu transkribieren
if voice and ref_txt_path and ref_txt_path.exists():
try:
existing = ref_txt_path.read_text(encoding="utf-8").strip()
if existing == _LEGACY_PLACEHOLDER_REF or not existing:
logger.info("Voice '%s' hat Legacy-Platzhalter → loesche, transkribiere neu", voice)
ref_txt_path.unlink()
except Exception:
pass
has_custom = bool(voice and ref_wav_path and ref_wav_path.exists() and ref_txt_path.exists())
if voice and not has_custom:
# Wenn nur WAV da ist aber kein txt → on-the-fly transkribieren
if ref_wav_path and ref_wav_path.exists() and (not ref_txt_path or not ref_txt_path.exists()):
logger.info("Voice '%s' hat kein txt — transkribiere on-the-fly", voice)
text_ref = await request_transcription(ws, ref_wav_path, language)
if text_ref and text_ref.strip():
try:
ref_txt_path.write_text(text_ref.strip(), encoding="utf-8")
has_custom = True
logger.info("Referenz-Text nachgezogen: '%s'", text_ref[:60])
except Exception as e:
logger.warning("Referenz-Text speichern fehlgeschlagen: %s", e)
if not has_custom:
logger.warning("Voice '%s' nicht komplett (%s, txt=%s) — nehme Default",
voice, ref_wav_path, (ref_txt_path and ref_txt_path.exists()))
if has_custom:
ref_wav_str = str(ref_wav_path)
ref_text = ref_txt_path.read_text(encoding="utf-8").strip()
else:
# Fallback: kein Custom-Voice. F5-TTS braucht IMMER eine Referenz,
# wir nehmen default_ref.wav/txt falls vorhanden, sonst die erste
# gefundene Voice im Ordner.
default_wav = VOICES_DIR / "default_ref.wav"
default_txt = VOICES_DIR / "default_ref.txt"
if default_wav.exists() and default_txt.exists():
ref_wav_str = str(default_wav)
ref_text = default_txt.read_text(encoding="utf-8").strip()
else:
# Nimm irgendein vorhandenes voice-Paar
pair = next(
((w, t) for w, t in (
(v, v.with_suffix(".txt")) for v in VOICES_DIR.glob("*.wav")
) if t.exists()),
None,
)
if not pair:
logger.error("Keine Referenz-Stimme im VOICES_DIR — TTS abgebrochen")
return
ref_wav_str, ref_text = str(pair[0]), pair[1].read_text(encoding="utf-8").strip()
sentences = split_sentences(text)
logger.info("F5-TTS: %d Satz(e), voice=%s (%s)", len(sentences), voice or "default", ref_wav_str)
chunk_index = 0
pcm_sr = TARGET_SR
for i, sent in enumerate(sentences):
try:
wav, sr = await runner.synthesize(sent, ref_wav_str, ref_text)
pcm_sr = sr
pcm_bytes = float_to_pcm16(wav)
# Erste PCM-Chunk des allerersten Satzes bekommt Fade-In (maskiert
# eventuelle Warmup-Glitches). Alle anderen Chunks bleiben wie sind.
if i == 0 and chunk_index == 0:
pcm_bytes = _fade_in_pcm16(pcm_bytes, sr, 120)
# Stueckeln
for off in range(0, len(pcm_bytes), PCM_CHUNK_BYTES):
slice_ = pcm_bytes[off:off + PCM_CHUNK_BYTES]
await _send(ws, "audio_pcm", {
"requestId": request_id,
"messageId": message_id,
"base64": base64.b64encode(slice_).decode("ascii"),
"format": "pcm_s16le",
"sampleRate": sr,
"channels": 1,
"voice": voice or "default",
"chunk": chunk_index,
"final": False,
})
chunk_index += 1
except Exception as e:
logger.exception("F5-TTS Synthese-Fehler (Satz %d)", i)
await _send(ws, "xtts_response", {
"requestId": request_id,
"error": str(e)[:200],
})
return
# Final-Marker
await _send(ws, "audio_pcm", {
"requestId": request_id,
"messageId": message_id,
"base64": "",
"format": "pcm_s16le",
"sampleRate": pcm_sr,
"channels": 1,
"voice": voice or "default",
"chunk": chunk_index,
"final": True,
})
logger.info("TTS komplett: %d Chunks, %.2fs render (voice=%s, text=%d chars)",
chunk_index, time.time() - t0, voice or "default", len(text))
def _fade_in_pcm16(pcm: bytes, sr: int, fade_ms: int) -> bytes:
"""Linear Fade-In auf erste fade_ms — maskiert Warmup-Glitches."""
arr = np.frombuffer(pcm, dtype=np.int16).copy()
fade_samples = min(int((fade_ms / 1000.0) * sr), len(arr))
if fade_samples <= 0:
return pcm
ramp = np.linspace(0.0, 1.0, fade_samples, dtype=np.float32)
arr[:fade_samples] = (arr[:fade_samples].astype(np.float32) * ramp).astype(np.int16)
return arr.tobytes()
# ── Voice Management Handlers ───────────────────────────────
async def handle_voice_upload(ws, payload: dict) -> None:
name = (payload.get("name") or "").strip()
samples = payload.get("samples") or []
if not name or not samples:
logger.warning("voice_upload: ungueltig (name=%r, samples=%d)", name, len(samples))
return
logger.info("Voice-Upload: '%s' (%d Samples)", name, len(samples))
try:
VOICES_DIR.mkdir(parents=True, exist_ok=True)
safe = sanitize_voice_name(name)
wav_path = VOICES_DIR / f"{safe}.wav"
txt_path = VOICES_DIR / f"{safe}.txt"
# Samples zusammenfuegen
buffers = [base64.b64decode(s.get("base64", "")) for s in samples]
with open(wav_path, "wb") as f:
for b in buffers:
f.write(b)
size_kb = wav_path.stat().st_size / 1024
logger.info("Voice WAV gespeichert: %s (%.0fKB)", wav_path, size_kb)
# Auf 24kHz mono clippen auf 10s (F5-TTS Hard-Limit ist 12s,
# kuerzer = schnellerer Warmup + Text+Audio bleiben aligned)
normalize_ref_wav(wav_path)
# Transkription ueber whisper-bridge anfragen
logger.info("Transkribiere '%s' via whisper-bridge...", name)
text = await request_transcription(ws, wav_path, language="de")
if text and text.strip():
txt_path.write_text(text.strip(), encoding="utf-8")
logger.info("Voice '%s' komplett (txt: %s)", name, text[:80])
ref_text_for_response = text.strip()
else:
# KEIN Platzhalter mehr schreiben! Beim ersten echten TTS-Use wird
# on-the-fly nachtranskribiert. Wenn die whisper-bridge dann online
# ist, klappt's — sonst koennte der User die .txt manuell anlegen.
logger.warning("Voice '%s': Transkription fehlgeschlagen — .txt bleibt leer, "
"wird on-the-fly bei erstem Render nachgezogen", name)
ref_text_for_response = ""
await _send(ws, "xtts_voice_saved", {
"name": name, "size": int(size_kb * 1024), "refText": ref_text_for_response,
})
# Liste aktualisieren
await handle_list_voices(ws)
except Exception as e:
logger.exception("voice_upload Fehler")
await _send(ws, "xtts_voice_saved", {"name": name, "error": str(e)[:200]})
async def handle_list_voices(ws) -> None:
try:
voices = []
if VOICES_DIR.exists():
for wav in sorted(VOICES_DIR.glob("*.wav")):
txt = wav.with_suffix(".txt")
voices.append({
"name": wav.stem,
"file": wav.name,
"size": wav.stat().st_size,
"hasRefText": txt.exists(),
})
logger.info("Stimmen-Liste: %d", len(voices))
await _send(ws, "xtts_voices_list", {"voices": voices})
except Exception:
logger.exception("handle_list_voices Fehler")
async def handle_delete_voice(ws, payload: dict) -> None:
name = (payload.get("name") or "").strip()
if not name:
return
try:
wav, txt = voice_paths(name)
for p in (wav, txt):
if p.exists():
p.unlink()
logger.info("Voice geloescht: %s", p)
await handle_list_voices(ws)
except Exception:
logger.exception("handle_delete_voice Fehler")
# Letzte diagnostisch-gesetzte Voice (verhindert Endlos-Preload bei jedem config)
_last_diag_voice = ""
async def handle_voice_preload(ws, payload: dict, runner: F5Runner) -> None:
voice = (payload.get("voice") or "").strip()
request_id = payload.get("requestId", "")
logger.info("Voice-Preload angefordert: '%s'", voice or "default")
try:
ref_wav, ref_txt = voice_paths(voice) if voice else (None, None)
if voice and (not ref_wav or not ref_wav.exists()):
await _send(ws, "voice_ready", {"voice": voice, "requestId": request_id, "error": "voice-file-not-found"})
return
# Ref-Text sicherstellen (falls nur WAV da ist)
if voice and ref_txt and not ref_txt.exists():
text = await request_transcription(ws, ref_wav, language="de")
if text:
ref_txt.write_text(text.strip(), encoding="utf-8")
logger.info("Referenz-Text beim Preload nachgezogen")
# Dummy-Render zum Warmup
t0 = time.time()
await _do_tts(ws, runner, "ja.", voice, f"preload-{request_id}", "", "de")
ms = int((time.time() - t0) * 1000)
await _send(ws, "voice_ready", {"voice": voice, "requestId": request_id, "loadMs": ms})
except Exception as e:
logger.exception("Voice-Preload Fehler")
await _send(ws, "voice_ready", {"voice": voice, "requestId": request_id, "error": str(e)[:200]})
# ── Haupt-Loop ──────────────────────────────────────────────
async def _broadcast_status(ws, state: str, **extra) -> None:
"""Sendet service_status fuer das F5-TTS Modul.
state: 'loading' | 'ready' | 'error'."""
payload = {"service": "f5tts", "state": state}
payload.update(extra)
await _send(ws, "service_status", payload)
async def run_loop(runner: F5Runner) -> None:
use_tls = RVS_TLS
retry_s = 2
tls_fallback_tried = False
global _last_diag_voice
while True:
scheme = "wss" if use_tls else "ws"
url = f"{scheme}://{RVS_HOST}:{RVS_PORT}/ws?token={RVS_TOKEN}"
masked = url.replace(RVS_TOKEN, "***") if RVS_TOKEN else url
try:
logger.info("Verbinde zu RVS: %s", masked)
async with websockets.connect(url, ping_interval=20, ping_timeout=10, max_size=50 * 1024 * 1024) as ws:
logger.info("RVS verbunden")
retry_s = 2
tls_fallback_tried = False
# Status-Broadcast: erst loading, dann ready nach erfolgreichem Load.
# Plus: config_request damit wir die persistierte Diagnostic-Config
# bekommen, falls aria-bridge ihre nicht von alleine sendet.
async def _load_with_status():
try:
if runner.model is not None:
logger.info("Initial: broadcaste ready (Modell schon im RAM: %s)", runner.model_id)
await _broadcast_status(ws, "ready",
model=runner.model_id,
loadSeconds=runner.last_load_seconds)
else:
logger.info("Initial: broadcaste loading + lade Modell '%s'", runner.model_id)
await _broadcast_status(ws, "loading", model=runner.model_id)
await runner.ensure_loaded()
await _broadcast_status(ws, "ready",
model=runner.model_id,
loadSeconds=runner.last_load_seconds)
logger.info("Initial: sende config_request an aria-bridge")
await _send(ws, "config_request", {"service": "f5tts"})
except Exception as e:
logger.exception("Initial-Load crashed: %s", e)
try:
await _broadcast_status(ws, "error", error=str(e)[:200])
except Exception:
pass
asyncio.create_task(_load_with_status())
# TTS-Worker fuer diese Verbindung starten
worker = asyncio.create_task(_tts_worker(ws, runner))
try:
async for raw in ws:
try:
msg = json.loads(raw)
except Exception:
continue
mtype = msg.get("type", "")
payload = msg.get("payload", {}) or {}
if mtype == "xtts_request":
await _tts_queue.put((
payload.get("text", ""),
payload.get("voice", "") or "",
payload.get("requestId", ""),
payload.get("messageId", ""),
payload.get("language", "de"),
))
elif mtype == "voice_upload":
asyncio.create_task(handle_voice_upload(ws, payload))
elif mtype == "xtts_list_voices":
asyncio.create_task(handle_list_voices(ws))
elif mtype == "xtts_delete_voice":
asyncio.create_task(handle_delete_voice(ws, payload))
elif mtype == "voice_preload":
asyncio.create_task(handle_voice_preload(ws, payload, runner))
elif mtype == "stt_response":
# Antwort auf unseren internen Transkriptions-Request
req_id = payload.get("requestId", "")
fut = _pending_stt.get(req_id)
if fut and not fut.done():
if payload.get("error"):
fut.set_result(None)
else:
fut.set_result(payload.get("text") or "")
elif mtype == "config":
# F5-TTS-Settings aktualisieren (Modell, cfg_strength, nfe)
async def _update_with_status(p):
# Schaut ob ein Modell-Wechsel ansteht — falls ja:
# erst loading-Status, dann update, dann ready.
old_model = (runner.model_id, runner.ckpt_file, runner.vocab_file)
new_model_id = (p.get("f5ttsModel") or runner.model_id,
p.get("f5ttsCkptFile", runner.ckpt_file) or "",
p.get("f5ttsVocabFile", runner.vocab_file) or "")
will_reload = old_model != new_model_id
if will_reload:
await _broadcast_status(ws, "loading", model=new_model_id[0])
try:
await runner.update_config(p)
if will_reload:
await _broadcast_status(ws, "ready",
model=runner.model_id,
loadSeconds=runner.last_load_seconds)
except Exception as e:
if will_reload:
await _broadcast_status(ws, "error", error=str(e)[:200])
asyncio.create_task(_update_with_status(payload))
# Voice-Preload bei Wechsel
v = (payload.get("xttsVoice") or "").strip()
if v and v != _last_diag_voice:
_last_diag_voice = v
asyncio.create_task(handle_voice_preload(
ws, {"voice": v, "source": "diagnostic"}, runner,
))
elif not v:
_last_diag_voice = ""
finally:
worker.cancel()
try:
await worker
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning("Verbindung verloren: %s", e)
if use_tls and RVS_TLS_FALLBACK and not tls_fallback_tried:
logger.info("TLS fehlgeschlagen — Fallback auf ws://")
use_tls = False
tls_fallback_tried = True
continue
await asyncio.sleep(min(retry_s, 30))
retry_s = min(retry_s * 2, 30)
async def main() -> None:
if not RVS_HOST:
logger.error("RVS_HOST nicht gesetzt — Abbruch")
sys.exit(1)
VOICES_DIR.mkdir(parents=True, exist_ok=True)
runner = F5Runner()
await run_loop(runner)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(0)