consciousness/scripts/consolidation-agents.py
ProofOfConcept 23fac4e5fe poc-memory v0.4.0: graph-structured memory with consolidation pipeline
Rust core:
- Cap'n Proto append-only storage (nodes + relations)
- Graph algorithms: clustering coefficient, community detection,
  schema fit, small-world metrics, interference detection
- BM25 text similarity with Porter stemming
- Spaced repetition replay queue
- Commands: search, init, health, status, graph, categorize,
  link-add, link-impact, decay, consolidate-session, etc.

Python scripts:
- Episodic digest pipeline: daily/weekly/monthly-digest.py
- retroactive-digest.py for backfilling
- consolidation-agents.py: 3 parallel Sonnet agents
- apply-consolidation.py: structured action extraction + apply
- digest-link-parser.py: extract ~400 explicit links from digests
- content-promotion-agent.py: promote episodic obs to semantic files
- bulk-categorize.py: categorize all nodes via single Sonnet call
- consolidation-loop.py: multi-round automated consolidation

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-02-28 22:17:00 -05:00

479 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
"""consolidation-agents.py — run parallel consolidation agents.
Three agents scan the memory system and produce structured reports:
1. Freshness Scanner — journal entries not yet in topic files
2. Cross-Link Scanner — missing connections between semantic nodes
3. Topology Reporter — graph health and structure analysis
Usage:
consolidation-agents.py # run all three
consolidation-agents.py freshness # run one agent
consolidation-agents.py crosslink
consolidation-agents.py topology
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
# ---------------------------------------------------------------------------
# Context gathering
# ---------------------------------------------------------------------------
def get_recent_journal(n_lines: int = 200) -> str:
"""Get last N lines of journal."""
journal = MEMORY_DIR / "journal.md"
if not journal.exists():
return ""
with open(journal) as f:
lines = f.readlines()
return "".join(lines[-n_lines:])
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers."""
index = {}
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
sections = []
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
sections.append(line.strip())
except Exception:
pass
index[name] = sections
return index
def get_mem_markers() -> list[dict]:
"""Extract all <!-- mem: --> markers from memory files."""
markers = []
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
continue
try:
content = md.read_text()
for match in re.finditer(
r'<!-- mem: (.*?) -->', content):
attrs = {}
for part in match.group(1).split():
if '=' in part:
k, v = part.split('=', 1)
attrs[k] = v
attrs['_file'] = md.name
markers.append(attrs)
except Exception:
pass
return markers
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get first N chars of each topic file for cross-link scanning."""
parts = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
try:
content = md.read_text()
# Get sections and first paragraph of each
sections = []
current_section = name
current_content = []
for line in content.split('\n'):
if line.startswith("## "):
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
current_section = line.strip()
current_content = []
elif line.strip():
current_content.append(line.strip())
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
parts.append(f"\n### {name}\n" + '\n'.join(sections[:15]))
except Exception:
pass
return '\n'.join(parts)
def get_graph_stats() -> str:
"""Run poc-memory status and graph commands."""
parts = []
try:
r = subprocess.run(["poc-memory", "status"],
capture_output=True, text=True, timeout=30)
parts.append(f"=== poc-memory status ===\n{r.stdout}")
except Exception as e:
parts.append(f"Status error: {e}")
try:
r = subprocess.run(["poc-memory", "graph"],
capture_output=True, text=True, timeout=30)
# Take first 150 lines
lines = r.stdout.split('\n')[:150]
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
+ '\n'.join(lines))
except Exception as e:
parts.append(f"Graph error: {e}")
return '\n'.join(parts)
def get_recent_digests(n: int = 3) -> str:
"""Get the most recent daily digests."""
digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True)
parts = []
for f in digest_files[:n]:
content = f.read_text()
# Just the summary and themes sections
summary = ""
in_section = False
for line in content.split('\n'):
if line.startswith("## Summary") or line.startswith("## Themes"):
in_section = True
summary += line + '\n'
elif line.startswith("## ") and in_section:
in_section = False
elif in_section:
summary += line + '\n'
parts.append(f"\n### {f.name}\n{summary}")
return '\n'.join(parts)
def get_work_queue() -> str:
"""Read work queue."""
wq = MEMORY_DIR / "work-queue.md"
if wq.exists():
return wq.read_text()
return "(no work queue found)"
# ---------------------------------------------------------------------------
# Agent prompts
# ---------------------------------------------------------------------------
def build_freshness_prompt() -> str:
journal = get_recent_journal(200)
topic_index = get_topic_file_index()
digests = get_recent_digests(3)
work_queue = get_work_queue()
topic_list = ""
for fname, sections in topic_index.items():
topic_list += f"\n {fname}:\n"
for s in sections[:10]:
topic_list += f" {s}\n"
return f"""You are the Freshness Scanner for ProofOfConcept's memory system.
Your job: identify what's NEW (in journal/digests but not yet in topic files)
and what's STALE (in work queue or topic files but outdated).
## Recent journal entries (last 200 lines)
{journal}
## Recent daily digests
{digests}
## Topic file index (file → section headers)
{topic_list}
## Work queue
{work_queue}
## Instructions
1. For each substantive insight, experience, or discovery in the journal:
- Check if a matching topic file section exists
- If not, note it as UNPROMOTED with a suggested destination file
2. For each work queue Active item:
- If it looks done or stale (>7 days old, mentioned as completed), flag it
3. For recent digest themes:
- Check if the cross-links they suggest actually exist in the topic index
- Flag any that are missing
Output a structured report:
### UNPROMOTED JOURNAL ENTRIES
(For each: journal entry summary, timestamp, suggested destination file#section)
### STALE WORK QUEUE ITEMS
(For each: item text, evidence it's stale)
### MISSING DIGEST LINKS
(For each: suggested link from digest, whether the target exists)
### FRESHNESS OBSERVATIONS
(Anything else notable about the state of the memory)
Be selective. Focus on the 10-15 most important items, not exhaustive lists.
"""
def build_crosslink_prompt() -> str:
markers = get_mem_markers()
summaries = get_topic_summaries()
marker_text = ""
for m in markers:
f = m.get('_file', '?')
mid = m.get('id', '?')
links = m.get('links', '')
marker_text += f" {f}#{mid} → links={links}\n"
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
Your job: find MISSING connections between topic files.
## Existing links (from <!-- mem: --> markers)
{marker_text}
## Topic file content summaries
{summaries}
## Instructions
1. For each topic file, check if concepts it discusses have dedicated
sections in OTHER files that aren't linked.
2. Look for thematic connections that should exist:
- Files about the same concept from different angles
- Files that reference each other's content without formal links
- Clusters of related files that should be connected
3. Identify island nodes — files or sections with very few connections.
4. Look for redundancy — files covering the same ground that should be
merged or cross-referenced.
Output a structured report:
### MISSING LINKS (high confidence)
(For each: source file#section → target file#section, evidence/reasoning)
### SUGGESTED CONNECTIONS (medium confidence)
(For each: file A ↔ file B, why they should be connected)
### ISLAND NODES
(Files/sections with few or no connections that need integration)
### REDUNDANCY CANDIDATES
(Files/sections covering similar ground that might benefit from merging)
Focus on the 15-20 highest-value connections. Quality over quantity.
"""
def build_topology_prompt() -> str:
stats = get_graph_stats()
topic_index = get_topic_file_index()
file_sizes = ""
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
continue
try:
lines = len(md.read_text().split('\n'))
file_sizes += f" {md.name}: {lines} lines\n"
except Exception:
pass
return f"""You are the Topology Reporter for ProofOfConcept's memory system.
Your job: analyze the health and structure of the memory graph.
## Graph statistics
{stats}
## File sizes
{file_sizes}
## Instructions
Analyze the graph structure and report on:
1. **Overall health**: Is the graph well-connected or fragmented?
Hub dominance? Star vs web topology?
2. **Community structure**: Are the 342 communities sensible? Are there
communities that should be merged or split?
3. **Size distribution**: Are some files too large (should be split)?
Are some too small (should be merged)?
4. **Balance**: Is the system over-indexed on any one topic? Are there
gaps where important topics have thin coverage?
5. **Integration quality**: How well are episodic entries (daily/weekly
digests) connected to semantic files? Is the episodic↔semantic bridge
working?
Output a structured report:
### GRAPH HEALTH
(Overall statistics, distribution, trends)
### STRUCTURAL OBSERVATIONS
(Hub nodes, clusters, gaps, web vs star assessment)
### SIZE RECOMMENDATIONS
(Files that are too large to split, too small to merge)
### COVERAGE GAPS
(Important topics with thin coverage)
### INTEGRATION ASSESSMENT
(How well episodic and semantic layers connect)
Be specific and actionable. What should be done to improve the graph?
"""
# ---------------------------------------------------------------------------
# Run agents
# ---------------------------------------------------------------------------
def run_agent(name: str, prompt: str) -> tuple[str, str]:
"""Run a single agent, return (name, report)."""
print(f" [{name}] Starting... ({len(prompt):,} chars)")
report = call_sonnet(prompt)
print(f" [{name}] Done ({len(report):,} chars)")
return name, report
def run_all(agents: list[str] | None = None):
"""Run specified agents (or all) in parallel."""
all_agents = {
"freshness": build_freshness_prompt,
"crosslink": build_crosslink_prompt,
"topology": build_topology_prompt,
}
if agents is None:
agents = list(all_agents.keys())
print(f"Running {len(agents)} consolidation agents...")
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
# Build prompts
prompts = {}
for name in agents:
if name not in all_agents:
print(f" Unknown agent: {name}")
continue
prompts[name] = all_agents[name]()
# Run in parallel
results = {}
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(run_agent, name, prompt): name
for name, prompt in prompts.items()
}
for future in as_completed(futures):
name, report = future.result()
results[name] = report
# Save reports
for name, report in results.items():
if report.startswith("Error:"):
print(f" [{name}] FAILED: {report}")
continue
out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md"
with open(out_path, "w") as f:
f.write(f"# Consolidation Report: {name}\n")
f.write(f"*Generated {timestamp}*\n\n")
f.write(report)
print(f" [{name}] Saved: {out_path}")
# Print combined summary
print(f"\n{'='*60}")
print(f"Consolidation reports ready ({len(results)} agents)")
print(f"{'='*60}\n")
for name in agents:
if name in results and not results[name].startswith("Error:"):
# Print first 20 lines of each report
lines = results[name].split('\n')[:25]
print(f"\n--- {name.upper()} (preview) ---")
print('\n'.join(lines))
if len(results[name].split('\n')) > 25:
print(f" ... ({len(results[name].split(chr(10)))} total lines)")
print()
return results
def main():
agents = None
if len(sys.argv) > 1:
agents = sys.argv[1:]
run_all(agents)
if __name__ == "__main__":
main()