// Consolidation pipeline: plan → agents → apply → digests → links // // consolidate_full() runs the full autonomous consolidation: // 1. Plan: analyze metrics, allocate agents // 2. Execute: run each agent, parse + apply actions inline // 3. Graph maintenance (orphans, degree cap) // 4. Digest: generate missing daily/weekly/monthly digests // 5. Links: apply links extracted from digests // 6. Summary: final metrics comparison // // Actions are parsed directly from agent output using the same parser // as the knowledge loop (WRITE_NODE, LINK, REFINE), eliminating the // second LLM call that was previously needed. use super::digest; use super::knowledge; use crate::neuro; use crate::store::{self, Store}; /// Append a line to the log buffer. fn log_line(buf: &mut String, line: &str) { buf.push_str(line); buf.push('\n'); } /// Run the full autonomous consolidation pipeline with logging. /// If `on_progress` is provided, it's called at each significant step. pub fn consolidate_full(store: &mut Store) -> Result<(), String> { consolidate_full_with_progress(store, &|_| {}) } pub fn consolidate_full_with_progress( store: &mut Store, on_progress: &dyn Fn(&str), ) -> Result<(), String> { let start = std::time::Instant::now(); let log_key = format!("_consolidate-log-{}", store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], "")); let mut log_buf = String::new(); log_line(&mut log_buf, "=== CONSOLIDATE FULL ==="); log_line(&mut log_buf, &format!("Started: {}", store::format_datetime(store::now_epoch()))); log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); log_line(&mut log_buf, ""); // --- Step 1: Plan --- log_line(&mut log_buf, "--- Step 1: Plan ---"); on_progress("planning"); let plan = neuro::consolidation_plan(store); let plan_text = neuro::format_plan(&plan); log_line(&mut log_buf, &plan_text); println!("{}", plan_text); let total_agents = plan.replay_count + plan.linker_count + plan.separator_count + plan.transfer_count + if plan.run_health { 1 } else { 0 }; log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents)); // --- Step 2: Execute agents --- log_line(&mut log_buf, "\n--- Step 2: Execute agents ---"); let mut agent_num = 0usize; let mut agent_errors = 0usize; let mut total_applied = 0usize; let mut total_actions = 0usize; // Build the list of (agent_type, batch_size) runs let mut runs: Vec<(&str, usize)> = Vec::new(); if plan.run_health { runs.push(("health", 0)); } let batch_size = 5; for (name, count) in [ ("replay", plan.replay_count), ("linker", plan.linker_count), ("separator", plan.separator_count), ("transfer", plan.transfer_count), ] { let mut remaining = count; while remaining > 0 { let batch = remaining.min(batch_size); runs.push((name, batch)); remaining -= batch; } } for (agent_type, count) in &runs { agent_num += 1; let label = if *count > 0 { format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count) } else { format!("[{}/{}] {}", agent_num, runs.len(), agent_type) }; log_line(&mut log_buf, &format!("\n{}", label)); on_progress(&label); println!("{}", label); // Reload store to pick up changes from previous agents if agent_num > 1 { *store = Store::load()?; } let result = match knowledge::run_one_agent(store, agent_type, *count, "consolidate") { Ok(r) => r, Err(e) => { let msg = format!(" ERROR: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); agent_errors += 1; continue; } }; let ts = store::format_datetime(store::now_epoch()) .replace([':', '-', 'T'], ""); let mut applied = 0; for action in &result.actions { if knowledge::apply_action(store, action, agent_type, &ts, 0) { applied += 1; } } total_actions += result.actions.len(); total_applied += applied; let msg = format!(" Done: {} actions ({} applied, {} no-ops)", result.actions.len(), applied, result.no_ops); log_line(&mut log_buf, &msg); on_progress(&msg); println!("{}", msg); } log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors, {} actions ({} applied)", agent_num - agent_errors, agent_errors, total_actions, total_applied)); store.save()?; // --- Step 3: Link orphans --- log_line(&mut log_buf, "\n--- Step 3: Link orphans ---"); on_progress("linking orphans"); println!("\n--- Linking orphan nodes ---"); *store = Store::load()?; let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15); log_line(&mut log_buf, &format!(" {} orphans, {} links added", lo_orphans, lo_added)); // --- Step 3b: Cap degree --- log_line(&mut log_buf, "\n--- Step 3b: Cap degree ---"); on_progress("capping degree"); println!("\n--- Capping node degree ---"); *store = Store::load()?; match store.cap_degree(50) { Ok((hubs, pruned)) => { store.save()?; log_line(&mut log_buf, &format!(" {} hubs capped, {} edges pruned", hubs, pruned)); } Err(e) => log_line(&mut log_buf, &format!(" ERROR: {}", e)), } // --- Step 4: Digest auto --- log_line(&mut log_buf, "\n--- Step 4: Digest auto ---"); on_progress("generating digests"); println!("\n--- Generating missing digests ---"); *store = Store::load()?; match digest::digest_auto(store) { Ok(()) => log_line(&mut log_buf, " Digests done."), Err(e) => { let msg = format!(" ERROR in digest auto: {}", e); log_line(&mut log_buf, &msg); eprintln!("{}", msg); } } // --- Step 5: Apply digest links --- log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---"); on_progress("applying digest links"); println!("\n--- Applying digest links ---"); *store = Store::load()?; let links = digest::parse_all_digest_links(store); let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links); store.save()?; log_line(&mut log_buf, &format!(" {} links applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); // --- Step 6: Summary --- let elapsed = start.elapsed(); log_line(&mut log_buf, "\n--- Summary ---"); log_line(&mut log_buf, &format!("Finished: {}", store::format_datetime(store::now_epoch()))); log_line(&mut log_buf, &format!("Duration: {:.0}s", elapsed.as_secs_f64())); *store = Store::load()?; log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); let summary = format!( "\n=== CONSOLIDATE FULL COMPLETE ===\n\ Duration: {:.0}s\n\ Agents: {} run, {} errors\n\ Nodes: {} Relations: {}\n", elapsed.as_secs_f64(), agent_num - agent_errors, agent_errors, store.nodes.len(), store.relations.len(), ); log_line(&mut log_buf, &summary); println!("{}", summary); // Store the log as a node store.upsert_provenance(&log_key, &log_buf, store::Provenance::AgentConsolidate).ok(); store.save()?; Ok(()) } /// Re-parse and apply actions from stored consolidation reports. /// This is for manually re-processing reports — during normal consolidation, /// actions are applied inline as each agent runs. pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option<&str>) -> Result<(), String> { let reports: Vec = if let Some(key) = report_key { vec![key.to_string()] } else { // Find the most recent batch of reports let mut keys: Vec<&String> = store.nodes.keys() .filter(|k| k.starts_with("_consolidation-") && !k.contains("-actions-") && !k.contains("-log-")) .collect(); keys.sort(); keys.reverse(); if keys.is_empty() { return Ok(()); } let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string(); keys.into_iter() .filter(|k| k.ends_with(&latest_ts)) .cloned() .collect() }; if reports.is_empty() { println!("No consolidation reports found."); return Ok(()); } println!("Found {} reports:", reports.len()); let mut all_actions = Vec::new(); for key in &reports { let content = store.nodes.get(key).map(|n| n.content.as_str()).unwrap_or(""); let actions = knowledge::parse_all_actions(content); println!(" {} → {} actions", key, actions.len()); all_actions.extend(actions); } if !do_apply { println!("\nDRY RUN — {} actions parsed", all_actions.len()); for action in &all_actions { match &action.kind { knowledge::ActionKind::Link { source, target } => println!(" LINK {} → {}", source, target), knowledge::ActionKind::WriteNode { key, .. } => println!(" WRITE {}", key), knowledge::ActionKind::Refine { key, .. } => println!(" REFINE {}", key), } } println!("\nTo apply: poc-memory apply-consolidation --apply"); return Ok(()); } let ts = store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], ""); let mut applied = 0; for action in &all_actions { if knowledge::apply_action(store, action, "consolidate", &ts, 0) { applied += 1; } } if applied > 0 { store.save()?; } println!("Applied: {}/{} actions", applied, all_actions.len()); Ok(()) }