""" Migration aus aria-data/brain-import/ → Vector-DB. Parst die mitgelieferten Markdown-Dateien (AGENT.md, USER.md, TOOLING.md) und zerlegt sie in atomare Memory-Punkte. Jeder Punkt bekommt: source = "import" migration_key = stabiler Identifier (z.B. "agent.md/rule-1") fuer Idempotenz pinned = True Beim Re-Run werden vorhandene Punkte mit gleicher migration_key entfernt und neu geschrieben. Mapping pro Datei: AGENT.md "Identitaet" → 1 Punkt type=identity "Persoenlichkeit" (Intro) → 1 Punkt type=identity "Kern-Eigenschaften" (Liste) → 1 Punkt pro Bullet type=identity "Tool-Freigaben" → 1 Punkt type=tool "Sicherheitsregeln" (Liste) → 1 Punkt pro Bullet type=rule "Arbeitsprinzipien" (Liste) → 1 Punkt pro Bullet type=rule "Dateien an Stefan zurueckgeben"→ 1 Punkt type=skill "Stimme" → 1 Punkt type=tool USER.md "Allgemein" (Liste) → 1 Punkt pro Bullet type=preference "Bestaetigung erforderlich" → 1 Punkt type=preference "Autonomes Arbeiten OK fuer" → 1 Punkt type=preference "Tools & Infrastruktur" → 1 Punkt type=preference TOOLING.md gesamter Inhalt → 1 Punkt type=tool, title="Tooling-Stack" BOOTSTRAP.md ist eine Variante von AGENT.md — wird (vorerst) ignoriert damit keine doppelten Punkte landen. """ from __future__ import annotations import logging import re from dataclasses import dataclass from pathlib import Path from typing import List, Optional from memory import Embedder, VectorStore, MemoryPoint from memory.vector_store import COLLECTION from qdrant_client.http import models as qm logger = logging.getLogger(__name__) @dataclass class _Block: title: str content: str def _split_h2(md: str) -> List[_Block]: """Zerlegt Markdown in H2-Bloecke. Inhalt vor dem ersten H2 wird verworfen.""" blocks: List[_Block] = [] current: Optional[_Block] = None for line in md.splitlines(): m = re.match(r"^##\s+(.+?)\s*$", line) if m and not line.startswith("### "): if current: blocks.append(current) current = _Block(title=m.group(1).strip(), content="") continue if current is not None: current.content += line + "\n" if current: blocks.append(current) return blocks def _split_h3(content: str) -> List[_Block]: """Zerlegt einen H2-Block in H3-Untersektionen + 'header'-Block davor.""" blocks: List[_Block] = [] header_lines: List[str] = [] current: Optional[_Block] = None for line in content.splitlines(): m = re.match(r"^###\s+(.+?)\s*$", line) if m: if current is None and header_lines: blocks.append(_Block(title="_intro", content="\n".join(header_lines).strip())) if current: blocks.append(current) current = _Block(title=m.group(1).strip(), content="") continue if current is None: header_lines.append(line) else: current.content += line + "\n" if current: blocks.append(current) elif header_lines: blocks.append(_Block(title="_intro", content="\n".join(header_lines).strip())) return blocks def _extract_bullets(content: str) -> List[tuple[str, str]]: """Findet "- **Title** — Body" oder "N. **Title** — Body" Bullets. Returns: Liste von (title, full_bullet_text). """ bullets: List[tuple[str, str]] = [] current_lines: List[str] = [] current_title: Optional[str] = None def flush(): if current_title and current_lines: bullets.append((current_title, "\n".join(current_lines).strip())) for line in content.splitlines(): m = re.match(r"^\s*(?:[-*]|\d+\.)\s+\*\*([^*]+?)\*\*\s*[—\-:]?\s*(.*)$", line) if m: flush() current_title = m.group(1).strip() current_lines = [line] continue # Folge-Zeilen mit Einrueckung gehoeren zum aktuellen Bullet if current_title and (line.startswith(" ") or line.startswith("\t") or not line.strip()): current_lines.append(line) continue if current_title and not re.match(r"^\s*(?:[-*]|\d+\.)\s+", line): current_lines.append(line) continue # Neuer Bullet ohne **Title** Format if re.match(r"^\s*(?:[-*]|\d+\.)\s+", line): flush() text = re.sub(r"^\s*(?:[-*]|\d+\.)\s+", "", line).strip() short_title = (text[:60] + "…") if len(text) > 60 else text bullets.append((short_title, line.strip())) current_title = None current_lines = [] flush() return bullets # ─── Pro Datei eine Parser-Funktion ────────────────────────────────── def _parse_agent_md(md: str, source_file: str) -> List[MemoryPoint]: points: List[MemoryPoint] = [] h2_blocks = _split_h2(md) for h2 in h2_blocks: title = h2.title content = h2.content.strip() if not content: continue if title.lower() == "identitaet" or title.lower() == "identität": points.append(_mk( type_="identity", title="ARIA — Identitaet", content=f"## {title}\n\n{content}", category="persoenlichkeit", migration_key=f"{source_file}/identity", )) elif title.lower() == "persoenlichkeit" or title.lower() == "persönlichkeit": # Intro-Absatz + Kern-Eigenschaften-Liste trennen sub = _split_h3(content) for s in sub: if s.title == "_intro" and s.content.strip(): points.append(_mk( type_="identity", title="Persoenlichkeit — Grundsatz", content=s.content.strip(), category="persoenlichkeit", migration_key=f"{source_file}/personality-intro", )) elif s.title.lower().startswith("kern"): for idx, (btitle, btext) in enumerate(_extract_bullets(s.content), 1): points.append(_mk( type_="identity", title=f"Eigenschaft: {btitle}", content=btext, category="persoenlichkeit", migration_key=f"{source_file}/personality-trait-{idx}", )) elif "sicherheitsregel" in title.lower(): for idx, (btitle, btext) in enumerate(_extract_bullets(content), 1): points.append(_mk( type_="rule", title=f"Sicherheit: {btitle}", content=btext, category="sicherheit", migration_key=f"{source_file}/security-{idx}", )) elif "arbeitsprinzipien" in title.lower() or "arbeitsprinzip" in title.lower(): for idx, (btitle, btext) in enumerate(_extract_bullets(content), 1): points.append(_mk( type_="rule", title=f"Prinzip: {btitle}", content=btext, category="arbeitsweise", migration_key=f"{source_file}/work-principle-{idx}", )) elif "tool-freigaben" in title.lower() or "tool freigaben" in title.lower(): points.append(_mk( type_="tool", title="Tool-Freigaben — Vollzugriff", content=content, category="infrastruktur", migration_key=f"{source_file}/tool-access", )) elif "dateien an stefan" in title.lower() or "dateien zurueckgeben" in title.lower() or "dateien zur" in title.lower(): points.append(_mk( type_="skill", title="Dateien an User zurueckgeben", content=content, category="ausgabe", migration_key=f"{source_file}/file-return-skill", )) elif title.lower() == "stimme": points.append(_mk( type_="tool", title="Stimme (F5-TTS)", content=content, category="infrastruktur", migration_key=f"{source_file}/voice", )) # Permanente Freigaben (in BOOTSTRAP) — als rule elif "freigaben" in title.lower(): points.append(_mk( type_="rule", title=title, content=content, category="freigaben", migration_key=f"{source_file}/permissions", )) else: # Unbekannter Block: als generischer fact ablegen, NICHT pinned logger.info("Unbekannter H2-Block '%s' in %s — als fact (unpinned)", title, source_file) points.append(_mk( type_="fact", title=f"{source_file}: {title}", content=content, pinned=False, migration_key=f"{source_file}/section-{title.lower().replace(' ', '-')}", )) return points def _parse_user_md(md: str, source_file: str) -> List[MemoryPoint]: points: List[MemoryPoint] = [] for h2 in _split_h2(md): title = h2.title content = h2.content.strip() if not content: continue # Template-Platzhalter herausfiltern: Beispiel-Zeilen mit if "" in content or "" in title: continue if title.lower() == "allgemein": for idx, (btitle, btext) in enumerate(_extract_bullets(content), 1): # Template-Platzhalter ueberspringen if "" in btext: continue points.append(_mk( type_="preference", title=f"User: {btitle}", content=btext, category="allgemein", migration_key=f"{source_file}/general-{idx}", )) else: cat_key = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-") or "allgemein" points.append(_mk( type_="preference", title=title, content=content, category=cat_key, migration_key=f"{source_file}/{cat_key}", )) return points def _parse_tooling_md(md: str, source_file: str) -> List[MemoryPoint]: md = md.strip() if not md: return [] return [_mk( type_="tool", title="Tooling-Stack (VM)", content=md, category="infrastruktur", migration_key=f"{source_file}/tooling-full", )] # ─── Helper ───────────────────────────────────────────────────────── def _mk( type_: str, title: str, content: str, migration_key: str, pinned: bool = True, category: str = "", ) -> MemoryPoint: p = MemoryPoint( id="", type=type_, title=title, content=content.strip(), pinned=pinned, category=category, source="import", tags=[], ) # migration_key wird ueber Payload-Index angesprochen — in to_payload manuell anhaengen setattr(p, "_migration_key", migration_key) return p # ─── Eintrittspunkt ───────────────────────────────────────────────── def run_migration( import_dir: Path, store: VectorStore, embedder: Embedder, ) -> dict: """Liest alle .md-Dateien aus import_dir, parst sie, schreibt in DB. Idempotent: vorhandene Punkte mit gleicher migration_key werden geloescht und neu geschrieben. Returns: {"created": int, "updated": int, "skipped": int, "files": [...]} """ if not import_dir.exists(): return {"created": 0, "updated": 0, "skipped": 0, "files": [], "error": f"{import_dir} nicht gefunden"} parsers = { "AGENT.md": _parse_agent_md, "BOOTSTRAP.md": _parse_agent_md, # gleicher Parser, ggf. ueberlappende Eintraege "USER.md": _parse_user_md, "USER.md.example": _parse_user_md, "TOOLING.md": _parse_tooling_md, "TOOLING.md.example": _parse_tooling_md, } # USER.md hat Vorrang vor USER.md.example file_priority = ["AGENT.md", "BOOTSTRAP.md", "USER.md", "USER.md.example", "TOOLING.md", "TOOLING.md.example"] seen_kinds: set[str] = set() # "USER" / "TOOLING" — nur einmal points: List[MemoryPoint] = [] processed_files: List[str] = [] for fname in file_priority: fp = import_dir / fname if not fp.exists(): continue kind = fname.split(".")[0] # "AGENT", "BOOTSTRAP", "USER", "TOOLING" # USER.md.example nur wenn USER.md fehlt if kind in ("USER", "TOOLING") and kind in seen_kinds: continue seen_kinds.add(kind) parser = parsers.get(fname) if not parser: continue try: md = fp.read_text(encoding="utf-8") file_points = parser(md, fname) points.extend(file_points) processed_files.append(f"{fname} ({len(file_points)})") logger.info("Migration: %s → %d Punkte", fname, len(file_points)) except Exception as exc: logger.exception("Migration: %s fehlgeschlagen", fname) processed_files.append(f"{fname} (FEHLER: {exc})") if not points: return {"created": 0, "updated": 0, "skipped": 0, "files": processed_files} # Erst alte Migration-Punkte mit gleicher migration_key loeschen migration_keys = [getattr(p, "_migration_key", None) for p in points] migration_keys = [k for k in migration_keys if k] if migration_keys: store.client.delete( collection_name=COLLECTION, points_selector=qm.FilterSelector(filter=qm.Filter(must=[ qm.FieldCondition(key="migration_key", match=qm.MatchAny(any=migration_keys)) ])), ) logger.info("Migration: %d alte Punkte mit gleicher migration_key entfernt", len(migration_keys)) # Embed in Batches texts = [p.content for p in points] vectors = embedder.embed_batch(texts) created = 0 for p, vec in zip(points, vectors): payload = p.to_payload() mkey = getattr(p, "_migration_key", None) if mkey: payload["migration_key"] = mkey from datetime import datetime, timezone import uuid as _uuid pid = str(_uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() payload["created_at"] = now payload["updated_at"] = now store.client.upsert( collection_name=COLLECTION, points=[qm.PointStruct(id=pid, vector=vec, payload=payload)], ) created += 1 return { "created": created, "files": processed_files, "import_dir": str(import_dir), }