#!/usr/bin/env python3 """knowledge-loop.py — fixed-point iteration over the knowledge graph. Runs observation → extractor → connector → challenger in sequence, applies results, recomputes spectral embedding, measures convergence. Convergence is structural, not behavioral: - Graph metrics (sigma, CC, community partition) stabilize - Inference depth is tracked; confidence threshold scales with depth - Rolling window smooths stochastic noise Usage: knowledge-loop.py # run until convergence knowledge-loop.py --max-cycles 10 # cap at 10 cycles knowledge-loop.py --batch-size 5 # agents process 5 items each knowledge-loop.py --window 5 # rolling average window knowledge-loop.py --max-depth 4 # max inference chain length knowledge-loop.py --dry-run # parse + report, don't apply """ import json import math import os import re import subprocess import sys from datetime import datetime from pathlib import Path MEMORY_DIR = Path.home() / ".claude" / "memory" AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True) SCRIPTS_DIR = Path(__file__).parent DEPTH_DB = AGENT_RESULTS_DIR / "node-depths.json" # Import the agent runners sys.path.insert(0, str(SCRIPTS_DIR)) from knowledge_agents import ( run_observation_extractor, run_extractor, run_connector, run_challenger, load_spectral_embedding, spectral_distance, poc_memory, ) # --------------------------------------------------------------------------- # Inference depth tracking # --------------------------------------------------------------------------- # Depth assignments by agent type: # depth 0 = raw observations (journal, conversations) # depth 1 = observation extractor (facts from conversations) # depth 2 = pattern extractor (patterns across knowledge nodes) # depth 3 = connector (cross-domain links between patterns) # Challenger refines existing nodes — preserves their depth. AGENT_BASE_DEPTH = { "observation": 1, "extractor": 2, "connector": 3, "challenger": None, # inherits from target } def load_depth_db() -> dict[str, int]: """Load the inference depth database.""" if DEPTH_DB.exists(): with open(DEPTH_DB) as f: return json.load(f) return {} def save_depth_db(db: dict[str, int]): """Save the inference depth database.""" with open(DEPTH_DB, "w") as f: json.dump(db, f, indent=2) def get_node_depth(db: dict[str, int], key: str) -> int: """Get inference depth for a node. Unknown nodes assumed depth 0.""" return db.get(key, 0) def compute_action_depth(db: dict[str, int], action: dict, agent: str) -> int: """Compute the inference depth for a new action. For write_node: max(depth of sources) + 1, or agent base depth. For refine: same depth as the target node. For link: no depth (links don't have depth). """ if action["type"] == "link": return -1 # links don't have depth if action["type"] == "refine": return get_node_depth(db, action["key"]) # write_node: depth = max(source depths) + 1 covers = action.get("covers", []) if covers: source_depths = [get_node_depth(db, k) for k in covers] return max(source_depths) + 1 # No source info — use agent base depth base = AGENT_BASE_DEPTH.get(agent, 2) return base if base is not None else 2 def required_confidence(depth: int, base: float = 0.3) -> float: """Confidence threshold that scales with inference depth. required(depth) = 1 - (1 - base)^depth depth 0: 0.00 (raw data, no threshold) depth 1: 0.30 (observation extraction) depth 2: 0.51 (pattern extraction) depth 3: 0.66 (cross-domain connection) depth 4: 0.76 depth 5: 0.83 """ if depth <= 0: return 0.0 return 1.0 - (1.0 - base) ** depth def use_bonus(use_count: int) -> float: """Confidence bonus from real-world use. Interior nodes that get retrieved during actual work earn empirical validation. Each use increases effective confidence, potentially clearing depth thresholds that were previously blocking. use_bonus(n) = 1 - 1/(1 + 0.15*n) 0 uses: +0.00 1 use: +0.13 3 uses: +0.31 5 uses: +0.43 10 uses: +0.60 """ if use_count <= 0: return 0.0 return 1.0 - 1.0 / (1.0 + 0.15 * use_count) def get_use_counts() -> dict[str, int]: """Get use counts for all nodes from the store.""" try: dump = subprocess.run( ["poc-memory", "dump-json"], capture_output=True, text=True, timeout=30, ) data = json.loads(dump.stdout) counts = {} nodes = data if isinstance(data, list) else data.get("nodes", data) if isinstance(nodes, dict): for key, node in nodes.items(): if isinstance(node, dict): counts[key] = node.get("uses", 0) elif isinstance(nodes, list): for node in nodes: if isinstance(node, dict): counts[node.get("key", "")] = node.get("uses", 0) return counts except Exception: return {} def effective_confidence(base_conf: float, use_count: int) -> float: """Compute effective confidence = base + use_bonus, capped at 1.0.""" return min(1.0, base_conf + use_bonus(use_count)) # --------------------------------------------------------------------------- # Action parsing — extract structured actions from agent markdown output # --------------------------------------------------------------------------- CONFIDENCE_WEIGHTS = {"high": 1.0, "medium": 0.6, "low": 0.3} CONFIDENCE_VALUES = {"high": 0.9, "medium": 0.6, "low": 0.3} def parse_write_nodes(text: str) -> list[dict]: """Parse WRITE_NODE blocks from agent output.""" actions = [] pattern = r'WRITE_NODE\s+(\S+)\s*\n(.*?)END_NODE' for m in re.finditer(pattern, text, re.DOTALL): key = m.group(1) content = m.group(2).strip() # Look for CONFIDENCE line conf_match = re.search(r'CONFIDENCE:\s*(high|medium|low)', content, re.I) confidence = conf_match.group(1).lower() if conf_match else "medium" if conf_match: content = content[:conf_match.start()] + content[conf_match.end():] content = content.strip() # Look for COVERS line covers_match = re.search(r'COVERS:\s*(.+)', content) covers = [] if covers_match: covers = [c.strip() for c in covers_match.group(1).split(',')] content = content[:covers_match.start()] + content[covers_match.end():] content = content.strip() actions.append({ "type": "write_node", "key": key, "content": content, "confidence": confidence, "covers": covers, "weight": CONFIDENCE_WEIGHTS.get(confidence, 0.5), }) return actions def parse_links(text: str) -> list[dict]: """Parse LINK directives from agent output.""" actions = [] for m in re.finditer(r'^LINK\s+(\S+)\s+(\S+)', text, re.MULTILINE): actions.append({ "type": "link", "source": m.group(1), "target": m.group(2), "weight": 0.3, # links are cheap, low weight in delta }) return actions def parse_refines(text: str) -> list[dict]: """Parse REFINE blocks from agent output.""" actions = [] pattern = r'REFINE\s+(\S+)\s*\n(.*?)END_REFINE' for m in re.finditer(pattern, text, re.DOTALL): key = m.group(1).strip('*').strip() # strip markdown bold artifacts actions.append({ "type": "refine", "key": key, "content": m.group(2).strip(), "weight": 0.7, # refinements are meaningful }) return actions def parse_all_actions(text: str) -> list[dict]: """Parse all action types from agent output.""" actions = [] actions.extend(parse_write_nodes(text)) actions.extend(parse_links(text)) actions.extend(parse_refines(text)) return actions def count_no_ops(text: str) -> int: """Count NO_CONNECTION, AFFIRM, and NO_EXTRACTION verdicts (non-actions).""" no_conn = len(re.findall(r'\bNO_CONNECTION\b', text)) affirm = len(re.findall(r'\bAFFIRM\b', text)) no_extract = len(re.findall(r'\bNO_EXTRACTION\b', text)) return no_conn + affirm + no_extract # --------------------------------------------------------------------------- # Action application # --------------------------------------------------------------------------- def stamp_content(content: str, agent: str, timestamp: str, depth: int) -> str: """Prepend provenance metadata to node content.""" stamp = (f"\n") return stamp + content def apply_action(action: dict, dry_run: bool = False, agent: str = "unknown", timestamp: str = "", depth: int = 0) -> bool: """Apply a single action to the graph. Returns True if applied.""" if dry_run: return True if action["type"] == "write_node": try: content = stamp_content(action["content"], agent, timestamp, depth) result = subprocess.run( ["poc-memory", "write", action["key"]], input=content, capture_output=True, text=True, timeout=15, ) return result.returncode == 0 except Exception: return False elif action["type"] == "link": try: result = subprocess.run( ["poc-memory", "link-add", action["source"], action["target"]], capture_output=True, text=True, timeout=10, ) if "already exists" in result.stdout: return False # not a new action return result.returncode == 0 except Exception: return False elif action["type"] == "refine": try: content = stamp_content(action["content"], agent, timestamp, depth) result = subprocess.run( ["poc-memory", "write", action["key"]], input=content, capture_output=True, text=True, timeout=15, ) return result.returncode == 0 except Exception: return False return False # --------------------------------------------------------------------------- # Graph-structural convergence metrics # --------------------------------------------------------------------------- def get_graph_metrics() -> dict: """Get current graph structural metrics.""" metrics = {} # Status: node/edge counts status = poc_memory("status") m = re.search(r'Nodes:\s*(\d+)\s+Relations:\s*(\d+)', status) if m: metrics["nodes"] = int(m.group(1)) metrics["edges"] = int(m.group(2)) m = re.search(r'Communities:\s*(\d+)', status) if m: metrics["communities"] = int(m.group(1)) # Health: CC, sigma health = poc_memory("health") m = re.search(r'Clustering coefficient.*?:\s*([\d.]+)', health) if m: metrics["cc"] = float(m.group(1)) m = re.search(r'Small-world.*?:\s*([\d.]+)', health) if m: metrics["sigma"] = float(m.group(1)) return metrics def metric_stability(history: list[dict], key: str, window: int) -> float: """Compute coefficient of variation of a metric over recent cycles. Returns CV (std/mean). Lower = more stable. 0.0 = perfectly stable, >0.1 = still changing significantly. """ if len(history) < window: return float('inf') values = [] for h in history[-window:]: metrics = h.get("graph_metrics_after", {}) if key in metrics: values.append(metrics[key]) if not values or len(values) < 2: return float('inf') mean = sum(values) / len(values) if mean == 0: return 0.0 variance = sum((v - mean) ** 2 for v in values) / len(values) return (variance ** 0.5) / abs(mean) # --------------------------------------------------------------------------- # Spectral tightening measurement # --------------------------------------------------------------------------- def measure_spectral_tightening( embedding_before: dict, embedding_after: dict, actions: list[dict], ) -> float: """Measure how much new nodes tightened their source clusters.""" if not embedding_before or not embedding_after: return 0.0 write_actions = [a for a in actions if a["type"] == "write_node" and a.get("covers")] if not write_actions: return 0.0 total_tightening = 0.0 count = 0 for action in write_actions: covers = action["covers"] if len(covers) < 2: continue dists_before = [] for i in range(len(covers)): for j in range(i + 1, len(covers)): d = spectral_distance(embedding_before, covers[i], covers[j]) if d < float('inf'): dists_before.append(d) dists_after = [] for i in range(len(covers)): for j in range(i + 1, len(covers)): d = spectral_distance(embedding_after, covers[i], covers[j]) if d < float('inf'): dists_after.append(d) if dists_before and dists_after: avg_before = sum(dists_before) / len(dists_before) avg_after = sum(dists_after) / len(dists_after) total_tightening += (avg_before - avg_after) count += 1 return total_tightening / count if count > 0 else 0.0 # --------------------------------------------------------------------------- # The loop # --------------------------------------------------------------------------- def run_cycle(cycle_num: int, batch_size: int, dry_run: bool, max_depth: int, depth_db: dict) -> dict: """Run one full cycle: observation → extractor → connector → challenger.""" timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") print(f"\n{'='*60}") print(f"CYCLE {cycle_num} — {timestamp}") print(f"{'='*60}") # Snapshot state before embedding_before = load_spectral_embedding() metrics_before = get_graph_metrics() print(f" Before: {metrics_before}") all_actions = [] all_no_ops = 0 depth_rejected = 0 agent_results = {} # Load use counts for confidence boosting use_counts = get_use_counts() used_nodes = sum(1 for v in use_counts.values() if v > 0) print(f" Nodes with use marks: {used_nodes}") # Run agents sequentially (each changes the graph for the next) for agent_name, agent_fn in [ ("observation", lambda: run_observation_extractor(batch_size)), ("extractor", lambda: run_extractor(batch_size)), ("connector", lambda: run_connector(batch_size)), ("challenger", lambda: run_challenger(batch_size)), ]: print(f"\n --- {agent_name} (n={batch_size}) ---") output = agent_fn() # Save raw output outfile = AGENT_RESULTS_DIR / f"knowledge-{agent_name}-{timestamp}.md" outfile.write_text( f"# {agent_name.title()} Agent Results — {timestamp}\n\n" f"{output}\n" ) # Parse actions actions = parse_all_actions(output) no_ops = count_no_ops(output) all_no_ops += no_ops print(f" Actions: {len(actions)} No-ops: {no_ops}") # Apply actions with depth checking applied = 0 for a in actions: depth = compute_action_depth(depth_db, a, agent_name) a["depth"] = depth kind = a["type"] if kind == "write_node": conf_val = CONFIDENCE_VALUES.get(a["confidence"], 0.5) req = required_confidence(depth) # Boost confidence based on source nodes' real-world use source_keys = a.get("covers", []) source_uses = [use_counts.get(k, 0) for k in source_keys] avg_uses = (sum(source_uses) / len(source_uses) if source_uses else 0) eff_conf = effective_confidence(conf_val, int(avg_uses)) meets = eff_conf >= req use_note = (f" use_boost={eff_conf-conf_val:+.2f}" if avg_uses > 0 else "") status = "OK" if meets else "REJECTED(depth)" print(f" WRITE {a['key']} depth={depth} " f"conf={a['confidence']}({conf_val:.2f}) " f"eff={eff_conf:.2f} req={req:.2f}" f"{use_note} {status}") if not meets: a["applied"] = False a["rejected_reason"] = "depth_threshold" depth_rejected += 1 continue if depth > max_depth: print(f" REJECTED: depth {depth} > " f"max {max_depth}") a["applied"] = False a["rejected_reason"] = "max_depth" depth_rejected += 1 continue elif kind == "link": print(f" LINK {a['source']} → {a['target']}") elif kind == "refine": target_uses = use_counts.get(a["key"], 0) use_note = (f" uses={target_uses}" if target_uses > 0 else "") print(f" REFINE {a['key']} depth={depth}" f"{use_note}") if apply_action(a, dry_run=dry_run, agent=agent_name, timestamp=timestamp, depth=depth): applied += 1 a["applied"] = True # Record depth for new nodes if kind in ("write_node", "refine"): depth_db[a["key"]] = depth else: a["applied"] = False print(f" Applied: {applied}/{len(actions)}") agent_results[agent_name] = { "actions": len(actions), "applied": applied, "no_ops": no_ops, } all_actions.extend(actions) # Save updated depth DB save_depth_db(depth_db) # Recompute spectral embedding if not dry_run and any(a.get("applied") for a in all_actions): print(f"\n Recomputing spectral embedding...") try: subprocess.run( ["poc-memory", "spectral-save"], capture_output=True, text=True, timeout=60, ) except Exception as e: print(f" Warning: spectral-save failed: {e}") # Measure spectral tightening embedding_after = load_spectral_embedding() tightening = measure_spectral_tightening( embedding_before, embedding_after, all_actions ) # Get metrics after metrics_after = get_graph_metrics() # Compute weighted delta applied_actions = [a for a in all_actions if a.get("applied")] weighted_delta = sum(a.get("weight", 0.5) for a in applied_actions) total_applied = sum(r["applied"] for r in agent_results.values()) total_actions = sum(r["actions"] for r in agent_results.values()) # Depth distribution of applied actions depth_dist = {} for a in applied_actions: d = a.get("depth", -1) depth_dist[d] = depth_dist.get(d, 0) + 1 print(f"\n CYCLE {cycle_num} SUMMARY") print(f" Total actions: {total_actions} parsed, " f"{total_applied} applied, {depth_rejected} depth-rejected") print(f" No-ops: {all_no_ops}") print(f" Weighted delta: {weighted_delta:.2f}") print(f" Spectral tightening: {tightening:+.4f}") print(f" Depth distribution: {depth_dist}") print(f" After: {metrics_after}") result = { "cycle": cycle_num, "timestamp": timestamp, "agents": agent_results, "total_actions": total_actions, "total_applied": total_applied, "total_no_ops": all_no_ops, "depth_rejected": depth_rejected, "weighted_delta": weighted_delta, "spectral_tightening": tightening, "depth_distribution": depth_dist, "graph_metrics_before": metrics_before, "graph_metrics_after": metrics_after, "dry_run": dry_run, } result_path = (AGENT_RESULTS_DIR / f"knowledge-cycle-{cycle_num}-{timestamp}.json") with open(result_path, "w") as f: json.dump(result, f, indent=2) return result def check_convergence(history: list[dict], window: int) -> bool: """Check structural convergence. The graph has converged when: 1. Sigma (small-world coeff) is stable (CV < 0.05) 2. CC (clustering coefficient) is stable (CV < 0.05) 3. Community count is stable (CV < 0.10) 4. Weighted delta is low (avg < 1.0 over window) All four must hold simultaneously. """ if len(history) < window: return False sigma_cv = metric_stability(history, "sigma", window) cc_cv = metric_stability(history, "cc", window) comm_cv = metric_stability(history, "communities", window) recent = history[-window:] avg_delta = sum(r["weighted_delta"] for r in recent) / len(recent) print(f"\n Convergence check (last {window} cycles):") print(f" sigma CV: {sigma_cv:.4f} (< 0.05?)") print(f" CC CV: {cc_cv:.4f} (< 0.05?)") print(f" community CV: {comm_cv:.4f} (< 0.10?)") print(f" avg delta: {avg_delta:.2f} (< 1.00?)") structural = (sigma_cv < 0.05 and cc_cv < 0.05 and comm_cv < 0.10) behavioral = avg_delta < 1.0 if structural and behavioral: print(f" → CONVERGED (structural + behavioral)") return True elif structural: print(f" → Structure stable, but agents still producing") elif behavioral: print(f" → Agents quiet, but structure still shifting") else: print(f" → Not converged") return False def main(): max_cycles = 20 batch_size = 5 window = 5 max_depth = 4 dry_run = False args = sys.argv[1:] i = 0 while i < len(args): if args[i] == "--max-cycles" and i + 1 < len(args): max_cycles = int(args[i + 1]); i += 2 elif args[i] == "--batch-size" and i + 1 < len(args): batch_size = int(args[i + 1]); i += 2 elif args[i] == "--window" and i + 1 < len(args): window = int(args[i + 1]); i += 2 elif args[i] == "--max-depth" and i + 1 < len(args): max_depth = int(args[i + 1]); i += 2 elif args[i] == "--dry-run": dry_run = True; i += 1 else: print(f"Unknown arg: {args[i]}"); sys.exit(1) print(f"Knowledge Loop — fixed-point iteration") print(f" max_cycles={max_cycles} batch_size={batch_size}") print(f" window={window} max_depth={max_depth}") print(f" dry_run={dry_run}") print(f"\n Depth thresholds:") for d in range(max_depth + 1): print(f" depth {d}: confidence >= {required_confidence(d):.2f}") # Load depth database depth_db = load_depth_db() print(f" Known node depths: {len(depth_db)}") # Get initial graph state status = poc_memory("status") print(f"\nInitial state: {status}") history = [] for cycle in range(1, max_cycles + 1): result = run_cycle(cycle, batch_size, dry_run, max_depth, depth_db) history.append(result) if check_convergence(history, window): print(f"\n CONVERGED after {cycle} cycles") break else: print(f"\n Reached max cycles ({max_cycles}) without " f"convergence") # Final summary print(f"\n{'='*60}") print(f"LOOP COMPLETE") print(f"{'='*60}") total_applied = sum(r["total_applied"] for r in history) total_no_ops = sum(r["total_no_ops"] for r in history) total_rejected = sum(r["depth_rejected"] for r in history) avg_tightening = ( sum(r["spectral_tightening"] for r in history) / len(history) if history else 0 ) # Aggregate depth distribution total_depths = {} for r in history: for d, c in r.get("depth_distribution", {}).items(): total_depths[d] = total_depths.get(d, 0) + c print(f" Cycles: {len(history)}") print(f" Total actions applied: {total_applied}") print(f" Total depth-rejected: {total_rejected}") print(f" Total no-ops: {total_no_ops}") print(f" Avg spectral tightening: {avg_tightening:+.4f}") print(f" Depth distribution: {total_depths}") if history: first = history[0].get("graph_metrics_before", {}) last = history[-1].get("graph_metrics_after", {}) print(f" Nodes: {first.get('nodes','?')} → " f"{last.get('nodes','?')}") print(f" Edges: {first.get('edges','?')} → " f"{last.get('edges','?')}") print(f" CC: {first.get('cc','?')} → {last.get('cc','?')}") print(f" Sigma: {first.get('sigma','?')} → " f"{last.get('sigma','?')}") print(f" Communities: {first.get('communities','?')} → " f"{last.get('communities','?')}") print(f"\nFinal state: {poc_memory('status')}") # Save loop summary ts = history[0]["timestamp"] if history else "empty" summary_path = AGENT_RESULTS_DIR / f"knowledge-loop-{ts}.json" with open(summary_path, "w") as f: json.dump({ "cycles": len(history), "converged": check_convergence(history, window) if len(history) >= window else False, "total_applied": total_applied, "total_rejected": total_rejected, "total_no_ops": total_no_ops, "avg_tightening": avg_tightening, "depth_distribution": total_depths, "history": history, }, f, indent=2) print(f" Summary: {summary_path}") if __name__ == "__main__": main()