// Consolidation pipeline: plan → agents → apply → digests → links // // consolidate_full() runs the full autonomous consolidation: // 1. Plan: analyze metrics, allocate agents // 2. Execute: run each agent (Sonnet calls), save reports // 3. Apply: extract and apply actions from reports // 4. Digest: generate missing daily/weekly/monthly digests // 5. Links: apply links extracted from digests // 6. Summary: final metrics comparison // // apply_consolidation() processes consolidation reports independently. use crate::digest; use crate::llm::{call_sonnet, parse_json_response}; use crate::neuro; use crate::store::{self, Store, new_relation}; /// Append a line to the log buffer. fn log_line(buf: &mut String, line: &str) { buf.push_str(line); buf.push('\n'); } /// Run the full autonomous consolidation pipeline with logging. /// If `on_progress` is provided, it's called at each significant step. pub fn consolidate_full(store: &mut Store) -> Result<(), String> { consolidate_full_with_progress(store, &|_| {}) } pub fn consolidate_full_with_progress( store: &mut Store, on_progress: &dyn Fn(&str), ) -> Result<(), String> { let start = std::time::Instant::now(); let log_key = format!("_consolidate-log-{}", store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], "")); let mut log_buf = String::new(); log_line(&mut log_buf, "=== CONSOLIDATE FULL ==="); log_line(&mut log_buf, &format!("Started: {}", store::format_datetime(store::now_epoch()))); log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); log_line(&mut log_buf, ""); // --- Step 1: Plan --- log_line(&mut log_buf, "--- Step 1: Plan ---"); on_progress("planning"); let plan = neuro::consolidation_plan(store); let plan_text = neuro::format_plan(&plan); log_line(&mut log_buf, &plan_text); println!("{}", plan_text); let total_agents = plan.replay_count + plan.linker_count + plan.separator_count + plan.transfer_count + if plan.run_health { 1 } else { 0 }; log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents)); // --- Step 2: Execute agents --- log_line(&mut log_buf, "\n--- Step 2: Execute agents ---"); let mut reports: Vec = Vec::new(); let mut agent_num = 0usize; let mut agent_errors = 0usize; // Build the list of (agent_type, batch_size) runs let mut runs: Vec<(&str, usize)> = Vec::new(); if plan.run_health { runs.push(("health", 0)); } if plan.replay_count > 0 { let batch = 5; let mut remaining = plan.replay_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("replay", this_batch)); remaining -= this_batch; } } if plan.linker_count > 0 { let batch = 5; let mut remaining = plan.linker_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("linker", this_batch)); remaining -= this_batch; } } if plan.separator_count > 0 { let batch = 5; let mut remaining = plan.separator_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("separator", this_batch)); remaining -= this_batch; } } if plan.transfer_count > 0 { let batch = 5; let mut remaining = plan.transfer_count; while remaining > 0 { let this_batch = remaining.min(batch); runs.push(("transfer", this_batch)); remaining -= this_batch; } } for (agent_type, count) in &runs { agent_num += 1; let label = if *count > 0 { format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count) } else { format!("[{}/{}] {}", agent_num, runs.len(), agent_type) }; log_line(&mut log_buf, &format!("\n{}", label)); on_progress(&label); println!("{}", label); // Reload store to pick up changes from previous agents if agent_num > 1 { *store = Store::load()?; } let prompt = match neuro::agent_prompt(store, agent_type, *count) { Ok(p) => p, Err(e) => { let msg = format!(" ERROR building prompt: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); agent_errors += 1; continue; } }; log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4)); let response = match call_sonnet("consolidate", &prompt) { Ok(r) => r, Err(e) => { let msg = format!(" ERROR from Sonnet: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); agent_errors += 1; continue; } }; // Store report as a node let ts = store::format_datetime(store::now_epoch()) .replace([':', '-', 'T'], ""); let report_key = format!("_consolidation-{}-{}", agent_type, ts); store.upsert_provenance(&report_key, &response, store::Provenance::AgentConsolidate).ok(); reports.push(report_key.clone()); let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key); log_line(&mut log_buf, &msg); on_progress(&msg); println!("{}", msg); } log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors", agent_num - agent_errors, agent_errors)); // --- Step 3: Apply consolidation actions --- log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---"); on_progress("applying actions"); println!("\n--- Applying consolidation actions ---"); *store = Store::load()?; if reports.is_empty() { log_line(&mut log_buf, " No reports to apply."); } else { match apply_consolidation(store, true, None) { Ok(()) => log_line(&mut log_buf, " Applied."), Err(e) => { let msg = format!(" ERROR applying consolidation: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); } } } // --- Step 3b: Link orphans --- log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---"); on_progress("linking orphans"); println!("\n--- Linking orphan nodes ---"); *store = Store::load()?; let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15); log_line(&mut log_buf, &format!(" {} orphans, {} links added", lo_orphans, lo_added)); // --- Step 3c: Cap degree --- log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---"); on_progress("capping degree"); println!("\n--- Capping node degree ---"); *store = Store::load()?; match store.cap_degree(50) { Ok((hubs, pruned)) => { store.save()?; log_line(&mut log_buf, &format!(" {} hubs capped, {} edges pruned", hubs, pruned)); } Err(e) => log_line(&mut log_buf, &format!(" ERROR: {}", e)), } // --- Step 4: Digest auto --- log_line(&mut log_buf, "\n--- Step 4: Digest auto ---"); on_progress("generating digests"); println!("\n--- Generating missing digests ---"); *store = Store::load()?; match digest::digest_auto(store) { Ok(()) => log_line(&mut log_buf, " Digests done."), Err(e) => { let msg = format!(" ERROR in digest auto: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); } } // --- Step 5: Apply digest links --- log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---"); on_progress("applying digest links"); println!("\n--- Applying digest links ---"); *store = Store::load()?; let links = digest::parse_all_digest_links(store); let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links); store.save()?; log_line(&mut log_buf, &format!(" {} links applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); // --- Step 6: Summary --- let elapsed = start.elapsed(); log_line(&mut log_buf, "\n--- Summary ---"); log_line(&mut log_buf, &format!("Finished: {}", store::format_datetime(store::now_epoch()))); log_line(&mut log_buf, &format!("Duration: {:.0}s", elapsed.as_secs_f64())); *store = Store::load()?; log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); let summary = format!( "\n=== CONSOLIDATE FULL COMPLETE ===\n\ Duration: {:.0}s\n\ Agents: {} run, {} errors\n\ Nodes: {} Relations: {}\n", elapsed.as_secs_f64(), agent_num - agent_errors, agent_errors, store.nodes.len(), store.relations.len(), ); log_line(&mut log_buf, &summary); println!("{}", summary); // Store the log as a node store.upsert_provenance(&log_key, &log_buf, store::Provenance::AgentConsolidate).ok(); store.save()?; Ok(()) } /// Find the most recent set of consolidation report keys from the store. fn find_consolidation_reports(store: &Store) -> Vec { let mut keys: Vec<&String> = store.nodes.keys() .filter(|k| k.starts_with("_consolidation-")) .collect(); keys.sort(); keys.reverse(); if keys.is_empty() { return Vec::new(); } // Group by timestamp (last segment after last '-') let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string(); keys.into_iter() .filter(|k| k.ends_with(&latest_ts)) .cloned() .collect() } fn build_consolidation_prompt(store: &Store, report_keys: &[String]) -> Result { let mut report_text = String::new(); for key in report_keys { let content = store.nodes.get(key) .map(|n| n.content.as_str()) .unwrap_or(""); report_text.push_str(&format!("\n{}\n## Report: {}\n\n{}\n", "=".repeat(60), key, content)); } neuro::load_prompt("consolidation", &[("{{REPORTS}}", &report_text)]) } /// Run the full apply-consolidation pipeline. pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option<&str>) -> Result<(), String> { let reports = if let Some(key) = report_key { vec![key.to_string()] } else { find_consolidation_reports(store) }; if reports.is_empty() { println!("No consolidation reports found."); println!("Run consolidation-agents first."); return Ok(()); } println!("Found {} reports:", reports.len()); for r in &reports { println!(" {}", r); } println!("\nExtracting actions from reports..."); let prompt = build_consolidation_prompt(store, &reports)?; println!(" Prompt: {} chars", prompt.len()); let response = call_sonnet("consolidate", &prompt)?; let actions_value = parse_json_response(&response)?; let actions = actions_value.as_array() .ok_or("expected JSON array of actions")?; println!(" {} actions extracted", actions.len()); // Store actions in the store let timestamp = store::format_datetime(store::now_epoch()) .replace([':', '-'], ""); let actions_key = format!("_consolidation-actions-{}", timestamp); let actions_json = serde_json::to_string_pretty(&actions_value).unwrap(); store.upsert_provenance(&actions_key, &actions_json, store::Provenance::AgentConsolidate).ok(); println!(" Stored: {}", actions_key); let link_actions: Vec<_> = actions.iter() .filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("link")) .collect(); let manual_actions: Vec<_> = actions.iter() .filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("manual")) .collect(); if !do_apply { // Dry run println!("\n{}", "=".repeat(60)); println!("DRY RUN — {} actions proposed", actions.len()); println!("{}\n", "=".repeat(60)); if !link_actions.is_empty() { println!("## Links to add ({})\n", link_actions.len()); for (i, a) in link_actions.iter().enumerate() { let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("?"); let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("?"); let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or(""); println!(" {:2}. {} → {} ({})", i + 1, src, tgt, reason); } } if !manual_actions.is_empty() { println!("\n## Manual actions needed ({})\n", manual_actions.len()); for a in &manual_actions { let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?"); let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?"); println!(" [{}] {}", prio, desc); } } println!("\n{}", "=".repeat(60)); println!("To apply: poc-memory apply-consolidation --apply"); println!("{}", "=".repeat(60)); return Ok(()); } // Apply let mut applied = 0usize; let mut skipped = 0usize; if !link_actions.is_empty() { println!("\nApplying {} links...", link_actions.len()); for a in &link_actions { let src = a.get("source").and_then(|v| v.as_str()).unwrap_or(""); let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or(""); if src.is_empty() || tgt.is_empty() { skipped += 1; continue; } let source = match store.resolve_key(src) { Ok(s) => s, Err(e) => { println!(" ? {} → {}: {}", src, tgt, e); skipped += 1; continue; } }; let target = match store.resolve_key(tgt) { Ok(t) => t, Err(e) => { println!(" ? {} → {}: {}", src, tgt, e); skipped += 1; continue; } }; // Refine target to best-matching section let source_content = store.nodes.get(&source) .map(|n| n.content.as_str()).unwrap_or(""); let target = neuro::refine_target(store, source_content, &target); let exists = store.relations.iter().any(|r| r.source_key == source && r.target_key == target && !r.deleted ); if exists { skipped += 1; continue; } let source_uuid = match store.nodes.get(&source) { Some(n) => n.uuid, None => { skipped += 1; continue; } }; let target_uuid = match store.nodes.get(&target) { Some(n) => n.uuid, None => { skipped += 1; continue; } }; let rel = new_relation( source_uuid, target_uuid, store::RelationType::Auto, 0.5, &source, &target, ); if store.add_relation(rel).is_ok() { println!(" + {} → {}", source, target); applied += 1; } } } if !manual_actions.is_empty() { println!("\n## Manual actions (not auto-applied):\n"); for a in &manual_actions { let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?"); let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?"); println!(" [{}] {}", prio, desc); } } if applied > 0 { store.save()?; } println!("\n{}", "=".repeat(60)); println!("Applied: {} Skipped: {} Manual: {}", applied, skipped, manual_actions.len()); println!("{}", "=".repeat(60)); Ok(()) }