From 7927ad05ae7a41aa1bf72b39356e4c46cc1b2663 Mon Sep 17 00:00:00 2001 From: duffyduck Date: Thu, 2 Jul 2026 17:57:30 +0200 Subject: [PATCH] feat(brain): Multi-Threading via per-request project_id + per-project queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Erster Schritt zum echten Multi-Threading fuer ARIA-Projekte. Kein globaler active_project-State mehr — jeder /chat-Request sagt selbst welche Buehne (project_id im Body). Verschiedene Projekte laufen parallel, gleiches Projekt queued via asyncio.Lock. Backend: - ChatIn.project_id: Client bestimmt pro Request wohin. Bridge routet. - /chat: async, holt per-Projekt asyncio.Lock. Requests fuers gleiche Projekt reihen sich in _project_pending ein, warten am Lock. Requests fuer verschiedene Projekte laufen echt parallel. - Neuer /projects/queue-status endpoint: pro Kontext (inkl. Hauptchat unter __main__): busy True/False + queue_size. Fuers UI-Status-Dots. - Agent.chat() nimmt project_id + pending_queue Params. Kein projects_mod.get_active() mehr im Hot-Path. Queue-Aware Prompting: - Wenn nach dem aktuellen Turn weitere Nachrichten in der Queue liegen, wird der System-Prompt um ein QUEUE-Segment erweitert mit Instruktion: „Bevor Du den aktuellen Task loesst, pruef die Queue — widerspricht/ annuliert eine spaetere Nachricht? Dann Skip-Antwort statt Doppelarbeit." - Beispiel: Task 'titelleiste rot' + Queue-Tail 'doch nicht, blau' → ARIA skipt rot, blau kommt als naechste Anfrage sauber durch. - Kein extra LLM-Call — reine Prompt-Injection. Project-Tools: - project_enter/exit sind jetzt UI-Signale (App wechselt Ansicht via project_changed event), aendern KEINEN Brain-State mehr. Der aktuelle Turn bleibt in seinem Chat-Kontext. - project_list zeigt keinen "AKTIV"-Marker mehr (nicht mehr sinnvoll). - projects_mod.set_active/get_active bleiben als Legacy-Helpers (kein Aufruf mehr aus dem Hot-Path). Bridge: - send_to_core packt project_id in den /chat-Body. - User-Backup-Eintrag tag't project_id sauber, keine Brain-Query mehr. Naechste Schritte (kommende Commits): - App: Focus-One-View mit Drawer + Status-Dots + OS-Push - Diagnostic: Dashboard-Stack mit Karten - Voice-Router: 30s-Sticky + Meta-Command-Interception im wakeword.ts Co-Authored-By: Claude Opus 4.7 --- aria-brain/agent.py | 95 ++++++++++++++++++--------- aria-brain/main.py | 145 ++++++++++++++++++++++++++++++++++-------- bridge/aria_bridge.py | 15 ++--- 3 files changed, 191 insertions(+), 64 deletions(-) diff --git a/aria-brain/agent.py b/aria-brain/agent.py index fc65acc..61944ff 100644 --- a/aria-brain/agent.py +++ b/aria-brain/agent.py @@ -836,10 +836,14 @@ META_TOOLS = [ "function": { "name": "project_enter", "description": ( - "Wechselt in ein bestehendes Projekt. Fuzzy-Match auf Namen — " - "'Spotify' findet das Projekt 'Spotify-Setup'. Nach dem Eintritt " - "tagged jeder neue Turn die project_id. Bei sehr alten Projekten: " - "vorher project_summary aufrufen damit Du Stefan abholst." + "Signalisiert der App/Diagnostic 'wechsel zu diesem Projekt'. Fuzzy-" + "Match auf Namen — 'Spotify' findet das Projekt 'Spotify-Setup'. " + "Der AKTUELLE Turn bleibt aber in seinem Chat-Kontext — wir haben " + "Multi-Threading, kein globales 'aktives Projekt' mehr. Wenn Stefan " + "im Hauptchat sagt 'lass uns in Spotify weiter machen': " + "project_enter aufrufen (App wechselt Ansicht), aber Deine Antwort " + "geht trotzdem im Hauptchat raus. Bei sehr alten Projekten vorher " + "project_summary aufrufen damit Du Stefan abholst." ), "parameters": { "type": "object", @@ -855,8 +859,10 @@ META_TOOLS = [ "function": { "name": "project_exit", "description": ( - "Verlässt das aktuelle Projekt — zurück zum Hauptthread. Nutze " - "wenn Stefan sagt 'Projekt Ende', 'zurück zum Hauptchat' o.ä." + "Signalisiert der App/Diagnostic 'wechsel zurueck zum Hauptchat'. " + "Nutze wenn Stefan sagt 'Projekt Ende' oder 'zurueck zum Hauptchat' " + "waehrend er visuell in einem Projekt ist. Der aktuelle Turn bleibt " + "in seinem Chat-Kontext — Multi-Threading." ), "parameters": {"type": "object", "properties": {}}, }, @@ -1051,7 +1057,21 @@ class Agent: MAX_TOOL_ITERATIONS = 8 # Schutz vor Endlos-Loops - def chat(self, user_message: str, source: str = "") -> str: + def chat(self, user_message: str, source: str = "", + project_id: Optional[str] = None, + pending_queue: Optional[list[str]] = None) -> str: + """Verarbeitet eine User-Nachricht — pro Request project_id explizit + angegeben (leer = Hauptchat). Kein globaler active_project-State mehr — + so laufen parallele /chat-Requests fuer verschiedene Projekte echt + parallel (Multi-Threading-Architektur seit 06/2026). + + pending_queue: Liste weiterer User-Nachrichten die in DIESEM Projekt + NACH dem aktuellen Turn warten. ARIA sieht sie im System-Prompt und + soll pruefen ob eine spaetere Nachricht den aktuellen Task + korrigiert / annuliert (dann Skip-Antwort statt Ausfuehren). + + Wenn project_id=None (Backward-Compat fuer Aufrufer die den Param nicht + setzen): wird als Hauptchat behandelt.""" user_message = (user_message or "").strip() if not user_message: raise ValueError("Leere Nachricht") @@ -1059,9 +1079,8 @@ class Agent: # Events vom letzten Turn weglassen self._pending_events = [] - # Aktives Projekt (leer = Hauptthread) — bestimmt das Tagging der - # neuen Turns + das Conversation-Window-Filter fuer den LLM-Prompt. - active_project_id = projects_mod.get_active() + # Projekt-Kontext pro Request statt aus globalem State + active_project_id = (project_id or "").strip() active_project = projects_mod.get_project(active_project_id) if active_project_id else None # Fast-Path: einfache "reines Steuern"-Commands ueberspringen Claude komplett. @@ -1127,6 +1146,28 @@ class Agent: oauth_callback_host=oauth_host, oauth_callback_port=oauth_port, oauth_callback_tls=oauth_tls) + # Queue-Aware Prompting: wenn nach diesem Turn weitere Nachrichten + # in der Warteschlange liegen, muss ARIA pruefen ob eine spaetere die + # aktuelle Aufgabe korrigiert/annuliert (→ Skip statt Doppelarbeit). + if pending_queue: + queue_lines = "\n".join(f" - {m[:280]}" for m in pending_queue[:5]) + more_hint = "" + if len(pending_queue) > 5: + more_hint = f"\n ... und {len(pending_queue) - 5} weitere" + system_prompt += ( + f"\n\n## QUEUE — NACH DIESEM TASK WARTEN\n" + f"{queue_lines}{more_hint}\n" + f"\nBEVOR DU DEN AKTUELLEN TASK LOESST:\n" + f" 1. Pruefe die Queue oben — widerspricht/annuliert eine der spaeteren " + f"Nachrichten den aktuellen Task?\n" + f" 2. Wenn ja: antworte ganz kurz 'Task ubersprungen — wird durch spaetere " + f"Nachricht korrigiert' und mach KEINE Aktion. Der spaetere Task laeuft dann " + f"ganz normal als naechste Anfrage durch.\n" + f" 3. Wenn nein / unabhaengige Ergaenzung: Task normal loesen.\n" + f"Beispiel: aktueller Task 'titelleiste rot', Queue enthaelt " + f"'doch nicht, mach sie blau' → skip, blau kommt als naechste Anfrage." + ) + # Aktuelle Projekt-Bühne als System-Hinweis ergaenzen, damit Claude # weiss in welchem Kontext sie spricht und ihre project_* Tools korrekt # einsetzt (z.B. bei „Projekt Ende" project_exit aufruft). @@ -1217,19 +1258,17 @@ class Agent: err_text = f"[Fehler: {exc}]" logger.error("chat() Exception — schreibe Error-Marker als Assistant-Turn: %s", exc) try: - # Aktive Projekt-ID NEU lesen — kann sich waehrend des Tool-Loops - # geaendert haben (project_enter/exit als Tool-Call). + # Turn-Kontext bleibt gleich — es gibt keinen globalen Wechsel + # mehr, jeder Request laeuft in seinem eigenen project_id-Kontext. self.conversation.add("assistant", err_text, - project_id=projects_mod.get_active()) + project_id=active_project_id) except Exception as add_exc: logger.warning("Konnte Error-Marker nicht persistieren: %s", add_exc) raise # 7. Assistant-Turn (final reply) in die Conversation - # NEU lesen — wenn der LLM project_enter/exit gerufen hat, ist der - # Final-Reply schon im neuen Projekt-Kontext. self.conversation.add("assistant", final_reply, - project_id=projects_mod.get_active()) + project_id=active_project_id) return final_reply # ── Tool-Dispatcher ─────────────────────────────────────── @@ -1804,7 +1843,10 @@ class Agent: "project": p, "action": "created", }) - return f"OK — Projekt '{p['name']}' angelegt (id={p['id']}) und aktiv. Alle weiteren Turns gehen jetzt da rein bis Du project_exit oder project_enter aufrufst." + return (f"OK — Projekt '{p['name']}' angelegt (id={p['id']}). App/Diagnostic " + f"kriegen ein project_changed-Event und koennen dahin wechseln. " + f"Kommender Turn bleibt aber im aktuellen Chat-Kontext — " + f"Multi-Threading, jeder Chat ist eigenstaendig.") if name == "project_enter": pname = (arguments.get("name") or "").strip() if not pname: @@ -1812,7 +1854,6 @@ class Agent: p = projects_mod.find_project(pname) if not p: return f"Kein Projekt '{pname}' gefunden. Nutze project_list zum Aufzaehlen oder project_create wenn's neu sein soll." - projects_mod.set_active(p["id"]) self._pending_events.append({ "type": "project_changed", "project": p, @@ -1822,31 +1863,27 @@ class Agent: hint = "" if turn_count > 0: hint = " Wenn Stefan nach dem Stand fragt: project_summary aufrufen." - return f"OK — in Projekt '{p['name']}' eingestiegen (id={p['id']}, {turn_count} bisherige Turns).{hint}" + return (f"OK — App/Diagnostic wird zum Projekt '{p['name']}' " + f"(id={p['id']}, {turn_count} bisherige Turns) umschalten. " + f"Der aktuelle Turn bleibt aber im aktuellen Chat-Kontext.{hint}") if name == "project_exit": - active_id = projects_mod.get_active() - if not active_id: - return "Es ist gerade kein Projekt aktiv — bereits im Hauptthread." - p = projects_mod.get_project(active_id) - projects_mod.set_active("") self._pending_events.append({ "type": "project_changed", - "project": p, + "project": None, "action": "exited", }) - return f"OK — Projekt '{p['name'] if p else active_id}' verlassen. Zurueck im Hauptthread." + return ("OK — App/Diagnostic bekommt Signal 'zurueck zum Hauptchat'. " + "Der aktuelle Turn bleibt aber im aktuellen Chat-Kontext.") if name == "project_list": items = projects_mod.list_projects() if not items: return "(keine Projekte angelegt)" - active_id = projects_mod.get_active() lines = [] for p in items: - marker = " ← AKTIV" if p["id"] == active_id else "" status_lbl = p.get("status", "active") lines.append( f"- {p['name']} (id={p['id']}, {p.get('turn_count', 0)} Turns, " - f"status={status_lbl}){marker}" + f"status={status_lbl})" ) return "Projekte:\n" + "\n".join(lines) if name == "project_summary": diff --git a/aria-brain/main.py b/aria-brain/main.py index 620c795..b124f8a 100644 --- a/aria-brain/main.py +++ b/aria-brain/main.py @@ -607,6 +607,11 @@ def memory_import_bootstrap(body: BootstrapBundle): class ChatIn(BaseModel): message: str source: str = "" # "app" / "diagnostic" / "stt" — optional + # Multi-Threading: Client bestimmt pro Request welches Projekt (leer = Hauptchat). + # Kein globaler active_project-State mehr im Brain — parallele Requests fuer + # verschiedene Projekte laufen echt parallel, nur Requests fuers gleiche + # Projekt queuen (per-Projekt-Lock). + project_id: str = "" class ChatOut(BaseModel): @@ -614,36 +619,124 @@ class ChatOut(BaseModel): turns: int distilling: bool events: list = Field(default_factory=list) - # Aktive Projekt-ID NACH dem Turn (kann durch project_enter/exit-Tools - # waehrend des Turns gewechselt haben). Bridge gibt das an die Chat- - # Bubble-Broadcasts weiter damit App + Diagnostic die Nachricht zum - # richtigen Projekt-Block sortieren koennen. + # Echo der project_id die dieser Turn hatte. Bridge nutzt sie damit die + # ausgehende Chat-Bubble sauber getaggt in der richtigen Thread-Bahn der + # UI landet. project_id: str = "" -@app.post("/chat", response_model=ChatOut) -def chat(body: ChatIn, background: BackgroundTasks): - """Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft - im Hintergrund nachdem die Response rausging.""" - a = agent() - try: - reply = a.chat(body.message, source=body.source) - except ValueError as exc: - raise HTTPException(400, str(exc)) - except RuntimeError as exc: - logger.error("chat fehlgeschlagen: %s", exc) - raise HTTPException(502, str(exc)) +# Per-Projekt async-Locks fuer Queue-Behavior: Requests fuers gleiche Projekt +# warten aufeinander (queue), Requests fuer verschiedene Projekte laufen echt +# parallel. Hauptchat = Lock unter key "" (leerer String). +_project_locks: dict[str, asyncio.Lock] = {} +_project_locks_meta_lock = asyncio.Lock() +# Pro Projekt eine Liste noch-nicht-verarbeiteter Requests. Wird beim Enqueue +# ergaenzt, beim Fertig-Werden gepoppt. Ermoeglicht Queue-Aware-Prompting: +# waehrend ARIA an Task N arbeitet, sieht sie N+1..N+k als System-Prompt-Hinweis +# und kann entscheiden ob eine spaetere Nachricht die aktuelle korrigiert/ +# annuliert → dann Skip-Antwort statt Ausfuehren. +_project_pending: dict[str, list[dict]] = {} - needs_distill = a.conversation.needs_distill() - if needs_distill: - background.add_task(a.distill_old_turns) - return ChatOut( - reply=reply, - turns=len(a.conversation.turns), - distilling=needs_distill, - events=a.pop_events(), - project_id=projects_mod.get_active(), - ) + +async def _get_project_lock(project_id: str) -> asyncio.Lock: + """Holt (oder erzeugt) den asyncio.Lock fuer ein bestimmtes Projekt. + Nutzt _project_locks_meta_lock zur Vermeidung von Race Conditions + beim ersten-Zugriff pro Projekt.""" + async with _project_locks_meta_lock: + lock = _project_locks.get(project_id) + if lock is None: + lock = asyncio.Lock() + _project_locks[project_id] = lock + return lock + + +def _project_queue_snapshot() -> dict: + """Snapshot fuer /projects/queue-status: welche Projekte arbeiten gerade, + wieviele wait-in-queue haben, welche sind idle.""" + out = {} + # Zeige nur Kontexte mit Aktivitaet — locked oder pending + seen: set = set() + for pid, lock in _project_locks.items(): + pending = len(_project_pending.get(pid, [])) + is_busy = lock.locked() + # busy: gerade in Verarbeitung. queue: N weitere warten dahinter. + # Der Busy-Request zaehlt NICHT in queue (er ist ja aus pending schon "raus"). + out[pid or "__main__"] = { + "busy": is_busy, + "queue_size": max(0, pending - (1 if is_busy else 0)), + } + seen.add(pid) + for pid, pend in _project_pending.items(): + if pid in seen: + continue + out[pid or "__main__"] = {"busy": False, "queue_size": len(pend)} + return out + + +@app.post("/chat", response_model=ChatOut) +async def chat(body: ChatIn, background: BackgroundTasks): + """Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft + im Hintergrund nachdem die Response rausging. + + Multi-Threading: Requests fuers gleiche Projekt (project_id gleich) + laufen serialisiert durch den per-Projekt-Lock — Queue-Behavior. + Verschiedene Projekte laufen parallel.""" + pid = (body.project_id or "").strip() + lock = await _get_project_lock(pid) + # Vor dem Lock in die Pending-Liste, damit die verlaufende Task sehen kann + # was NACH ihr in der Warteschlange steht (Queue-Aware Prompting). + import uuid as _uuid + req_id = _uuid.uuid4().hex + _project_pending.setdefault(pid, []).append({ + "id": req_id, "message": body.message, "source": body.source, + }) + try: + async with lock: + # Snapshot: was liegt NACH mir in der Queue? + after_me = [ + e["message"] for e in _project_pending.get(pid, []) + if e["id"] != req_id + ] + a = agent() + try: + # Sync-Aufruf im Executor damit wir den Event-Loop nicht blocken — + # chat() macht HTTP-Calls (Proxy) die 30-60s dauern koennen. + loop = asyncio.get_running_loop() + reply = await loop.run_in_executor( + None, + lambda: a.chat( + body.message, source=body.source, project_id=pid, + pending_queue=after_me, + ), + ) + except ValueError as exc: + raise HTTPException(400, str(exc)) + except RuntimeError as exc: + logger.error("chat fehlgeschlagen: %s", exc) + raise HTTPException(502, str(exc)) + + needs_distill = a.conversation.needs_distill() + if needs_distill: + background.add_task(a.distill_old_turns) + return ChatOut( + reply=reply, + turns=len(a.conversation.turns), + distilling=needs_distill, + events=a.pop_events(), + project_id=pid, + ) + finally: + _project_pending[pid] = [ + e for e in _project_pending.get(pid, []) if e["id"] != req_id + ] + + +@app.get("/projects/queue-status") +def projects_queue_status(): + """Snapshot: fuer jeden Projekt-Kontext (inkl. Hauptchat unter __main__) + - busy: True wenn gerade ein Request in Verarbeitung + - queue_size: wieviele weitere warten dahinter""" + return {"contexts": _project_queue_snapshot()} # ── Projekte ──────────────────────────────────────────────────────── diff --git a/bridge/aria_bridge.py b/bridge/aria_bridge.py index d9951a9..4292361 100644 --- a/bridge/aria_bridge.py +++ b/bridge/aria_bridge.py @@ -1522,24 +1522,21 @@ class ARIABridge: """ brain_url = os.environ.get("BRAIN_URL", "http://aria-brain:8080") url = f"{brain_url}/chat" - payload = json.dumps({"message": text, "source": source}).encode("utf-8") + # project_id kommt jetzt IM /chat-Body an das Brain (Multi-Threading: + # per-Request-Routing statt globaler active_project-State). + payload = json.dumps({ + "message": text, "source": source, + "project_id": project_id or "", + }).encode("utf-8") logger.info("[brain] chat ← %s '%s' project=%s", source, text[:80], project_id or "(main)") # User-Nachricht in chat_backup.jsonl loggen — wird beim App-Reconnect # / Diagnostic-Reload als History-Quelle gelesen. clientMsgId speichern # damit die App beim chat_history_response ihre lokale Bubble # dedupen kann (sonst verschwindet sie nach Offline→Online-Race). - # project_id: pre-turn-State (was App geschickt hat). Wenn leer, vom - # Brain-Status nachholen — z.B. bei Trigger-Replies oder Diagnostic-Send. entry: dict = {"role": "user", "text": text, "source": source} if client_msg_id: entry["clientMsgId"] = client_msg_id - if not project_id: - try: - with urllib.request.urlopen(f"{brain_url}/projects/status", timeout=3) as r: - project_id = (json.loads(r.read()).get("active_id") or "") - except Exception: - pass if project_id: entry["project_id"] = project_id self._append_chat_backup(entry)