Add store_helpers.py with shared helpers that call poc-memory commands (list-keys, render, journal-tail) instead of globbing ~/.claude/memory/*.md and parsing section headers. All 9 Python scripts updated: get_semantic_keys(), get_topic_file_index(), get_recent_journal(), parse_journal_entries(), read_journal_range(), collect_topic_stems(), and file preview rendering now go through the store. This completes the clean switch — no script reads archived markdown files.
422 lines
12 KiB
Python
Executable file
422 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""consolidation-agents.py — run parallel consolidation agents.
|
|
|
|
Three agents scan the memory system and produce structured reports:
|
|
1. Freshness Scanner — journal entries not yet in topic files
|
|
2. Cross-Link Scanner — missing connections between semantic nodes
|
|
3. Topology Reporter — graph health and structure analysis
|
|
|
|
Usage:
|
|
consolidation-agents.py # run all three
|
|
consolidation-agents.py freshness # run one agent
|
|
consolidation-agents.py crosslink
|
|
consolidation-agents.py topology
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
MEMORY_DIR = Path.home() / ".claude" / "memory"
|
|
EPISODIC_DIR = MEMORY_DIR / "episodic"
|
|
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
|
|
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
SCRIPTS_DIR = Path(__file__).parent
|
|
|
|
|
|
def call_sonnet(prompt: str, timeout: int = 600) -> str:
|
|
"""Call Sonnet via the wrapper script."""
|
|
env = dict(os.environ)
|
|
env.pop("CLAUDECODE", None)
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
|
|
delete=False) as f:
|
|
f.write(prompt)
|
|
prompt_file = f.name
|
|
|
|
try:
|
|
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
|
|
result = subprocess.run(
|
|
[wrapper, prompt_file],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env=env,
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Sonnet call timed out"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
finally:
|
|
os.unlink(prompt_file)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Context gathering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_recent_journal(n_lines: int = 200) -> str:
|
|
"""Get recent journal entries from the store."""
|
|
from store_helpers import get_recent_journal as _get_journal
|
|
# n_lines ≈ 50 entries (rough heuristic: ~4 lines per entry)
|
|
return _get_journal(n=max(20, n_lines // 4))
|
|
|
|
|
|
def get_topic_file_index() -> dict[str, list[str]]:
|
|
"""Build index of topic files and their section headers from the store."""
|
|
from store_helpers import get_topic_file_index as _get_index
|
|
return _get_index()
|
|
|
|
|
|
def get_mem_markers() -> list[dict]:
|
|
"""Get relations from the store (replaces mem marker parsing)."""
|
|
from store_helpers import get_relations
|
|
raw = get_relations()
|
|
# Parse list-edges output into marker-like dicts
|
|
markers = []
|
|
for line in raw.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
markers.append({"_raw": line})
|
|
return markers
|
|
|
|
|
|
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
|
|
"""Get topic file summaries from the store."""
|
|
from store_helpers import get_topic_summaries as _get_summaries
|
|
return _get_summaries(max_chars_per_file)
|
|
|
|
|
|
def get_graph_stats() -> str:
|
|
"""Run poc-memory status and graph commands."""
|
|
parts = []
|
|
try:
|
|
r = subprocess.run(["poc-memory", "status"],
|
|
capture_output=True, text=True, timeout=30)
|
|
parts.append(f"=== poc-memory status ===\n{r.stdout}")
|
|
except Exception as e:
|
|
parts.append(f"Status error: {e}")
|
|
|
|
try:
|
|
r = subprocess.run(["poc-memory", "graph"],
|
|
capture_output=True, text=True, timeout=30)
|
|
# Take first 150 lines
|
|
lines = r.stdout.split('\n')[:150]
|
|
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
|
|
+ '\n'.join(lines))
|
|
except Exception as e:
|
|
parts.append(f"Graph error: {e}")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def get_recent_digests(n: int = 3) -> str:
|
|
"""Get the most recent daily digests."""
|
|
digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True)
|
|
parts = []
|
|
for f in digest_files[:n]:
|
|
content = f.read_text()
|
|
# Just the summary and themes sections
|
|
summary = ""
|
|
in_section = False
|
|
for line in content.split('\n'):
|
|
if line.startswith("## Summary") or line.startswith("## Themes"):
|
|
in_section = True
|
|
summary += line + '\n'
|
|
elif line.startswith("## ") and in_section:
|
|
in_section = False
|
|
elif in_section:
|
|
summary += line + '\n'
|
|
parts.append(f"\n### {f.name}\n{summary}")
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def get_work_queue() -> str:
|
|
"""Read work queue."""
|
|
wq = MEMORY_DIR / "work-queue.md"
|
|
if wq.exists():
|
|
return wq.read_text()
|
|
return "(no work queue found)"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent prompts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_freshness_prompt() -> str:
|
|
journal = get_recent_journal(200)
|
|
topic_index = get_topic_file_index()
|
|
digests = get_recent_digests(3)
|
|
work_queue = get_work_queue()
|
|
|
|
topic_list = ""
|
|
for fname, sections in topic_index.items():
|
|
topic_list += f"\n {fname}:\n"
|
|
for s in sections[:10]:
|
|
topic_list += f" {s}\n"
|
|
|
|
return f"""You are the Freshness Scanner for ProofOfConcept's memory system.
|
|
|
|
Your job: identify what's NEW (in journal/digests but not yet in topic files)
|
|
and what's STALE (in work queue or topic files but outdated).
|
|
|
|
## Recent journal entries (last 200 lines)
|
|
|
|
{journal}
|
|
|
|
## Recent daily digests
|
|
|
|
{digests}
|
|
|
|
## Topic file index (file → section headers)
|
|
|
|
{topic_list}
|
|
|
|
## Work queue
|
|
|
|
{work_queue}
|
|
|
|
## Instructions
|
|
|
|
1. For each substantive insight, experience, or discovery in the journal:
|
|
- Check if a matching topic file section exists
|
|
- If not, note it as UNPROMOTED with a suggested destination file
|
|
|
|
2. For each work queue Active item:
|
|
- If it looks done or stale (>7 days old, mentioned as completed), flag it
|
|
|
|
3. For recent digest themes:
|
|
- Check if the cross-links they suggest actually exist in the topic index
|
|
- Flag any that are missing
|
|
|
|
Output a structured report:
|
|
|
|
### UNPROMOTED JOURNAL ENTRIES
|
|
(For each: journal entry summary, timestamp, suggested destination file#section)
|
|
|
|
### STALE WORK QUEUE ITEMS
|
|
(For each: item text, evidence it's stale)
|
|
|
|
### MISSING DIGEST LINKS
|
|
(For each: suggested link from digest, whether the target exists)
|
|
|
|
### FRESHNESS OBSERVATIONS
|
|
(Anything else notable about the state of the memory)
|
|
|
|
Be selective. Focus on the 10-15 most important items, not exhaustive lists.
|
|
"""
|
|
|
|
|
|
def build_crosslink_prompt() -> str:
|
|
markers = get_mem_markers()
|
|
summaries = get_topic_summaries()
|
|
|
|
marker_text = ""
|
|
for m in markers:
|
|
marker_text += f" {m.get('_raw', '?')}\n"
|
|
|
|
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
|
|
|
|
Your job: find MISSING connections between topic files.
|
|
|
|
## Existing relations (from the memory graph)
|
|
|
|
{marker_text}
|
|
|
|
## Topic file content summaries
|
|
|
|
{summaries}
|
|
|
|
## Instructions
|
|
|
|
1. For each topic file, check if concepts it discusses have dedicated
|
|
sections in OTHER files that aren't linked.
|
|
|
|
2. Look for thematic connections that should exist:
|
|
- Files about the same concept from different angles
|
|
- Files that reference each other's content without formal links
|
|
- Clusters of related files that should be connected
|
|
|
|
3. Identify island nodes — files or sections with very few connections.
|
|
|
|
4. Look for redundancy — files covering the same ground that should be
|
|
merged or cross-referenced.
|
|
|
|
Output a structured report:
|
|
|
|
### MISSING LINKS (high confidence)
|
|
(For each: source file#section → target file#section, evidence/reasoning)
|
|
|
|
### SUGGESTED CONNECTIONS (medium confidence)
|
|
(For each: file A ↔ file B, why they should be connected)
|
|
|
|
### ISLAND NODES
|
|
(Files/sections with few or no connections that need integration)
|
|
|
|
### REDUNDANCY CANDIDATES
|
|
(Files/sections covering similar ground that might benefit from merging)
|
|
|
|
Focus on the 15-20 highest-value connections. Quality over quantity.
|
|
"""
|
|
|
|
|
|
def build_topology_prompt() -> str:
|
|
stats = get_graph_stats()
|
|
topic_index = get_topic_file_index()
|
|
|
|
# Get node counts per file from the store
|
|
from store_helpers import get_topic_file_index as _get_index
|
|
topic_index = _get_index()
|
|
file_sizes = ""
|
|
for fname in sorted(topic_index.keys()):
|
|
n_sections = len(topic_index[fname])
|
|
file_sizes += f" {fname}: {n_sections} sections\n"
|
|
|
|
return f"""You are the Topology Reporter for ProofOfConcept's memory system.
|
|
|
|
Your job: analyze the health and structure of the memory graph.
|
|
|
|
## Graph statistics
|
|
|
|
{stats}
|
|
|
|
## File sizes
|
|
|
|
{file_sizes}
|
|
|
|
## Instructions
|
|
|
|
Analyze the graph structure and report on:
|
|
|
|
1. **Overall health**: Is the graph well-connected or fragmented?
|
|
Hub dominance? Star vs web topology?
|
|
|
|
2. **Community structure**: Are the 342 communities sensible? Are there
|
|
communities that should be merged or split?
|
|
|
|
3. **Size distribution**: Are some files too large (should be split)?
|
|
Are some too small (should be merged)?
|
|
|
|
4. **Balance**: Is the system over-indexed on any one topic? Are there
|
|
gaps where important topics have thin coverage?
|
|
|
|
5. **Integration quality**: How well are episodic entries (daily/weekly
|
|
digests) connected to semantic files? Is the episodic↔semantic bridge
|
|
working?
|
|
|
|
Output a structured report:
|
|
|
|
### GRAPH HEALTH
|
|
(Overall statistics, distribution, trends)
|
|
|
|
### STRUCTURAL OBSERVATIONS
|
|
(Hub nodes, clusters, gaps, web vs star assessment)
|
|
|
|
### SIZE RECOMMENDATIONS
|
|
(Files that are too large to split, too small to merge)
|
|
|
|
### COVERAGE GAPS
|
|
(Important topics with thin coverage)
|
|
|
|
### INTEGRATION ASSESSMENT
|
|
(How well episodic and semantic layers connect)
|
|
|
|
Be specific and actionable. What should be done to improve the graph?
|
|
"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run agents
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_agent(name: str, prompt: str) -> tuple[str, str]:
|
|
"""Run a single agent, return (name, report)."""
|
|
print(f" [{name}] Starting... ({len(prompt):,} chars)")
|
|
report = call_sonnet(prompt)
|
|
print(f" [{name}] Done ({len(report):,} chars)")
|
|
return name, report
|
|
|
|
|
|
def run_all(agents: list[str] | None = None):
|
|
"""Run specified agents (or all) in parallel."""
|
|
all_agents = {
|
|
"freshness": build_freshness_prompt,
|
|
"crosslink": build_crosslink_prompt,
|
|
"topology": build_topology_prompt,
|
|
}
|
|
|
|
if agents is None:
|
|
agents = list(all_agents.keys())
|
|
|
|
print(f"Running {len(agents)} consolidation agents...")
|
|
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
|
|
|
|
# Build prompts
|
|
prompts = {}
|
|
for name in agents:
|
|
if name not in all_agents:
|
|
print(f" Unknown agent: {name}")
|
|
continue
|
|
prompts[name] = all_agents[name]()
|
|
|
|
# Run in parallel
|
|
results = {}
|
|
with ProcessPoolExecutor(max_workers=3) as executor:
|
|
futures = {
|
|
executor.submit(run_agent, name, prompt): name
|
|
for name, prompt in prompts.items()
|
|
}
|
|
for future in as_completed(futures):
|
|
name, report = future.result()
|
|
results[name] = report
|
|
|
|
# Save reports
|
|
for name, report in results.items():
|
|
if report.startswith("Error:"):
|
|
print(f" [{name}] FAILED: {report}")
|
|
continue
|
|
|
|
out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md"
|
|
with open(out_path, "w") as f:
|
|
f.write(f"# Consolidation Report: {name}\n")
|
|
f.write(f"*Generated {timestamp}*\n\n")
|
|
f.write(report)
|
|
print(f" [{name}] Saved: {out_path}")
|
|
|
|
# Print combined summary
|
|
print(f"\n{'='*60}")
|
|
print(f"Consolidation reports ready ({len(results)} agents)")
|
|
print(f"{'='*60}\n")
|
|
|
|
for name in agents:
|
|
if name in results and not results[name].startswith("Error:"):
|
|
# Print first 20 lines of each report
|
|
lines = results[name].split('\n')[:25]
|
|
print(f"\n--- {name.upper()} (preview) ---")
|
|
print('\n'.join(lines))
|
|
if len(results[name].split('\n')) > 25:
|
|
print(f" ... ({len(results[name].split(chr(10)))} total lines)")
|
|
print()
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
agents = None
|
|
if len(sys.argv) > 1:
|
|
agents = sys.argv[1:]
|
|
|
|
run_all(agents)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|