/** * ARIA XTTS Bridge — Verbindet XTTS v2 Server mit dem RVS * * Empfaengt tts_request ueber RVS → rendert Audio via XTTS API → sendet zurueck * Empfaengt voice_upload → speichert Voice-Sample fuer Cloning * Empfaengt xtts_list_voices → listet verfuegbare Stimmen */ const WebSocket = require("ws"); const http = require("http"); const https = require("https"); const fs = require("fs"); const path = require("path"); const XTTS_API_URL = process.env.XTTS_API_URL || "http://xtts:8000"; const RVS_HOST = process.env.RVS_HOST || ""; const RVS_PORT = process.env.RVS_PORT || "443"; const RVS_TLS = process.env.RVS_TLS || "true"; const RVS_TLS_FALLBACK = process.env.RVS_TLS_FALLBACK || "true"; const RVS_TOKEN = process.env.RVS_TOKEN || ""; const VOICES_DIR = "/voices"; function log(msg) { console.log(`[${new Date().toISOString()}] ${msg}`); } // ── RVS Verbindung ────────────────────────────────── let rvsWs = null; let retryDelay = 2; function connectRVS(forcePlain) { if (!RVS_HOST || !RVS_TOKEN) { log("RVS nicht konfiguriert — beende"); process.exit(1); } const useTls = RVS_TLS === "true" && !forcePlain; const proto = useTls ? "wss" : "ws"; const url = `${proto}://${RVS_HOST}:${RVS_PORT}?token=${RVS_TOKEN}`; log(`Verbinde zu RVS: ${proto}://${RVS_HOST}:${RVS_PORT}`); const ws = new WebSocket(url); ws.on("open", () => { log("RVS verbunden — warte auf TTS-Requests"); rvsWs = ws; retryDelay = 2; // Keepalive setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.ping(); ws.send(JSON.stringify({ type: "heartbeat", timestamp: Date.now() })); } }, 25000); }); ws.on("message", async (raw) => { try { const msg = JSON.parse(raw.toString()); if (msg.type === "xtts_request") { await handleTTSRequest(msg.payload); } else if (msg.type === "voice_upload") { await handleVoiceUpload(msg.payload); } else if (msg.type === "xtts_list_voices") { await handleListVoices(); } else if (msg.type === "xtts_delete_voice") { await handleDeleteVoice(msg.payload); } } catch (err) { log(`Fehler: ${err.message}`); } }); ws.on("close", () => { log("RVS Verbindung geschlossen"); rvsWs = null; setTimeout(() => connectRVS(), Math.min(retryDelay * 1000, 30000)); retryDelay = Math.min(retryDelay * 2, 30); }); ws.on("error", (err) => { log(`RVS Fehler: ${err.message}`); if (useTls && RVS_TLS_FALLBACK === "true") { log("TLS fehlgeschlagen — Fallback auf ws://"); ws.removeAllListeners(); try { ws.close(); } catch (_) {} connectRVS(true); } }); } // ── TTS Request Handler ───────────────────────────── // ── TTS-Queue ────────────────────────────────────── // XTTS verarbeitet Requests sequenziell, damit Streams sich nicht ueberlappen. // Ohne Queue wuerden parallele Requests parallel streamen → App bekommt // interleaved PCM-Chunks aus zwei Rendern → klingt wie Chaos. let ttsQueue = Promise.resolve(); function handleTTSRequest(payload) { ttsQueue = ttsQueue.then(() => _runTTSRequest(payload)).catch(err => { log(`TTS-Queue Fehler: ${err.message}`); }); return ttsQueue; } async function _runTTSRequest(payload) { const { text, voice, requestId, language, messageId } = payload; if (!text) return; // Markdown-Cleanup (Bridge macht jetzt auch Cleanup, aber safety net) let cleanText = text .replace(/\*\*([^*]+)\*\*/g, "$1") .replace(/\*([^*]+)\*/g, "$1") .replace(/`([^`]+)`/g, "$1") .replace(/```[\s\S]*?```/g, "") .replace(/\[([^\]]+)\]\([^)]+\)/g, "$1") .replace(/#{1,6}\s*/g, "") .replace(/>\s*/g, "") .replace(/[-*]\s+/g, "") .replace(/\n{2,}/g, ". ") .replace(/\n/g, ", ") .replace(/\s{2,}/g, " ") .replace(/["""„]/g, "") .replace(/\(\)/g, "") .trim(); log(`TTS-Request (streaming): "${cleanText.slice(0, 80)}..." (${cleanText.length} chars, voice: ${voice || "default"})`); try { const voiceSample = voice ? path.join(VOICES_DIR, `${voice}.wav`) : null; const hasCustomVoice = voiceSample && fs.existsSync(voiceSample); let chunkIndex = 0; let pcmMeta = null; const onChunk = (pcmBase64, meta) => { if (!pcmMeta) pcmMeta = meta; sendToRVS({ type: "audio_pcm", payload: { requestId: requestId || "", messageId: messageId || "", base64: pcmBase64, format: "pcm_s16le", sampleRate: meta.sampleRate, channels: meta.channels, voice: voice || "default", chunk: chunkIndex++, final: false, }, timestamp: Date.now(), }); }; // /tts_stream fuer echtes Streaming (funktioniert im XTTS local-Mode). // Wenn Server im apiManual/api-Mode laeuft: 400 → Fallback auf /tts_to_audio/. try { await streamXTTSAsPCM( cleanText, language || "de", hasCustomVoice ? voiceSample : null, onChunk, ); } catch (streamErr) { log(`/tts_stream fehlgeschlagen (${streamErr.message.slice(0, 100)}) — Fallback /tts_to_audio/`); await streamXTTSBatch( cleanText, language || "de", hasCustomVoice ? voiceSample : null, onChunk, ); } // Am Ende: final-Flag damit App weiss "fertig" und Cache geschrieben werden kann if (pcmMeta) { sendToRVS({ type: "audio_pcm", payload: { requestId: requestId || "", messageId: messageId || "", base64: "", format: "pcm_s16le", sampleRate: pcmMeta.sampleRate, channels: pcmMeta.channels, voice: voice || "default", chunk: chunkIndex++, final: true, }, timestamp: Date.now(), }); } log(`TTS komplett: ${chunkIndex} PCM-Frames gestreamt (${cleanText.length} chars)`); } catch (err) { log(`TTS Fehler: ${err.message}`); sendToRVS({ type: "xtts_response", payload: { requestId, error: err.message }, timestamp: Date.now(), }); } } /** * Ruft /tts_stream auf — echter Streaming-Endpoint bei daswer123. * Schickt was der Server verlangt (allow: GET), aber mit JSON-Body * als POST scheitert mit 405. Manche Versionen wollen GET + Query, * andere POST + JSON. Testen was funktioniert. */ function streamXTTSAsPCM(text, language, speakerWav, onPcmChunk) { return new Promise((resolve, reject) => { // Wichtig: speaker_wav MUSS als Query-Key dabei sein (Pydantic required) — // auch bei default-voice mit leerem Wert. Sonst gibt's HTTP 422. // stream_chunk_size=250: grosse Chunks = wenige Chunk-Grenzen = wenig // Render-Artefakte. daswer123 erzeugt an Chunk-Boundaries haeufig Glitches // in den Worten die ueber die Grenze gehen. Hoehere Latenz ist OK. const qs = new URLSearchParams(); qs.set("text", text); qs.set("language", language || "de"); qs.set("speaker_wav", speakerWav || ""); qs.set("stream_chunk_size", "250"); const url = new URL(XTTS_API_URL); const fullPath = `/tts_stream?${qs.toString()}`; const options = { hostname: url.hostname, port: url.port || 80, path: fullPath, method: "GET", timeout: 60000, }; log(`TTS GET /tts_stream?text=${text.slice(0, 30)}... (voice=${speakerWav ? "custom" : "default"})`); const req = http.request(options, (res) => { if (res.statusCode !== 200) { let body = ""; res.on("data", (d) => { body += d.toString(); }); res.on("end", () => { log(`XTTS /tts_stream ${res.statusCode}: ${body.slice(0, 300)}`); reject(new Error(`XTTS HTTP ${res.statusCode}: ${body.slice(0, 200)}`)); }); return; } log(`TTS stream verbunden, empfange PCM...`); let headerParsed = false; let sampleRate = 24000; let channels = 1; let leftover = Buffer.alloc(0); // ungerade Byte-Reste fuer das naechste Chunk const HEADER_BYTES = 44; let headerBuf = Buffer.alloc(0); const PCM_CHUNK_BYTES = 8192; // ~170ms bei 24kHz s16 mono res.on("data", (chunk) => { let data = chunk; // WAV-Header konsumieren (44 Bytes) if (!headerParsed) { headerBuf = Buffer.concat([headerBuf, data]); if (headerBuf.length < HEADER_BYTES) return; // Header lesen const header = headerBuf.slice(0, HEADER_BYTES); try { channels = header.readUInt16LE(22); sampleRate = header.readUInt32LE(24); } catch (_) {} headerParsed = true; data = headerBuf.slice(HEADER_BYTES); } // leftover aus vorherigem Chunk + neuer data let combined = Buffer.concat([leftover, data]); // In PCM_CHUNK_BYTES-Happen zerlegen (Vielfache von 2 damit keine Sample-Splits) while (combined.length >= PCM_CHUNK_BYTES) { const slice = combined.slice(0, PCM_CHUNK_BYTES); combined = combined.slice(PCM_CHUNK_BYTES); onPcmChunk(slice.toString("base64"), { sampleRate, channels }); } leftover = combined; }); res.on("end", () => { // Rest-Daten senden if (leftover.length > 0) { onPcmChunk(leftover.toString("base64"), { sampleRate, channels }); } resolve(); }); res.on("error", reject); }); req.on("error", reject); req.on("timeout", () => { req.destroy(); reject(new Error("XTTS API Timeout (60s)")); }); req.end(); }); } /** * Fallback: /tts_to_audio/ (POST JSON) — rendert komplett, dann response. * Kein echtes Streaming, aber stabil als Backup wenn /tts_stream nicht geht. * Shared chunking-Logik mit streamXTTSAsPCM — parst WAV-Header, stueckelt PCM. */ function streamXTTSBatch(text, language, speakerWav, onPcmChunk) { return new Promise((resolve, reject) => { const body = JSON.stringify({ text, language: language || "de", speaker_wav: speakerWav || "", }); const url = new URL(XTTS_API_URL); const options = { hostname: url.hostname, port: url.port || 80, path: "/tts_to_audio/", method: "POST", headers: { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(body), }, timeout: 60000, }; const req = http.request(options, (res) => { if (res.statusCode !== 200) { let rb = ""; res.on("data", (d) => { rb += d.toString(); }); res.on("end", () => reject(new Error(`XTTS Batch HTTP ${res.statusCode}: ${rb.slice(0, 200)}`))); return; } let headerParsed = false; let sampleRate = 24000; let channels = 1; let leftover = Buffer.alloc(0); let headerBuf = Buffer.alloc(0); const HEADER_BYTES = 44; const PCM_CHUNK_BYTES = 8192; res.on("data", (chunk) => { let data = chunk; if (!headerParsed) { headerBuf = Buffer.concat([headerBuf, data]); if (headerBuf.length < HEADER_BYTES) return; const header = headerBuf.slice(0, HEADER_BYTES); try { channels = header.readUInt16LE(22); sampleRate = header.readUInt32LE(24); } catch (_) {} headerParsed = true; data = headerBuf.slice(HEADER_BYTES); } let combined = Buffer.concat([leftover, data]); while (combined.length >= PCM_CHUNK_BYTES) { const slice = combined.slice(0, PCM_CHUNK_BYTES); combined = combined.slice(PCM_CHUNK_BYTES); onPcmChunk(slice.toString("base64"), { sampleRate, channels }); } leftover = combined; }); res.on("end", () => { if (leftover.length > 0) onPcmChunk(leftover.toString("base64"), { sampleRate, channels }); resolve(); }); res.on("error", reject); }); req.on("error", reject); req.on("timeout", () => { req.destroy(); reject(new Error("XTTS Batch Timeout (60s)")); }); req.write(body); req.end(); }); } // ── Voice Upload Handler ──────────────────────────── async function handleVoiceUpload(payload) { const { name, samples } = payload; if (!name || !samples || !Array.isArray(samples) || samples.length === 0) { log("Voice Upload: Ungueltige Daten"); return; } log(`Voice Upload: "${name}" (${samples.length} Samples)`); try { // Alle Samples zusammenfuegen const buffers = samples.map(s => Buffer.from(s.base64, "base64")); const combined = Buffer.concat(buffers); // Als WAV speichern fs.mkdirSync(VOICES_DIR, { recursive: true }); const filePath = path.join(VOICES_DIR, `${name.replace(/[^a-zA-Z0-9_-]/g, "_")}.wav`); fs.writeFileSync(filePath, combined); log(`Voice gespeichert: ${filePath} (${(combined.length / 1024).toFixed(0)}KB)`); sendToRVS({ type: "xtts_voice_saved", payload: { name, size: combined.length, path: filePath }, timestamp: Date.now(), }); } catch (err) { log(`Voice Upload Fehler: ${err.message}`); } } // ── Voice Delete Handler ──────────────────────────── async function handleDeleteVoice(payload) { const { name } = payload || {}; if (!name || typeof name !== "string") { log("Voice Delete: ungueltiger Name"); return; } const safe = name.replace(/[^a-zA-Z0-9_-]/g, "_"); const filePath = path.join(VOICES_DIR, `${safe}.wav`); try { if (fs.existsSync(filePath)) { fs.unlinkSync(filePath); log(`Voice geloescht: ${filePath}`); } else { log(`Voice Delete: Datei existiert nicht (${filePath})`); } // Aktualisierte Liste an alle Clients senden await handleListVoices(); } catch (err) { log(`Voice Delete Fehler: ${err.message}`); } } // ── Voice List Handler ────────────────────────────── async function handleListVoices() { try { const files = fs.existsSync(VOICES_DIR) ? fs.readdirSync(VOICES_DIR).filter(f => f.endsWith(".wav")) : []; const voices = files.map(f => ({ name: path.basename(f, ".wav"), file: f, size: fs.statSync(path.join(VOICES_DIR, f)).size, })); log(`Stimmen: ${voices.length} verfuegbar`); sendToRVS({ type: "xtts_voices_list", payload: { voices }, timestamp: Date.now(), }); } catch (err) { log(`Stimmen-Liste Fehler: ${err.message}`); } } // ── RVS senden ────────────────────────────────────── function sendToRVS(msg) { if (rvsWs && rvsWs.readyState === WebSocket.OPEN) { rvsWs.send(JSON.stringify(msg)); } } // ── Start ─────────────────────────────────────────── log("ARIA XTTS Bridge startet..."); log(`XTTS API: ${XTTS_API_URL}`); log(`RVS: ${RVS_HOST}:${RVS_PORT}`); // Warten bis XTTS API erreichbar ist function waitForXTTS(callback, attempts) { if (attempts <= 0) { log("XTTS API nicht erreichbar — starte trotzdem"); callback(); return; } http.get(`${XTTS_API_URL}/docs`, (res) => { log(`XTTS API erreichbar (HTTP ${res.statusCode})`); callback(); }).on("error", () => { log(`XTTS API noch nicht bereit — warte (${attempts} Versuche uebrig)...`); setTimeout(() => waitForXTTS(callback, attempts - 1), 10000); // 10s statt 5s (Model laden dauert) }); } waitForXTTS(() => connectRVS(), 30); // Max 5min warten