// Consolidation pipeline: plan → agents → apply → digests → links // // consolidate_full() runs the full autonomous consolidation: // 1. Plan: analyze metrics, allocate agents // 2. Execute: run each agent (Sonnet calls), save reports // 3. Apply: extract and apply actions from reports // 4. Digest: generate missing daily/weekly/monthly digests // 5. Links: apply links extracted from digests // 6. Summary: final metrics comparison // // apply_consolidation() processes consolidation reports independently. use crate::digest; use crate::llm::{call_sonnet, parse_json_response}; use crate::neuro; use crate::store::{self, Store, new_relation}; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; use crate::util::memory_subdir; /// Simple append-only log writer for consolidate-full. struct LogWriter { path: PathBuf, } impl LogWriter { fn new(path: &Path) -> Result { fs::write(path, "").map_err(|e| format!("create log: {}", e))?; Ok(LogWriter { path: path.to_path_buf() }) } fn write(&mut self, line: &str) -> Result<(), String> { let mut f = fs::OpenOptions::new() .append(true) .open(&self.path) .map_err(|e| format!("open log: {}", e))?; writeln!(f, "{}", line) .map_err(|e| format!("write log: {}", e)) } } /// Run the full autonomous consolidation pipeline with logging. pub fn consolidate_full(store: &mut Store) -> Result<(), String> { let start = std::time::Instant::now(); let log_path = memory_subdir("agent-results")?.join("consolidate-full.log"); let mut log = LogWriter::new(&log_path)?; log.write("=== CONSOLIDATE FULL ===")?; log.write(&format!("Started: {}", store::format_datetime(store::now_epoch())))?; log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?; log.write("")?; // --- Step 1: Plan --- log.write("--- Step 1: Plan ---")?; let plan = neuro::consolidation_plan(store); let plan_text = neuro::format_plan(&plan); log.write(&plan_text)?; println!("{}", plan_text); let total_agents = plan.replay_count + plan.linker_count + plan.separator_count + plan.transfer_count + if plan.run_health { 1 } else { 0 }; log.write(&format!("Total agents to run: {}", total_agents))?; // --- Step 2: Execute agents --- log.write("\n--- Step 2: Execute agents ---")?; let mut reports: Vec = Vec::new(); let mut agent_num = 0usize; let mut agent_errors = 0usize; // Build the list of (agent_type, batch_size) runs let mut runs: Vec<(&str, usize)> = Vec::new(); if plan.run_health { runs.push(("health", 0)); } if plan.replay_count > 0 { let batch = 5; let mut remaining = plan.replay_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("replay", this_batch)); remaining -= this_batch; } } if plan.linker_count > 0 { let batch = 5; let mut remaining = plan.linker_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("linker", this_batch)); remaining -= this_batch; } } if plan.separator_count > 0 { let batch = 5; let mut remaining = plan.separator_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("separator", this_batch)); remaining -= this_batch; } } if plan.transfer_count > 0 { let batch = 5; let mut remaining = plan.transfer_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("transfer", this_batch)); remaining -= this_batch; } } for (agent_type, count) in &runs { agent_num += 1; let label = if *count > 0 { format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count) } else { format!("[{}/{}] {}", agent_num, runs.len(), agent_type) }; log.write(&format!("\n{}", label))?; println!("{}", label); // Reload store to pick up changes from previous agents if agent_num > 1 { *store = Store::load()?; } let prompt = match neuro::agent_prompt(store, agent_type, *count) { Ok(p) => p, Err(e) => { let msg = format!(" ERROR building prompt: {}", e); log.write(&msg)?; eprintln!("{}", msg); agent_errors += 1; continue; } }; log.write(&format!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4))?; let response = match call_sonnet(&prompt, 300) { Ok(r) => r, Err(e) => { let msg = format!(" ERROR from Sonnet: {}", e); log.write(&msg)?; eprintln!("{}", msg); agent_errors += 1; continue; } }; // Save report let ts = store::format_datetime(store::now_epoch()) .replace([':', '-', 'T'], ""); let report_name = format!("consolidation-{}-{}.md", agent_type, ts); let report_path = memory_subdir("agent-results")?.join(&report_name); fs::write(&report_path, &response) .map_err(|e| format!("write report: {}", e))?; reports.push(report_path.clone()); let msg = format!(" Done: {} lines → {}", response.lines().count(), report_name); log.write(&msg)?; println!("{}", msg); } log.write(&format!("\nAgents complete: {} run, {} errors", agent_num - agent_errors, agent_errors))?; // --- Step 3: Apply consolidation actions --- log.write("\n--- Step 3: Apply consolidation actions ---")?; println!("\n--- Applying consolidation actions ---"); *store = Store::load()?; if reports.is_empty() { log.write(" No reports to apply.")?; } else { match apply_consolidation(store, true, None) { Ok(()) => log.write(" Applied.")?, Err(e) => { let msg = format!(" ERROR applying consolidation: {}", e); log.write(&msg)?; eprintln!("{}", msg); } } } // --- Step 3b: Link orphans --- log.write("\n--- Step 3b: Link orphans ---")?; println!("\n--- Linking orphan nodes ---"); *store = Store::load()?; let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15); log.write(&format!(" {} orphans, {} links added", lo_orphans, lo_added))?; // --- Step 3c: Cap degree --- log.write("\n--- Step 3c: Cap degree ---")?; println!("\n--- Capping node degree ---"); *store = Store::load()?; match store.cap_degree(50) { Ok((hubs, pruned)) => { store.save()?; log.write(&format!(" {} hubs capped, {} edges pruned", hubs, pruned))?; } Err(e) => log.write(&format!(" ERROR: {}", e))?, } // --- Step 4: Digest auto --- log.write("\n--- Step 4: Digest auto ---")?; println!("\n--- Generating missing digests ---"); *store = Store::load()?; match digest::digest_auto(store) { Ok(()) => log.write(" Digests done.")?, Err(e) => { let msg = format!(" ERROR in digest auto: {}", e); log.write(&msg)?; eprintln!("{}", msg); } } // --- Step 5: Apply digest links --- log.write("\n--- Step 5: Apply digest links ---")?; println!("\n--- Applying digest links ---"); *store = Store::load()?; let links = digest::parse_all_digest_links()?; let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links); store.save()?; log.write(&format!(" {} links applied, {} skipped, {} fallbacks", applied, skipped, fallbacks))?; // --- Step 6: Summary --- let elapsed = start.elapsed(); log.write("\n--- Summary ---")?; log.write(&format!("Finished: {}", store::format_datetime(store::now_epoch())))?; log.write(&format!("Duration: {:.0}s", elapsed.as_secs_f64()))?; *store = Store::load()?; log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?; let summary = format!( "\n=== CONSOLIDATE FULL COMPLETE ===\n\ Duration: {:.0}s\n\ Agents: {} run, {} errors\n\ Nodes: {} Relations: {}\n\ Log: {}\n", elapsed.as_secs_f64(), agent_num - agent_errors, agent_errors, store.nodes.len(), store.relations.len(), log_path.display(), ); log.write(&summary)?; println!("{}", summary); Ok(()) } /// Find the most recent set of consolidation reports. fn find_consolidation_reports() -> Result, String> { let dir = memory_subdir("agent-results")?; let mut reports: Vec = fs::read_dir(&dir) .map(|entries| { entries.filter_map(|e| e.ok()) .map(|e| e.path()) .filter(|p| { p.file_name() .and_then(|n| n.to_str()) .map(|n| n.starts_with("consolidation-") && n.ends_with(".md")) .unwrap_or(false) }) .collect() }) .unwrap_or_default(); reports.sort(); reports.reverse(); if reports.is_empty() { return Ok(reports); } // Group by timestamp (last segment of stem before .md) let latest_ts = reports[0].file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .rsplit('-').next().unwrap_or("") .to_string(); reports.retain(|r| { r.file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .ends_with(latest_ts.as_str()) }); Ok(reports) } fn build_consolidation_prompt(reports: &[PathBuf]) -> Result { let mut report_text = String::new(); for r in reports { let content = fs::read_to_string(r) .map_err(|e| format!("read {}: {}", r.display(), e))?; report_text.push_str(&format!("\n{}\n## Report: {}\n\n{}\n", "=".repeat(60), r.file_stem().and_then(|s| s.to_str()).unwrap_or(""), content)); } neuro::load_prompt("consolidation", &[("{{REPORTS}}", &report_text)]) } /// Run the full apply-consolidation pipeline. pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_file: Option<&str>) -> Result<(), String> { let reports = if let Some(path) = report_file { vec![PathBuf::from(path)] } else { find_consolidation_reports()? }; if reports.is_empty() { println!("No consolidation reports found."); println!("Run consolidation-agents first."); return Ok(()); } println!("Found {} reports:", reports.len()); for r in &reports { println!(" {}", r.file_name().and_then(|s| s.to_str()).unwrap_or("?")); } println!("\nExtracting actions from reports..."); let prompt = build_consolidation_prompt(&reports)?; println!(" Prompt: {} chars", prompt.len()); let response = call_sonnet(&prompt, 300)?; let actions_value = parse_json_response(&response)?; let actions = actions_value.as_array() .ok_or("expected JSON array of actions")?; println!(" {} actions extracted", actions.len()); // Save actions let timestamp = store::format_datetime(store::now_epoch()) .replace([':', '-'], ""); let actions_path = memory_subdir("agent-results")? .join(format!("consolidation-actions-{}.json", timestamp)); fs::write(&actions_path, serde_json::to_string_pretty(&actions_value).unwrap()) .map_err(|e| format!("write {}: {}", actions_path.display(), e))?; println!(" Saved: {}", actions_path.display()); let link_actions: Vec<_> = actions.iter() .filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("link")) .collect(); let cat_actions: Vec<_> = actions.iter() .filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("categorize")) .collect(); let manual_actions: Vec<_> = actions.iter() .filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("manual")) .collect(); if !do_apply { // Dry run println!("\n{}", "=".repeat(60)); println!("DRY RUN — {} actions proposed", actions.len()); println!("{}\n", "=".repeat(60)); if !link_actions.is_empty() { println!("## Links to add ({})\n", link_actions.len()); for (i, a) in link_actions.iter().enumerate() { let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("?"); let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("?"); let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or(""); println!(" {:2}. {} → {} ({})", i + 1, src, tgt, reason); } } if !cat_actions.is_empty() { println!("\n## Categories to set ({})\n", cat_actions.len()); for a in &cat_actions { let key = a.get("key").and_then(|v| v.as_str()).unwrap_or("?"); let cat = a.get("category").and_then(|v| v.as_str()).unwrap_or("?"); let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or(""); println!(" {} → {} ({})", key, cat, reason); } } if !manual_actions.is_empty() { println!("\n## Manual actions needed ({})\n", manual_actions.len()); for a in &manual_actions { let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?"); let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?"); println!(" [{}] {}", prio, desc); } } println!("\n{}", "=".repeat(60)); println!("To apply: poc-memory apply-consolidation --apply"); println!("{}", "=".repeat(60)); return Ok(()); } // Apply let mut applied = 0usize; let mut skipped = 0usize; if !link_actions.is_empty() { println!("\nApplying {} links...", link_actions.len()); for a in &link_actions { let src = a.get("source").and_then(|v| v.as_str()).unwrap_or(""); let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or(""); if src.is_empty() || tgt.is_empty() { skipped += 1; continue; } let source = match store.resolve_key(src) { Ok(s) => s, Err(e) => { println!(" ? {} → {}: {}", src, tgt, e); skipped += 1; continue; } }; let target = match store.resolve_key(tgt) { Ok(t) => t, Err(e) => { println!(" ? {} → {}: {}", src, tgt, e); skipped += 1; continue; } }; // Refine target to best-matching section let source_content = store.nodes.get(&source) .map(|n| n.content.as_str()).unwrap_or(""); let target = neuro::refine_target(store, source_content, &target); let exists = store.relations.iter().any(|r| r.source_key == source && r.target_key == target && !r.deleted ); if exists { skipped += 1; continue; } let source_uuid = match store.nodes.get(&source) { Some(n) => n.uuid, None => { skipped += 1; continue; } }; let target_uuid = match store.nodes.get(&target) { Some(n) => n.uuid, None => { skipped += 1; continue; } }; let rel = new_relation( source_uuid, target_uuid, store::RelationType::Auto, 0.5, &source, &target, ); if store.add_relation(rel).is_ok() { println!(" + {} → {}", source, target); applied += 1; } } } if !cat_actions.is_empty() { println!("\nApplying {} categorizations...", cat_actions.len()); for a in &cat_actions { let key = a.get("key").and_then(|v| v.as_str()).unwrap_or(""); let cat = a.get("category").and_then(|v| v.as_str()).unwrap_or(""); if key.is_empty() || cat.is_empty() { continue; } let resolved = match store.resolve_key(key) { Ok(r) => r, Err(_) => { println!(" ? {} → {}: not found", key, cat); skipped += 1; continue; } }; if store.categorize(&resolved, cat).is_ok() { println!(" + {} → {}", resolved, cat); applied += 1; } else { skipped += 1; } } } if !manual_actions.is_empty() { println!("\n## Manual actions (not auto-applied):\n"); for a in &manual_actions { let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?"); let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?"); println!(" [{}] {}", prio, desc); } } if applied > 0 { store.save()?; } println!("\n{}", "=".repeat(60)); println!("Applied: {} Skipped: {} Manual: {}", applied, skipped, manual_actions.len()); println!("{}", "=".repeat(60)); Ok(()) }