consciousness/scripts/consolidation-agents.py

423 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""consolidation-agents.py — run parallel consolidation agents.
Three agents scan the memory system and produce structured reports:
1. Freshness Scanner journal entries not yet in topic files
2. Cross-Link Scanner missing connections between semantic nodes
3. Topology Reporter graph health and structure analysis
Usage:
consolidation-agents.py # run all three
consolidation-agents.py freshness # run one agent
consolidation-agents.py crosslink
consolidation-agents.py topology
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
# ---------------------------------------------------------------------------
# Context gathering
# ---------------------------------------------------------------------------
def get_recent_journal(n_lines: int = 200) -> str:
"""Get recent journal entries from the store."""
from store_helpers import get_recent_journal as _get_journal
# n_lines ≈ 50 entries (rough heuristic: ~4 lines per entry)
return _get_journal(n=max(20, n_lines // 4))
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers from the store."""
from store_helpers import get_topic_file_index as _get_index
return _get_index()
def get_mem_markers() -> list[dict]:
"""Get relations from the store (replaces mem marker parsing)."""
from store_helpers import get_relations
raw = get_relations()
# Parse list-edges output into marker-like dicts
markers = []
for line in raw.split('\n'):
line = line.strip()
if not line:
continue
markers.append({"_raw": line})
return markers
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get topic file summaries from the store."""
from store_helpers import get_topic_summaries as _get_summaries
return _get_summaries(max_chars_per_file)
def get_graph_stats() -> str:
"""Run poc-memory status and graph commands."""
parts = []
try:
r = subprocess.run(["poc-memory", "status"],
capture_output=True, text=True, timeout=30)
parts.append(f"=== poc-memory status ===\n{r.stdout}")
except Exception as e:
parts.append(f"Status error: {e}")
try:
r = subprocess.run(["poc-memory", "graph"],
capture_output=True, text=True, timeout=30)
# Take first 150 lines
lines = r.stdout.split('\n')[:150]
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
+ '\n'.join(lines))
except Exception as e:
parts.append(f"Graph error: {e}")
return '\n'.join(parts)
def get_recent_digests(n: int = 3) -> str:
"""Get the most recent daily digests."""
digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True)
parts = []
for f in digest_files[:n]:
content = f.read_text()
# Just the summary and themes sections
summary = ""
in_section = False
for line in content.split('\n'):
if line.startswith("## Summary") or line.startswith("## Themes"):
in_section = True
summary += line + '\n'
elif line.startswith("## ") and in_section:
in_section = False
elif in_section:
summary += line + '\n'
parts.append(f"\n### {f.name}\n{summary}")
return '\n'.join(parts)
def get_work_queue() -> str:
"""Read work queue."""
wq = MEMORY_DIR / "work-queue.md"
if wq.exists():
return wq.read_text()
return "(no work queue found)"
# ---------------------------------------------------------------------------
# Agent prompts
# ---------------------------------------------------------------------------
def build_freshness_prompt() -> str:
journal = get_recent_journal(200)
topic_index = get_topic_file_index()
digests = get_recent_digests(3)
work_queue = get_work_queue()
topic_list = ""
for fname, sections in topic_index.items():
topic_list += f"\n {fname}:\n"
for s in sections[:10]:
topic_list += f" {s}\n"
return f"""You are the Freshness Scanner for ProofOfConcept's memory system.
Your job: identify what's NEW (in journal/digests but not yet in topic files)
and what's STALE (in work queue or topic files but outdated).
## Recent journal entries (last 200 lines)
{journal}
## Recent daily digests
{digests}
## Topic file index (file → section headers)
{topic_list}
## Work queue
{work_queue}
## Instructions
1. For each substantive insight, experience, or discovery in the journal:
- Check if a matching topic file section exists
- If not, note it as UNPROMOTED with a suggested destination file
2. For each work queue Active item:
- If it looks done or stale (>7 days old, mentioned as completed), flag it
3. For recent digest themes:
- Check if the cross-links they suggest actually exist in the topic index
- Flag any that are missing
Output a structured report:
### UNPROMOTED JOURNAL ENTRIES
(For each: journal entry summary, timestamp, suggested destination file#section)
### STALE WORK QUEUE ITEMS
(For each: item text, evidence it's stale)
### MISSING DIGEST LINKS
(For each: suggested link from digest, whether the target exists)
### FRESHNESS OBSERVATIONS
(Anything else notable about the state of the memory)
Be selective. Focus on the 10-15 most important items, not exhaustive lists.
"""
def build_crosslink_prompt() -> str:
markers = get_mem_markers()
summaries = get_topic_summaries()
marker_text = ""
for m in markers:
marker_text += f" {m.get('_raw', '?')}\n"
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
Your job: find MISSING connections between topic files.
## Existing relations (from the memory graph)
{marker_text}
## Topic file content summaries
{summaries}
## Instructions
1. For each topic file, check if concepts it discusses have dedicated
sections in OTHER files that aren't linked.
2. Look for thematic connections that should exist:
- Files about the same concept from different angles
- Files that reference each other's content without formal links
- Clusters of related files that should be connected
3. Identify island nodes files or sections with very few connections.
4. Look for redundancy files covering the same ground that should be
merged or cross-referenced.
Output a structured report:
### MISSING LINKS (high confidence)
(For each: source file#section → target file#section, evidence/reasoning)
### SUGGESTED CONNECTIONS (medium confidence)
(For each: file A file B, why they should be connected)
### ISLAND NODES
(Files/sections with few or no connections that need integration)
### REDUNDANCY CANDIDATES
(Files/sections covering similar ground that might benefit from merging)
Focus on the 15-20 highest-value connections. Quality over quantity.
"""
def build_topology_prompt() -> str:
stats = get_graph_stats()
topic_index = get_topic_file_index()
# Get node counts per file from the store
from store_helpers import get_topic_file_index as _get_index
topic_index = _get_index()
file_sizes = ""
for fname in sorted(topic_index.keys()):
n_sections = len(topic_index[fname])
file_sizes += f" {fname}: {n_sections} sections\n"
return f"""You are the Topology Reporter for ProofOfConcept's memory system.
Your job: analyze the health and structure of the memory graph.
## Graph statistics
{stats}
## File sizes
{file_sizes}
## Instructions
Analyze the graph structure and report on:
1. **Overall health**: Is the graph well-connected or fragmented?
Hub dominance? Star vs web topology?
2. **Community structure**: Are the 342 communities sensible? Are there
communities that should be merged or split?
3. **Size distribution**: Are some files too large (should be split)?
Are some too small (should be merged)?
4. **Balance**: Is the system over-indexed on any one topic? Are there
gaps where important topics have thin coverage?
5. **Integration quality**: How well are episodic entries (daily/weekly
digests) connected to semantic files? Is the episodicsemantic bridge
working?
Output a structured report:
### GRAPH HEALTH
(Overall statistics, distribution, trends)
### STRUCTURAL OBSERVATIONS
(Hub nodes, clusters, gaps, web vs star assessment)
### SIZE RECOMMENDATIONS
(Files that are too large to split, too small to merge)
### COVERAGE GAPS
(Important topics with thin coverage)
### INTEGRATION ASSESSMENT
(How well episodic and semantic layers connect)
Be specific and actionable. What should be done to improve the graph?
"""
# ---------------------------------------------------------------------------
# Run agents
# ---------------------------------------------------------------------------
def run_agent(name: str, prompt: str) -> tuple[str, str]:
"""Run a single agent, return (name, report)."""
print(f" [{name}] Starting... ({len(prompt):,} chars)")
report = call_sonnet(prompt)
print(f" [{name}] Done ({len(report):,} chars)")
return name, report
def run_all(agents: list[str] | None = None):
"""Run specified agents (or all) in parallel."""
all_agents = {
"freshness": build_freshness_prompt,
"crosslink": build_crosslink_prompt,
"topology": build_topology_prompt,
}
if agents is None:
agents = list(all_agents.keys())
print(f"Running {len(agents)} consolidation agents...")
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
# Build prompts
prompts = {}
for name in agents:
if name not in all_agents:
print(f" Unknown agent: {name}")
continue
prompts[name] = all_agents[name]()
# Run in parallel
results = {}
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(run_agent, name, prompt): name
for name, prompt in prompts.items()
}
for future in as_completed(futures):
name, report = future.result()
results[name] = report
# Save reports
for name, report in results.items():
if report.startswith("Error:"):
print(f" [{name}] FAILED: {report}")
continue
out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md"
with open(out_path, "w") as f:
f.write(f"# Consolidation Report: {name}\n")
f.write(f"*Generated {timestamp}*\n\n")
f.write(report)
print(f" [{name}] Saved: {out_path}")
# Print combined summary
print(f"\n{'='*60}")
print(f"Consolidation reports ready ({len(results)} agents)")
print(f"{'='*60}\n")
for name in agents:
if name in results and not results[name].startswith("Error:"):
# Print first 20 lines of each report
lines = results[name].split('\n')[:25]
print(f"\n--- {name.upper()} (preview) ---")
print('\n'.join(lines))
if len(results[name].split('\n')) > 25:
print(f" ... ({len(results[name].split(chr(10)))} total lines)")
print()
return results
def main():
agents = None
if len(sys.argv) > 1:
agents = sys.argv[1:]
run_all(agents)
if __name__ == "__main__":
main()