feat(brain): Multi-Threading via per-request project_id + per-project queue
Erster Schritt zum echten Multi-Threading fuer ARIA-Projekte. Kein globaler active_project-State mehr — jeder /chat-Request sagt selbst welche Buehne (project_id im Body). Verschiedene Projekte laufen parallel, gleiches Projekt queued via asyncio.Lock. Backend: - ChatIn.project_id: Client bestimmt pro Request wohin. Bridge routet. - /chat: async, holt per-Projekt asyncio.Lock. Requests fuers gleiche Projekt reihen sich in _project_pending ein, warten am Lock. Requests fuer verschiedene Projekte laufen echt parallel. - Neuer /projects/queue-status endpoint: pro Kontext (inkl. Hauptchat unter __main__): busy True/False + queue_size. Fuers UI-Status-Dots. - Agent.chat() nimmt project_id + pending_queue Params. Kein projects_mod.get_active() mehr im Hot-Path. Queue-Aware Prompting: - Wenn nach dem aktuellen Turn weitere Nachrichten in der Queue liegen, wird der System-Prompt um ein QUEUE-Segment erweitert mit Instruktion: „Bevor Du den aktuellen Task loesst, pruef die Queue — widerspricht/ annuliert eine spaetere Nachricht? Dann Skip-Antwort statt Doppelarbeit." - Beispiel: Task 'titelleiste rot' + Queue-Tail 'doch nicht, blau' → ARIA skipt rot, blau kommt als naechste Anfrage sauber durch. - Kein extra LLM-Call — reine Prompt-Injection. Project-Tools: - project_enter/exit sind jetzt UI-Signale (App wechselt Ansicht via project_changed event), aendern KEINEN Brain-State mehr. Der aktuelle Turn bleibt in seinem Chat-Kontext. - project_list zeigt keinen "AKTIV"-Marker mehr (nicht mehr sinnvoll). - projects_mod.set_active/get_active bleiben als Legacy-Helpers (kein Aufruf mehr aus dem Hot-Path). Bridge: - send_to_core packt project_id in den /chat-Body. - User-Backup-Eintrag tag't project_id sauber, keine Brain-Query mehr. Naechste Schritte (kommende Commits): - App: Focus-One-View mit Drawer + Status-Dots + OS-Push - Diagnostic: Dashboard-Stack mit Karten - Voice-Router: 30s-Sticky + Meta-Command-Interception im wakeword.ts Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+66
-29
@@ -836,10 +836,14 @@ META_TOOLS = [
|
||||
"function": {
|
||||
"name": "project_enter",
|
||||
"description": (
|
||||
"Wechselt in ein bestehendes Projekt. Fuzzy-Match auf Namen — "
|
||||
"'Spotify' findet das Projekt 'Spotify-Setup'. Nach dem Eintritt "
|
||||
"tagged jeder neue Turn die project_id. Bei sehr alten Projekten: "
|
||||
"vorher project_summary aufrufen damit Du Stefan abholst."
|
||||
"Signalisiert der App/Diagnostic 'wechsel zu diesem Projekt'. Fuzzy-"
|
||||
"Match auf Namen — 'Spotify' findet das Projekt 'Spotify-Setup'. "
|
||||
"Der AKTUELLE Turn bleibt aber in seinem Chat-Kontext — wir haben "
|
||||
"Multi-Threading, kein globales 'aktives Projekt' mehr. Wenn Stefan "
|
||||
"im Hauptchat sagt 'lass uns in Spotify weiter machen': "
|
||||
"project_enter aufrufen (App wechselt Ansicht), aber Deine Antwort "
|
||||
"geht trotzdem im Hauptchat raus. Bei sehr alten Projekten vorher "
|
||||
"project_summary aufrufen damit Du Stefan abholst."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
@@ -855,8 +859,10 @@ META_TOOLS = [
|
||||
"function": {
|
||||
"name": "project_exit",
|
||||
"description": (
|
||||
"Verlässt das aktuelle Projekt — zurück zum Hauptthread. Nutze "
|
||||
"wenn Stefan sagt 'Projekt Ende', 'zurück zum Hauptchat' o.ä."
|
||||
"Signalisiert der App/Diagnostic 'wechsel zurueck zum Hauptchat'. "
|
||||
"Nutze wenn Stefan sagt 'Projekt Ende' oder 'zurueck zum Hauptchat' "
|
||||
"waehrend er visuell in einem Projekt ist. Der aktuelle Turn bleibt "
|
||||
"in seinem Chat-Kontext — Multi-Threading."
|
||||
),
|
||||
"parameters": {"type": "object", "properties": {}},
|
||||
},
|
||||
@@ -1051,7 +1057,21 @@ class Agent:
|
||||
|
||||
MAX_TOOL_ITERATIONS = 8 # Schutz vor Endlos-Loops
|
||||
|
||||
def chat(self, user_message: str, source: str = "") -> str:
|
||||
def chat(self, user_message: str, source: str = "",
|
||||
project_id: Optional[str] = None,
|
||||
pending_queue: Optional[list[str]] = None) -> str:
|
||||
"""Verarbeitet eine User-Nachricht — pro Request project_id explizit
|
||||
angegeben (leer = Hauptchat). Kein globaler active_project-State mehr —
|
||||
so laufen parallele /chat-Requests fuer verschiedene Projekte echt
|
||||
parallel (Multi-Threading-Architektur seit 06/2026).
|
||||
|
||||
pending_queue: Liste weiterer User-Nachrichten die in DIESEM Projekt
|
||||
NACH dem aktuellen Turn warten. ARIA sieht sie im System-Prompt und
|
||||
soll pruefen ob eine spaetere Nachricht den aktuellen Task
|
||||
korrigiert / annuliert (dann Skip-Antwort statt Ausfuehren).
|
||||
|
||||
Wenn project_id=None (Backward-Compat fuer Aufrufer die den Param nicht
|
||||
setzen): wird als Hauptchat behandelt."""
|
||||
user_message = (user_message or "").strip()
|
||||
if not user_message:
|
||||
raise ValueError("Leere Nachricht")
|
||||
@@ -1059,9 +1079,8 @@ class Agent:
|
||||
# Events vom letzten Turn weglassen
|
||||
self._pending_events = []
|
||||
|
||||
# Aktives Projekt (leer = Hauptthread) — bestimmt das Tagging der
|
||||
# neuen Turns + das Conversation-Window-Filter fuer den LLM-Prompt.
|
||||
active_project_id = projects_mod.get_active()
|
||||
# Projekt-Kontext pro Request statt aus globalem State
|
||||
active_project_id = (project_id or "").strip()
|
||||
active_project = projects_mod.get_project(active_project_id) if active_project_id else None
|
||||
|
||||
# Fast-Path: einfache "reines Steuern"-Commands ueberspringen Claude komplett.
|
||||
@@ -1127,6 +1146,28 @@ class Agent:
|
||||
oauth_callback_host=oauth_host,
|
||||
oauth_callback_port=oauth_port,
|
||||
oauth_callback_tls=oauth_tls)
|
||||
# Queue-Aware Prompting: wenn nach diesem Turn weitere Nachrichten
|
||||
# in der Warteschlange liegen, muss ARIA pruefen ob eine spaetere die
|
||||
# aktuelle Aufgabe korrigiert/annuliert (→ Skip statt Doppelarbeit).
|
||||
if pending_queue:
|
||||
queue_lines = "\n".join(f" - {m[:280]}" for m in pending_queue[:5])
|
||||
more_hint = ""
|
||||
if len(pending_queue) > 5:
|
||||
more_hint = f"\n ... und {len(pending_queue) - 5} weitere"
|
||||
system_prompt += (
|
||||
f"\n\n## QUEUE — NACH DIESEM TASK WARTEN\n"
|
||||
f"{queue_lines}{more_hint}\n"
|
||||
f"\nBEVOR DU DEN AKTUELLEN TASK LOESST:\n"
|
||||
f" 1. Pruefe die Queue oben — widerspricht/annuliert eine der spaeteren "
|
||||
f"Nachrichten den aktuellen Task?\n"
|
||||
f" 2. Wenn ja: antworte ganz kurz 'Task ubersprungen — wird durch spaetere "
|
||||
f"Nachricht korrigiert' und mach KEINE Aktion. Der spaetere Task laeuft dann "
|
||||
f"ganz normal als naechste Anfrage durch.\n"
|
||||
f" 3. Wenn nein / unabhaengige Ergaenzung: Task normal loesen.\n"
|
||||
f"Beispiel: aktueller Task 'titelleiste rot', Queue enthaelt "
|
||||
f"'doch nicht, mach sie blau' → skip, blau kommt als naechste Anfrage."
|
||||
)
|
||||
|
||||
# Aktuelle Projekt-Bühne als System-Hinweis ergaenzen, damit Claude
|
||||
# weiss in welchem Kontext sie spricht und ihre project_* Tools korrekt
|
||||
# einsetzt (z.B. bei „Projekt Ende" project_exit aufruft).
|
||||
@@ -1217,19 +1258,17 @@ class Agent:
|
||||
err_text = f"[Fehler: {exc}]"
|
||||
logger.error("chat() Exception — schreibe Error-Marker als Assistant-Turn: %s", exc)
|
||||
try:
|
||||
# Aktive Projekt-ID NEU lesen — kann sich waehrend des Tool-Loops
|
||||
# geaendert haben (project_enter/exit als Tool-Call).
|
||||
# Turn-Kontext bleibt gleich — es gibt keinen globalen Wechsel
|
||||
# mehr, jeder Request laeuft in seinem eigenen project_id-Kontext.
|
||||
self.conversation.add("assistant", err_text,
|
||||
project_id=projects_mod.get_active())
|
||||
project_id=active_project_id)
|
||||
except Exception as add_exc:
|
||||
logger.warning("Konnte Error-Marker nicht persistieren: %s", add_exc)
|
||||
raise
|
||||
|
||||
# 7. Assistant-Turn (final reply) in die Conversation
|
||||
# NEU lesen — wenn der LLM project_enter/exit gerufen hat, ist der
|
||||
# Final-Reply schon im neuen Projekt-Kontext.
|
||||
self.conversation.add("assistant", final_reply,
|
||||
project_id=projects_mod.get_active())
|
||||
project_id=active_project_id)
|
||||
return final_reply
|
||||
|
||||
# ── Tool-Dispatcher ───────────────────────────────────────
|
||||
@@ -1804,7 +1843,10 @@ class Agent:
|
||||
"project": p,
|
||||
"action": "created",
|
||||
})
|
||||
return f"OK — Projekt '{p['name']}' angelegt (id={p['id']}) und aktiv. Alle weiteren Turns gehen jetzt da rein bis Du project_exit oder project_enter aufrufst."
|
||||
return (f"OK — Projekt '{p['name']}' angelegt (id={p['id']}). App/Diagnostic "
|
||||
f"kriegen ein project_changed-Event und koennen dahin wechseln. "
|
||||
f"Kommender Turn bleibt aber im aktuellen Chat-Kontext — "
|
||||
f"Multi-Threading, jeder Chat ist eigenstaendig.")
|
||||
if name == "project_enter":
|
||||
pname = (arguments.get("name") or "").strip()
|
||||
if not pname:
|
||||
@@ -1812,7 +1854,6 @@ class Agent:
|
||||
p = projects_mod.find_project(pname)
|
||||
if not p:
|
||||
return f"Kein Projekt '{pname}' gefunden. Nutze project_list zum Aufzaehlen oder project_create wenn's neu sein soll."
|
||||
projects_mod.set_active(p["id"])
|
||||
self._pending_events.append({
|
||||
"type": "project_changed",
|
||||
"project": p,
|
||||
@@ -1822,31 +1863,27 @@ class Agent:
|
||||
hint = ""
|
||||
if turn_count > 0:
|
||||
hint = " Wenn Stefan nach dem Stand fragt: project_summary aufrufen."
|
||||
return f"OK — in Projekt '{p['name']}' eingestiegen (id={p['id']}, {turn_count} bisherige Turns).{hint}"
|
||||
return (f"OK — App/Diagnostic wird zum Projekt '{p['name']}' "
|
||||
f"(id={p['id']}, {turn_count} bisherige Turns) umschalten. "
|
||||
f"Der aktuelle Turn bleibt aber im aktuellen Chat-Kontext.{hint}")
|
||||
if name == "project_exit":
|
||||
active_id = projects_mod.get_active()
|
||||
if not active_id:
|
||||
return "Es ist gerade kein Projekt aktiv — bereits im Hauptthread."
|
||||
p = projects_mod.get_project(active_id)
|
||||
projects_mod.set_active("")
|
||||
self._pending_events.append({
|
||||
"type": "project_changed",
|
||||
"project": p,
|
||||
"project": None,
|
||||
"action": "exited",
|
||||
})
|
||||
return f"OK — Projekt '{p['name'] if p else active_id}' verlassen. Zurueck im Hauptthread."
|
||||
return ("OK — App/Diagnostic bekommt Signal 'zurueck zum Hauptchat'. "
|
||||
"Der aktuelle Turn bleibt aber im aktuellen Chat-Kontext.")
|
||||
if name == "project_list":
|
||||
items = projects_mod.list_projects()
|
||||
if not items:
|
||||
return "(keine Projekte angelegt)"
|
||||
active_id = projects_mod.get_active()
|
||||
lines = []
|
||||
for p in items:
|
||||
marker = " ← AKTIV" if p["id"] == active_id else ""
|
||||
status_lbl = p.get("status", "active")
|
||||
lines.append(
|
||||
f"- {p['name']} (id={p['id']}, {p.get('turn_count', 0)} Turns, "
|
||||
f"status={status_lbl}){marker}"
|
||||
f"status={status_lbl})"
|
||||
)
|
||||
return "Projekte:\n" + "\n".join(lines)
|
||||
if name == "project_summary":
|
||||
|
||||
+119
-26
@@ -607,6 +607,11 @@ def memory_import_bootstrap(body: BootstrapBundle):
|
||||
class ChatIn(BaseModel):
|
||||
message: str
|
||||
source: str = "" # "app" / "diagnostic" / "stt" — optional
|
||||
# Multi-Threading: Client bestimmt pro Request welches Projekt (leer = Hauptchat).
|
||||
# Kein globaler active_project-State mehr im Brain — parallele Requests fuer
|
||||
# verschiedene Projekte laufen echt parallel, nur Requests fuers gleiche
|
||||
# Projekt queuen (per-Projekt-Lock).
|
||||
project_id: str = ""
|
||||
|
||||
|
||||
class ChatOut(BaseModel):
|
||||
@@ -614,36 +619,124 @@ class ChatOut(BaseModel):
|
||||
turns: int
|
||||
distilling: bool
|
||||
events: list = Field(default_factory=list)
|
||||
# Aktive Projekt-ID NACH dem Turn (kann durch project_enter/exit-Tools
|
||||
# waehrend des Turns gewechselt haben). Bridge gibt das an die Chat-
|
||||
# Bubble-Broadcasts weiter damit App + Diagnostic die Nachricht zum
|
||||
# richtigen Projekt-Block sortieren koennen.
|
||||
# Echo der project_id die dieser Turn hatte. Bridge nutzt sie damit die
|
||||
# ausgehende Chat-Bubble sauber getaggt in der richtigen Thread-Bahn der
|
||||
# UI landet.
|
||||
project_id: str = ""
|
||||
|
||||
|
||||
@app.post("/chat", response_model=ChatOut)
|
||||
def chat(body: ChatIn, background: BackgroundTasks):
|
||||
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
|
||||
im Hintergrund nachdem die Response rausging."""
|
||||
a = agent()
|
||||
try:
|
||||
reply = a.chat(body.message, source=body.source)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(400, str(exc))
|
||||
except RuntimeError as exc:
|
||||
logger.error("chat fehlgeschlagen: %s", exc)
|
||||
raise HTTPException(502, str(exc))
|
||||
# Per-Projekt async-Locks fuer Queue-Behavior: Requests fuers gleiche Projekt
|
||||
# warten aufeinander (queue), Requests fuer verschiedene Projekte laufen echt
|
||||
# parallel. Hauptchat = Lock unter key "" (leerer String).
|
||||
_project_locks: dict[str, asyncio.Lock] = {}
|
||||
_project_locks_meta_lock = asyncio.Lock()
|
||||
# Pro Projekt eine Liste noch-nicht-verarbeiteter Requests. Wird beim Enqueue
|
||||
# ergaenzt, beim Fertig-Werden gepoppt. Ermoeglicht Queue-Aware-Prompting:
|
||||
# waehrend ARIA an Task N arbeitet, sieht sie N+1..N+k als System-Prompt-Hinweis
|
||||
# und kann entscheiden ob eine spaetere Nachricht die aktuelle korrigiert/
|
||||
# annuliert → dann Skip-Antwort statt Ausfuehren.
|
||||
_project_pending: dict[str, list[dict]] = {}
|
||||
|
||||
needs_distill = a.conversation.needs_distill()
|
||||
if needs_distill:
|
||||
background.add_task(a.distill_old_turns)
|
||||
return ChatOut(
|
||||
reply=reply,
|
||||
turns=len(a.conversation.turns),
|
||||
distilling=needs_distill,
|
||||
events=a.pop_events(),
|
||||
project_id=projects_mod.get_active(),
|
||||
)
|
||||
|
||||
async def _get_project_lock(project_id: str) -> asyncio.Lock:
|
||||
"""Holt (oder erzeugt) den asyncio.Lock fuer ein bestimmtes Projekt.
|
||||
Nutzt _project_locks_meta_lock zur Vermeidung von Race Conditions
|
||||
beim ersten-Zugriff pro Projekt."""
|
||||
async with _project_locks_meta_lock:
|
||||
lock = _project_locks.get(project_id)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
_project_locks[project_id] = lock
|
||||
return lock
|
||||
|
||||
|
||||
def _project_queue_snapshot() -> dict:
|
||||
"""Snapshot fuer /projects/queue-status: welche Projekte arbeiten gerade,
|
||||
wieviele wait-in-queue haben, welche sind idle."""
|
||||
out = {}
|
||||
# Zeige nur Kontexte mit Aktivitaet — locked oder pending
|
||||
seen: set = set()
|
||||
for pid, lock in _project_locks.items():
|
||||
pending = len(_project_pending.get(pid, []))
|
||||
is_busy = lock.locked()
|
||||
# busy: gerade in Verarbeitung. queue: N weitere warten dahinter.
|
||||
# Der Busy-Request zaehlt NICHT in queue (er ist ja aus pending schon "raus").
|
||||
out[pid or "__main__"] = {
|
||||
"busy": is_busy,
|
||||
"queue_size": max(0, pending - (1 if is_busy else 0)),
|
||||
}
|
||||
seen.add(pid)
|
||||
for pid, pend in _project_pending.items():
|
||||
if pid in seen:
|
||||
continue
|
||||
out[pid or "__main__"] = {"busy": False, "queue_size": len(pend)}
|
||||
return out
|
||||
|
||||
|
||||
@app.post("/chat", response_model=ChatOut)
|
||||
async def chat(body: ChatIn, background: BackgroundTasks):
|
||||
"""Hauptpfad. Antwort kommt synchron. Memory-Destillat laeuft
|
||||
im Hintergrund nachdem die Response rausging.
|
||||
|
||||
Multi-Threading: Requests fuers gleiche Projekt (project_id gleich)
|
||||
laufen serialisiert durch den per-Projekt-Lock — Queue-Behavior.
|
||||
Verschiedene Projekte laufen parallel."""
|
||||
pid = (body.project_id or "").strip()
|
||||
lock = await _get_project_lock(pid)
|
||||
# Vor dem Lock in die Pending-Liste, damit die verlaufende Task sehen kann
|
||||
# was NACH ihr in der Warteschlange steht (Queue-Aware Prompting).
|
||||
import uuid as _uuid
|
||||
req_id = _uuid.uuid4().hex
|
||||
_project_pending.setdefault(pid, []).append({
|
||||
"id": req_id, "message": body.message, "source": body.source,
|
||||
})
|
||||
try:
|
||||
async with lock:
|
||||
# Snapshot: was liegt NACH mir in der Queue?
|
||||
after_me = [
|
||||
e["message"] for e in _project_pending.get(pid, [])
|
||||
if e["id"] != req_id
|
||||
]
|
||||
a = agent()
|
||||
try:
|
||||
# Sync-Aufruf im Executor damit wir den Event-Loop nicht blocken —
|
||||
# chat() macht HTTP-Calls (Proxy) die 30-60s dauern koennen.
|
||||
loop = asyncio.get_running_loop()
|
||||
reply = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: a.chat(
|
||||
body.message, source=body.source, project_id=pid,
|
||||
pending_queue=after_me,
|
||||
),
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(400, str(exc))
|
||||
except RuntimeError as exc:
|
||||
logger.error("chat fehlgeschlagen: %s", exc)
|
||||
raise HTTPException(502, str(exc))
|
||||
|
||||
needs_distill = a.conversation.needs_distill()
|
||||
if needs_distill:
|
||||
background.add_task(a.distill_old_turns)
|
||||
return ChatOut(
|
||||
reply=reply,
|
||||
turns=len(a.conversation.turns),
|
||||
distilling=needs_distill,
|
||||
events=a.pop_events(),
|
||||
project_id=pid,
|
||||
)
|
||||
finally:
|
||||
_project_pending[pid] = [
|
||||
e for e in _project_pending.get(pid, []) if e["id"] != req_id
|
||||
]
|
||||
|
||||
|
||||
@app.get("/projects/queue-status")
|
||||
def projects_queue_status():
|
||||
"""Snapshot: fuer jeden Projekt-Kontext (inkl. Hauptchat unter __main__)
|
||||
- busy: True wenn gerade ein Request in Verarbeitung
|
||||
- queue_size: wieviele weitere warten dahinter"""
|
||||
return {"contexts": _project_queue_snapshot()}
|
||||
|
||||
|
||||
# ── Projekte ────────────────────────────────────────────────────────
|
||||
|
||||
@@ -1522,24 +1522,21 @@ class ARIABridge:
|
||||
"""
|
||||
brain_url = os.environ.get("BRAIN_URL", "http://aria-brain:8080")
|
||||
url = f"{brain_url}/chat"
|
||||
payload = json.dumps({"message": text, "source": source}).encode("utf-8")
|
||||
# project_id kommt jetzt IM /chat-Body an das Brain (Multi-Threading:
|
||||
# per-Request-Routing statt globaler active_project-State).
|
||||
payload = json.dumps({
|
||||
"message": text, "source": source,
|
||||
"project_id": project_id or "",
|
||||
}).encode("utf-8")
|
||||
logger.info("[brain] chat ← %s '%s' project=%s", source, text[:80], project_id or "(main)")
|
||||
|
||||
# User-Nachricht in chat_backup.jsonl loggen — wird beim App-Reconnect
|
||||
# / Diagnostic-Reload als History-Quelle gelesen. clientMsgId speichern
|
||||
# damit die App beim chat_history_response ihre lokale Bubble
|
||||
# dedupen kann (sonst verschwindet sie nach Offline→Online-Race).
|
||||
# project_id: pre-turn-State (was App geschickt hat). Wenn leer, vom
|
||||
# Brain-Status nachholen — z.B. bei Trigger-Replies oder Diagnostic-Send.
|
||||
entry: dict = {"role": "user", "text": text, "source": source}
|
||||
if client_msg_id:
|
||||
entry["clientMsgId"] = client_msg_id
|
||||
if not project_id:
|
||||
try:
|
||||
with urllib.request.urlopen(f"{brain_url}/projects/status", timeout=3) as r:
|
||||
project_id = (json.loads(r.read()).get("active_id") or "")
|
||||
except Exception:
|
||||
pass
|
||||
if project_id:
|
||||
entry["project_id"] = project_id
|
||||
self._append_chat_backup(entry)
|
||||
|
||||
Reference in New Issue
Block a user