Rust core: - Cap'n Proto append-only storage (nodes + relations) - Graph algorithms: clustering coefficient, community detection, schema fit, small-world metrics, interference detection - BM25 text similarity with Porter stemming - Spaced repetition replay queue - Commands: search, init, health, status, graph, categorize, link-add, link-impact, decay, consolidate-session, etc. Python scripts: - Episodic digest pipeline: daily/weekly/monthly-digest.py - retroactive-digest.py for backfilling - consolidation-agents.py: 3 parallel Sonnet agents - apply-consolidation.py: structured action extraction + apply - digest-link-parser.py: extract ~400 explicit links from digests - content-promotion-agent.py: promote episodic obs to semantic files - bulk-categorize.py: categorize all nodes via single Sonnet call - consolidation-loop.py: multi-round automated consolidation Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
312 lines
9.9 KiB
Python
Executable file
312 lines
9.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""apply-consolidation.py — convert consolidation reports to actions.
|
|
|
|
Reads consolidation agent reports, sends them to Sonnet to extract
|
|
structured actions, then executes them (or shows dry-run).
|
|
|
|
Usage:
|
|
apply-consolidation.py # dry run (show what would happen)
|
|
apply-consolidation.py --apply # execute actions
|
|
apply-consolidation.py --report FILE # use specific report file
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
MEMORY_DIR = Path.home() / ".claude" / "memory"
|
|
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
|
|
SCRIPTS_DIR = Path(__file__).parent
|
|
|
|
|
|
def call_sonnet(prompt: str, timeout: int = 300) -> str:
|
|
"""Call Sonnet via the wrapper script."""
|
|
env = dict(os.environ)
|
|
env.pop("CLAUDECODE", None)
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
|
|
delete=False) as f:
|
|
f.write(prompt)
|
|
prompt_file = f.name
|
|
|
|
try:
|
|
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
|
|
result = subprocess.run(
|
|
[wrapper, prompt_file],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env=env,
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Sonnet call timed out"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
finally:
|
|
os.unlink(prompt_file)
|
|
|
|
|
|
def find_latest_reports() -> list[Path]:
|
|
"""Find the most recent set of consolidation reports."""
|
|
reports = sorted(AGENT_RESULTS_DIR.glob("consolidation-*-*.md"),
|
|
reverse=True)
|
|
if not reports:
|
|
return []
|
|
|
|
# Group by timestamp
|
|
latest_ts = reports[0].stem.split('-')[-1]
|
|
return [r for r in reports if r.stem.endswith(latest_ts)]
|
|
|
|
|
|
def build_action_prompt(reports: list[Path]) -> str:
|
|
"""Build prompt for Sonnet to extract structured actions."""
|
|
report_text = ""
|
|
for r in reports:
|
|
report_text += f"\n{'='*60}\n"
|
|
report_text += f"## Report: {r.stem}\n\n"
|
|
report_text += r.read_text()
|
|
|
|
return f"""You are converting consolidation analysis reports into structured actions.
|
|
|
|
Read the reports below and extract CONCRETE, EXECUTABLE actions.
|
|
Output ONLY a JSON array. Each action is an object with these fields:
|
|
|
|
For adding cross-links:
|
|
{{"action": "link", "source": "file.md#section", "target": "file.md#section", "reason": "brief explanation"}}
|
|
|
|
For categorizing nodes:
|
|
{{"action": "categorize", "key": "file.md#section", "category": "core|tech|obs|task", "reason": "brief"}}
|
|
|
|
For things that need manual attention (splitting files, creating new files, editing content):
|
|
{{"action": "manual", "priority": "high|medium|low", "description": "what needs to be done"}}
|
|
|
|
Rules:
|
|
- Only output actions that are safe and reversible
|
|
- Links are the primary action — focus on those
|
|
- Use exact file names and section slugs from the reports
|
|
- For categorize: core=identity/relationship, tech=bcachefs/code, obs=experience, task=work item
|
|
- For manual items: include enough detail that someone can act on them
|
|
- Output 20-40 actions, prioritized by impact
|
|
- DO NOT include actions for things that are merely suggestions or speculation
|
|
- Focus on HIGH CONFIDENCE items from the reports
|
|
|
|
{report_text}
|
|
|
|
Output ONLY the JSON array, no markdown fences, no explanation.
|
|
"""
|
|
|
|
|
|
def parse_actions(response: str) -> list[dict]:
|
|
"""Parse Sonnet's JSON response into action list."""
|
|
# Strip any markdown fences
|
|
response = re.sub(r'^```json\s*', '', response.strip())
|
|
response = re.sub(r'\s*```$', '', response.strip())
|
|
|
|
try:
|
|
actions = json.loads(response)
|
|
if isinstance(actions, list):
|
|
return actions
|
|
except json.JSONDecodeError:
|
|
# Try to find JSON array in the response
|
|
match = re.search(r'\[.*\]', response, re.DOTALL)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
print("Error: Could not parse Sonnet response as JSON")
|
|
print(f"Response preview: {response[:500]}")
|
|
return []
|
|
|
|
|
|
def dry_run(actions: list[dict]):
|
|
"""Show what would be done."""
|
|
links = [a for a in actions if a.get("action") == "link"]
|
|
cats = [a for a in actions if a.get("action") == "categorize"]
|
|
manual = [a for a in actions if a.get("action") == "manual"]
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"DRY RUN — {len(actions)} actions proposed")
|
|
print(f"{'='*60}\n")
|
|
|
|
if links:
|
|
print(f"## Links to add ({len(links)})\n")
|
|
for i, a in enumerate(links, 1):
|
|
src = a.get("source", "?")
|
|
tgt = a.get("target", "?")
|
|
reason = a.get("reason", "")
|
|
print(f" {i:2d}. {src}")
|
|
print(f" → {tgt}")
|
|
print(f" ({reason})")
|
|
print()
|
|
|
|
if cats:
|
|
print(f"\n## Categories to set ({len(cats)})\n")
|
|
for a in cats:
|
|
key = a.get("key", "?")
|
|
cat = a.get("category", "?")
|
|
reason = a.get("reason", "")
|
|
print(f" {key} → {cat} ({reason})")
|
|
|
|
if manual:
|
|
print(f"\n## Manual actions needed ({len(manual)})\n")
|
|
for a in manual:
|
|
prio = a.get("priority", "?")
|
|
desc = a.get("description", "?")
|
|
print(f" [{prio}] {desc}")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"To apply: {sys.argv[0]} --apply")
|
|
print(f"{'='*60}")
|
|
|
|
|
|
def apply_actions(actions: list[dict]):
|
|
"""Execute the actions."""
|
|
links = [a for a in actions if a.get("action") == "link"]
|
|
cats = [a for a in actions if a.get("action") == "categorize"]
|
|
manual = [a for a in actions if a.get("action") == "manual"]
|
|
|
|
applied = 0
|
|
skipped = 0
|
|
errors = 0
|
|
|
|
# Apply links via poc-memory
|
|
if links:
|
|
print(f"\nApplying {len(links)} links...")
|
|
# Build a JSON file that apply-agent can process
|
|
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
|
|
links_data = {
|
|
"type": "consolidation-apply",
|
|
"timestamp": timestamp,
|
|
"links": []
|
|
}
|
|
for a in links:
|
|
links_data["links"].append({
|
|
"source": a.get("source", ""),
|
|
"target": a.get("target", ""),
|
|
"reason": a.get("reason", ""),
|
|
})
|
|
|
|
# Write as agent-results JSON for apply-agent
|
|
out_path = AGENT_RESULTS_DIR / f"consolidation-apply-{timestamp}.json"
|
|
with open(out_path, "w") as f:
|
|
json.dump(links_data, f, indent=2)
|
|
|
|
# Now apply each link directly
|
|
for a in links:
|
|
src = a.get("source", "")
|
|
tgt = a.get("target", "")
|
|
reason = a.get("reason", "")
|
|
try:
|
|
cmd = ["poc-memory", "link-add", src, tgt]
|
|
if reason:
|
|
cmd.append(reason)
|
|
r = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=10
|
|
)
|
|
if r.returncode == 0:
|
|
output = r.stdout.strip()
|
|
print(f" {output}")
|
|
applied += 1
|
|
else:
|
|
err = r.stderr.strip()
|
|
print(f" ? {src} → {tgt}: {err}")
|
|
skipped += 1
|
|
except Exception as e:
|
|
print(f" ! {src} → {tgt}: {e}")
|
|
errors += 1
|
|
|
|
# Apply categorizations
|
|
if cats:
|
|
print(f"\nApplying {len(cats)} categorizations...")
|
|
for a in cats:
|
|
key = a.get("key", "")
|
|
cat = a.get("category", "")
|
|
try:
|
|
r = subprocess.run(
|
|
["poc-memory", "categorize", key, cat],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if r.returncode == 0:
|
|
print(f" + {key} → {cat}")
|
|
applied += 1
|
|
else:
|
|
print(f" ? {key} → {cat}: {r.stderr.strip()}")
|
|
skipped += 1
|
|
except Exception as e:
|
|
print(f" ! {key} → {cat}: {e}")
|
|
errors += 1
|
|
|
|
# Report manual items
|
|
if manual:
|
|
print(f"\n## Manual actions (not auto-applied):\n")
|
|
for a in manual:
|
|
prio = a.get("priority", "?")
|
|
desc = a.get("description", "?")
|
|
print(f" [{prio}] {desc}")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Applied: {applied} Skipped: {skipped} Errors: {errors}")
|
|
print(f"Manual items: {len(manual)}")
|
|
print(f"{'='*60}")
|
|
|
|
|
|
def main():
|
|
do_apply = "--apply" in sys.argv
|
|
|
|
# Find reports
|
|
specific = [a for a in sys.argv[1:] if a.startswith("--report")]
|
|
if specific:
|
|
# TODO: handle --report FILE
|
|
reports = []
|
|
else:
|
|
reports = find_latest_reports()
|
|
|
|
if not reports:
|
|
print("No consolidation reports found.")
|
|
print("Run consolidation-agents.py first.")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(reports)} reports:")
|
|
for r in reports:
|
|
print(f" {r.name}")
|
|
|
|
# Send to Sonnet for action extraction
|
|
print("\nExtracting actions from reports...")
|
|
prompt = build_action_prompt(reports)
|
|
print(f" Prompt: {len(prompt):,} chars")
|
|
|
|
response = call_sonnet(prompt)
|
|
if response.startswith("Error:"):
|
|
print(f" {response}")
|
|
sys.exit(1)
|
|
|
|
actions = parse_actions(response)
|
|
if not actions:
|
|
print("No actions extracted.")
|
|
sys.exit(1)
|
|
|
|
print(f" {len(actions)} actions extracted")
|
|
|
|
# Save actions
|
|
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
|
|
actions_path = AGENT_RESULTS_DIR / f"consolidation-actions-{timestamp}.json"
|
|
with open(actions_path, "w") as f:
|
|
json.dump(actions, f, indent=2)
|
|
print(f" Saved: {actions_path}")
|
|
|
|
if do_apply:
|
|
apply_actions(actions)
|
|
else:
|
|
dry_run(actions)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|