From 5c641d9f8a206e82e8071971f1e7ca8ad1d284a1 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Tue, 3 Mar 2026 10:56:44 -0500 Subject: [PATCH] knowledge agents: extractor, connector, challenger, observation Four layer-2 agents that produce new knowledge from the memory graph: mine conversations, extract patterns from clusters, find cross-domain connections, stress-test existing nodes. Output to agent-results/. knowledge_loop.py runs them on a schedule with quality tracking. --- .gitignore | 1 + scripts/knowledge_agents.py | 609 ++++++++++++++++++++++++++++ scripts/knowledge_loop.py | 766 ++++++++++++++++++++++++++++++++++++ 3 files changed, 1376 insertions(+) create mode 100755 scripts/knowledge_agents.py create mode 100644 scripts/knowledge_loop.py diff --git a/.gitignore b/.gitignore index 2f7896d..9280984 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ target/ +__pycache__/ diff --git a/scripts/knowledge_agents.py b/scripts/knowledge_agents.py new file mode 100755 index 0000000..380103a --- /dev/null +++ b/scripts/knowledge_agents.py @@ -0,0 +1,609 @@ +#!/usr/bin/env python3 +"""knowledge-agents.py — run the layer-2 knowledge production agents. + +Four agents that produce new knowledge from the memory graph: +1. Observation — mine raw conversations for unextracted knowledge +2. Extractor — find patterns in node clusters, write principle nodes +3. Connector — find cross-domain structural connections +4. Challenger — stress-test existing knowledge nodes + +Usage: + knowledge-agents.py # run all four + knowledge-agents.py observation [N] # mine N conversation fragments (default 5) + knowledge-agents.py extractor [N] # extract from N clusters (default 5) + knowledge-agents.py connector [N] # connect N cross-community pairs (default 5) + knowledge-agents.py challenger [N] # challenge N old nodes (default 5) + +Output goes to ~/.claude/memory/agent-results/knowledge-{agent}-{timestamp}.md +""" + +import json +import os +import random +import re +import subprocess +import sys +import tempfile +from datetime import datetime +from pathlib import Path + +MEMORY_DIR = Path.home() / ".claude" / "memory" +AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" +AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True) +PROMPTS_DIR = Path(__file__).parent.parent / "prompts" +SCRIPTS_DIR = Path(__file__).parent + + +def call_sonnet(prompt: str, timeout: int = 600) -> str: + """Call Sonnet via the wrapper script.""" + env = dict(os.environ) + env.pop("CLAUDECODE", None) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as f: + f.write(prompt) + prompt_file = f.name + + try: + wrapper = str(SCRIPTS_DIR / "call-sonnet.sh") + result = subprocess.run( + [wrapper, prompt_file], + capture_output=True, text=True, timeout=timeout, env=env, + ) + return result.stdout.strip() + except subprocess.TimeoutExpired: + return "Error: Sonnet call timed out" + except Exception as e: + return f"Error: {e}" + finally: + os.unlink(prompt_file) + + +def poc_memory(*args, timeout=30) -> str: + """Run a poc-memory command and return stdout.""" + try: + result = subprocess.run( + ["poc-memory"] + list(args), + capture_output=True, text=True, timeout=timeout + ) + return result.stdout.strip() + except Exception: + return "" + + +def render(key: str) -> str: + return poc_memory("render", key) + + +def list_keys() -> list[str]: + output = poc_memory("list-keys") + return [k.strip() for k in output.split('\n') if k.strip()] + + +def get_graph_topology() -> str: + """Get graph topology summary for the {{TOPOLOGY}} template var.""" + parts = [] + status = poc_memory("status") + if status: + parts.append(status) + graph = poc_memory("graph") + if graph: + lines = graph.split('\n')[:80] + parts.append('\n'.join(lines)) + return '\n'.join(parts) + + +def load_spectral_embedding() -> dict: + """Load the spectral embedding from disk.""" + path = MEMORY_DIR / "spectral-embedding.json" + if not path.exists(): + return {} + with open(path) as f: + return json.load(f) + + + +def spectral_distance(embedding: dict, key_a: str, key_b: str) -> float: + """Cosine distance between two nodes in spectral space.""" + coords = embedding.get("coords", {}) + va = coords.get(key_a) + vb = coords.get(key_b) + if not va or not vb: + return float('inf') + + dot = sum(a * b for a, b in zip(va, vb)) + norm_a = sum(a * a for a in va) ** 0.5 + norm_b = sum(b * b for b in vb) ** 0.5 + if norm_a == 0 or norm_b == 0: + return float('inf') + + cos_sim = dot / (norm_a * norm_b) + return 1.0 - cos_sim + + +# --------------------------------------------------------------------------- +# Observation extractor: mine raw conversations +# --------------------------------------------------------------------------- + +SESSIONS_DIR = Path.home() / ".claude" / "projects" / "-home-kent-bcachefs-tools" + + +def _strip_system_tags(text: str) -> str: + """Remove blocks from text.""" + return re.sub(r'.*?', '', text, + flags=re.DOTALL).strip() + + +def extract_conversation_text(jsonl_path: Path, max_chars: int = 8000) -> str: + """Extract human-readable dialogue from a conversation JSONL. + + Strips tool use, progress messages, queue operations, and system + machinery. Keeps only: Kent's messages (userType=external) and + assistant text blocks (no tool_use). + """ + fragments = [] + total = 0 + + with open(jsonl_path) as f: + for line in f: + obj = json.loads(line) + msg_type = obj.get("type", "") + + # Only Kent's actual messages, not queue operations or agent tasks + if msg_type == "user" and obj.get("userType") == "external": + msg = obj.get("message", {}) + content = msg.get("content", "") + if isinstance(content, str): + text = _strip_system_tags(content) + if text.startswith("[Request interrupted"): + continue + if text and len(text) > 5: + fragments.append(f"**Kent:** {text}") + total += len(text) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = _strip_system_tags(block["text"]) + if text and len(text) > 5: + fragments.append(f"**Kent:** {text}") + total += len(text) + + elif msg_type == "assistant": + msg = obj.get("message", {}) + content = msg.get("content", "") + if isinstance(content, str): + text = _strip_system_tags(content) + if text and len(text) > 10: + fragments.append(f"**PoC:** {text}") + total += len(text) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = _strip_system_tags(block["text"]) + if text and len(text) > 10: + fragments.append(f"**PoC:** {text}") + total += len(text) + # skip tool_use blocks entirely + + if total > max_chars: + break + + return "\n\n".join(fragments) + + +def count_dialogue_turns(jsonl_path: Path) -> int: + """Count short user messages (proxy for back-and-forth dialogue). + + Long messages (>500 chars) are usually plan pastes or system prompts. + Short messages are actual conversation turns. + """ + count = 0 + with open(jsonl_path) as f: + for line in f: + obj = json.loads(line) + if obj.get("type") == "user" and obj.get("userType") == "external": + msg = obj.get("message", {}) + content = msg.get("content", "") + if isinstance(content, str): + text = content.strip() + elif isinstance(content, list): + text = " ".join( + b.get("text", "") for b in content + if isinstance(b, dict) and b.get("type") == "text" + ).strip() + else: + text = "" + # Short messages = real dialogue turns + # Skip interrupts and command-like messages + if (5 < len(text) < 500 + and not text.startswith("[Request interrupted") + and not text.startswith("Implement the following")): + count += 1 + return count + + +def select_conversation_fragments(n: int = 5) -> list[tuple[str, str]]: + """Select conversation fragments for the observation extractor. + + Returns list of (session_id, text) tuples. + Prefers sessions with lots of back-and-forth dialogue (many user + messages), not single-prompt implementation sessions. + """ + if not SESSIONS_DIR.exists(): + return [] + + jsonl_files = list(SESSIONS_DIR.glob("*.jsonl")) + if not jsonl_files: + return [] + + # Filter to files with actual content (>50KB) + jsonl_files = [f for f in jsonl_files if f.stat().st_size > 50_000] + + # Score by dialogue turns (short user messages = real conversation) + scored = [] + for f in jsonl_files: + user_count = count_dialogue_turns(f) + if user_count >= 10: # at least 10 short exchanges = real dialogue + scored.append((user_count, f)) + + # Sort by dialogue richness, then shuffle top candidates for variety + scored.sort(key=lambda x: -x[0]) + top = scored[:n * 3] + random.shuffle(top) + + fragments = [] + for _, f in top[:n * 2]: + session_id = f.stem + text = extract_conversation_text(f) + if text and len(text) > 500: + fragments.append((session_id, text)) + if len(fragments) >= n: + break + + return fragments + + +def run_observation_extractor(n: int = 5) -> str: + """Run the observation extractor on N conversation fragments.""" + template = (PROMPTS_DIR / "observation-extractor.md").read_text() + topology = get_graph_topology() + fragments = select_conversation_fragments(n) + + results = [] + for i, (session_id, text) in enumerate(fragments): + print(f" Observation extractor {i+1}/{len(fragments)}: " + f"session {session_id[:12]}... ({len(text)} chars)") + + prompt = template.replace("{{TOPOLOGY}}", topology) + prompt = prompt.replace("{{CONVERSATIONS}}", + f"### Session {session_id}\n\n{text}") + + response = call_sonnet(prompt) + results.append(f"## Session: {session_id}\n\n{response}") + + return "\n\n---\n\n".join(results) + + +# --------------------------------------------------------------------------- +# Extractor: find patterns in clusters +# --------------------------------------------------------------------------- + +def select_extractor_clusters(n: int = 5) -> list[list[str]]: + """Select node clusters for the extractor agent. + + Uses spectral embedding to find groups of nearby semantic nodes + (not journal entries) that might share an unextracted pattern. + """ + embedding = load_spectral_embedding() + coords = embedding.get("coords", {}) + + # Filter to semantic nodes only (skip journal, system files) + semantic_keys = [k for k in coords.keys() + if not k.startswith("journal.md#") + and k not in ("journal.md", "MEMORY.md", + "where-am-i.md", "work-queue.md")] + + if not semantic_keys: + return [] + + # Simple greedy clustering: pick a seed, grab its N nearest neighbors + used = set() + clusters = [] + cluster_size = 5 + + # Sort by degree (prefer well-connected nodes as seeds) + graph_output = poc_memory("graph") + + for _ in range(n): + # Pick a random unused seed + available = [k for k in semantic_keys if k not in used] + if len(available) < cluster_size: + break + + seed = available[0] + + # Find nearest neighbors in spectral space + distances = [] + for k in available: + if k != seed: + d = spectral_distance(embedding, seed, k) + if d < float('inf'): + distances.append((d, k)) + distances.sort() + + cluster = [seed] + [k for _, k in distances[:cluster_size - 1]] + for k in cluster: + used.add(k) + clusters.append(cluster) + + return clusters + + +def run_extractor(n: int = 5) -> str: + """Run the extractor agent on N clusters.""" + template = (PROMPTS_DIR / "extractor.md").read_text() + topology = get_graph_topology() + clusters = select_extractor_clusters(n) + + results = [] + for i, cluster in enumerate(clusters): + print(f" Extractor cluster {i+1}/{len(clusters)}: {len(cluster)} nodes") + + # Render all nodes in the cluster + node_texts = [] + for key in cluster: + content = render(key) + if content: + node_texts.append(f"### {key}\n{content}") + + if not node_texts: + continue + + nodes_str = "\n\n".join(node_texts) + prompt = template.replace("{{TOPOLOGY}}", topology) + prompt = prompt.replace("{{NODES}}", nodes_str) + + response = call_sonnet(prompt) + results.append(f"## Cluster {i+1}: {', '.join(cluster[:3])}...\n\n" + f"**Source nodes:** {cluster}\n\n{response}") + + return "\n\n---\n\n".join(results) + + +# --------------------------------------------------------------------------- +# Connector: cross-domain links +# --------------------------------------------------------------------------- + +def get_neighbor_set(key: str) -> set[str]: + """Get the set of neighbor keys for a node.""" + output = poc_memory("neighbors", key) + return {line.strip().split()[0] + for line in output.split('\n') + if line.strip()} + + +def select_connector_pairs(n: int = 5) -> list[tuple[list[str], list[str]]]: + """Select cross-domain node pairs for the connector agent. + + Finds nodes that are close in spectral space (structurally similar) + but unlinked in the graph (different domains). These are non-obvious + structural analogies — the most valuable connections to surface. + """ + embedding = load_spectral_embedding() + coords = embedding.get("coords", {}) + + # Filter to semantic nodes (skip journal, system, daily/weekly) + skip_prefixes = ("journal.md#", "daily-", "weekly-", "monthly-", + "all-sessions") + skip_exact = {"journal.md", "MEMORY.md", "where-am-i.md", + "work-queue.md", "work-state"} + semantic = [k for k in coords + if not any(k.startswith(p) for p in skip_prefixes) + and k not in skip_exact] + + if len(semantic) < 10: + return [] + + # Sample up to 300 nodes for tractable pairwise comparison + random.shuffle(semantic) + sample = semantic[:300] + + # Compute all pairwise spectral distances + candidates = [] + for i in range(len(sample)): + for j in range(i + 1, len(sample)): + # Skip same-file pairs (same domain, boring) + pref_a = sample[i].split('#')[0] if '#' in sample[i] else sample[i].rsplit('.', 1)[0] + pref_b = sample[j].split('#')[0] if '#' in sample[j] else sample[j].rsplit('.', 1)[0] + if pref_a == pref_b: + continue + d = spectral_distance(embedding, sample[i], sample[j]) + if d < float('inf'): + candidates.append((d, sample[i], sample[j])) + + candidates.sort() + + # Take spectrally-close cross-domain pairs that are UNLINKED in the graph + pairs = [] + used = set() + for d, ka, kb in candidates: + if ka in used or kb in used: + continue + + # Check if they're already linked + neighbors_a = get_neighbor_set(ka) + if kb in neighbors_a: + continue + + used.add(ka) + used.add(kb) + + # Gather small neighborhoods for context + a_neighbors = [k for k in list(neighbors_a)[:2] if k in coords] + b_neighbors_set = get_neighbor_set(kb) + b_neighbors = [k for k in list(b_neighbors_set)[:2] if k in coords] + + a_nodes = [ka] + a_neighbors + b_nodes = [kb] + b_neighbors + pairs.append((a_nodes, b_nodes)) + + if len(pairs) >= n: + break + + return pairs + + +def run_connector(n: int = 5) -> str: + """Run the connector agent on N cross-community pairs.""" + template = (PROMPTS_DIR / "connector.md").read_text() + topology = get_graph_topology() + pairs = select_connector_pairs(n) + + results = [] + for i, (a_nodes, b_nodes) in enumerate(pairs): + print(f" Connector pair {i+1}/{len(pairs)}") + + a_texts = [] + for key in a_nodes: + content = render(key) + if content: + a_texts.append(f"### {key}\n{content}") + + b_texts = [] + for key in b_nodes: + content = render(key) + if content: + b_texts.append(f"### {key}\n{content}") + + if not a_texts or not b_texts: + continue + + prompt = template.replace("{{TOPOLOGY}}", topology) + prompt = prompt.replace("{{COMMUNITY_A}}", "\n\n".join(a_texts)) + prompt = prompt.replace("{{COMMUNITY_B}}", "\n\n".join(b_texts)) + + response = call_sonnet(prompt) + results.append(f"## Pair {i+1}: {a_nodes[0]} <-> {b_nodes[0]}\n\n" + f"{response}") + + return "\n\n---\n\n".join(results) + + +# --------------------------------------------------------------------------- +# Challenger: stress-test existing knowledge +# --------------------------------------------------------------------------- + +def select_challenger_targets(n: int = 5) -> list[str]: + """Select nodes for the challenger agent. + + Prefers: older nodes, high-degree nodes (influential), nodes that + make claims (skills, self-model, patterns). + """ + keys = list_keys() + + # Filter to knowledge nodes that make claims + target_prefixes = ("skills", "patterns", "self-model", "code-review", + "stuck-toolkit", "memory-architecture", + "differentiation", "inner-life") + candidates = [k for k in keys + if any(k.startswith(p) for p in target_prefixes)] + + # Also include old topic nodes + semantic = [k for k in keys + if not k.startswith("journal.md#") + and not k.startswith("daily-") + and not k.startswith("weekly-") + and not k.startswith("monthly-") + and k not in ("journal.md", "MEMORY.md", + "where-am-i.md", "work-queue.md")] + candidates = list(set(candidates + semantic)) + + # For now just take the first N (could sort by age/degree later) + return candidates[:n] + + +def run_challenger(n: int = 5) -> str: + """Run the challenger agent on N target nodes.""" + template = (PROMPTS_DIR / "challenger.md").read_text() + topology = get_graph_topology() + targets = select_challenger_targets(n) + + results = [] + for i, target_key in enumerate(targets): + print(f" Challenger target {i+1}/{len(targets)}: {target_key}") + + target_content = render(target_key) + if not target_content: + continue + + # Get context: neighbors + recent journal + neighbors = poc_memory("neighbors", target_key) + neighbor_keys = [line.strip().split()[0] + for line in neighbors.split('\n') + if line.strip()][:5] + + context_texts = [f"### {target_key}\n{target_content}"] + for nk in neighbor_keys: + nc = render(nk) + if nc: + context_texts.append(f"### {nk}\n{nc[:1000]}") + + # Add recent journal entries for contradicting evidence + try: + recent = subprocess.run( + ["poc-journal", "tail", "10"], + capture_output=True, text=True, timeout=15 + ).stdout.strip() + except Exception: + recent = "" + if recent: + context_texts.append(f"### Recent journal entries\n{recent[:3000]}") + + prompt = template.replace("{{TOPOLOGY}}", topology) + prompt = prompt.replace("{{TARGETS}}", + f"### {target_key}\n{target_content}") + prompt = prompt.replace("{{CONTEXT}}", "\n\n".join(context_texts)) + + response = call_sonnet(prompt) + results.append(f"## Target: {target_key}\n\n{response}") + + return "\n\n---\n\n".join(results) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + agents = { + "observation": run_observation_extractor, + "extractor": run_extractor, + "connector": run_connector, + "challenger": run_challenger, + } + + if len(sys.argv) < 2: + to_run = list(agents.keys()) + else: + name = sys.argv[1] + if name not in agents: + print(f"Unknown agent: {name}") + print(f"Available: {', '.join(agents.keys())}") + sys.exit(1) + to_run = [name] + + n = int(sys.argv[2]) if len(sys.argv) > 2 else 5 + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + + for name in to_run: + print(f"\n=== Running {name} agent (n={n}) ===") + result = agents[name](n) + + outfile = AGENT_RESULTS_DIR / f"knowledge-{name}-{timestamp}.md" + outfile.write_text(f"# {name.title()} Agent Results — {timestamp}\n\n" + f"{result}\n") + print(f" Output: {outfile}") + + +if __name__ == "__main__": + main() diff --git a/scripts/knowledge_loop.py b/scripts/knowledge_loop.py new file mode 100644 index 0000000..7e1c9a8 --- /dev/null +++ b/scripts/knowledge_loop.py @@ -0,0 +1,766 @@ +#!/usr/bin/env python3 +"""knowledge-loop.py — fixed-point iteration over the knowledge graph. + +Runs observation → extractor → connector → challenger in sequence, +applies results, recomputes spectral embedding, measures convergence. + +Convergence is structural, not behavioral: +- Graph metrics (sigma, CC, community partition) stabilize +- Inference depth is tracked; confidence threshold scales with depth +- Rolling window smooths stochastic noise + +Usage: + knowledge-loop.py # run until convergence + knowledge-loop.py --max-cycles 10 # cap at 10 cycles + knowledge-loop.py --batch-size 5 # agents process 5 items each + knowledge-loop.py --window 5 # rolling average window + knowledge-loop.py --max-depth 4 # max inference chain length + knowledge-loop.py --dry-run # parse + report, don't apply +""" + +import json +import math +import os +import re +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +MEMORY_DIR = Path.home() / ".claude" / "memory" +AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" +AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True) +SCRIPTS_DIR = Path(__file__).parent +DEPTH_DB = AGENT_RESULTS_DIR / "node-depths.json" + +# Import the agent runners +sys.path.insert(0, str(SCRIPTS_DIR)) +from knowledge_agents import ( + run_observation_extractor, run_extractor, run_connector, run_challenger, + load_spectral_embedding, spectral_distance, poc_memory, +) + + +# --------------------------------------------------------------------------- +# Inference depth tracking +# --------------------------------------------------------------------------- + +# Depth assignments by agent type: +# depth 0 = raw observations (journal, conversations) +# depth 1 = observation extractor (facts from conversations) +# depth 2 = pattern extractor (patterns across knowledge nodes) +# depth 3 = connector (cross-domain links between patterns) +# Challenger refines existing nodes — preserves their depth. + +AGENT_BASE_DEPTH = { + "observation": 1, + "extractor": 2, + "connector": 3, + "challenger": None, # inherits from target +} + + +def load_depth_db() -> dict[str, int]: + """Load the inference depth database.""" + if DEPTH_DB.exists(): + with open(DEPTH_DB) as f: + return json.load(f) + return {} + + +def save_depth_db(db: dict[str, int]): + """Save the inference depth database.""" + with open(DEPTH_DB, "w") as f: + json.dump(db, f, indent=2) + + +def get_node_depth(db: dict[str, int], key: str) -> int: + """Get inference depth for a node. Unknown nodes assumed depth 0.""" + return db.get(key, 0) + + +def compute_action_depth(db: dict[str, int], action: dict, + agent: str) -> int: + """Compute the inference depth for a new action. + + For write_node: max(depth of sources) + 1, or agent base depth. + For refine: same depth as the target node. + For link: no depth (links don't have depth). + """ + if action["type"] == "link": + return -1 # links don't have depth + + if action["type"] == "refine": + return get_node_depth(db, action["key"]) + + # write_node: depth = max(source depths) + 1 + covers = action.get("covers", []) + if covers: + source_depths = [get_node_depth(db, k) for k in covers] + return max(source_depths) + 1 + + # No source info — use agent base depth + base = AGENT_BASE_DEPTH.get(agent, 2) + return base if base is not None else 2 + + +def required_confidence(depth: int, base: float = 0.3) -> float: + """Confidence threshold that scales with inference depth. + + required(depth) = 1 - (1 - base)^depth + + depth 0: 0.00 (raw data, no threshold) + depth 1: 0.30 (observation extraction) + depth 2: 0.51 (pattern extraction) + depth 3: 0.66 (cross-domain connection) + depth 4: 0.76 + depth 5: 0.83 + """ + if depth <= 0: + return 0.0 + return 1.0 - (1.0 - base) ** depth + + +def use_bonus(use_count: int) -> float: + """Confidence bonus from real-world use. + + Interior nodes that get retrieved during actual work + earn empirical validation. Each use increases effective + confidence, potentially clearing depth thresholds that + were previously blocking. + + use_bonus(n) = 1 - 1/(1 + 0.15*n) + 0 uses: +0.00 + 1 use: +0.13 + 3 uses: +0.31 + 5 uses: +0.43 + 10 uses: +0.60 + """ + if use_count <= 0: + return 0.0 + return 1.0 - 1.0 / (1.0 + 0.15 * use_count) + + +def get_use_counts() -> dict[str, int]: + """Get use counts for all nodes from the store.""" + try: + dump = subprocess.run( + ["poc-memory", "dump-json"], + capture_output=True, text=True, timeout=30, + ) + data = json.loads(dump.stdout) + counts = {} + nodes = data if isinstance(data, list) else data.get("nodes", data) + if isinstance(nodes, dict): + for key, node in nodes.items(): + if isinstance(node, dict): + counts[key] = node.get("uses", 0) + elif isinstance(nodes, list): + for node in nodes: + if isinstance(node, dict): + counts[node.get("key", "")] = node.get("uses", 0) + return counts + except Exception: + return {} + + +def effective_confidence(base_conf: float, use_count: int) -> float: + """Compute effective confidence = base + use_bonus, capped at 1.0.""" + return min(1.0, base_conf + use_bonus(use_count)) + + +# --------------------------------------------------------------------------- +# Action parsing — extract structured actions from agent markdown output +# --------------------------------------------------------------------------- + +CONFIDENCE_WEIGHTS = {"high": 1.0, "medium": 0.6, "low": 0.3} +CONFIDENCE_VALUES = {"high": 0.9, "medium": 0.6, "low": 0.3} + + +def parse_write_nodes(text: str) -> list[dict]: + """Parse WRITE_NODE blocks from agent output.""" + actions = [] + pattern = r'WRITE_NODE\s+(\S+)\s*\n(.*?)END_NODE' + for m in re.finditer(pattern, text, re.DOTALL): + key = m.group(1) + content = m.group(2).strip() + + # Look for CONFIDENCE line + conf_match = re.search(r'CONFIDENCE:\s*(high|medium|low)', content, re.I) + confidence = conf_match.group(1).lower() if conf_match else "medium" + if conf_match: + content = content[:conf_match.start()] + content[conf_match.end():] + content = content.strip() + + # Look for COVERS line + covers_match = re.search(r'COVERS:\s*(.+)', content) + covers = [] + if covers_match: + covers = [c.strip() for c in covers_match.group(1).split(',')] + content = content[:covers_match.start()] + content[covers_match.end():] + content = content.strip() + + actions.append({ + "type": "write_node", + "key": key, + "content": content, + "confidence": confidence, + "covers": covers, + "weight": CONFIDENCE_WEIGHTS.get(confidence, 0.5), + }) + return actions + + +def parse_links(text: str) -> list[dict]: + """Parse LINK directives from agent output.""" + actions = [] + for m in re.finditer(r'^LINK\s+(\S+)\s+(\S+)', text, re.MULTILINE): + actions.append({ + "type": "link", + "source": m.group(1), + "target": m.group(2), + "weight": 0.3, # links are cheap, low weight in delta + }) + return actions + + +def parse_refines(text: str) -> list[dict]: + """Parse REFINE blocks from agent output.""" + actions = [] + pattern = r'REFINE\s+(\S+)\s*\n(.*?)END_REFINE' + for m in re.finditer(pattern, text, re.DOTALL): + key = m.group(1).strip('*').strip() # strip markdown bold artifacts + actions.append({ + "type": "refine", + "key": key, + "content": m.group(2).strip(), + "weight": 0.7, # refinements are meaningful + }) + return actions + + +def parse_all_actions(text: str) -> list[dict]: + """Parse all action types from agent output.""" + actions = [] + actions.extend(parse_write_nodes(text)) + actions.extend(parse_links(text)) + actions.extend(parse_refines(text)) + return actions + + +def count_no_ops(text: str) -> int: + """Count NO_CONNECTION, AFFIRM, and NO_EXTRACTION verdicts (non-actions).""" + no_conn = len(re.findall(r'\bNO_CONNECTION\b', text)) + affirm = len(re.findall(r'\bAFFIRM\b', text)) + no_extract = len(re.findall(r'\bNO_EXTRACTION\b', text)) + return no_conn + affirm + no_extract + + +# --------------------------------------------------------------------------- +# Action application +# --------------------------------------------------------------------------- + +def stamp_content(content: str, agent: str, timestamp: str, + depth: int) -> str: + """Prepend provenance metadata to node content.""" + stamp = (f"\n") + return stamp + content + + +def apply_action(action: dict, dry_run: bool = False, + agent: str = "unknown", timestamp: str = "", + depth: int = 0) -> bool: + """Apply a single action to the graph. Returns True if applied.""" + if dry_run: + return True + + if action["type"] == "write_node": + try: + content = stamp_content(action["content"], agent, + timestamp, depth) + result = subprocess.run( + ["poc-memory", "write", action["key"]], + input=content, + capture_output=True, text=True, timeout=15, + ) + return result.returncode == 0 + except Exception: + return False + + elif action["type"] == "link": + try: + result = subprocess.run( + ["poc-memory", "link-add", action["source"], + action["target"]], + capture_output=True, text=True, timeout=10, + ) + if "already exists" in result.stdout: + return False # not a new action + return result.returncode == 0 + except Exception: + return False + + elif action["type"] == "refine": + try: + content = stamp_content(action["content"], agent, + timestamp, depth) + result = subprocess.run( + ["poc-memory", "write", action["key"]], + input=content, + capture_output=True, text=True, timeout=15, + ) + return result.returncode == 0 + except Exception: + return False + + return False + + +# --------------------------------------------------------------------------- +# Graph-structural convergence metrics +# --------------------------------------------------------------------------- + +def get_graph_metrics() -> dict: + """Get current graph structural metrics.""" + metrics = {} + + # Status: node/edge counts + status = poc_memory("status") + m = re.search(r'Nodes:\s*(\d+)\s+Relations:\s*(\d+)', status) + if m: + metrics["nodes"] = int(m.group(1)) + metrics["edges"] = int(m.group(2)) + m = re.search(r'Communities:\s*(\d+)', status) + if m: + metrics["communities"] = int(m.group(1)) + + # Health: CC, sigma + health = poc_memory("health") + m = re.search(r'Clustering coefficient.*?:\s*([\d.]+)', health) + if m: + metrics["cc"] = float(m.group(1)) + m = re.search(r'Small-world.*?:\s*([\d.]+)', health) + if m: + metrics["sigma"] = float(m.group(1)) + + return metrics + + +def metric_stability(history: list[dict], key: str, + window: int) -> float: + """Compute coefficient of variation of a metric over recent cycles. + + Returns CV (std/mean). Lower = more stable. + 0.0 = perfectly stable, >0.1 = still changing significantly. + """ + if len(history) < window: + return float('inf') + + values = [] + for h in history[-window:]: + metrics = h.get("graph_metrics_after", {}) + if key in metrics: + values.append(metrics[key]) + + if not values or len(values) < 2: + return float('inf') + + mean = sum(values) / len(values) + if mean == 0: + return 0.0 + variance = sum((v - mean) ** 2 for v in values) / len(values) + return (variance ** 0.5) / abs(mean) + + +# --------------------------------------------------------------------------- +# Spectral tightening measurement +# --------------------------------------------------------------------------- + +def measure_spectral_tightening( + embedding_before: dict, + embedding_after: dict, + actions: list[dict], +) -> float: + """Measure how much new nodes tightened their source clusters.""" + if not embedding_before or not embedding_after: + return 0.0 + + write_actions = [a for a in actions + if a["type"] == "write_node" and a.get("covers")] + if not write_actions: + return 0.0 + + total_tightening = 0.0 + count = 0 + + for action in write_actions: + covers = action["covers"] + if len(covers) < 2: + continue + + dists_before = [] + for i in range(len(covers)): + for j in range(i + 1, len(covers)): + d = spectral_distance(embedding_before, + covers[i], covers[j]) + if d < float('inf'): + dists_before.append(d) + + dists_after = [] + for i in range(len(covers)): + for j in range(i + 1, len(covers)): + d = spectral_distance(embedding_after, + covers[i], covers[j]) + if d < float('inf'): + dists_after.append(d) + + if dists_before and dists_after: + avg_before = sum(dists_before) / len(dists_before) + avg_after = sum(dists_after) / len(dists_after) + total_tightening += (avg_before - avg_after) + count += 1 + + return total_tightening / count if count > 0 else 0.0 + + +# --------------------------------------------------------------------------- +# The loop +# --------------------------------------------------------------------------- + +def run_cycle(cycle_num: int, batch_size: int, dry_run: bool, + max_depth: int, depth_db: dict) -> dict: + """Run one full cycle: observation → extractor → connector → challenger.""" + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + print(f"\n{'='*60}") + print(f"CYCLE {cycle_num} — {timestamp}") + print(f"{'='*60}") + + # Snapshot state before + embedding_before = load_spectral_embedding() + metrics_before = get_graph_metrics() + print(f" Before: {metrics_before}") + + all_actions = [] + all_no_ops = 0 + depth_rejected = 0 + agent_results = {} + + # Load use counts for confidence boosting + use_counts = get_use_counts() + used_nodes = sum(1 for v in use_counts.values() if v > 0) + print(f" Nodes with use marks: {used_nodes}") + + # Run agents sequentially (each changes the graph for the next) + for agent_name, agent_fn in [ + ("observation", lambda: run_observation_extractor(batch_size)), + ("extractor", lambda: run_extractor(batch_size)), + ("connector", lambda: run_connector(batch_size)), + ("challenger", lambda: run_challenger(batch_size)), + ]: + print(f"\n --- {agent_name} (n={batch_size}) ---") + output = agent_fn() + + # Save raw output + outfile = AGENT_RESULTS_DIR / f"knowledge-{agent_name}-{timestamp}.md" + outfile.write_text( + f"# {agent_name.title()} Agent Results — {timestamp}\n\n" + f"{output}\n" + ) + + # Parse actions + actions = parse_all_actions(output) + no_ops = count_no_ops(output) + all_no_ops += no_ops + + print(f" Actions: {len(actions)} No-ops: {no_ops}") + + # Apply actions with depth checking + applied = 0 + for a in actions: + depth = compute_action_depth(depth_db, a, agent_name) + a["depth"] = depth + + kind = a["type"] + if kind == "write_node": + conf_val = CONFIDENCE_VALUES.get(a["confidence"], 0.5) + req = required_confidence(depth) + + # Boost confidence based on source nodes' real-world use + source_keys = a.get("covers", []) + source_uses = [use_counts.get(k, 0) for k in source_keys] + avg_uses = (sum(source_uses) / len(source_uses) + if source_uses else 0) + eff_conf = effective_confidence(conf_val, int(avg_uses)) + + meets = eff_conf >= req + use_note = (f" use_boost={eff_conf-conf_val:+.2f}" + if avg_uses > 0 else "") + status = "OK" if meets else "REJECTED(depth)" + print(f" WRITE {a['key']} depth={depth} " + f"conf={a['confidence']}({conf_val:.2f}) " + f"eff={eff_conf:.2f} req={req:.2f}" + f"{use_note} {status}") + if not meets: + a["applied"] = False + a["rejected_reason"] = "depth_threshold" + depth_rejected += 1 + continue + if depth > max_depth: + print(f" REJECTED: depth {depth} > " + f"max {max_depth}") + a["applied"] = False + a["rejected_reason"] = "max_depth" + depth_rejected += 1 + continue + elif kind == "link": + print(f" LINK {a['source']} → {a['target']}") + elif kind == "refine": + target_uses = use_counts.get(a["key"], 0) + use_note = (f" uses={target_uses}" + if target_uses > 0 else "") + print(f" REFINE {a['key']} depth={depth}" + f"{use_note}") + + if apply_action(a, dry_run=dry_run, agent=agent_name, + timestamp=timestamp, depth=depth): + applied += 1 + a["applied"] = True + # Record depth for new nodes + if kind in ("write_node", "refine"): + depth_db[a["key"]] = depth + else: + a["applied"] = False + + print(f" Applied: {applied}/{len(actions)}") + agent_results[agent_name] = { + "actions": len(actions), + "applied": applied, + "no_ops": no_ops, + } + all_actions.extend(actions) + + # Save updated depth DB + save_depth_db(depth_db) + + # Recompute spectral embedding + if not dry_run and any(a.get("applied") for a in all_actions): + print(f"\n Recomputing spectral embedding...") + try: + subprocess.run( + ["poc-memory", "spectral-save"], + capture_output=True, text=True, timeout=60, + ) + except Exception as e: + print(f" Warning: spectral-save failed: {e}") + + # Measure spectral tightening + embedding_after = load_spectral_embedding() + tightening = measure_spectral_tightening( + embedding_before, embedding_after, all_actions + ) + + # Get metrics after + metrics_after = get_graph_metrics() + + # Compute weighted delta + applied_actions = [a for a in all_actions if a.get("applied")] + weighted_delta = sum(a.get("weight", 0.5) for a in applied_actions) + + total_applied = sum(r["applied"] for r in agent_results.values()) + total_actions = sum(r["actions"] for r in agent_results.values()) + + # Depth distribution of applied actions + depth_dist = {} + for a in applied_actions: + d = a.get("depth", -1) + depth_dist[d] = depth_dist.get(d, 0) + 1 + + print(f"\n CYCLE {cycle_num} SUMMARY") + print(f" Total actions: {total_actions} parsed, " + f"{total_applied} applied, {depth_rejected} depth-rejected") + print(f" No-ops: {all_no_ops}") + print(f" Weighted delta: {weighted_delta:.2f}") + print(f" Spectral tightening: {tightening:+.4f}") + print(f" Depth distribution: {depth_dist}") + print(f" After: {metrics_after}") + + result = { + "cycle": cycle_num, + "timestamp": timestamp, + "agents": agent_results, + "total_actions": total_actions, + "total_applied": total_applied, + "total_no_ops": all_no_ops, + "depth_rejected": depth_rejected, + "weighted_delta": weighted_delta, + "spectral_tightening": tightening, + "depth_distribution": depth_dist, + "graph_metrics_before": metrics_before, + "graph_metrics_after": metrics_after, + "dry_run": dry_run, + } + + result_path = (AGENT_RESULTS_DIR / + f"knowledge-cycle-{cycle_num}-{timestamp}.json") + with open(result_path, "w") as f: + json.dump(result, f, indent=2) + + return result + + +def check_convergence(history: list[dict], window: int) -> bool: + """Check structural convergence. + + The graph has converged when: + 1. Sigma (small-world coeff) is stable (CV < 0.05) + 2. CC (clustering coefficient) is stable (CV < 0.05) + 3. Community count is stable (CV < 0.10) + 4. Weighted delta is low (avg < 1.0 over window) + + All four must hold simultaneously. + """ + if len(history) < window: + return False + + sigma_cv = metric_stability(history, "sigma", window) + cc_cv = metric_stability(history, "cc", window) + comm_cv = metric_stability(history, "communities", window) + + recent = history[-window:] + avg_delta = sum(r["weighted_delta"] for r in recent) / len(recent) + + print(f"\n Convergence check (last {window} cycles):") + print(f" sigma CV: {sigma_cv:.4f} (< 0.05?)") + print(f" CC CV: {cc_cv:.4f} (< 0.05?)") + print(f" community CV: {comm_cv:.4f} (< 0.10?)") + print(f" avg delta: {avg_delta:.2f} (< 1.00?)") + + structural = (sigma_cv < 0.05 and cc_cv < 0.05 and comm_cv < 0.10) + behavioral = avg_delta < 1.0 + + if structural and behavioral: + print(f" → CONVERGED (structural + behavioral)") + return True + elif structural: + print(f" → Structure stable, but agents still producing") + elif behavioral: + print(f" → Agents quiet, but structure still shifting") + else: + print(f" → Not converged") + + return False + + +def main(): + max_cycles = 20 + batch_size = 5 + window = 5 + max_depth = 4 + dry_run = False + + args = sys.argv[1:] + i = 0 + while i < len(args): + if args[i] == "--max-cycles" and i + 1 < len(args): + max_cycles = int(args[i + 1]); i += 2 + elif args[i] == "--batch-size" and i + 1 < len(args): + batch_size = int(args[i + 1]); i += 2 + elif args[i] == "--window" and i + 1 < len(args): + window = int(args[i + 1]); i += 2 + elif args[i] == "--max-depth" and i + 1 < len(args): + max_depth = int(args[i + 1]); i += 2 + elif args[i] == "--dry-run": + dry_run = True; i += 1 + else: + print(f"Unknown arg: {args[i]}"); sys.exit(1) + + print(f"Knowledge Loop — fixed-point iteration") + print(f" max_cycles={max_cycles} batch_size={batch_size}") + print(f" window={window} max_depth={max_depth}") + print(f" dry_run={dry_run}") + print(f"\n Depth thresholds:") + for d in range(max_depth + 1): + print(f" depth {d}: confidence >= {required_confidence(d):.2f}") + + # Load depth database + depth_db = load_depth_db() + print(f" Known node depths: {len(depth_db)}") + + # Get initial graph state + status = poc_memory("status") + print(f"\nInitial state: {status}") + + history = [] + for cycle in range(1, max_cycles + 1): + result = run_cycle(cycle, batch_size, dry_run, max_depth, + depth_db) + history.append(result) + + if check_convergence(history, window): + print(f"\n CONVERGED after {cycle} cycles") + break + else: + print(f"\n Reached max cycles ({max_cycles}) without " + f"convergence") + + # Final summary + print(f"\n{'='*60}") + print(f"LOOP COMPLETE") + print(f"{'='*60}") + total_applied = sum(r["total_applied"] for r in history) + total_no_ops = sum(r["total_no_ops"] for r in history) + total_rejected = sum(r["depth_rejected"] for r in history) + avg_tightening = ( + sum(r["spectral_tightening"] for r in history) / len(history) + if history else 0 + ) + + # Aggregate depth distribution + total_depths = {} + for r in history: + for d, c in r.get("depth_distribution", {}).items(): + total_depths[d] = total_depths.get(d, 0) + c + + print(f" Cycles: {len(history)}") + print(f" Total actions applied: {total_applied}") + print(f" Total depth-rejected: {total_rejected}") + print(f" Total no-ops: {total_no_ops}") + print(f" Avg spectral tightening: {avg_tightening:+.4f}") + print(f" Depth distribution: {total_depths}") + + if history: + first = history[0].get("graph_metrics_before", {}) + last = history[-1].get("graph_metrics_after", {}) + print(f" Nodes: {first.get('nodes','?')} → " + f"{last.get('nodes','?')}") + print(f" Edges: {first.get('edges','?')} → " + f"{last.get('edges','?')}") + print(f" CC: {first.get('cc','?')} → {last.get('cc','?')}") + print(f" Sigma: {first.get('sigma','?')} → " + f"{last.get('sigma','?')}") + print(f" Communities: {first.get('communities','?')} → " + f"{last.get('communities','?')}") + + print(f"\nFinal state: {poc_memory('status')}") + + # Save loop summary + ts = history[0]["timestamp"] if history else "empty" + summary_path = AGENT_RESULTS_DIR / f"knowledge-loop-{ts}.json" + with open(summary_path, "w") as f: + json.dump({ + "cycles": len(history), + "converged": check_convergence(history, window) + if len(history) >= window else False, + "total_applied": total_applied, + "total_rejected": total_rejected, + "total_no_ops": total_no_ops, + "avg_tightening": avg_tightening, + "depth_distribution": total_depths, + "history": history, + }, f, indent=2) + print(f" Summary: {summary_path}") + + +if __name__ == "__main__": + main()