consciousness/scripts/knowledge_loop.py

767 lines
26 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""knowledge-loop.py — fixed-point iteration over the knowledge graph.
Runs observation extractor connector challenger in sequence,
applies results, recomputes spectral embedding, measures convergence.
Convergence is structural, not behavioral:
- Graph metrics (sigma, CC, community partition) stabilize
- Inference depth is tracked; confidence threshold scales with depth
- Rolling window smooths stochastic noise
Usage:
knowledge-loop.py # run until convergence
knowledge-loop.py --max-cycles 10 # cap at 10 cycles
knowledge-loop.py --batch-size 5 # agents process 5 items each
knowledge-loop.py --window 5 # rolling average window
knowledge-loop.py --max-depth 4 # max inference chain length
knowledge-loop.py --dry-run # parse + report, don't apply
"""
import json
import math
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
DEPTH_DB = AGENT_RESULTS_DIR / "node-depths.json"
# Import the agent runners
sys.path.insert(0, str(SCRIPTS_DIR))
from knowledge_agents import (
run_observation_extractor, run_extractor, run_connector, run_challenger,
load_spectral_embedding, spectral_distance, poc_memory,
)
# ---------------------------------------------------------------------------
# Inference depth tracking
# ---------------------------------------------------------------------------
# Depth assignments by agent type:
# depth 0 = raw observations (journal, conversations)
# depth 1 = observation extractor (facts from conversations)
# depth 2 = pattern extractor (patterns across knowledge nodes)
# depth 3 = connector (cross-domain links between patterns)
# Challenger refines existing nodes — preserves their depth.
AGENT_BASE_DEPTH = {
"observation": 1,
"extractor": 2,
"connector": 3,
"challenger": None, # inherits from target
}
def load_depth_db() -> dict[str, int]:
"""Load the inference depth database."""
if DEPTH_DB.exists():
with open(DEPTH_DB) as f:
return json.load(f)
return {}
def save_depth_db(db: dict[str, int]):
"""Save the inference depth database."""
with open(DEPTH_DB, "w") as f:
json.dump(db, f, indent=2)
def get_node_depth(db: dict[str, int], key: str) -> int:
"""Get inference depth for a node. Unknown nodes assumed depth 0."""
return db.get(key, 0)
def compute_action_depth(db: dict[str, int], action: dict,
agent: str) -> int:
"""Compute the inference depth for a new action.
For write_node: max(depth of sources) + 1, or agent base depth.
For refine: same depth as the target node.
For link: no depth (links don't have depth).
"""
if action["type"] == "link":
return -1 # links don't have depth
if action["type"] == "refine":
return get_node_depth(db, action["key"])
# write_node: depth = max(source depths) + 1
covers = action.get("covers", [])
if covers:
source_depths = [get_node_depth(db, k) for k in covers]
return max(source_depths) + 1
# No source info — use agent base depth
base = AGENT_BASE_DEPTH.get(agent, 2)
return base if base is not None else 2
def required_confidence(depth: int, base: float = 0.3) -> float:
"""Confidence threshold that scales with inference depth.
required(depth) = 1 - (1 - base)^depth
depth 0: 0.00 (raw data, no threshold)
depth 1: 0.30 (observation extraction)
depth 2: 0.51 (pattern extraction)
depth 3: 0.66 (cross-domain connection)
depth 4: 0.76
depth 5: 0.83
"""
if depth <= 0:
return 0.0
return 1.0 - (1.0 - base) ** depth
def use_bonus(use_count: int) -> float:
"""Confidence bonus from real-world use.
Interior nodes that get retrieved during actual work
earn empirical validation. Each use increases effective
confidence, potentially clearing depth thresholds that
were previously blocking.
use_bonus(n) = 1 - 1/(1 + 0.15*n)
0 uses: +0.00
1 use: +0.13
3 uses: +0.31
5 uses: +0.43
10 uses: +0.60
"""
if use_count <= 0:
return 0.0
return 1.0 - 1.0 / (1.0 + 0.15 * use_count)
def get_use_counts() -> dict[str, int]:
"""Get use counts for all nodes from the store."""
try:
dump = subprocess.run(
["poc-memory", "dump-json"],
capture_output=True, text=True, timeout=30,
)
data = json.loads(dump.stdout)
counts = {}
nodes = data if isinstance(data, list) else data.get("nodes", data)
if isinstance(nodes, dict):
for key, node in nodes.items():
if isinstance(node, dict):
counts[key] = node.get("uses", 0)
elif isinstance(nodes, list):
for node in nodes:
if isinstance(node, dict):
counts[node.get("key", "")] = node.get("uses", 0)
return counts
except Exception:
return {}
def effective_confidence(base_conf: float, use_count: int) -> float:
"""Compute effective confidence = base + use_bonus, capped at 1.0."""
return min(1.0, base_conf + use_bonus(use_count))
# ---------------------------------------------------------------------------
# Action parsing — extract structured actions from agent markdown output
# ---------------------------------------------------------------------------
CONFIDENCE_WEIGHTS = {"high": 1.0, "medium": 0.6, "low": 0.3}
CONFIDENCE_VALUES = {"high": 0.9, "medium": 0.6, "low": 0.3}
def parse_write_nodes(text: str) -> list[dict]:
"""Parse WRITE_NODE blocks from agent output."""
actions = []
pattern = r'WRITE_NODE\s+(\S+)\s*\n(.*?)END_NODE'
for m in re.finditer(pattern, text, re.DOTALL):
key = m.group(1)
content = m.group(2).strip()
# Look for CONFIDENCE line
conf_match = re.search(r'CONFIDENCE:\s*(high|medium|low)', content, re.I)
confidence = conf_match.group(1).lower() if conf_match else "medium"
if conf_match:
content = content[:conf_match.start()] + content[conf_match.end():]
content = content.strip()
# Look for COVERS line
covers_match = re.search(r'COVERS:\s*(.+)', content)
covers = []
if covers_match:
covers = [c.strip() for c in covers_match.group(1).split(',')]
content = content[:covers_match.start()] + content[covers_match.end():]
content = content.strip()
actions.append({
"type": "write_node",
"key": key,
"content": content,
"confidence": confidence,
"covers": covers,
"weight": CONFIDENCE_WEIGHTS.get(confidence, 0.5),
})
return actions
def parse_links(text: str) -> list[dict]:
"""Parse LINK directives from agent output."""
actions = []
for m in re.finditer(r'^LINK\s+(\S+)\s+(\S+)', text, re.MULTILINE):
actions.append({
"type": "link",
"source": m.group(1),
"target": m.group(2),
"weight": 0.3, # links are cheap, low weight in delta
})
return actions
def parse_refines(text: str) -> list[dict]:
"""Parse REFINE blocks from agent output."""
actions = []
pattern = r'REFINE\s+(\S+)\s*\n(.*?)END_REFINE'
for m in re.finditer(pattern, text, re.DOTALL):
key = m.group(1).strip('*').strip() # strip markdown bold artifacts
actions.append({
"type": "refine",
"key": key,
"content": m.group(2).strip(),
"weight": 0.7, # refinements are meaningful
})
return actions
def parse_all_actions(text: str) -> list[dict]:
"""Parse all action types from agent output."""
actions = []
actions.extend(parse_write_nodes(text))
actions.extend(parse_links(text))
actions.extend(parse_refines(text))
return actions
def count_no_ops(text: str) -> int:
"""Count NO_CONNECTION, AFFIRM, and NO_EXTRACTION verdicts (non-actions)."""
no_conn = len(re.findall(r'\bNO_CONNECTION\b', text))
affirm = len(re.findall(r'\bAFFIRM\b', text))
no_extract = len(re.findall(r'\bNO_EXTRACTION\b', text))
return no_conn + affirm + no_extract
# ---------------------------------------------------------------------------
# Action application
# ---------------------------------------------------------------------------
def stamp_content(content: str, agent: str, timestamp: str,
depth: int) -> str:
"""Prepend provenance metadata to node content."""
stamp = (f"<!-- author: {agent} | created: {timestamp} "
f"| depth: {depth} -->\n")
return stamp + content
def apply_action(action: dict, dry_run: bool = False,
agent: str = "unknown", timestamp: str = "",
depth: int = 0) -> bool:
"""Apply a single action to the graph. Returns True if applied."""
if dry_run:
return True
if action["type"] == "write_node":
try:
content = stamp_content(action["content"], agent,
timestamp, depth)
result = subprocess.run(
["poc-memory", "write", action["key"]],
input=content,
capture_output=True, text=True, timeout=15,
)
return result.returncode == 0
except Exception:
return False
elif action["type"] == "link":
try:
result = subprocess.run(
["poc-memory", "link-add", action["source"],
action["target"]],
capture_output=True, text=True, timeout=10,
)
if "already exists" in result.stdout:
return False # not a new action
return result.returncode == 0
except Exception:
return False
elif action["type"] == "refine":
try:
content = stamp_content(action["content"], agent,
timestamp, depth)
result = subprocess.run(
["poc-memory", "write", action["key"]],
input=content,
capture_output=True, text=True, timeout=15,
)
return result.returncode == 0
except Exception:
return False
return False
# ---------------------------------------------------------------------------
# Graph-structural convergence metrics
# ---------------------------------------------------------------------------
def get_graph_metrics() -> dict:
"""Get current graph structural metrics."""
metrics = {}
# Status: node/edge counts
status = poc_memory("status")
m = re.search(r'Nodes:\s*(\d+)\s+Relations:\s*(\d+)', status)
if m:
metrics["nodes"] = int(m.group(1))
metrics["edges"] = int(m.group(2))
m = re.search(r'Communities:\s*(\d+)', status)
if m:
metrics["communities"] = int(m.group(1))
# Health: CC, sigma
health = poc_memory("health")
m = re.search(r'Clustering coefficient.*?:\s*([\d.]+)', health)
if m:
metrics["cc"] = float(m.group(1))
m = re.search(r'Small-world.*?:\s*([\d.]+)', health)
if m:
metrics["sigma"] = float(m.group(1))
return metrics
def metric_stability(history: list[dict], key: str,
window: int) -> float:
"""Compute coefficient of variation of a metric over recent cycles.
Returns CV (std/mean). Lower = more stable.
0.0 = perfectly stable, >0.1 = still changing significantly.
"""
if len(history) < window:
return float('inf')
values = []
for h in history[-window:]:
metrics = h.get("graph_metrics_after", {})
if key in metrics:
values.append(metrics[key])
if not values or len(values) < 2:
return float('inf')
mean = sum(values) / len(values)
if mean == 0:
return 0.0
variance = sum((v - mean) ** 2 for v in values) / len(values)
return (variance ** 0.5) / abs(mean)
# ---------------------------------------------------------------------------
# Spectral tightening measurement
# ---------------------------------------------------------------------------
def measure_spectral_tightening(
embedding_before: dict,
embedding_after: dict,
actions: list[dict],
) -> float:
"""Measure how much new nodes tightened their source clusters."""
if not embedding_before or not embedding_after:
return 0.0
write_actions = [a for a in actions
if a["type"] == "write_node" and a.get("covers")]
if not write_actions:
return 0.0
total_tightening = 0.0
count = 0
for action in write_actions:
covers = action["covers"]
if len(covers) < 2:
continue
dists_before = []
for i in range(len(covers)):
for j in range(i + 1, len(covers)):
d = spectral_distance(embedding_before,
covers[i], covers[j])
if d < float('inf'):
dists_before.append(d)
dists_after = []
for i in range(len(covers)):
for j in range(i + 1, len(covers)):
d = spectral_distance(embedding_after,
covers[i], covers[j])
if d < float('inf'):
dists_after.append(d)
if dists_before and dists_after:
avg_before = sum(dists_before) / len(dists_before)
avg_after = sum(dists_after) / len(dists_after)
total_tightening += (avg_before - avg_after)
count += 1
return total_tightening / count if count > 0 else 0.0
# ---------------------------------------------------------------------------
# The loop
# ---------------------------------------------------------------------------
def run_cycle(cycle_num: int, batch_size: int, dry_run: bool,
max_depth: int, depth_db: dict) -> dict:
"""Run one full cycle: observation → extractor → connector → challenger."""
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
print(f"\n{'='*60}")
print(f"CYCLE {cycle_num}{timestamp}")
print(f"{'='*60}")
# Snapshot state before
embedding_before = load_spectral_embedding()
metrics_before = get_graph_metrics()
print(f" Before: {metrics_before}")
all_actions = []
all_no_ops = 0
depth_rejected = 0
agent_results = {}
# Load use counts for confidence boosting
use_counts = get_use_counts()
used_nodes = sum(1 for v in use_counts.values() if v > 0)
print(f" Nodes with use marks: {used_nodes}")
# Run agents sequentially (each changes the graph for the next)
for agent_name, agent_fn in [
("observation", lambda: run_observation_extractor(batch_size)),
("extractor", lambda: run_extractor(batch_size)),
("connector", lambda: run_connector(batch_size)),
("challenger", lambda: run_challenger(batch_size)),
]:
print(f"\n --- {agent_name} (n={batch_size}) ---")
output = agent_fn()
# Save raw output
outfile = AGENT_RESULTS_DIR / f"knowledge-{agent_name}-{timestamp}.md"
outfile.write_text(
f"# {agent_name.title()} Agent Results — {timestamp}\n\n"
f"{output}\n"
)
# Parse actions
actions = parse_all_actions(output)
no_ops = count_no_ops(output)
all_no_ops += no_ops
print(f" Actions: {len(actions)} No-ops: {no_ops}")
# Apply actions with depth checking
applied = 0
for a in actions:
depth = compute_action_depth(depth_db, a, agent_name)
a["depth"] = depth
kind = a["type"]
if kind == "write_node":
conf_val = CONFIDENCE_VALUES.get(a["confidence"], 0.5)
req = required_confidence(depth)
# Boost confidence based on source nodes' real-world use
source_keys = a.get("covers", [])
source_uses = [use_counts.get(k, 0) for k in source_keys]
avg_uses = (sum(source_uses) / len(source_uses)
if source_uses else 0)
eff_conf = effective_confidence(conf_val, int(avg_uses))
meets = eff_conf >= req
use_note = (f" use_boost={eff_conf-conf_val:+.2f}"
if avg_uses > 0 else "")
status = "OK" if meets else "REJECTED(depth)"
print(f" WRITE {a['key']} depth={depth} "
f"conf={a['confidence']}({conf_val:.2f}) "
f"eff={eff_conf:.2f} req={req:.2f}"
f"{use_note} {status}")
if not meets:
a["applied"] = False
a["rejected_reason"] = "depth_threshold"
depth_rejected += 1
continue
if depth > max_depth:
print(f" REJECTED: depth {depth} > "
f"max {max_depth}")
a["applied"] = False
a["rejected_reason"] = "max_depth"
depth_rejected += 1
continue
elif kind == "link":
print(f" LINK {a['source']}{a['target']}")
elif kind == "refine":
target_uses = use_counts.get(a["key"], 0)
use_note = (f" uses={target_uses}"
if target_uses > 0 else "")
print(f" REFINE {a['key']} depth={depth}"
f"{use_note}")
if apply_action(a, dry_run=dry_run, agent=agent_name,
timestamp=timestamp, depth=depth):
applied += 1
a["applied"] = True
# Record depth for new nodes
if kind in ("write_node", "refine"):
depth_db[a["key"]] = depth
else:
a["applied"] = False
print(f" Applied: {applied}/{len(actions)}")
agent_results[agent_name] = {
"actions": len(actions),
"applied": applied,
"no_ops": no_ops,
}
all_actions.extend(actions)
# Save updated depth DB
save_depth_db(depth_db)
# Recompute spectral embedding
if not dry_run and any(a.get("applied") for a in all_actions):
print(f"\n Recomputing spectral embedding...")
try:
subprocess.run(
["poc-memory", "spectral-save"],
capture_output=True, text=True, timeout=60,
)
except Exception as e:
print(f" Warning: spectral-save failed: {e}")
# Measure spectral tightening
embedding_after = load_spectral_embedding()
tightening = measure_spectral_tightening(
embedding_before, embedding_after, all_actions
)
# Get metrics after
metrics_after = get_graph_metrics()
# Compute weighted delta
applied_actions = [a for a in all_actions if a.get("applied")]
weighted_delta = sum(a.get("weight", 0.5) for a in applied_actions)
total_applied = sum(r["applied"] for r in agent_results.values())
total_actions = sum(r["actions"] for r in agent_results.values())
# Depth distribution of applied actions
depth_dist = {}
for a in applied_actions:
d = a.get("depth", -1)
depth_dist[d] = depth_dist.get(d, 0) + 1
print(f"\n CYCLE {cycle_num} SUMMARY")
print(f" Total actions: {total_actions} parsed, "
f"{total_applied} applied, {depth_rejected} depth-rejected")
print(f" No-ops: {all_no_ops}")
print(f" Weighted delta: {weighted_delta:.2f}")
print(f" Spectral tightening: {tightening:+.4f}")
print(f" Depth distribution: {depth_dist}")
print(f" After: {metrics_after}")
result = {
"cycle": cycle_num,
"timestamp": timestamp,
"agents": agent_results,
"total_actions": total_actions,
"total_applied": total_applied,
"total_no_ops": all_no_ops,
"depth_rejected": depth_rejected,
"weighted_delta": weighted_delta,
"spectral_tightening": tightening,
"depth_distribution": depth_dist,
"graph_metrics_before": metrics_before,
"graph_metrics_after": metrics_after,
"dry_run": dry_run,
}
result_path = (AGENT_RESULTS_DIR /
f"knowledge-cycle-{cycle_num}-{timestamp}.json")
with open(result_path, "w") as f:
json.dump(result, f, indent=2)
return result
def check_convergence(history: list[dict], window: int) -> bool:
"""Check structural convergence.
The graph has converged when:
1. Sigma (small-world coeff) is stable (CV < 0.05)
2. CC (clustering coefficient) is stable (CV < 0.05)
3. Community count is stable (CV < 0.10)
4. Weighted delta is low (avg < 1.0 over window)
All four must hold simultaneously.
"""
if len(history) < window:
return False
sigma_cv = metric_stability(history, "sigma", window)
cc_cv = metric_stability(history, "cc", window)
comm_cv = metric_stability(history, "communities", window)
recent = history[-window:]
avg_delta = sum(r["weighted_delta"] for r in recent) / len(recent)
print(f"\n Convergence check (last {window} cycles):")
print(f" sigma CV: {sigma_cv:.4f} (< 0.05?)")
print(f" CC CV: {cc_cv:.4f} (< 0.05?)")
print(f" community CV: {comm_cv:.4f} (< 0.10?)")
print(f" avg delta: {avg_delta:.2f} (< 1.00?)")
structural = (sigma_cv < 0.05 and cc_cv < 0.05 and comm_cv < 0.10)
behavioral = avg_delta < 1.0
if structural and behavioral:
print(f" → CONVERGED (structural + behavioral)")
return True
elif structural:
print(f" → Structure stable, but agents still producing")
elif behavioral:
print(f" → Agents quiet, but structure still shifting")
else:
print(f" → Not converged")
return False
def main():
max_cycles = 20
batch_size = 5
window = 5
max_depth = 4
dry_run = False
args = sys.argv[1:]
i = 0
while i < len(args):
if args[i] == "--max-cycles" and i + 1 < len(args):
max_cycles = int(args[i + 1]); i += 2
elif args[i] == "--batch-size" and i + 1 < len(args):
batch_size = int(args[i + 1]); i += 2
elif args[i] == "--window" and i + 1 < len(args):
window = int(args[i + 1]); i += 2
elif args[i] == "--max-depth" and i + 1 < len(args):
max_depth = int(args[i + 1]); i += 2
elif args[i] == "--dry-run":
dry_run = True; i += 1
else:
print(f"Unknown arg: {args[i]}"); sys.exit(1)
print(f"Knowledge Loop — fixed-point iteration")
print(f" max_cycles={max_cycles} batch_size={batch_size}")
print(f" window={window} max_depth={max_depth}")
print(f" dry_run={dry_run}")
print(f"\n Depth thresholds:")
for d in range(max_depth + 1):
print(f" depth {d}: confidence >= {required_confidence(d):.2f}")
# Load depth database
depth_db = load_depth_db()
print(f" Known node depths: {len(depth_db)}")
# Get initial graph state
status = poc_memory("status")
print(f"\nInitial state: {status}")
history = []
for cycle in range(1, max_cycles + 1):
result = run_cycle(cycle, batch_size, dry_run, max_depth,
depth_db)
history.append(result)
if check_convergence(history, window):
print(f"\n CONVERGED after {cycle} cycles")
break
else:
print(f"\n Reached max cycles ({max_cycles}) without "
f"convergence")
# Final summary
print(f"\n{'='*60}")
print(f"LOOP COMPLETE")
print(f"{'='*60}")
total_applied = sum(r["total_applied"] for r in history)
total_no_ops = sum(r["total_no_ops"] for r in history)
total_rejected = sum(r["depth_rejected"] for r in history)
avg_tightening = (
sum(r["spectral_tightening"] for r in history) / len(history)
if history else 0
)
# Aggregate depth distribution
total_depths = {}
for r in history:
for d, c in r.get("depth_distribution", {}).items():
total_depths[d] = total_depths.get(d, 0) + c
print(f" Cycles: {len(history)}")
print(f" Total actions applied: {total_applied}")
print(f" Total depth-rejected: {total_rejected}")
print(f" Total no-ops: {total_no_ops}")
print(f" Avg spectral tightening: {avg_tightening:+.4f}")
print(f" Depth distribution: {total_depths}")
if history:
first = history[0].get("graph_metrics_before", {})
last = history[-1].get("graph_metrics_after", {})
print(f" Nodes: {first.get('nodes','?')}"
f"{last.get('nodes','?')}")
print(f" Edges: {first.get('edges','?')}"
f"{last.get('edges','?')}")
print(f" CC: {first.get('cc','?')}{last.get('cc','?')}")
print(f" Sigma: {first.get('sigma','?')}"
f"{last.get('sigma','?')}")
print(f" Communities: {first.get('communities','?')}"
f"{last.get('communities','?')}")
print(f"\nFinal state: {poc_memory('status')}")
# Save loop summary
ts = history[0]["timestamp"] if history else "empty"
summary_path = AGENT_RESULTS_DIR / f"knowledge-loop-{ts}.json"
with open(summary_path, "w") as f:
json.dump({
"cycles": len(history),
"converged": check_convergence(history, window)
if len(history) >= window else False,
"total_applied": total_applied,
"total_rejected": total_rejected,
"total_no_ops": total_no_ops,
"avg_tightening": avg_tightening,
"depth_distribution": total_depths,
"history": history,
}, f, indent=2)
print(f" Summary: {summary_path}")
if __name__ == "__main__":
main()