fix(bridge): 3-Schichten-Schutz gegen Bridge-Hangs + Chat-History in beide Boxen
Bridge hat seit 5+h still gehangen — Container Up, asyncio idle im selectors.select(), TCP-Verbindung zum RVS ESTABLISHED, aber keine Events mehr verarbeitet. Klassischer Fall: NAT-Tabelle/Firewall hat die TCP-Verbindung still gekillt (kein RST), Linux-Kernel mit Default- Keepalive (2h idle) hat's nicht gemerkt, und der ws.ping()-Future hat im Limbo gehangen ohne Exception zu werfen. Schicht 1 — TCP-Keepalive aufm Socket: SO_KEEPALIVE=1, TCP_KEEPIDLE=30s, TCP_KEEPINTVL=10s, TCP_KEEPCNT=3. Halb-tote Verbindungen werden in ~1 min mit ECONNRESET sichtbar statt nach 2h. Loest 80% der Faelle direkt. Schicht 2 — Asyncio-Watchdog (_rvs_heartbeat_watchdog): Separate Coroutine parallel zu _rvs_heartbeat. Letzterer markiert _last_heartbeat_ok nach jedem erfolgreichen pong. Watchdog checkt alle 20s: > 60s stale → ws.close() + transport.close() als Notausgang. Schuetzt gegen ws.ping()-Limbo. Schicht 3 — File-Based Liveness Thread: Separater OS-Thread (NICHT asyncio) — immun gegen asyncio-Hangs. Schreibt /shared/health/bridge_alive periodisch. Wenn _last_heartbeat_ok > 180s stale: os._exit(1), Docker restart_policy uebernimmt. Last-Resort wenn Schichten 1+2 versagen. Plus: chat_history-Render nach Reload bezog nur #chat-box, nicht #chat-box-fs (Vollbild). Wer im FS-Modus reloaded hat sah eine leere Box statt der History. Jetzt rendert der Handler in beide Boxen (gleicher Pattern wie addChat / addAriaFile). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+145
-1
@@ -20,7 +20,9 @@ import mimetypes
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import socket
|
||||
import ssl
|
||||
import threading
|
||||
import time
|
||||
import sys
|
||||
import tempfile
|
||||
@@ -48,6 +50,35 @@ logging.basicConfig(
|
||||
)
|
||||
logger = logging.getLogger("aria-bridge")
|
||||
|
||||
|
||||
# ── TCP-Keepalive Helper ────────────────────────────────────
|
||||
#
|
||||
# Aktiviert TCP-Level Keepalive auf einer websockets-Verbindung mit
|
||||
# aggressiven Intervallen: 30s idle bis erster Probe, 10s zwischen
|
||||
# Probes, 3 verfehlte → Verbindung tot. Das deckt den Fall ab dass
|
||||
# NAT-Tabellen-Verfall die TCP-Verbindung still kills ohne RST — Linux-
|
||||
# Default braeucht sonst 2 Stunden idle bis der Kernel selber probt.
|
||||
def _enable_tcp_keepalive(ws) -> None:
|
||||
try:
|
||||
sock = ws.transport.get_extra_info("socket")
|
||||
if sock is None:
|
||||
return
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
# Linux-spezifisch — TCP_KEEPIDLE/INTVL/CNT existieren auf macOS
|
||||
# mit anderem Namen; im Container ist Linux garantiert.
|
||||
for opt, val in (
|
||||
("TCP_KEEPIDLE", 30),
|
||||
("TCP_KEEPINTVL", 10),
|
||||
("TCP_KEEPCNT", 3),
|
||||
):
|
||||
const = getattr(socket, opt, None)
|
||||
if const is not None:
|
||||
sock.setsockopt(socket.IPPROTO_TCP, const, val)
|
||||
logger.info("[rvs] TCP-Keepalive aktiviert (idle=30s, intvl=10s, cnt=3)")
|
||||
except Exception as exc:
|
||||
logger.warning("[rvs] TCP-Keepalive konnte nicht aktiviert werden: %s", exc)
|
||||
|
||||
|
||||
# ── Konfiguration ───────────────────────────────────────────
|
||||
|
||||
VOICES_DIR = Path("/voices")
|
||||
@@ -1500,6 +1531,20 @@ class ARIABridge:
|
||||
retry_delay = 2
|
||||
logger.info("[rvs] Verbunden — warte auf App-Nachrichten")
|
||||
|
||||
# TCP-Keepalive auf dem unterliegenden Socket aktivieren —
|
||||
# damit NAT-Tabellen-Verfall oder "halb-tote" Verbindungen
|
||||
# (kein RST, kein FIN) innerhalb von ~1 Minute erkannt
|
||||
# werden statt nach Linux-Default (2h idle). Ohne das
|
||||
# hat die Bridge schon mal 5+h auf einer toten Connection
|
||||
# gehangen ohne dass irgendeine Exception kam.
|
||||
_enable_tcp_keepalive(ws)
|
||||
|
||||
# Heartbeat-Watchdog: jeden erfolgreichen Ping markieren wir
|
||||
# in _last_heartbeat_ok. Ein separater Watchdog killt die
|
||||
# WS-Verbindung wenn diese Marke > 60s stale ist — schuetzt
|
||||
# gegen den Fall dass ws.ping() selbst nie zurueckkommt.
|
||||
self._last_heartbeat_ok = time.monotonic()
|
||||
|
||||
# Aktuellen Modus broadcasten damit gerade verbundene Apps/Diagnostic
|
||||
# ihren UI-State sofort syncen koennen
|
||||
await self._broadcast_current_mode()
|
||||
@@ -1512,12 +1557,14 @@ class ARIABridge:
|
||||
|
||||
# Heartbeat senden (RVS erwartet Ping alle 30s)
|
||||
heartbeat_task = asyncio.create_task(self._rvs_heartbeat())
|
||||
watchdog_task = asyncio.create_task(self._rvs_heartbeat_watchdog())
|
||||
|
||||
try:
|
||||
async for raw_message in ws:
|
||||
await self._handle_rvs_message(raw_message)
|
||||
finally:
|
||||
heartbeat_task.cancel()
|
||||
watchdog_task.cancel()
|
||||
|
||||
except websockets.ConnectionClosed:
|
||||
logger.warning("[rvs] Verbindung verloren")
|
||||
@@ -1544,7 +1591,12 @@ class ARIABridge:
|
||||
retry_delay = min(retry_delay * 2, 30)
|
||||
|
||||
async def _rvs_heartbeat(self) -> None:
|
||||
"""Sendet Heartbeats + WebSocket Pings an den RVS damit die Verbindung offen bleibt."""
|
||||
"""Sendet Heartbeats + WebSocket Pings an den RVS damit die Verbindung offen bleibt.
|
||||
|
||||
Markiert nach jedem erfolgreichen Ping `_last_heartbeat_ok` —
|
||||
`_rvs_heartbeat_watchdog` schaut darauf und killt die Verbindung
|
||||
wenn die Marke stale ist (Fallback fuer den Fall dass ping() selbst
|
||||
in einer halb-toten TCP-Verbindung ewig blockt)."""
|
||||
while True:
|
||||
await asyncio.sleep(15)
|
||||
if self.ws_rvs:
|
||||
@@ -1552,6 +1604,8 @@ class ARIABridge:
|
||||
# WebSocket Protocol-Level Ping (haelt TCP-Verbindung am Leben)
|
||||
pong = await self.ws_rvs.ping()
|
||||
await asyncio.wait_for(pong, timeout=10)
|
||||
# Erfolgreicher Pong → Watchdog-Marke updaten
|
||||
self._last_heartbeat_ok = time.monotonic()
|
||||
except Exception:
|
||||
logger.warning("[rvs] Ping fehlgeschlagen — Verbindung tot, erzwinge Reconnect")
|
||||
try:
|
||||
@@ -1568,6 +1622,45 @@ class ARIABridge:
|
||||
except Exception:
|
||||
break
|
||||
|
||||
# Heartbeat-Watchdog: wenn der letzte erfolgreiche Ping > HEARTBEAT_STALE_SEC
|
||||
# her ist (z.B. weil ws.ping() im Limbo haengt), erzwingen wir ein hartes
|
||||
# Schliessen der Verbindung. Das wirft den `async for raw_message in ws`-
|
||||
# Loop aus, der Reconnect-Loop in connect_to_rvs greift dann.
|
||||
HEARTBEAT_STALE_SEC = 60.0
|
||||
HEARTBEAT_WATCHDOG_INTERVAL_SEC = 20.0
|
||||
|
||||
async def _rvs_heartbeat_watchdog(self) -> None:
|
||||
"""Independent watchdog der den Heartbeat-Status ueberwacht und
|
||||
bei staleness die WS-Verbindung haert killt. Wird parallel zu
|
||||
`_rvs_heartbeat` gestartet, ist aber unabhaengig davon — auch wenn
|
||||
die heartbeat-Coroutine in einem await ewig haengen wuerde, laeuft
|
||||
diese hier weiter (eigene Coroutine, eigener await-Slot)."""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(self.HEARTBEAT_WATCHDOG_INTERVAL_SEC)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
if not self.ws_rvs:
|
||||
return
|
||||
stale = time.monotonic() - getattr(self, "_last_heartbeat_ok", time.monotonic())
|
||||
if stale > self.HEARTBEAT_STALE_SEC:
|
||||
logger.error(
|
||||
"[rvs] Heartbeat stale (%.0fs > %.0fs) — erzwinge harten Reconnect",
|
||||
stale, self.HEARTBEAT_STALE_SEC,
|
||||
)
|
||||
ws = self.ws_rvs
|
||||
self.ws_rvs = None
|
||||
try:
|
||||
# close mit Reason — falls's hängt killen wir via Underlying-Transport
|
||||
await asyncio.wait_for(ws.close(code=1011, reason="heartbeat-stale"), timeout=3.0)
|
||||
except Exception:
|
||||
# Letzte Option: Transport direkt schliessen, das wirft den recv-Loop
|
||||
try:
|
||||
ws.transport.close() # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
|
||||
async def _send_chat_ack(self, client_msg_id: Optional[str]) -> None:
|
||||
"""Bestaetigt der App den Empfang einer chat/audio-Nachricht.
|
||||
App nutzt das fuer Delivery-Status (✓ = sent). Ohne ACK wuerde die
|
||||
@@ -3220,6 +3313,51 @@ class ARIABridge:
|
||||
self.running = False
|
||||
|
||||
|
||||
# ── File-Based Liveness Watchdog ─────────────────────────────
|
||||
#
|
||||
# Separater OS-Thread (NICHT asyncio) — schreibt periodisch eine
|
||||
# Liveness-Datei mit aktuellem Timestamp und prüft ob der asyncio-Loop
|
||||
# noch lebt. Wenn ueber LIVENESS_SELFKILL_SEC keine erfolgreiche Heart-
|
||||
# beat-Bestätigung vom RVS kam, killt der Watchdog den ganzen Prozess
|
||||
# (os._exit). Docker restart-Policy startet neu. Last-Resort fuer den
|
||||
# Fall dass weder TCP-Keepalive noch der asyncio-Heartbeat-Watchdog
|
||||
# greifen — z.B. wenn der event loop selbst korrumpiert ist.
|
||||
|
||||
LIVENESS_FILE = Path("/shared/health/bridge_alive")
|
||||
LIVENESS_CHECK_INTERVAL_SEC = 15
|
||||
LIVENESS_SELFKILL_SEC = 180 # 3 min — alle anderen Watchdogs (TCP-Keepalive
|
||||
# ~1 min, asyncio-Watchdog 60s) sollten vorher
|
||||
# greifen. Wenn nicht, ist der Prozess wirklich
|
||||
# kaputt.
|
||||
|
||||
|
||||
def _liveness_watchdog(bridge: "ARIABridge") -> None:
|
||||
try:
|
||||
LIVENESS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
while True:
|
||||
time.sleep(LIVENESS_CHECK_INTERVAL_SEC)
|
||||
# 1) Timestamp schreiben — externe Watcher koennen das pollen
|
||||
try:
|
||||
LIVENESS_FILE.write_text(str(int(time.time())))
|
||||
except Exception:
|
||||
pass
|
||||
# 2) Letzten heartbeat checken (wird vom asyncio-Loop gesetzt). Wenn
|
||||
# zu lange stale → Self-Kill. Docker-restart-Policy uebernimmt.
|
||||
last_ok = getattr(bridge, "_last_heartbeat_ok", None)
|
||||
if last_ok is None:
|
||||
continue # noch keine RVS-Verbindung gewesen, fair, kein Kill
|
||||
stale = time.monotonic() - last_ok
|
||||
if stale > LIVENESS_SELFKILL_SEC:
|
||||
sys.stderr.write(
|
||||
f"[liveness] heartbeat {int(stale)}s stale — Self-Kill "
|
||||
f"(Docker restart_policy uebernimmt)\n"
|
||||
)
|
||||
sys.stderr.flush()
|
||||
os._exit(1)
|
||||
|
||||
|
||||
# ── Hauptprogramm ────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -3243,6 +3381,12 @@ def main() -> None:
|
||||
logger.exception("Initialisierung fehlgeschlagen")
|
||||
sys.exit(1)
|
||||
|
||||
# Liveness-Watchdog als daemon-Thread starten (immune gegen asyncio-Hangs)
|
||||
threading.Thread(target=_liveness_watchdog, args=(bridge,),
|
||||
daemon=True, name="liveness-watchdog").start()
|
||||
logger.info("[liveness] Watchdog-Thread gestartet (selfkill nach %ds Heartbeat-Staleness)",
|
||||
LIVENESS_SELFKILL_SEC)
|
||||
|
||||
# Event-Loop starten
|
||||
try:
|
||||
asyncio.run(bridge.run())
|
||||
|
||||
+18
-12
@@ -1642,26 +1642,26 @@
|
||||
showDockerLogs(msg);
|
||||
return;
|
||||
}
|
||||
// Chat-History (nach F5 / Reconnect)
|
||||
// Chat-History (nach F5 / Reconnect) — IN BEIDE Boxen rendern.
|
||||
// Vorher: nur chatBox bekam die Replay, die Vollbild-Box blieb leer
|
||||
// → bei Reload aus dem FS-Modus sah es so aus als ob die letzten
|
||||
// Bubbles weg waeren. Live-addChat schreibt schon korrekt in beide,
|
||||
// der Reload-Pfad zog nicht mit.
|
||||
if (msg.type === 'chat_history') {
|
||||
chatBox.innerHTML = '';
|
||||
const boxes = [chatBox, document.getElementById('chat-box-fs')].filter(Boolean);
|
||||
for (const b of boxes) b.innerHTML = '';
|
||||
if (msg.messages && msg.messages.length > 0) {
|
||||
for (const m of msg.messages) {
|
||||
if (m.type === 'aria_file') {
|
||||
// ARIA-Datei-Bubble rekonstruieren (statt addAriaFile damit
|
||||
// kein Auto-Scroll-Race waehrend des Bulk-Loads)
|
||||
addAriaFile({ serverPath: m.serverPath, name: m.name, mimeType: m.mimeType, size: m.size });
|
||||
// ARIA-Datei-Bubble — addAriaFile schreibt selbst in beide Boxen
|
||||
addAriaFile({ serverPath: m.serverPath, name: m.name, mimeType: m.mimeType, size: m.size, deleted: m.deleted });
|
||||
continue;
|
||||
}
|
||||
const el = document.createElement('div');
|
||||
el.className = `chat-msg ${m.type}`;
|
||||
if (m.ts) el.dataset.ts = String(m.ts);
|
||||
// [FILE: ...]-Marker rausfiltern (gleicher Filter wie addChat)
|
||||
const cleaned = (m.text || '').replace(/\[FILE:\s*\/shared\/uploads\/[^\]]+\]/gi, '').replace(/\n{3,}/g, '\n\n').trim();
|
||||
const escaped = escapeHtml(cleaned);
|
||||
let linked = linkifyText(escaped);
|
||||
// /shared/uploads/-Bildpfade auch im History inline rendern
|
||||
// (gleicher Replace wie in addChat — sonst sieht man nach F5 nur Text-Pfade)
|
||||
linked = linked.replace(/\/shared\/uploads\/[^\s<"]+\.(jpg|jpeg|png|gif|webp|svg|bmp)/gi, (match) => {
|
||||
return `<a href="${match}" target="_blank">${match}</a><img src="${match}" class="chat-media" onclick="openLightbox('image','${match}')" onerror="this.style.display='none'">`;
|
||||
});
|
||||
@@ -1669,10 +1669,16 @@
|
||||
const trashBtn = m.ts
|
||||
? `<button class="bubble-trash" title="Diese Bubble loeschen" onclick="deleteDiagBubble(${m.ts})">🗑</button>`
|
||||
: '';
|
||||
el.innerHTML = `${trashBtn}${linked}<div class="meta">${escapeHtml(m.meta)} — ${time}</div>`;
|
||||
chatBox.appendChild(el);
|
||||
const innerHtml = `${trashBtn}${linked}<div class="meta">${escapeHtml(m.meta)} — ${time}</div>`;
|
||||
for (const b of boxes) {
|
||||
const el = document.createElement('div');
|
||||
el.className = `chat-msg ${m.type}`;
|
||||
if (m.ts) el.dataset.ts = String(m.ts);
|
||||
el.innerHTML = innerHtml;
|
||||
b.appendChild(el);
|
||||
}
|
||||
}
|
||||
chatBox.scrollTop = chatBox.scrollHeight;
|
||||
for (const b of boxes) b.scrollTop = b.scrollHeight;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user