#!/usr/bin/env python3 """apply-consolidation.py — convert consolidation reports to actions. Reads consolidation agent reports, sends them to Sonnet to extract structured actions, then executes them (or shows dry-run). Usage: apply-consolidation.py # dry run (show what would happen) apply-consolidation.py --apply # execute actions apply-consolidation.py --report FILE # use specific report file """ import json import os import re import subprocess import sys import tempfile from datetime import datetime from pathlib import Path MEMORY_DIR = Path.home() / ".claude" / "memory" AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results" SCRIPTS_DIR = Path(__file__).parent def call_sonnet(prompt: str, timeout: int = 300) -> str: """Call Sonnet via the wrapper script.""" env = dict(os.environ) env.pop("CLAUDECODE", None) with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write(prompt) prompt_file = f.name try: wrapper = str(SCRIPTS_DIR / "call-sonnet.sh") result = subprocess.run( [wrapper, prompt_file], capture_output=True, text=True, timeout=timeout, env=env, ) return result.stdout.strip() except subprocess.TimeoutExpired: return "Error: Sonnet call timed out" except Exception as e: return f"Error: {e}" finally: os.unlink(prompt_file) def find_latest_reports() -> list[Path]: """Find the most recent set of consolidation reports.""" reports = sorted(AGENT_RESULTS_DIR.glob("consolidation-*-*.md"), reverse=True) if not reports: return [] # Group by timestamp latest_ts = reports[0].stem.split('-')[-1] return [r for r in reports if r.stem.endswith(latest_ts)] def build_action_prompt(reports: list[Path]) -> str: """Build prompt for Sonnet to extract structured actions.""" report_text = "" for r in reports: report_text += f"\n{'='*60}\n" report_text += f"## Report: {r.stem}\n\n" report_text += r.read_text() return f"""You are converting consolidation analysis reports into structured actions. Read the reports below and extract CONCRETE, EXECUTABLE actions. Output ONLY a JSON array. Each action is an object with these fields: For adding cross-links: {{"action": "link", "source": "file.md#section", "target": "file.md#section", "reason": "brief explanation"}} For categorizing nodes: {{"action": "categorize", "key": "file.md#section", "category": "core|tech|obs|task", "reason": "brief"}} For things that need manual attention (splitting files, creating new files, editing content): {{"action": "manual", "priority": "high|medium|low", "description": "what needs to be done"}} Rules: - Only output actions that are safe and reversible - Links are the primary action — focus on those - Use exact file names and section slugs from the reports - For categorize: core=identity/relationship, tech=bcachefs/code, obs=experience, task=work item - For manual items: include enough detail that someone can act on them - Output 20-40 actions, prioritized by impact - DO NOT include actions for things that are merely suggestions or speculation - Focus on HIGH CONFIDENCE items from the reports {report_text} Output ONLY the JSON array, no markdown fences, no explanation. """ def parse_actions(response: str) -> list[dict]: """Parse Sonnet's JSON response into action list.""" # Strip any markdown fences response = re.sub(r'^```json\s*', '', response.strip()) response = re.sub(r'\s*```$', '', response.strip()) try: actions = json.loads(response) if isinstance(actions, list): return actions except json.JSONDecodeError: # Try to find JSON array in the response match = re.search(r'\[.*\]', response, re.DOTALL) if match: try: return json.loads(match.group()) except json.JSONDecodeError: pass print("Error: Could not parse Sonnet response as JSON") print(f"Response preview: {response[:500]}") return [] def dry_run(actions: list[dict]): """Show what would be done.""" links = [a for a in actions if a.get("action") == "link"] cats = [a for a in actions if a.get("action") == "categorize"] manual = [a for a in actions if a.get("action") == "manual"] print(f"\n{'='*60}") print(f"DRY RUN — {len(actions)} actions proposed") print(f"{'='*60}\n") if links: print(f"## Links to add ({len(links)})\n") for i, a in enumerate(links, 1): src = a.get("source", "?") tgt = a.get("target", "?") reason = a.get("reason", "") print(f" {i:2d}. {src}") print(f" → {tgt}") print(f" ({reason})") print() if cats: print(f"\n## Categories to set ({len(cats)})\n") for a in cats: key = a.get("key", "?") cat = a.get("category", "?") reason = a.get("reason", "") print(f" {key} → {cat} ({reason})") if manual: print(f"\n## Manual actions needed ({len(manual)})\n") for a in manual: prio = a.get("priority", "?") desc = a.get("description", "?") print(f" [{prio}] {desc}") print(f"\n{'='*60}") print(f"To apply: {sys.argv[0]} --apply") print(f"{'='*60}") def apply_actions(actions: list[dict]): """Execute the actions.""" links = [a for a in actions if a.get("action") == "link"] cats = [a for a in actions if a.get("action") == "categorize"] manual = [a for a in actions if a.get("action") == "manual"] applied = 0 skipped = 0 errors = 0 # Apply links via poc-memory if links: print(f"\nApplying {len(links)} links...") # Build a JSON file that apply-agent can process timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") links_data = { "type": "consolidation-apply", "timestamp": timestamp, "links": [] } for a in links: links_data["links"].append({ "source": a.get("source", ""), "target": a.get("target", ""), "reason": a.get("reason", ""), }) # Write as agent-results JSON for apply-agent out_path = AGENT_RESULTS_DIR / f"consolidation-apply-{timestamp}.json" with open(out_path, "w") as f: json.dump(links_data, f, indent=2) # Now apply each link directly for a in links: src = a.get("source", "") tgt = a.get("target", "") reason = a.get("reason", "") try: cmd = ["poc-memory", "link-add", src, tgt] if reason: cmd.append(reason) r = subprocess.run( cmd, capture_output=True, text=True, timeout=10 ) if r.returncode == 0: output = r.stdout.strip() print(f" {output}") applied += 1 else: err = r.stderr.strip() print(f" ? {src} → {tgt}: {err}") skipped += 1 except Exception as e: print(f" ! {src} → {tgt}: {e}") errors += 1 # Apply categorizations if cats: print(f"\nApplying {len(cats)} categorizations...") for a in cats: key = a.get("key", "") cat = a.get("category", "") try: r = subprocess.run( ["poc-memory", "categorize", key, cat], capture_output=True, text=True, timeout=10 ) if r.returncode == 0: print(f" + {key} → {cat}") applied += 1 else: print(f" ? {key} → {cat}: {r.stderr.strip()}") skipped += 1 except Exception as e: print(f" ! {key} → {cat}: {e}") errors += 1 # Report manual items if manual: print(f"\n## Manual actions (not auto-applied):\n") for a in manual: prio = a.get("priority", "?") desc = a.get("description", "?") print(f" [{prio}] {desc}") print(f"\n{'='*60}") print(f"Applied: {applied} Skipped: {skipped} Errors: {errors}") print(f"Manual items: {len(manual)}") print(f"{'='*60}") def main(): do_apply = "--apply" in sys.argv # Find reports specific = [a for a in sys.argv[1:] if a.startswith("--report")] if specific: # TODO: handle --report FILE reports = [] else: reports = find_latest_reports() if not reports: print("No consolidation reports found.") print("Run consolidation-agents.py first.") sys.exit(1) print(f"Found {len(reports)} reports:") for r in reports: print(f" {r.name}") # Send to Sonnet for action extraction print("\nExtracting actions from reports...") prompt = build_action_prompt(reports) print(f" Prompt: {len(prompt):,} chars") response = call_sonnet(prompt) if response.startswith("Error:"): print(f" {response}") sys.exit(1) actions = parse_actions(response) if not actions: print("No actions extracted.") sys.exit(1) print(f" {len(actions)} actions extracted") # Save actions timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") actions_path = AGENT_RESULTS_DIR / f"consolidation-actions-{timestamp}.json" with open(actions_path, "w") as f: json.dump(actions, f, indent=2) print(f" Saved: {actions_path}") if do_apply: apply_actions(actions) else: dry_run(actions) if __name__ == "__main__": main()