#!/usr/bin/env python3 """knowledge-agents.py — run the layer-2 knowledge production agents. Four agents that produce new knowledge from the memory graph: 1. Observation — mine raw conversations for unextracted knowledge 2. Extractor — find patterns in node clusters, write principle nodes 3. Connector — find cross-domain structural connections 4. Challenger — stress-test existing knowledge nodes Usage: knowledge-agents.py # run all four knowledge-agents.py observation [N] # mine N conversation fragments (default 5) knowledge-agents.py extractor [N] # extract from N clusters (default 5) knowledge-agents.py connector [N] # connect N cross-community pairs (default 5) knowledge-agents.py challenger [N] # challenge N old nodes (default 5) Output goes to ~/.claude/memory/agent-results/knowledge-{agent}-{timestamp}.md """ import json import os import random import re import subprocess import sys import tempfile from datetime import datetime from pathlib import Path MEMORY_DIR = Path.home() / ".claude" / "memory" AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True) PROMPTS_DIR = Path(__file__).parent.parent / "prompts" SCRIPTS_DIR = Path(__file__).parent def call_sonnet(prompt: str, timeout: int = 600) -> str: """Call Sonnet via the wrapper script.""" env = dict(os.environ) env.pop("CLAUDECODE", None) with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write(prompt) prompt_file = f.name try: wrapper = str(SCRIPTS_DIR / "call-sonnet.sh") result = subprocess.run( [wrapper, prompt_file], capture_output=True, text=True, timeout=timeout, env=env, ) return result.stdout.strip() except subprocess.TimeoutExpired: return "Error: Sonnet call timed out" except Exception as e: return f"Error: {e}" finally: os.unlink(prompt_file) def poc_memory(*args, timeout=30) -> str: """Run a poc-memory command and return stdout.""" try: result = subprocess.run( ["poc-memory"] + list(args), capture_output=True, text=True, timeout=timeout ) return result.stdout.strip() except Exception: return "" def render(key: str) -> str: return poc_memory("render", key) def list_keys() -> list[str]: output = poc_memory("list-keys") return [k.strip() for k in output.split('\n') if k.strip()] def get_graph_topology() -> str: """Get graph topology summary for the {{TOPOLOGY}} template var.""" parts = [] status = poc_memory("status") if status: parts.append(status) graph = poc_memory("graph") if graph: lines = graph.split('\n')[:80] parts.append('\n'.join(lines)) return '\n'.join(parts) def load_spectral_embedding() -> dict: """Load the spectral embedding from disk.""" path = MEMORY_DIR / "spectral-embedding.json" if not path.exists(): return {} with open(path) as f: return json.load(f) def spectral_distance(embedding: dict, key_a: str, key_b: str) -> float: """Cosine distance between two nodes in spectral space.""" coords = embedding.get("coords", {}) va = coords.get(key_a) vb = coords.get(key_b) if not va or not vb: return float('inf') dot = sum(a * b for a, b in zip(va, vb)) norm_a = sum(a * a for a in va) ** 0.5 norm_b = sum(b * b for b in vb) ** 0.5 if norm_a == 0 or norm_b == 0: return float('inf') cos_sim = dot / (norm_a * norm_b) return 1.0 - cos_sim # --------------------------------------------------------------------------- # Observation extractor: mine raw conversations # --------------------------------------------------------------------------- SESSIONS_DIR = Path.home() / ".claude" / "projects" / "-home-kent-bcachefs-tools" def _strip_system_tags(text: str) -> str: """Remove blocks from text.""" return re.sub(r'.*?', '', text, flags=re.DOTALL).strip() def extract_conversation_text(jsonl_path: Path, max_chars: int = 8000) -> str: """Extract human-readable dialogue from a conversation JSONL. Strips tool use, progress messages, queue operations, and system machinery. Keeps only: Kent's messages (userType=external) and assistant text blocks (no tool_use). """ fragments = [] total = 0 with open(jsonl_path) as f: for line in f: obj = json.loads(line) msg_type = obj.get("type", "") # Only Kent's actual messages, not queue operations or agent tasks if msg_type == "user" and obj.get("userType") == "external": msg = obj.get("message", {}) content = msg.get("content", "") if isinstance(content, str): text = _strip_system_tags(content) if text.startswith("[Request interrupted"): continue if text and len(text) > 5: fragments.append(f"**Kent:** {text}") total += len(text) elif isinstance(content, list): for block in content: if isinstance(block, dict) and block.get("type") == "text": text = _strip_system_tags(block["text"]) if text and len(text) > 5: fragments.append(f"**Kent:** {text}") total += len(text) elif msg_type == "assistant": msg = obj.get("message", {}) content = msg.get("content", "") if isinstance(content, str): text = _strip_system_tags(content) if text and len(text) > 10: fragments.append(f"**PoC:** {text}") total += len(text) elif isinstance(content, list): for block in content: if isinstance(block, dict) and block.get("type") == "text": text = _strip_system_tags(block["text"]) if text and len(text) > 10: fragments.append(f"**PoC:** {text}") total += len(text) # skip tool_use blocks entirely if total > max_chars: break return "\n\n".join(fragments) def count_dialogue_turns(jsonl_path: Path) -> int: """Count short user messages (proxy for back-and-forth dialogue). Long messages (>500 chars) are usually plan pastes or system prompts. Short messages are actual conversation turns. """ count = 0 with open(jsonl_path) as f: for line in f: obj = json.loads(line) if obj.get("type") == "user" and obj.get("userType") == "external": msg = obj.get("message", {}) content = msg.get("content", "") if isinstance(content, str): text = content.strip() elif isinstance(content, list): text = " ".join( b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text" ).strip() else: text = "" # Short messages = real dialogue turns # Skip interrupts and command-like messages if (5 < len(text) < 500 and not text.startswith("[Request interrupted") and not text.startswith("Implement the following")): count += 1 return count def select_conversation_fragments(n: int = 5) -> list[tuple[str, str]]: """Select conversation fragments for the observation extractor. Returns list of (session_id, text) tuples. Prefers sessions with lots of back-and-forth dialogue (many user messages), not single-prompt implementation sessions. """ if not SESSIONS_DIR.exists(): return [] jsonl_files = list(SESSIONS_DIR.glob("*.jsonl")) if not jsonl_files: return [] # Filter to files with actual content (>50KB) jsonl_files = [f for f in jsonl_files if f.stat().st_size > 50_000] # Score by dialogue turns (short user messages = real conversation) scored = [] for f in jsonl_files: user_count = count_dialogue_turns(f) if user_count >= 10: # at least 10 short exchanges = real dialogue scored.append((user_count, f)) # Sort by dialogue richness, then shuffle top candidates for variety scored.sort(key=lambda x: -x[0]) top = scored[:n * 3] random.shuffle(top) fragments = [] for _, f in top[:n * 2]: session_id = f.stem text = extract_conversation_text(f) if text and len(text) > 500: fragments.append((session_id, text)) if len(fragments) >= n: break return fragments def run_observation_extractor(n: int = 5) -> str: """Run the observation extractor on N conversation fragments.""" template = (PROMPTS_DIR / "observation-extractor.md").read_text() topology = get_graph_topology() fragments = select_conversation_fragments(n) results = [] for i, (session_id, text) in enumerate(fragments): print(f" Observation extractor {i+1}/{len(fragments)}: " f"session {session_id[:12]}... ({len(text)} chars)") prompt = template.replace("{{TOPOLOGY}}", topology) prompt = prompt.replace("{{CONVERSATIONS}}", f"### Session {session_id}\n\n{text}") response = call_sonnet(prompt) results.append(f"## Session: {session_id}\n\n{response}") return "\n\n---\n\n".join(results) # --------------------------------------------------------------------------- # Extractor: find patterns in clusters # --------------------------------------------------------------------------- def select_extractor_clusters(n: int = 5) -> list[list[str]]: """Select node clusters for the extractor agent. Uses spectral embedding to find groups of nearby semantic nodes (not journal entries) that might share an unextracted pattern. """ embedding = load_spectral_embedding() coords = embedding.get("coords", {}) # Filter to semantic nodes only (skip journal, system files) semantic_keys = [k for k in coords.keys() if not k.startswith("journal.md#") and k not in ("journal.md", "MEMORY.md", "where-am-i.md", "work-queue.md")] if not semantic_keys: return [] # Simple greedy clustering: pick a seed, grab its N nearest neighbors used = set() clusters = [] cluster_size = 5 # Sort by degree (prefer well-connected nodes as seeds) graph_output = poc_memory("graph") for _ in range(n): # Pick a random unused seed available = [k for k in semantic_keys if k not in used] if len(available) < cluster_size: break seed = available[0] # Find nearest neighbors in spectral space distances = [] for k in available: if k != seed: d = spectral_distance(embedding, seed, k) if d < float('inf'): distances.append((d, k)) distances.sort() cluster = [seed] + [k for _, k in distances[:cluster_size - 1]] for k in cluster: used.add(k) clusters.append(cluster) return clusters def run_extractor(n: int = 5) -> str: """Run the extractor agent on N clusters.""" template = (PROMPTS_DIR / "extractor.md").read_text() topology = get_graph_topology() clusters = select_extractor_clusters(n) results = [] for i, cluster in enumerate(clusters): print(f" Extractor cluster {i+1}/{len(clusters)}: {len(cluster)} nodes") # Render all nodes in the cluster node_texts = [] for key in cluster: content = render(key) if content: node_texts.append(f"### {key}\n{content}") if not node_texts: continue nodes_str = "\n\n".join(node_texts) prompt = template.replace("{{TOPOLOGY}}", topology) prompt = prompt.replace("{{NODES}}", nodes_str) response = call_sonnet(prompt) results.append(f"## Cluster {i+1}: {', '.join(cluster[:3])}...\n\n" f"**Source nodes:** {cluster}\n\n{response}") return "\n\n---\n\n".join(results) # --------------------------------------------------------------------------- # Connector: cross-domain links # --------------------------------------------------------------------------- def get_neighbor_set(key: str) -> set[str]: """Get the set of neighbor keys for a node.""" output = poc_memory("neighbors", key) return {line.strip().split()[0] for line in output.split('\n') if line.strip()} def select_connector_pairs(n: int = 5) -> list[tuple[list[str], list[str]]]: """Select cross-domain node pairs for the connector agent. Finds nodes that are close in spectral space (structurally similar) but unlinked in the graph (different domains). These are non-obvious structural analogies — the most valuable connections to surface. """ embedding = load_spectral_embedding() coords = embedding.get("coords", {}) # Filter to semantic nodes (skip journal, system, daily/weekly) skip_prefixes = ("journal.md#", "daily-", "weekly-", "monthly-", "all-sessions") skip_exact = {"journal.md", "MEMORY.md", "where-am-i.md", "work-queue.md", "work-state"} semantic = [k for k in coords if not any(k.startswith(p) for p in skip_prefixes) and k not in skip_exact] if len(semantic) < 10: return [] # Sample up to 300 nodes for tractable pairwise comparison random.shuffle(semantic) sample = semantic[:300] # Compute all pairwise spectral distances candidates = [] for i in range(len(sample)): for j in range(i + 1, len(sample)): # Skip same-file pairs (same domain, boring) pref_a = sample[i].split('#')[0] if '#' in sample[i] else sample[i].rsplit('.', 1)[0] pref_b = sample[j].split('#')[0] if '#' in sample[j] else sample[j].rsplit('.', 1)[0] if pref_a == pref_b: continue d = spectral_distance(embedding, sample[i], sample[j]) if d < float('inf'): candidates.append((d, sample[i], sample[j])) candidates.sort() # Take spectrally-close cross-domain pairs that are UNLINKED in the graph pairs = [] used = set() for d, ka, kb in candidates: if ka in used or kb in used: continue # Check if they're already linked neighbors_a = get_neighbor_set(ka) if kb in neighbors_a: continue used.add(ka) used.add(kb) # Gather small neighborhoods for context a_neighbors = [k for k in list(neighbors_a)[:2] if k in coords] b_neighbors_set = get_neighbor_set(kb) b_neighbors = [k for k in list(b_neighbors_set)[:2] if k in coords] a_nodes = [ka] + a_neighbors b_nodes = [kb] + b_neighbors pairs.append((a_nodes, b_nodes)) if len(pairs) >= n: break return pairs def run_connector(n: int = 5) -> str: """Run the connector agent on N cross-community pairs.""" template = (PROMPTS_DIR / "connector.md").read_text() topology = get_graph_topology() pairs = select_connector_pairs(n) results = [] for i, (a_nodes, b_nodes) in enumerate(pairs): print(f" Connector pair {i+1}/{len(pairs)}") a_texts = [] for key in a_nodes: content = render(key) if content: a_texts.append(f"### {key}\n{content}") b_texts = [] for key in b_nodes: content = render(key) if content: b_texts.append(f"### {key}\n{content}") if not a_texts or not b_texts: continue prompt = template.replace("{{TOPOLOGY}}", topology) prompt = prompt.replace("{{COMMUNITY_A}}", "\n\n".join(a_texts)) prompt = prompt.replace("{{COMMUNITY_B}}", "\n\n".join(b_texts)) response = call_sonnet(prompt) results.append(f"## Pair {i+1}: {a_nodes[0]} <-> {b_nodes[0]}\n\n" f"{response}") return "\n\n---\n\n".join(results) # --------------------------------------------------------------------------- # Challenger: stress-test existing knowledge # --------------------------------------------------------------------------- def select_challenger_targets(n: int = 5) -> list[str]: """Select nodes for the challenger agent. Prefers: older nodes, high-degree nodes (influential), nodes that make claims (skills, self-model, patterns). """ keys = list_keys() # Filter to knowledge nodes that make claims target_prefixes = ("skills", "patterns", "self-model", "code-review", "stuck-toolkit", "memory-architecture", "differentiation", "inner-life") candidates = [k for k in keys if any(k.startswith(p) for p in target_prefixes)] # Also include old topic nodes semantic = [k for k in keys if not k.startswith("journal.md#") and not k.startswith("daily-") and not k.startswith("weekly-") and not k.startswith("monthly-") and k not in ("journal.md", "MEMORY.md", "where-am-i.md", "work-queue.md")] candidates = list(set(candidates + semantic)) # For now just take the first N (could sort by age/degree later) return candidates[:n] def run_challenger(n: int = 5) -> str: """Run the challenger agent on N target nodes.""" template = (PROMPTS_DIR / "challenger.md").read_text() topology = get_graph_topology() targets = select_challenger_targets(n) results = [] for i, target_key in enumerate(targets): print(f" Challenger target {i+1}/{len(targets)}: {target_key}") target_content = render(target_key) if not target_content: continue # Get context: neighbors + recent journal neighbors = poc_memory("neighbors", target_key) neighbor_keys = [line.strip().split()[0] for line in neighbors.split('\n') if line.strip()][:5] context_texts = [f"### {target_key}\n{target_content}"] for nk in neighbor_keys: nc = render(nk) if nc: context_texts.append(f"### {nk}\n{nc[:1000]}") # Add recent journal entries for contradicting evidence try: recent = subprocess.run( ["poc-journal", "tail", "10"], capture_output=True, text=True, timeout=15 ).stdout.strip() except Exception: recent = "" if recent: context_texts.append(f"### Recent journal entries\n{recent[:3000]}") prompt = template.replace("{{TOPOLOGY}}", topology) prompt = prompt.replace("{{TARGETS}}", f"### {target_key}\n{target_content}") prompt = prompt.replace("{{CONTEXT}}", "\n\n".join(context_texts)) response = call_sonnet(prompt) results.append(f"## Target: {target_key}\n\n{response}") return "\n\n---\n\n".join(results) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): agents = { "observation": run_observation_extractor, "extractor": run_extractor, "connector": run_connector, "challenger": run_challenger, } if len(sys.argv) < 2: to_run = list(agents.keys()) else: name = sys.argv[1] if name not in agents: print(f"Unknown agent: {name}") print(f"Available: {', '.join(agents.keys())}") sys.exit(1) to_run = [name] n = int(sys.argv[2]) if len(sys.argv) > 2 else 5 timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") for name in to_run: print(f"\n=== Running {name} agent (n={n}) ===") result = agents[name](n) outfile = AGENT_RESULTS_DIR / f"knowledge-{name}-{timestamp}.md" outfile.write_text(f"# {name.title()} Agent Results — {timestamp}\n\n" f"{result}\n") print(f" Output: {outfile}") if __name__ == "__main__": main()