Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
- .woodpecker.yaml: image paths -> library/autojanet-{agent,dispatcher}
- .woodpecker.yaml: secret names RS_HARBOR_USER / RS_HARBOR_PASS (global)
- container/Dockerfile: restore COPY skills/, skills/ populated from opencode config
- skills/: 84 opencode skills bundled into image
- k8s/manifests: update image refs to library/
509 lines
18 KiB
Python
509 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Deterministic parser for Karpathy-pattern LLM wikis.
|
|
|
|
Detects the three-layer pattern (raw sources + wiki markdown + schema),
|
|
extracts structure from markdown files, resolves wikilinks, and derives
|
|
categories from index.md section headings.
|
|
|
|
Usage:
|
|
python parse-knowledge-base.py <wiki-directory>
|
|
|
|
Output:
|
|
Writes scan-manifest.json to <wiki-directory>/.understand-anything/intermediate/
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Regex patterns
|
|
# ---------------------------------------------------------------------------
|
|
WIKILINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|([^\]]+))?\]\]")
|
|
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
|
|
CODE_BLOCK_RE = re.compile(r"```(\w*)")
|
|
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
|
|
INDEX_SECTION_RE = re.compile(r"^##\s+(.+)$", re.MULTILINE)
|
|
|
|
# Files that are part of wiki infrastructure, not content articles
|
|
INFRA_FILES = {"index.md", "log.md", "claude.md", "agents.md", "soul.md"}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Detection: is this a Karpathy-pattern wiki?
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def detect_format(root: Path) -> dict:
|
|
"""Detect if directory follows the Karpathy LLM wiki three-layer pattern."""
|
|
signals = {
|
|
"has_index": (root / "index.md").is_file() or (root / "wiki" / "index.md").is_file(),
|
|
"has_log": (root / "log.md").is_file() or (root / "wiki" / "log.md").is_file(),
|
|
"has_raw": (root / "raw").is_dir(),
|
|
"has_schema": any(
|
|
(root / f).is_file() or (root / "wiki" / f).is_file()
|
|
for f in ["CLAUDE.md", "AGENTS.md"]
|
|
),
|
|
}
|
|
|
|
# Find the wiki root — could be the directory itself or a wiki/ subdirectory
|
|
if (root / "wiki").is_dir():
|
|
wiki_root = root / "wiki"
|
|
else:
|
|
wiki_root = root
|
|
|
|
# Count markdown files in the wiki root
|
|
md_files = list(wiki_root.rglob("*.md"))
|
|
signals["md_count"] = len(md_files)
|
|
signals["wiki_root"] = str(wiki_root)
|
|
|
|
# Primary signal: has index.md + meaningful number of markdown files
|
|
if signals["has_index"] and signals["md_count"] >= 3:
|
|
signals["detected"] = True
|
|
signals["format"] = "karpathy"
|
|
else:
|
|
signals["detected"] = False
|
|
signals["format"] = "unknown"
|
|
|
|
return signals
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown extraction helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_frontmatter(text: str) -> dict:
|
|
"""Extract YAML frontmatter as a simple key-value dict."""
|
|
m = FRONTMATTER_RE.match(text)
|
|
if not m:
|
|
return {}
|
|
fm = {}
|
|
for line in m.group(1).split("\n"):
|
|
if ":" in line:
|
|
key, _, val = line.partition(":")
|
|
fm[key.strip()] = val.strip().strip('"').strip("'")
|
|
return fm
|
|
|
|
|
|
def extract_wikilinks(text: str) -> list[dict]:
|
|
"""Extract all [[target]] and [[target|display]] wikilinks."""
|
|
links = []
|
|
for m in WIKILINK_RE.finditer(text):
|
|
links.append({
|
|
"target": m.group(1).strip(),
|
|
"display": m.group(2).strip() if m.group(2) else None,
|
|
})
|
|
return links
|
|
|
|
|
|
def extract_headings(text: str) -> list[dict]:
|
|
"""Extract all markdown headings with level and text."""
|
|
return [
|
|
{"level": len(m.group(1)), "text": m.group(2).strip()}
|
|
for m in HEADING_RE.finditer(text)
|
|
]
|
|
|
|
|
|
def extract_code_blocks(text: str) -> list[str]:
|
|
"""Extract languages from fenced code blocks."""
|
|
return [m.group(1) for m in CODE_BLOCK_RE.finditer(text) if m.group(1)]
|
|
|
|
|
|
def extract_first_paragraph(text: str) -> str:
|
|
"""Extract the first non-empty paragraph after frontmatter and H1."""
|
|
# Strip frontmatter
|
|
stripped = FRONTMATTER_RE.sub("", text).strip()
|
|
if not stripped:
|
|
return ""
|
|
lines = stripped.split("\n")
|
|
|
|
def _collect_paragraph(start_lines: list[str]) -> str:
|
|
"""Collect the first paragraph from the given lines."""
|
|
para: list[str] = []
|
|
for s_raw in start_lines:
|
|
s = s_raw.strip()
|
|
if not s and not para:
|
|
continue # Skip leading blank lines
|
|
if not s and para:
|
|
break # End of paragraph
|
|
if s.startswith(">"):
|
|
continue # Skip blockquotes
|
|
if re.match(r"^[-*_]{3,}\s*$", s):
|
|
continue # Skip horizontal rules
|
|
if s.startswith("#"):
|
|
if para:
|
|
break # End paragraph at next heading
|
|
continue # Skip headings before paragraph
|
|
para.append(s)
|
|
return " ".join(para)
|
|
|
|
# Try: find first paragraph after H1
|
|
for i, line in enumerate(lines):
|
|
if line.strip().startswith("# "):
|
|
result = _collect_paragraph(lines[i + 1:])
|
|
if result:
|
|
if len(result) > 200:
|
|
return result[:197] + "..."
|
|
return result
|
|
|
|
# Fallback: no H1 found, take first paragraph from start
|
|
result = _collect_paragraph(lines)
|
|
if len(result) > 200:
|
|
result = result[:197] + "..."
|
|
return result or ""
|
|
|
|
|
|
def extract_h1(text: str) -> str:
|
|
"""Extract the first H1 heading."""
|
|
for m in HEADING_RE.finditer(text):
|
|
if len(m.group(1)) == 1:
|
|
# Strip trailing wiki-style decorations like " — subtitle"
|
|
return m.group(2).strip()
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Index.md parsing — categories come from section headings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_index(index_path: Path) -> list[dict]:
|
|
"""Parse index.md to extract categories from ## headings and their wikilinks."""
|
|
if not index_path.is_file():
|
|
return []
|
|
text = index_path.read_text(encoding="utf-8", errors="replace")
|
|
categories = []
|
|
current_category = None
|
|
|
|
for line in text.split("\n"):
|
|
# Detect ## section heading
|
|
sec_match = re.match(r"^##\s+(.+)$", line)
|
|
if sec_match:
|
|
current_category = {
|
|
"name": sec_match.group(1).strip(),
|
|
"articles": [],
|
|
}
|
|
categories.append(current_category)
|
|
continue
|
|
|
|
# Collect wikilinks under current section
|
|
if current_category:
|
|
for wl in WIKILINK_RE.finditer(line):
|
|
current_category["articles"].append(wl.group(1).strip())
|
|
|
|
return categories
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Log.md parsing — extract operation timeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_log(log_path: Path) -> list[dict]:
|
|
"""Parse log.md to extract chronological entries."""
|
|
if not log_path.is_file():
|
|
return []
|
|
text = log_path.read_text(encoding="utf-8", errors="replace")
|
|
entries = []
|
|
log_entry_re = re.compile(
|
|
r"^##\s+\[(\d{4}-\d{2}-\d{2})\]\s+(\w+)\s*\|\s*(.+)$", re.MULTILINE
|
|
)
|
|
for m in log_entry_re.finditer(text):
|
|
entries.append({
|
|
"date": m.group(1),
|
|
"operation": m.group(2),
|
|
"title": m.group(3).strip(),
|
|
})
|
|
return entries
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_name_to_stem_map(wiki_root: Path) -> dict[str, str]:
|
|
"""Build a case-insensitive map from filename stem to relative stem path.
|
|
|
|
Full relative paths always map uniquely. Bare basenames map only when
|
|
unambiguous — duplicate basenames are removed so they don't silently
|
|
resolve to the wrong page.
|
|
"""
|
|
name_map: dict[str, str] = {}
|
|
# Track which bare basenames appear more than once
|
|
basename_counts: dict[str, int] = {}
|
|
for md_file in wiki_root.rglob("*.md"):
|
|
rel = md_file.relative_to(wiki_root)
|
|
stem = rel.with_suffix("").as_posix() # e.g., "decisions/decision-foo"
|
|
basename = md_file.stem # e.g., "decision-foo"
|
|
# Full relative path always maps uniquely
|
|
name_map[stem.lower()] = stem
|
|
# Track basename for ambiguity detection
|
|
key = basename.lower()
|
|
basename_counts[key] = basename_counts.get(key, 0) + 1
|
|
name_map[key] = stem
|
|
|
|
# Remove ambiguous basename entries (appear more than once)
|
|
for key, count in basename_counts.items():
|
|
if count > 1 and key in name_map:
|
|
del name_map[key]
|
|
|
|
return name_map
|
|
|
|
|
|
def resolve_wikilink(target: str, name_map: dict[str, str], node_ids: set[str] | None = None) -> str | None:
|
|
"""Resolve a wikilink target to an article node ID.
|
|
|
|
If node_ids is provided, only resolve to IDs that exist in the set.
|
|
"""
|
|
key = target.lower().strip()
|
|
# Skip targets that are clearly not page names (shell flags, etc.)
|
|
if key.startswith("-"):
|
|
return None
|
|
stem = name_map.get(key)
|
|
if stem:
|
|
candidate = f"article:{stem}"
|
|
# If we have a node set, verify the target exists
|
|
if node_ids is not None and candidate not in node_ids:
|
|
return None
|
|
return candidate
|
|
# Try without directory prefix
|
|
for stored_key, stored_stem in name_map.items():
|
|
if stored_key.endswith("/" + key) or stored_key == key:
|
|
candidate = f"article:{stored_stem}"
|
|
if node_ids is not None and candidate not in node_ids:
|
|
return None
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def parse_wiki(root: Path) -> dict:
|
|
"""Parse a Karpathy-pattern wiki and produce the scan manifest."""
|
|
detection = detect_format(root)
|
|
if not detection["detected"]:
|
|
print(json.dumps({"error": "Not a Karpathy-pattern wiki", "detection": detection}),
|
|
file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
wiki_root = Path(detection["wiki_root"])
|
|
raw_root = root / "raw"
|
|
|
|
# Build name resolution map
|
|
name_map = build_name_to_stem_map(wiki_root)
|
|
|
|
# Find index.md and log.md
|
|
index_path = wiki_root / "index.md"
|
|
if not index_path.is_file():
|
|
index_path = root / "index.md"
|
|
log_path = wiki_root / "log.md"
|
|
if not log_path.is_file():
|
|
log_path = root / "log.md"
|
|
|
|
# Parse index for categories
|
|
categories = parse_index(index_path)
|
|
log_entries = parse_log(log_path)
|
|
|
|
# Build category lookup: wikilink target → category name
|
|
category_lookup: dict[str, str] = {}
|
|
for cat in categories:
|
|
for article_target in cat["articles"]:
|
|
category_lookup[article_target.lower()] = cat["name"]
|
|
|
|
# --- Pre-compute article IDs (for edge resolution validation) ---
|
|
# Only skip infra files at the wiki root level, not in subdirectories
|
|
# (e.g., wiki/index.md is infra, but wiki/concepts/index.md is content)
|
|
article_ids: set[str] = set()
|
|
for md_file in sorted(wiki_root.rglob("*.md")):
|
|
rel = md_file.relative_to(wiki_root)
|
|
stem = rel.with_suffix("").as_posix()
|
|
# Only filter infra files at root level (no parent directory)
|
|
if rel.parent == Path(".") and rel.name.lower() in INFRA_FILES:
|
|
continue
|
|
article_ids.add(f"article:{stem}")
|
|
|
|
# --- Build article nodes ---
|
|
nodes = []
|
|
edges = []
|
|
warnings = []
|
|
stats = {"articles": 0, "sources": 0, "topics": 0, "wikilinks": 0, "unresolved": 0}
|
|
|
|
for md_file in sorted(wiki_root.rglob("*.md")):
|
|
rel = md_file.relative_to(wiki_root)
|
|
stem = rel.with_suffix("").as_posix()
|
|
basename = md_file.stem
|
|
|
|
# Skip infrastructure files only at wiki root level
|
|
if rel.parent == Path(".") and rel.name.lower() in INFRA_FILES:
|
|
continue
|
|
|
|
text = md_file.read_text(encoding="utf-8", errors="replace")
|
|
h1 = extract_h1(text)
|
|
frontmatter = extract_frontmatter(text)
|
|
wikilinks = extract_wikilinks(text)
|
|
headings = extract_headings(text)
|
|
code_langs = extract_code_blocks(text)
|
|
summary = extract_first_paragraph(text)
|
|
line_count = text.count("\n") + 1
|
|
word_count = len(text.split())
|
|
|
|
# Derive category from index.md lookup
|
|
category = category_lookup.get(basename.lower(), "")
|
|
if not category:
|
|
# Try stem match
|
|
category = category_lookup.get(stem.lower(), "")
|
|
|
|
# Derive tags (deduplicated)
|
|
tag_set: set[str] = set()
|
|
if category:
|
|
tag_set.add(category.lower())
|
|
if rel.parent != Path("."):
|
|
tag_set.add(str(rel.parent))
|
|
fm_tags = frontmatter.get("tags", "")
|
|
if fm_tags:
|
|
tag_set.update(t.strip() for t in fm_tags.split(",") if t.strip())
|
|
tags = sorted(tag_set)
|
|
|
|
# Complexity from wikilink density
|
|
wl_count = len(wikilinks)
|
|
if wl_count > 15:
|
|
complexity = "complex"
|
|
elif wl_count > 5:
|
|
complexity = "moderate"
|
|
else:
|
|
complexity = "simple"
|
|
|
|
node_id = f"article:{stem}"
|
|
nodes.append({
|
|
"id": node_id,
|
|
"type": "article",
|
|
"name": h1 or basename,
|
|
"filePath": str(rel),
|
|
"summary": summary or f"Wiki article: {h1 or basename}",
|
|
"tags": tags,
|
|
"complexity": complexity,
|
|
"knowledgeMeta": {
|
|
"wikilinks": [wl["target"] for wl in wikilinks],
|
|
**({"category": category} if category else {}),
|
|
"content": text[:3000], # First 3000 chars for LLM analysis
|
|
},
|
|
})
|
|
stats["articles"] += 1
|
|
stats["wikilinks"] += wl_count
|
|
|
|
# Build edges from wikilinks (resolve against known article IDs)
|
|
for wl in wikilinks:
|
|
target_id = resolve_wikilink(wl["target"], name_map, article_ids)
|
|
if target_id and target_id != node_id:
|
|
edges.append({
|
|
"source": node_id,
|
|
"target": target_id,
|
|
"type": "related",
|
|
"direction": "forward",
|
|
"weight": 0.7,
|
|
})
|
|
elif not target_id:
|
|
warnings.append(f"Unresolved wikilink: [[{wl['target']}]] in {rel}")
|
|
stats["unresolved"] += 1
|
|
|
|
# --- Build topic nodes from index.md categories ---
|
|
for cat in categories:
|
|
topic_id = f"topic:{cat['name'].lower().replace(' ', '-')}"
|
|
nodes.append({
|
|
"id": topic_id,
|
|
"type": "topic",
|
|
"name": cat["name"],
|
|
"summary": f"Category from index: {cat['name']} ({len(cat['articles'])} articles)",
|
|
"tags": ["category"],
|
|
"complexity": "simple",
|
|
})
|
|
stats["topics"] += 1
|
|
|
|
# categorized_under edges (only resolve to known article nodes)
|
|
for article_target in cat["articles"]:
|
|
article_id = resolve_wikilink(article_target, name_map, article_ids)
|
|
if article_id:
|
|
edges.append({
|
|
"source": article_id,
|
|
"target": topic_id,
|
|
"type": "categorized_under",
|
|
"direction": "forward",
|
|
"weight": 0.6,
|
|
})
|
|
|
|
# --- Build source nodes from raw/ ---
|
|
if raw_root.is_dir():
|
|
for raw_file in sorted(raw_root.rglob("*")):
|
|
if raw_file.is_file() and not raw_file.name.startswith("."):
|
|
rel_raw = raw_file.relative_to(root)
|
|
ext = raw_file.suffix.lower()
|
|
size_kb = raw_file.stat().st_size / 1024
|
|
source_id = f"source:{raw_file.relative_to(raw_root).with_suffix('')}"
|
|
nodes.append({
|
|
"id": source_id,
|
|
"type": "source",
|
|
"name": raw_file.name,
|
|
"filePath": str(rel_raw),
|
|
"summary": f"Raw source ({ext or 'unknown'}, {size_kb:.0f} KB)",
|
|
"tags": ["raw", ext.lstrip(".") or "unknown"],
|
|
"complexity": "simple",
|
|
})
|
|
stats["sources"] += 1
|
|
|
|
# --- Compute backlinks ---
|
|
backlink_map: dict[str, list[str]] = {}
|
|
for edge in edges:
|
|
if edge["type"] == "related":
|
|
target = edge["target"]
|
|
source = edge["source"]
|
|
backlink_map.setdefault(target, []).append(source)
|
|
for node in nodes:
|
|
if node["type"] == "article" and "knowledgeMeta" in node:
|
|
bl = backlink_map.get(node["id"], [])
|
|
node["knowledgeMeta"]["backlinks"] = bl
|
|
|
|
# --- Deduplicate edges ---
|
|
seen_edges: set[tuple[str, str, str]] = set()
|
|
deduped_edges = []
|
|
for edge in edges:
|
|
key = (edge["source"], edge["target"], edge["type"])
|
|
if key not in seen_edges:
|
|
seen_edges.add(key)
|
|
deduped_edges.append(edge)
|
|
|
|
return {
|
|
"format": "karpathy",
|
|
"stats": stats,
|
|
"categories": [{"name": c["name"], "count": len(c["articles"])} for c in categories],
|
|
"logEntries": len(log_entries),
|
|
"nodes": nodes,
|
|
"edges": deduped_edges,
|
|
"warnings": warnings[:50], # Cap warnings
|
|
}
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: parse-knowledge-base.py <wiki-directory>", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
root = Path(sys.argv[1]).resolve()
|
|
if not root.is_dir():
|
|
print(f"Error: {root} is not a directory", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
manifest = parse_wiki(root)
|
|
|
|
# Write output
|
|
out_dir = root / ".understand-anything" / "intermediate"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = out_dir / "scan-manifest.json"
|
|
out_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8")
|
|
|
|
# Report to stderr
|
|
s = manifest["stats"]
|
|
print(f"[parse] Karpathy wiki: {s['articles']} articles, {s['sources']} sources, "
|
|
f"{s['topics']} topics, {s['wikilinks']} wikilinks "
|
|
f"({s['unresolved']} unresolved)", file=sys.stderr)
|
|
print(f"[parse] Output: {out_path}", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|