consciousness/src/consolidate.rs
Kent Overstreet f4364e299c replace libc date math with chrono, extract memory_subdir helper
- date_to_epoch, iso_week_info, weeks_in_month: replaced unsafe libc
  (mktime, strftime, localtime_r) with chrono NaiveDate and IsoWeek
- epoch_to_local: replaced unsafe libc localtime_r with chrono Local
- New util.rs with memory_subdir() helper: ensures subdir exists and
  propagates errors instead of silently ignoring them
- Removed three duplicate agent_results_dir() definitions across
  digest.rs, consolidate.rs, enrich.rs
- load_digest_files, parse_all_digest_links, find_consolidation_reports
  now return Result to properly propagate directory creation errors

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-03-03 17:23:43 -05:00

487 lines
17 KiB
Rust

// Consolidation pipeline: plan → agents → apply → digests → links
//
// consolidate_full() runs the full autonomous consolidation:
// 1. Plan: analyze metrics, allocate agents
// 2. Execute: run each agent (Sonnet calls), save reports
// 3. Apply: extract and apply actions from reports
// 4. Digest: generate missing daily/weekly/monthly digests
// 5. Links: apply links extracted from digests
// 6. Summary: final metrics comparison
//
// apply_consolidation() processes consolidation reports independently.
use crate::digest;
use crate::llm::{call_sonnet, parse_json_response};
use crate::neuro;
use crate::store::{self, Store, new_relation};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use crate::util::memory_subdir;
/// Simple append-only log writer for consolidate-full.
struct LogWriter {
path: PathBuf,
}
impl LogWriter {
fn new(path: &Path) -> Result<Self, String> {
fs::write(path, "").map_err(|e| format!("create log: {}", e))?;
Ok(LogWriter { path: path.to_path_buf() })
}
fn write(&mut self, line: &str) -> Result<(), String> {
let mut f = fs::OpenOptions::new()
.append(true)
.open(&self.path)
.map_err(|e| format!("open log: {}", e))?;
writeln!(f, "{}", line)
.map_err(|e| format!("write log: {}", e))
}
}
/// Run the full autonomous consolidation pipeline with logging.
pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
let start = std::time::Instant::now();
let log_path = memory_subdir("agent-results")?.join("consolidate-full.log");
let mut log = LogWriter::new(&log_path)?;
log.write("=== CONSOLIDATE FULL ===")?;
log.write(&format!("Started: {}", store::format_datetime(store::now_epoch())))?;
log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?;
log.write("")?;
// --- Step 1: Plan ---
log.write("--- Step 1: Plan ---")?;
let plan = neuro::consolidation_plan(store);
let plan_text = neuro::format_plan(&plan);
log.write(&plan_text)?;
println!("{}", plan_text);
let total_agents = plan.replay_count + plan.linker_count
+ plan.separator_count + plan.transfer_count
+ if plan.run_health { 1 } else { 0 };
log.write(&format!("Total agents to run: {}", total_agents))?;
// --- Step 2: Execute agents ---
log.write("\n--- Step 2: Execute agents ---")?;
let mut reports: Vec<PathBuf> = Vec::new();
let mut agent_num = 0usize;
let mut agent_errors = 0usize;
// Build the list of (agent_type, batch_size) runs
let mut runs: Vec<(&str, usize)> = Vec::new();
if plan.run_health {
runs.push(("health", 0));
}
if plan.replay_count > 0 {
let batch = 5;
let mut remaining = plan.replay_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("replay", this_batch));
remaining -= this_batch;
}
}
if plan.linker_count > 0 {
let batch = 5;
let mut remaining = plan.linker_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("linker", this_batch));
remaining -= this_batch;
}
}
if plan.separator_count > 0 {
let batch = 5;
let mut remaining = plan.separator_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("separator", this_batch));
remaining -= this_batch;
}
}
if plan.transfer_count > 0 {
let batch = 5;
let mut remaining = plan.transfer_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("transfer", this_batch));
remaining -= this_batch;
}
}
for (agent_type, count) in &runs {
agent_num += 1;
let label = if *count > 0 {
format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count)
} else {
format!("[{}/{}] {}", agent_num, runs.len(), agent_type)
};
log.write(&format!("\n{}", label))?;
println!("{}", label);
// Reload store to pick up changes from previous agents
if agent_num > 1 {
*store = Store::load()?;
}
let prompt = match neuro::agent_prompt(store, agent_type, *count) {
Ok(p) => p,
Err(e) => {
let msg = format!(" ERROR building prompt: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
log.write(&format!(" Prompt: {} chars (~{} tokens)",
prompt.len(), prompt.len() / 4))?;
let response = match call_sonnet(&prompt, 300) {
Ok(r) => r,
Err(e) => {
let msg = format!(" ERROR from Sonnet: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
// Save report
let ts = store::format_datetime(store::now_epoch())
.replace([':', '-', 'T'], "");
let report_name = format!("consolidation-{}-{}.md", agent_type, ts);
let report_path = memory_subdir("agent-results")?.join(&report_name);
fs::write(&report_path, &response)
.map_err(|e| format!("write report: {}", e))?;
reports.push(report_path.clone());
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_name);
log.write(&msg)?;
println!("{}", msg);
}
log.write(&format!("\nAgents complete: {} run, {} errors",
agent_num - agent_errors, agent_errors))?;
// --- Step 3: Apply consolidation actions ---
log.write("\n--- Step 3: Apply consolidation actions ---")?;
println!("\n--- Applying consolidation actions ---");
*store = Store::load()?;
if reports.is_empty() {
log.write(" No reports to apply.")?;
} else {
match apply_consolidation(store, true, None) {
Ok(()) => log.write(" Applied.")?,
Err(e) => {
let msg = format!(" ERROR applying consolidation: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
}
}
}
// --- Step 3b: Link orphans ---
log.write("\n--- Step 3b: Link orphans ---")?;
println!("\n--- Linking orphan nodes ---");
*store = Store::load()?;
let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15);
log.write(&format!(" {} orphans, {} links added", lo_orphans, lo_added))?;
// --- Step 3c: Cap degree ---
log.write("\n--- Step 3c: Cap degree ---")?;
println!("\n--- Capping node degree ---");
*store = Store::load()?;
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
log.write(&format!(" {} hubs capped, {} edges pruned", hubs, pruned))?;
}
Err(e) => log.write(&format!(" ERROR: {}", e))?,
}
// --- Step 4: Digest auto ---
log.write("\n--- Step 4: Digest auto ---")?;
println!("\n--- Generating missing digests ---");
*store = Store::load()?;
match digest::digest_auto(store) {
Ok(()) => log.write(" Digests done.")?,
Err(e) => {
let msg = format!(" ERROR in digest auto: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
}
}
// --- Step 5: Apply digest links ---
log.write("\n--- Step 5: Apply digest links ---")?;
println!("\n--- Applying digest links ---");
*store = Store::load()?;
let links = digest::parse_all_digest_links()?;
let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links);
store.save()?;
log.write(&format!(" {} links applied, {} skipped, {} fallbacks",
applied, skipped, fallbacks))?;
// --- Step 6: Summary ---
let elapsed = start.elapsed();
log.write("\n--- Summary ---")?;
log.write(&format!("Finished: {}", store::format_datetime(store::now_epoch())))?;
log.write(&format!("Duration: {:.0}s", elapsed.as_secs_f64()))?;
*store = Store::load()?;
log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?;
let summary = format!(
"\n=== CONSOLIDATE FULL COMPLETE ===\n\
Duration: {:.0}s\n\
Agents: {} run, {} errors\n\
Nodes: {} Relations: {}\n\
Log: {}\n",
elapsed.as_secs_f64(),
agent_num - agent_errors, agent_errors,
store.nodes.len(), store.relations.len(),
log_path.display(),
);
log.write(&summary)?;
println!("{}", summary);
Ok(())
}
/// Find the most recent set of consolidation reports.
fn find_consolidation_reports() -> Result<Vec<PathBuf>, String> {
let dir = memory_subdir("agent-results")?;
let mut reports: Vec<PathBuf> = fs::read_dir(&dir)
.map(|entries| {
entries.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("consolidation-") && n.ends_with(".md"))
.unwrap_or(false)
})
.collect()
})
.unwrap_or_default();
reports.sort();
reports.reverse();
if reports.is_empty() { return Ok(reports); }
// Group by timestamp (last segment of stem before .md)
let latest_ts = reports[0].file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.rsplit('-').next().unwrap_or("")
.to_string();
reports.retain(|r| {
r.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.ends_with(latest_ts.as_str())
});
Ok(reports)
}
fn build_consolidation_prompt(reports: &[PathBuf]) -> Result<String, String> {
let mut report_text = String::new();
for r in reports {
let content = fs::read_to_string(r)
.map_err(|e| format!("read {}: {}", r.display(), e))?;
report_text.push_str(&format!("\n{}\n## Report: {}\n\n{}\n",
"=".repeat(60),
r.file_stem().and_then(|s| s.to_str()).unwrap_or(""),
content));
}
neuro::load_prompt("consolidation", &[("{{REPORTS}}", &report_text)])
}
/// Run the full apply-consolidation pipeline.
pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_file: Option<&str>) -> Result<(), String> {
let reports = if let Some(path) = report_file {
vec![PathBuf::from(path)]
} else {
find_consolidation_reports()?
};
if reports.is_empty() {
println!("No consolidation reports found.");
println!("Run consolidation-agents first.");
return Ok(());
}
println!("Found {} reports:", reports.len());
for r in &reports {
println!(" {}", r.file_name().and_then(|s| s.to_str()).unwrap_or("?"));
}
println!("\nExtracting actions from reports...");
let prompt = build_consolidation_prompt(&reports)?;
println!(" Prompt: {} chars", prompt.len());
let response = call_sonnet(&prompt, 300)?;
let actions_value = parse_json_response(&response)?;
let actions = actions_value.as_array()
.ok_or("expected JSON array of actions")?;
println!(" {} actions extracted", actions.len());
// Save actions
let timestamp = store::format_datetime(store::now_epoch())
.replace([':', '-'], "");
let actions_path = memory_subdir("agent-results")?
.join(format!("consolidation-actions-{}.json", timestamp));
fs::write(&actions_path, serde_json::to_string_pretty(&actions_value).unwrap())
.map_err(|e| format!("write {}: {}", actions_path.display(), e))?;
println!(" Saved: {}", actions_path.display());
let link_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("link"))
.collect();
let cat_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("categorize"))
.collect();
let manual_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("manual"))
.collect();
if !do_apply {
// Dry run
println!("\n{}", "=".repeat(60));
println!("DRY RUN — {} actions proposed", actions.len());
println!("{}\n", "=".repeat(60));
if !link_actions.is_empty() {
println!("## Links to add ({})\n", link_actions.len());
for (i, a) in link_actions.iter().enumerate() {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("?");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("?");
let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or("");
println!(" {:2}. {}{} ({})", i + 1, src, tgt, reason);
}
}
if !cat_actions.is_empty() {
println!("\n## Categories to set ({})\n", cat_actions.len());
for a in &cat_actions {
let key = a.get("key").and_then(|v| v.as_str()).unwrap_or("?");
let cat = a.get("category").and_then(|v| v.as_str()).unwrap_or("?");
let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or("");
println!(" {}{} ({})", key, cat, reason);
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions needed ({})\n", manual_actions.len());
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
}
}
println!("\n{}", "=".repeat(60));
println!("To apply: poc-memory apply-consolidation --apply");
println!("{}", "=".repeat(60));
return Ok(());
}
// Apply
let mut applied = 0usize;
let mut skipped = 0usize;
if !link_actions.is_empty() {
println!("\nApplying {} links...", link_actions.len());
for a in &link_actions {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("");
if src.is_empty() || tgt.is_empty() { skipped += 1; continue; }
let source = match store.resolve_key(src) {
Ok(s) => s,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
let target = match store.resolve_key(tgt) {
Ok(t) => t,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
// Refine target to best-matching section
let source_content = store.nodes.get(&source)
.map(|n| n.content.as_str()).unwrap_or("");
let target = neuro::refine_target(store, source_content, &target);
let exists = store.relations.iter().any(|r|
r.source_key == source && r.target_key == target && !r.deleted
);
if exists { skipped += 1; continue; }
let source_uuid = match store.nodes.get(&source) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let target_uuid = match store.nodes.get(&target) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Auto,
0.5,
&source, &target,
);
if store.add_relation(rel).is_ok() {
println!(" + {}{}", source, target);
applied += 1;
}
}
}
if !cat_actions.is_empty() {
println!("\nApplying {} categorizations...", cat_actions.len());
for a in &cat_actions {
let key = a.get("key").and_then(|v| v.as_str()).unwrap_or("");
let cat = a.get("category").and_then(|v| v.as_str()).unwrap_or("");
if key.is_empty() || cat.is_empty() { continue; }
let resolved = match store.resolve_key(key) {
Ok(r) => r,
Err(_) => { println!(" ? {}{}: not found", key, cat); skipped += 1; continue; }
};
if store.categorize(&resolved, cat).is_ok() {
println!(" + {}{}", resolved, cat);
applied += 1;
} else {
skipped += 1;
}
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions (not auto-applied):\n");
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
}
}
if applied > 0 {
store.save()?;
}
println!("\n{}", "=".repeat(60));
println!("Applied: {} Skipped: {} Manual: {}", applied, skipped, manual_actions.len());
println!("{}", "=".repeat(60));
Ok(())
}