consciousness/poc-memory/src/agents/consolidate.rs

259 lines
9.1 KiB
Rust
Raw Normal View History

// Consolidation pipeline: plan → agents → apply → digests → links
//
// consolidate_full() runs the full autonomous consolidation:
// 1. Plan: analyze metrics, allocate agents
// 2. Execute: run each agent, parse + apply actions inline
// 3. Graph maintenance (orphans, degree cap)
// 4. Digest: generate missing daily/weekly/monthly digests
// 5. Links: apply links extracted from digests
// 6. Summary: final metrics comparison
//
// Actions are parsed directly from agent output using the same parser
// as the knowledge loop (WRITE_NODE, LINK, REFINE), eliminating the
// second LLM call that was previously needed.
use super::digest;
use super::knowledge;
use crate::neuro;
use crate::store::{self, Store};
/// Append a line to the log buffer.
fn log_line(buf: &mut String, line: &str) {
buf.push_str(line);
buf.push('\n');
}
/// Run the full autonomous consolidation pipeline with logging.
/// If `on_progress` is provided, it's called at each significant step.
pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
consolidate_full_with_progress(store, &|_| {})
}
pub fn consolidate_full_with_progress(
store: &mut Store,
on_progress: &dyn Fn(&str),
) -> Result<(), String> {
let start = std::time::Instant::now();
let log_key = format!("_consolidate-log-{}", store::compact_timestamp());
let mut log_buf = String::new();
log_line(&mut log_buf, "=== CONSOLIDATE FULL ===");
log_line(&mut log_buf, &format!("Started: {}", store::format_datetime(store::now_epoch())));
log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()));
log_line(&mut log_buf, "");
// --- Step 1: Plan ---
log_line(&mut log_buf, "--- Step 1: Plan ---");
on_progress("planning");
let plan = neuro::consolidation_plan(store);
let plan_text = neuro::format_plan(&plan);
log_line(&mut log_buf, &plan_text);
println!("{}", plan_text);
let total_agents = plan.replay_count + plan.linker_count
+ plan.separator_count + plan.transfer_count
+ if plan.run_health { 1 } else { 0 };
log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents));
// --- Step 2: Execute agents ---
log_line(&mut log_buf, "\n--- Step 2: Execute agents ---");
let mut agent_num = 0usize;
let mut agent_errors = 0usize;
let mut total_applied = 0usize;
let mut total_actions = 0usize;
let batch_size = 5;
let runs = plan.to_agent_runs(batch_size);
for (agent_type, count) in &runs {
agent_num += 1;
let label = if *count > 0 {
format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count)
} else {
format!("[{}/{}] {}", agent_num, runs.len(), agent_type)
};
log_line(&mut log_buf, &format!("\n{}", label));
on_progress(&label);
println!("{}", label);
// Reload store to pick up changes from previous agents
if agent_num > 1 {
*store = Store::load()?;
}
let (total, applied) = match knowledge::run_and_apply(store, agent_type, *count, "consolidate") {
Ok(r) => r,
Err(e) => {
let msg = format!(" ERROR: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
total_actions += total;
total_applied += applied;
let msg = format!(" Done: {} actions ({} applied)", total, applied);
log_line(&mut log_buf, &msg);
on_progress(&msg);
println!("{}", msg);
}
log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors, {} actions ({} applied)",
agent_num - agent_errors, agent_errors, total_actions, total_applied));
store.save()?;
// --- Step 3: Link orphans ---
log_line(&mut log_buf, "\n--- Step 3: Link orphans ---");
on_progress("linking orphans");
println!("\n--- Linking orphan nodes ---");
*store = Store::load()?;
let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15);
log_line(&mut log_buf, &format!(" {} orphans, {} links added", lo_orphans, lo_added));
// --- Step 3b: Cap degree ---
log_line(&mut log_buf, "\n--- Step 3b: Cap degree ---");
on_progress("capping degree");
println!("\n--- Capping node degree ---");
*store = Store::load()?;
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
log_line(&mut log_buf, &format!(" {} hubs capped, {} edges pruned", hubs, pruned));
}
Err(e) => log_line(&mut log_buf, &format!(" ERROR: {}", e)),
}
// --- Step 4: Digest auto ---
log_line(&mut log_buf, "\n--- Step 4: Digest auto ---");
on_progress("generating digests");
println!("\n--- Generating missing digests ---");
*store = Store::load()?;
match digest::digest_auto(store) {
Ok(()) => log_line(&mut log_buf, " Digests done."),
Err(e) => {
let msg = format!(" ERROR in digest auto: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
}
}
// --- Step 5: Apply digest links ---
log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---");
on_progress("applying digest links");
println!("\n--- Applying digest links ---");
*store = Store::load()?;
let links = digest::parse_all_digest_links(store);
let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links);
store.save()?;
log_line(&mut log_buf, &format!(" {} links applied, {} skipped, {} fallbacks",
applied, skipped, fallbacks));
// --- Step 6: Summary ---
let elapsed = start.elapsed();
log_line(&mut log_buf, "\n--- Summary ---");
log_line(&mut log_buf, &format!("Finished: {}", store::format_datetime(store::now_epoch())));
log_line(&mut log_buf, &format!("Duration: {:.0}s", elapsed.as_secs_f64()));
*store = Store::load()?;
log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()));
let summary = format!(
"\n=== CONSOLIDATE FULL COMPLETE ===\n\
Duration: {:.0}s\n\
Agents: {} run, {} errors\n\
Nodes: {} Relations: {}\n",
elapsed.as_secs_f64(),
agent_num - agent_errors, agent_errors,
store.nodes.len(), store.relations.len(),
);
log_line(&mut log_buf, &summary);
println!("{}", summary);
// Store the log as a node
store.upsert_provenance(&log_key, &log_buf,
"consolidate:write").ok();
store.save()?;
Ok(())
}
/// Re-parse and apply actions from stored consolidation reports.
/// This is for manually re-processing reports — during normal consolidation,
/// actions are applied inline as each agent runs.
pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option<&str>) -> Result<(), String> {
let reports: Vec<String> = if let Some(key) = report_key {
vec![key.to_string()]
} else {
// Find the most recent batch of reports
let mut keys: Vec<&String> = store.nodes.keys()
.filter(|k| k.starts_with("_consolidation-") && !k.contains("-actions-") && !k.contains("-log-"))
.collect();
keys.sort();
keys.reverse();
if keys.is_empty() { return Ok(()); }
let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string();
keys.into_iter()
.filter(|k| k.ends_with(&latest_ts))
.cloned()
.collect()
};
if reports.is_empty() {
println!("No consolidation reports found.");
return Ok(());
}
println!("Found {} reports:", reports.len());
let mut all_actions = Vec::new();
for key in &reports {
let content = store.nodes.get(key).map(|n| n.content.as_str()).unwrap_or("");
let actions = knowledge::parse_all_actions(content);
println!(" {}{} actions", key, actions.len());
all_actions.extend(actions);
}
if !do_apply {
println!("\nDRY RUN — {} actions parsed", all_actions.len());
for action in &all_actions {
match &action.kind {
knowledge::ActionKind::Link { source, target } =>
println!(" LINK {}{}", source, target),
knowledge::ActionKind::WriteNode { key, .. } =>
println!(" WRITE {}", key),
knowledge::ActionKind::Refine { key, .. } =>
println!(" REFINE {}", key),
knowledge::ActionKind::Demote { key } =>
println!(" DEMOTE {}", key),
knowledge::ActionKind::Delete { key } =>
println!(" DELETE {}", key),
}
}
println!("\nTo apply: poc-memory apply-consolidation --apply");
return Ok(());
}
let ts = store::compact_timestamp();
let mut applied = 0;
for action in &all_actions {
if knowledge::apply_action(store, action, "consolidate", &ts, 0) {
applied += 1;
}
}
if applied > 0 {
store.save()?;
}
println!("Applied: {}/{} actions", applied, all_actions.len());
Ok(())
}