"use strict"; const { WebSocketServer } = require("ws"); const http = require("http"); const fs = require("fs"); const path = require("path"); // ── Konfiguration aus Umgebungsvariablen ──────────────────────────── const PORT = parseInt(process.env.PORT || "3000", 10); const MAX_SESSIONS = parseInt(process.env.MAX_SESSIONS || "10", 10); const UPDATES_DIR = process.env.UPDATES_DIR || "/updates"; // Kein Polling — APK wird manuell per git pull bereitgestellt // Erlaubte Nachrichtentypen — alles andere wird verworfen const ALLOWED_TYPES = new Set([ "chat", "audio", "file", "location", "mode", "log", "event", "heartbeat", "file_request", "file_response", "file_saved", "stt_result", "config", "tts_request", "xtts_request", "xtts_response", "xtts_list_voices", "xtts_voices_list", "voice_upload", "xtts_voice_saved", "update_check", "update_available", "update_download", "update_data", "agent_activity", "cancel_request", "audio_pcm", "file_from_aria", "container_restart", "file_list_request", "file_list_response", "file_delete_request", "file_deleted", "xtts_export_voice", "xtts_voice_exported", "xtts_import_voice", "xtts_voice_imported", "skill_created", "trigger_created", "memory_saved", "location_update", "location_tracking", "chat_history_request", "chat_history_response", "chat_cleared", "delete_message_request", "chat_message_deleted", "brain_request", "brain_response", "app_log", "file_delete_batch_request", "file_delete_batch_response", "file_zip_request", "file_zip_response", "xtts_delete_voice", "voice_preload", "voice_ready", "stt_request", "stt_response", "service_status", "config_request", "flux_request", "flux_response", "agent_stream", "oauth_callback", ]); // Token-Raum: token -> { clients: Set } const rooms = new Map(); // ── Hilfsfunktionen ───────────────────────────────────────────────── function timestamp() { return new Date().toISOString(); } function log(msg) { console.log(`[${timestamp()}] ${msg}`); } // Leere Räume und tote Clients aufräumen function cleanupRooms() { for (const [token, room] of rooms) { // Tote Clients entfernen for (const client of room.clients) { if (client.readyState > 1) room.clients.delete(client); } // Raum löschen wenn leer if (room.clients.size === 0) { rooms.delete(token); log(`Leerer Raum entfernt: ${token.slice(0, 8)}...`); } } } // ── HTTP + WebSocket Server (hybrid) ──────────────────────────────── // // Der gleiche Port handelt jetzt sowohl WebSocket-Upgrades (App, Bridges, // Diagnostic) als auch normale HTTP-Requests (OAuth-Callbacks von Spotify, // Google etc.). TLS-Termination passiert wie bisher vor dem RVS-Container // (Caddy/Nginx); RVS selber bleibt plain HTTP. Wichtig fuer OAuth: aus // Provider-Sicht ist die Callback-URL `https://{RVS_HOST}:{PORT_oeffentlich} // /oauth/callback/{service}` — RVS schnappt den ?code=..&state=.., broadcastet // als WS-Message `oauth_callback` und antwortet dem Browser mit einer // schoenen "Tab schliessen"-Seite. // // maxPayload 100MB: TTS-Streaming + Voice-Upload (WAV als base64) + // audio_pcm Chunks koennen die ws-Library Default 1MB ueberschreiten. // Plus: file_request/file_response fuer Re-Download von Anhaengen. // 40 MB MP4 → ~53 MB base64 → vorher mit 50 MB Limit zerschossen // (Code 1009 message too big, Bridge crashed im cleanup). 100 MB // deckt bis ~70 MB binaer ab; groessere Files werden Bridge-seitig // abgewiesen (siehe file_request-Handler) bevor die WS abreisst. const httpServer = http.createServer(handleHttpRequest); const wss = new WebSocketServer({ noServer: true, maxPayload: 100 * 1024 * 1024 }); // HTTP-Upgrade-Pfad → an WebSocket-Server reichen httpServer.on("upgrade", (req, socket, head) => { wss.handleUpgrade(req, socket, head, (ws) => { wss.emit("connection", ws, req); }); }); httpServer.listen(PORT, () => { log(`RVS läuft auf Port ${PORT} (HTTP + WS) | Max Sessions: ${MAX_SESSIONS}`); // Beim Start pruefen ob eine APK da ist const apkInfo = getLatestAPK(); if (apkInfo) log(`APK bereit: v${apkInfo.version} (${(fs.statSync(apkInfo.path).size / 1024 / 1024).toFixed(1)}MB)`); }); // ── HTTP Route-Handler ────────────────────────────────────────────── function handleHttpRequest(req, res) { try { const url = new URL(req.url, `http://${req.headers.host || "localhost"}`); const pathname = url.pathname; // OAuth-Callback: GET /oauth/callback/{service}?code=...&state=...&error=... // Pattern fuer Spotify, Google, Strava, GitHub, ... — alle OAuth2 Auth-Code-Flow. // Wir broadcasten an alle Raeume (App ist nicht im selben Raum wie Bridge, // aber Bridge schon — sie picks-up und forwardet ans Brain). const oauthMatch = pathname.match(/^\/oauth\/callback\/([a-zA-Z0-9_-]+)\/?$/); if (req.method === "GET" && oauthMatch) { const service = oauthMatch[1]; const code = url.searchParams.get("code") || ""; const state = url.searchParams.get("state") || ""; const err = url.searchParams.get("error") || ""; const errDesc = url.searchParams.get("error_description") || ""; log(`OAuth-Callback: service=${service} code=${code.slice(0, 8)}... state=${state.slice(0, 8)}... err=${err}`); const payload = { service, code, state }; if (err) { payload.error = err; if (errDesc) payload.errorDescription = errDesc; } // An alle Clients in allen Raeumen broadcasten — Bridge picks-up. const msg = JSON.stringify({ type: "oauth_callback", payload, timestamp: Date.now(), }); let receivers = 0; for (const [, room] of rooms) { for (const client of room.clients) { if (client.readyState === 1) { try { client.send(msg); receivers++; } catch (_) {} } } } log(`OAuth-Callback gebroadcastet an ${receivers} Client(s)`); // Browser-Antwort: schoene HTML-Seite (auch bei Error) const ok = !err; const title = ok ? "OAuth erfolgreich" : "OAuth fehlgeschlagen"; const bodyColor = ok ? "#34C759" : "#FF3B30"; const icon = ok ? "✅" : "❌"; const subtitle = ok ? "Du kannst dieses Tab schliessen — ARIA hat den Zugang erhalten." : `Fehler: ${escapeHtml(err)} ${errDesc ? "— " + escapeHtml(errDesc) : ""}`; const html = ` ${title} — ${escapeHtml(service)}
${icon}
${title}
${escapeHtml(service)}
${subtitle}
Du kannst zur ARIA-App zurueckkehren.
`; res.writeHead(ok ? 200 : 400, { "Content-Type": "text/html; charset=utf-8", "Cache-Control": "no-store", }); res.end(html); return; } // Health-Endpoint if (req.method === "GET" && pathname === "/health") { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, rooms: rooms.size })); return; } // Default: 404 res.writeHead(404, { "Content-Type": "text/plain" }); res.end("Not Found\n"); } catch (e) { log(`HTTP handler error: ${e.message}`); try { res.writeHead(500).end("Internal Server Error"); } catch (_) {} } } function escapeHtml(s) { return String(s || "").replace(/[&<>"']/g, (c) => ({ "&": "&", "<": "<", ">": ">", '"': """, "'": "'" }[c])); } wss.on("connection", (ws, req) => { // Token aus URL-Query lesen: ws://host:port/?token=abc123 const url = new URL(req.url, `http://${req.headers.host}`); let token = url.searchParams.get("token"); // Wenn kein Token in der URL, auf erste Nachricht warten if (!token) { ws.once("message", (raw) => { try { const msg = JSON.parse(raw); if (msg.token) { registerClient(ws, msg.token); } else { ws.close(4000, "Kein Token angegeben"); } } catch { ws.close(4000, "Ungültige erste Nachricht — Token erwartet"); } }); return; } registerClient(ws, token); }); function registerClient(ws, token) { // Maximale Anzahl aktiver Sessions prüfen if (!rooms.has(token) && rooms.size >= MAX_SESSIONS) { ws.close(4002, "Maximale Anzahl aktiver Sessions erreicht"); log(`Abgelehnt: Session-Limit (${MAX_SESSIONS}) erreicht`); return; } // Raum erstellen oder betreten if (!rooms.has(token)) { rooms.set(token, { clients: new Set() }); log(`Neuer Raum: ${token.slice(0, 8)}...`); } const room = rooms.get(token); room.clients.add(ws); ws._token = token; log(`Client verbunden: ${token.slice(0, 8)}... (${room.clients.size} im Raum)`); // Nachrichten an alle anderen Clients im selben Raum weiterleiten ws.on("message", (raw) => { let msg; try { msg = JSON.parse(raw); } catch { return; // Keine gültige JSON-Nachricht — ignorieren } // Nur erlaubte Nachrichtentypen durchlassen if (!msg.type || !ALLOWED_TYPES.has(msg.type)) { return; } // Update-Check: direkt an den anfragenden Client antworten (nicht relay'en) if (msg.type === "update_check") { const clientVersion = msg.payload?.version || "0.0.0.0"; const apkInfo = getLatestAPK(); if (apkInfo && compareVersions(apkInfo.version, clientVersion) > 0) { ws.send(JSON.stringify({ type: "update_available", payload: { version: apkInfo.version, downloadUrl: `/update/latest.apk`, size: fs.statSync(apkInfo.path).size, }, timestamp: Date.now(), })); } return; } // Update-Download: APK als Base64 ueber WebSocket senden if (msg.type === "update_download") { const apkInfo = getLatestAPK(); if (!apkInfo) { ws.send(JSON.stringify({ type: "update_data", payload: { error: "Keine APK verfuegbar" }, timestamp: Date.now() })); return; } try { const data = fs.readFileSync(apkInfo.path); const base64 = data.toString("base64"); const sizeMB = (data.length / 1024 / 1024).toFixed(1); log(`APK sende: v${apkInfo.version} (${sizeMB}MB) an Client`); ws.send(JSON.stringify({ type: "update_data", payload: { version: apkInfo.version, base64, size: data.length, fileName: `ARIA-v${apkInfo.version}.apk`, }, timestamp: Date.now(), })); } catch (err) { ws.send(JSON.stringify({ type: "update_data", payload: { error: err.message }, timestamp: Date.now() })); } return; } // An alle anderen Clients im Raum weiterleiten for (const client of room.clients) { if (client !== ws && client.readyState === 1) { client.send(raw.toString()); } } }); ws.on("close", () => { room.clients.delete(ws); log(`Client getrennt: ${token.slice(0, 8)}... (${room.clients.size} verbleibend)`); if (room.clients.size === 0) { rooms.delete(token); log(`Raum geschlossen: ${token.slice(0, 8)}...`); } }); ws.on("error", (err) => { log(`Fehler: ${err.message}`); room.clients.delete(ws); }); } // ── Heartbeat — hält Verbindungen am Leben, räumt tote auf ────────── const HEARTBEAT_INTERVAL = 15_000; const heartbeat = setInterval(() => { for (const client of wss.clients) { if (client.isAlive === false) { log(`Toter Client entfernt (kein Pong)`); client.terminate(); continue; } client.isAlive = false; client.ping(); } }, HEARTBEAT_INTERVAL); wss.on("connection", (ws) => { ws.isAlive = true; ws.on("pong", () => { ws.isAlive = true; }); // App-seitiger Heartbeat (JSON) zaehlt auch als lebendig const origOnMessage = ws._events?.message; ws.on("message", (raw) => { try { const msg = JSON.parse(raw); if (msg.type === "heartbeat") ws.isAlive = true; } catch {} }); }); // Aufräumen alle 30 Sekunden (statt 60) const cleanup = setInterval(cleanupRooms, 30_000); wss.on("close", () => { clearInterval(heartbeat); clearInterval(cleanup); }); // ── Auto-Update: APK-Erkennung + Push ────────────────────────────── let latestVersion = null; function getLatestAPK() { try { if (!fs.existsSync(UPDATES_DIR)) return null; const files = fs.readdirSync(UPDATES_DIR) .filter(f => f.endsWith(".apk")) .map(f => { // ARIA-v0.0.2.3.apk oder ARIA-Cockpit-release.apk const match = f.match(/(\d+\.\d+\.\d+[\.\d]*)/); return { file: f, path: path.join(UPDATES_DIR, f), version: match ? match[1] : null }; }) .filter(f => f.version) .sort((a, b) => compareVersions(b.version, a.version)); // Neueste zuerst return files[0] || null; } catch { return null; } } function compareVersions(a, b) { const pa = a.split(".").map(Number); const pb = b.split(".").map(Number); for (let i = 0; i < Math.max(pa.length, pb.length); i++) { const diff = (pa[i] || 0) - (pb[i] || 0); if (diff !== 0) return diff; } return 0; } function notifyClientsAboutUpdate(apkInfo) { const msg = JSON.stringify({ type: "update_available", payload: { version: apkInfo.version, downloadUrl: `/update/latest.apk`, size: fs.statSync(apkInfo.path).size, }, timestamp: Date.now(), }); // An alle Clients in allen Rooms senden for (const [, room] of rooms) { for (const client of room.clients) { if (client.readyState === 1) { client.send(msg); } } } log(`Update-Benachrichtigung gesendet: v${apkInfo.version} (${rooms.size} Raum/Raeume)`); } // Kein Polling — Update-Check passiert on-demand (update_check Message von App) // ── Sauberes Herunterfahren ───────────────────────────────────────── process.on("SIGTERM", () => { log("SIGTERM empfangen — fahre herunter"); wss.close(() => process.exit(0)); }); process.on("SIGINT", () => { log("SIGINT empfangen — fahre herunter"); wss.close(() => process.exit(0)); });