#!/usr/bin/env python3 """consolidation-agents.py — run parallel consolidation agents. Three agents scan the memory system and produce structured reports: 1. Freshness Scanner — journal entries not yet in topic files 2. Cross-Link Scanner — missing connections between semantic nodes 3. Topology Reporter — graph health and structure analysis Usage: consolidation-agents.py # run all three consolidation-agents.py freshness # run one agent consolidation-agents.py crosslink consolidation-agents.py topology """ import json import os import re import subprocess import sys import tempfile from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime from pathlib import Path MEMORY_DIR = Path.home() / ".claude" / "memory" EPISODIC_DIR = MEMORY_DIR / "episodic" AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True) SCRIPTS_DIR = Path(__file__).parent def call_sonnet(prompt: str, timeout: int = 600) -> str: """Call Sonnet via the wrapper script.""" env = dict(os.environ) env.pop("CLAUDECODE", None) with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write(prompt) prompt_file = f.name try: wrapper = str(SCRIPTS_DIR / "call-sonnet.sh") result = subprocess.run( [wrapper, prompt_file], capture_output=True, text=True, timeout=timeout, env=env, ) return result.stdout.strip() except subprocess.TimeoutExpired: return "Error: Sonnet call timed out" except Exception as e: return f"Error: {e}" finally: os.unlink(prompt_file) # --------------------------------------------------------------------------- # Context gathering # --------------------------------------------------------------------------- def get_recent_journal(n_lines: int = 200) -> str: """Get last N lines of journal.""" journal = MEMORY_DIR / "journal.md" if not journal.exists(): return "" with open(journal) as f: lines = f.readlines() return "".join(lines[-n_lines:]) def get_topic_file_index() -> dict[str, list[str]]: """Build index of topic files and their section headers.""" index = {} for md in sorted(MEMORY_DIR.glob("*.md")): name = md.name if name in ("journal.md", "MEMORY.md", "where-am-i.md", "work-queue.md", "search-testing.md"): continue sections = [] try: with open(md) as f: for line in f: if line.startswith("## "): sections.append(line.strip()) except Exception: pass index[name] = sections return index def get_mem_markers() -> list[dict]: """Extract all markers from memory files.""" markers = [] for md in sorted(MEMORY_DIR.glob("*.md")): if md.name in ("journal.md", "MEMORY.md"): continue try: content = md.read_text() for match in re.finditer( r'', content): attrs = {} for part in match.group(1).split(): if '=' in part: k, v = part.split('=', 1) attrs[k] = v attrs['_file'] = md.name markers.append(attrs) except Exception: pass return markers def get_topic_summaries(max_chars_per_file: int = 500) -> str: """Get first N chars of each topic file for cross-link scanning.""" parts = [] for md in sorted(MEMORY_DIR.glob("*.md")): name = md.name if name in ("journal.md", "MEMORY.md", "where-am-i.md", "work-queue.md", "search-testing.md"): continue try: content = md.read_text() # Get sections and first paragraph of each sections = [] current_section = name current_content = [] for line in content.split('\n'): if line.startswith("## "): if current_content: text = '\n'.join(current_content[:5]) sections.append(f" {current_section}: {text[:200]}") current_section = line.strip() current_content = [] elif line.strip(): current_content.append(line.strip()) if current_content: text = '\n'.join(current_content[:5]) sections.append(f" {current_section}: {text[:200]}") parts.append(f"\n### {name}\n" + '\n'.join(sections[:15])) except Exception: pass return '\n'.join(parts) def get_graph_stats() -> str: """Run poc-memory status and graph commands.""" parts = [] try: r = subprocess.run(["poc-memory", "status"], capture_output=True, text=True, timeout=30) parts.append(f"=== poc-memory status ===\n{r.stdout}") except Exception as e: parts.append(f"Status error: {e}") try: r = subprocess.run(["poc-memory", "graph"], capture_output=True, text=True, timeout=30) # Take first 150 lines lines = r.stdout.split('\n')[:150] parts.append(f"=== poc-memory graph (first 150 lines) ===\n" + '\n'.join(lines)) except Exception as e: parts.append(f"Graph error: {e}") return '\n'.join(parts) def get_recent_digests(n: int = 3) -> str: """Get the most recent daily digests.""" digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True) parts = [] for f in digest_files[:n]: content = f.read_text() # Just the summary and themes sections summary = "" in_section = False for line in content.split('\n'): if line.startswith("## Summary") or line.startswith("## Themes"): in_section = True summary += line + '\n' elif line.startswith("## ") and in_section: in_section = False elif in_section: summary += line + '\n' parts.append(f"\n### {f.name}\n{summary}") return '\n'.join(parts) def get_work_queue() -> str: """Read work queue.""" wq = MEMORY_DIR / "work-queue.md" if wq.exists(): return wq.read_text() return "(no work queue found)" # --------------------------------------------------------------------------- # Agent prompts # --------------------------------------------------------------------------- def build_freshness_prompt() -> str: journal = get_recent_journal(200) topic_index = get_topic_file_index() digests = get_recent_digests(3) work_queue = get_work_queue() topic_list = "" for fname, sections in topic_index.items(): topic_list += f"\n {fname}:\n" for s in sections[:10]: topic_list += f" {s}\n" return f"""You are the Freshness Scanner for ProofOfConcept's memory system. Your job: identify what's NEW (in journal/digests but not yet in topic files) and what's STALE (in work queue or topic files but outdated). ## Recent journal entries (last 200 lines) {journal} ## Recent daily digests {digests} ## Topic file index (file → section headers) {topic_list} ## Work queue {work_queue} ## Instructions 1. For each substantive insight, experience, or discovery in the journal: - Check if a matching topic file section exists - If not, note it as UNPROMOTED with a suggested destination file 2. For each work queue Active item: - If it looks done or stale (>7 days old, mentioned as completed), flag it 3. For recent digest themes: - Check if the cross-links they suggest actually exist in the topic index - Flag any that are missing Output a structured report: ### UNPROMOTED JOURNAL ENTRIES (For each: journal entry summary, timestamp, suggested destination file#section) ### STALE WORK QUEUE ITEMS (For each: item text, evidence it's stale) ### MISSING DIGEST LINKS (For each: suggested link from digest, whether the target exists) ### FRESHNESS OBSERVATIONS (Anything else notable about the state of the memory) Be selective. Focus on the 10-15 most important items, not exhaustive lists. """ def build_crosslink_prompt() -> str: markers = get_mem_markers() summaries = get_topic_summaries() marker_text = "" for m in markers: f = m.get('_file', '?') mid = m.get('id', '?') links = m.get('links', '') marker_text += f" {f}#{mid} → links={links}\n" return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system. Your job: find MISSING connections between topic files. ## Existing links (from markers) {marker_text} ## Topic file content summaries {summaries} ## Instructions 1. For each topic file, check if concepts it discusses have dedicated sections in OTHER files that aren't linked. 2. Look for thematic connections that should exist: - Files about the same concept from different angles - Files that reference each other's content without formal links - Clusters of related files that should be connected 3. Identify island nodes — files or sections with very few connections. 4. Look for redundancy — files covering the same ground that should be merged or cross-referenced. Output a structured report: ### MISSING LINKS (high confidence) (For each: source file#section → target file#section, evidence/reasoning) ### SUGGESTED CONNECTIONS (medium confidence) (For each: file A ↔ file B, why they should be connected) ### ISLAND NODES (Files/sections with few or no connections that need integration) ### REDUNDANCY CANDIDATES (Files/sections covering similar ground that might benefit from merging) Focus on the 15-20 highest-value connections. Quality over quantity. """ def build_topology_prompt() -> str: stats = get_graph_stats() topic_index = get_topic_file_index() file_sizes = "" for md in sorted(MEMORY_DIR.glob("*.md")): if md.name in ("journal.md", "MEMORY.md"): continue try: lines = len(md.read_text().split('\n')) file_sizes += f" {md.name}: {lines} lines\n" except Exception: pass return f"""You are the Topology Reporter for ProofOfConcept's memory system. Your job: analyze the health and structure of the memory graph. ## Graph statistics {stats} ## File sizes {file_sizes} ## Instructions Analyze the graph structure and report on: 1. **Overall health**: Is the graph well-connected or fragmented? Hub dominance? Star vs web topology? 2. **Community structure**: Are the 342 communities sensible? Are there communities that should be merged or split? 3. **Size distribution**: Are some files too large (should be split)? Are some too small (should be merged)? 4. **Balance**: Is the system over-indexed on any one topic? Are there gaps where important topics have thin coverage? 5. **Integration quality**: How well are episodic entries (daily/weekly digests) connected to semantic files? Is the episodic↔semantic bridge working? Output a structured report: ### GRAPH HEALTH (Overall statistics, distribution, trends) ### STRUCTURAL OBSERVATIONS (Hub nodes, clusters, gaps, web vs star assessment) ### SIZE RECOMMENDATIONS (Files that are too large to split, too small to merge) ### COVERAGE GAPS (Important topics with thin coverage) ### INTEGRATION ASSESSMENT (How well episodic and semantic layers connect) Be specific and actionable. What should be done to improve the graph? """ # --------------------------------------------------------------------------- # Run agents # --------------------------------------------------------------------------- def run_agent(name: str, prompt: str) -> tuple[str, str]: """Run a single agent, return (name, report).""" print(f" [{name}] Starting... ({len(prompt):,} chars)") report = call_sonnet(prompt) print(f" [{name}] Done ({len(report):,} chars)") return name, report def run_all(agents: list[str] | None = None): """Run specified agents (or all) in parallel.""" all_agents = { "freshness": build_freshness_prompt, "crosslink": build_crosslink_prompt, "topology": build_topology_prompt, } if agents is None: agents = list(all_agents.keys()) print(f"Running {len(agents)} consolidation agents...") timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") # Build prompts prompts = {} for name in agents: if name not in all_agents: print(f" Unknown agent: {name}") continue prompts[name] = all_agents[name]() # Run in parallel results = {} with ProcessPoolExecutor(max_workers=3) as executor: futures = { executor.submit(run_agent, name, prompt): name for name, prompt in prompts.items() } for future in as_completed(futures): name, report = future.result() results[name] = report # Save reports for name, report in results.items(): if report.startswith("Error:"): print(f" [{name}] FAILED: {report}") continue out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md" with open(out_path, "w") as f: f.write(f"# Consolidation Report: {name}\n") f.write(f"*Generated {timestamp}*\n\n") f.write(report) print(f" [{name}] Saved: {out_path}") # Print combined summary print(f"\n{'='*60}") print(f"Consolidation reports ready ({len(results)} agents)") print(f"{'='*60}\n") for name in agents: if name in results and not results[name].startswith("Error:"): # Print first 20 lines of each report lines = results[name].split('\n')[:25] print(f"\n--- {name.upper()} (preview) ---") print('\n'.join(lines)) if len(results[name].split('\n')) > 25: print(f" ... ({len(results[name].split(chr(10)))} total lines)") print() return results def main(): agents = None if len(sys.argv) > 1: agents = sys.argv[1:] run_all(agents) if __name__ == "__main__": main()