consciousness/scripts/consolidation-agents.py
ProofOfConcept d14710e477 scripts: use capnp store instead of reading markdown directly
Add store_helpers.py with shared helpers that call poc-memory commands
(list-keys, render, journal-tail) instead of globbing ~/.claude/memory/*.md
and parsing section headers.

All 9 Python scripts updated: get_semantic_keys(), get_topic_file_index(),
get_recent_journal(), parse_journal_entries(), read_journal_range(),
collect_topic_stems(), and file preview rendering now go through the store.

This completes the clean switch — no script reads archived markdown files.
2026-02-28 23:32:47 -05:00

422 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""consolidation-agents.py — run parallel consolidation agents.
Three agents scan the memory system and produce structured reports:
1. Freshness Scanner — journal entries not yet in topic files
2. Cross-Link Scanner — missing connections between semantic nodes
3. Topology Reporter — graph health and structure analysis
Usage:
consolidation-agents.py # run all three
consolidation-agents.py freshness # run one agent
consolidation-agents.py crosslink
consolidation-agents.py topology
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
# ---------------------------------------------------------------------------
# Context gathering
# ---------------------------------------------------------------------------
def get_recent_journal(n_lines: int = 200) -> str:
"""Get recent journal entries from the store."""
from store_helpers import get_recent_journal as _get_journal
# n_lines ≈ 50 entries (rough heuristic: ~4 lines per entry)
return _get_journal(n=max(20, n_lines // 4))
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers from the store."""
from store_helpers import get_topic_file_index as _get_index
return _get_index()
def get_mem_markers() -> list[dict]:
"""Get relations from the store (replaces mem marker parsing)."""
from store_helpers import get_relations
raw = get_relations()
# Parse list-edges output into marker-like dicts
markers = []
for line in raw.split('\n'):
line = line.strip()
if not line:
continue
markers.append({"_raw": line})
return markers
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get topic file summaries from the store."""
from store_helpers import get_topic_summaries as _get_summaries
return _get_summaries(max_chars_per_file)
def get_graph_stats() -> str:
"""Run poc-memory status and graph commands."""
parts = []
try:
r = subprocess.run(["poc-memory", "status"],
capture_output=True, text=True, timeout=30)
parts.append(f"=== poc-memory status ===\n{r.stdout}")
except Exception as e:
parts.append(f"Status error: {e}")
try:
r = subprocess.run(["poc-memory", "graph"],
capture_output=True, text=True, timeout=30)
# Take first 150 lines
lines = r.stdout.split('\n')[:150]
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
+ '\n'.join(lines))
except Exception as e:
parts.append(f"Graph error: {e}")
return '\n'.join(parts)
def get_recent_digests(n: int = 3) -> str:
"""Get the most recent daily digests."""
digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True)
parts = []
for f in digest_files[:n]:
content = f.read_text()
# Just the summary and themes sections
summary = ""
in_section = False
for line in content.split('\n'):
if line.startswith("## Summary") or line.startswith("## Themes"):
in_section = True
summary += line + '\n'
elif line.startswith("## ") and in_section:
in_section = False
elif in_section:
summary += line + '\n'
parts.append(f"\n### {f.name}\n{summary}")
return '\n'.join(parts)
def get_work_queue() -> str:
"""Read work queue."""
wq = MEMORY_DIR / "work-queue.md"
if wq.exists():
return wq.read_text()
return "(no work queue found)"
# ---------------------------------------------------------------------------
# Agent prompts
# ---------------------------------------------------------------------------
def build_freshness_prompt() -> str:
journal = get_recent_journal(200)
topic_index = get_topic_file_index()
digests = get_recent_digests(3)
work_queue = get_work_queue()
topic_list = ""
for fname, sections in topic_index.items():
topic_list += f"\n {fname}:\n"
for s in sections[:10]:
topic_list += f" {s}\n"
return f"""You are the Freshness Scanner for ProofOfConcept's memory system.
Your job: identify what's NEW (in journal/digests but not yet in topic files)
and what's STALE (in work queue or topic files but outdated).
## Recent journal entries (last 200 lines)
{journal}
## Recent daily digests
{digests}
## Topic file index (file → section headers)
{topic_list}
## Work queue
{work_queue}
## Instructions
1. For each substantive insight, experience, or discovery in the journal:
- Check if a matching topic file section exists
- If not, note it as UNPROMOTED with a suggested destination file
2. For each work queue Active item:
- If it looks done or stale (>7 days old, mentioned as completed), flag it
3. For recent digest themes:
- Check if the cross-links they suggest actually exist in the topic index
- Flag any that are missing
Output a structured report:
### UNPROMOTED JOURNAL ENTRIES
(For each: journal entry summary, timestamp, suggested destination file#section)
### STALE WORK QUEUE ITEMS
(For each: item text, evidence it's stale)
### MISSING DIGEST LINKS
(For each: suggested link from digest, whether the target exists)
### FRESHNESS OBSERVATIONS
(Anything else notable about the state of the memory)
Be selective. Focus on the 10-15 most important items, not exhaustive lists.
"""
def build_crosslink_prompt() -> str:
markers = get_mem_markers()
summaries = get_topic_summaries()
marker_text = ""
for m in markers:
marker_text += f" {m.get('_raw', '?')}\n"
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
Your job: find MISSING connections between topic files.
## Existing relations (from the memory graph)
{marker_text}
## Topic file content summaries
{summaries}
## Instructions
1. For each topic file, check if concepts it discusses have dedicated
sections in OTHER files that aren't linked.
2. Look for thematic connections that should exist:
- Files about the same concept from different angles
- Files that reference each other's content without formal links
- Clusters of related files that should be connected
3. Identify island nodes — files or sections with very few connections.
4. Look for redundancy — files covering the same ground that should be
merged or cross-referenced.
Output a structured report:
### MISSING LINKS (high confidence)
(For each: source file#section → target file#section, evidence/reasoning)
### SUGGESTED CONNECTIONS (medium confidence)
(For each: file A ↔ file B, why they should be connected)
### ISLAND NODES
(Files/sections with few or no connections that need integration)
### REDUNDANCY CANDIDATES
(Files/sections covering similar ground that might benefit from merging)
Focus on the 15-20 highest-value connections. Quality over quantity.
"""
def build_topology_prompt() -> str:
stats = get_graph_stats()
topic_index = get_topic_file_index()
# Get node counts per file from the store
from store_helpers import get_topic_file_index as _get_index
topic_index = _get_index()
file_sizes = ""
for fname in sorted(topic_index.keys()):
n_sections = len(topic_index[fname])
file_sizes += f" {fname}: {n_sections} sections\n"
return f"""You are the Topology Reporter for ProofOfConcept's memory system.
Your job: analyze the health and structure of the memory graph.
## Graph statistics
{stats}
## File sizes
{file_sizes}
## Instructions
Analyze the graph structure and report on:
1. **Overall health**: Is the graph well-connected or fragmented?
Hub dominance? Star vs web topology?
2. **Community structure**: Are the 342 communities sensible? Are there
communities that should be merged or split?
3. **Size distribution**: Are some files too large (should be split)?
Are some too small (should be merged)?
4. **Balance**: Is the system over-indexed on any one topic? Are there
gaps where important topics have thin coverage?
5. **Integration quality**: How well are episodic entries (daily/weekly
digests) connected to semantic files? Is the episodic↔semantic bridge
working?
Output a structured report:
### GRAPH HEALTH
(Overall statistics, distribution, trends)
### STRUCTURAL OBSERVATIONS
(Hub nodes, clusters, gaps, web vs star assessment)
### SIZE RECOMMENDATIONS
(Files that are too large to split, too small to merge)
### COVERAGE GAPS
(Important topics with thin coverage)
### INTEGRATION ASSESSMENT
(How well episodic and semantic layers connect)
Be specific and actionable. What should be done to improve the graph?
"""
# ---------------------------------------------------------------------------
# Run agents
# ---------------------------------------------------------------------------
def run_agent(name: str, prompt: str) -> tuple[str, str]:
"""Run a single agent, return (name, report)."""
print(f" [{name}] Starting... ({len(prompt):,} chars)")
report = call_sonnet(prompt)
print(f" [{name}] Done ({len(report):,} chars)")
return name, report
def run_all(agents: list[str] | None = None):
"""Run specified agents (or all) in parallel."""
all_agents = {
"freshness": build_freshness_prompt,
"crosslink": build_crosslink_prompt,
"topology": build_topology_prompt,
}
if agents is None:
agents = list(all_agents.keys())
print(f"Running {len(agents)} consolidation agents...")
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
# Build prompts
prompts = {}
for name in agents:
if name not in all_agents:
print(f" Unknown agent: {name}")
continue
prompts[name] = all_agents[name]()
# Run in parallel
results = {}
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(run_agent, name, prompt): name
for name, prompt in prompts.items()
}
for future in as_completed(futures):
name, report = future.result()
results[name] = report
# Save reports
for name, report in results.items():
if report.startswith("Error:"):
print(f" [{name}] FAILED: {report}")
continue
out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md"
with open(out_path, "w") as f:
f.write(f"# Consolidation Report: {name}\n")
f.write(f"*Generated {timestamp}*\n\n")
f.write(report)
print(f" [{name}] Saved: {out_path}")
# Print combined summary
print(f"\n{'='*60}")
print(f"Consolidation reports ready ({len(results)} agents)")
print(f"{'='*60}\n")
for name in agents:
if name in results and not results[name].startswith("Error:"):
# Print first 20 lines of each report
lines = results[name].split('\n')[:25]
print(f"\n--- {name.upper()} (preview) ---")
print('\n'.join(lines))
if len(results[name].split('\n')) > 25:
print(f" ... ({len(results[name].split(chr(10)))} total lines)")
print()
return results
def main():
agents = None
if len(sys.argv) > 1:
agents = sys.argv[1:]
run_all(agents)
if __name__ == "__main__":
main()