From 6bc11e5fb6ee4e95ab47c239c55c9acebc855e43 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sun, 1 Mar 2026 07:14:03 -0500 Subject: [PATCH] consolidate-full: autonomous consolidation pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New commands: - `digest auto`: detect and generate missing daily/weekly/monthly digests bottom-up. Validates date format to skip non-date journal keys. Skips today (incomplete) and current week/month. - `consolidate-full`: full autonomous pipeline: 1. Plan (metrics → agent allocation) 2. Execute agents (batched Sonnet calls, 5 nodes per batch) 3. Apply consolidation actions 4. Generate missing digests 5. Apply digest links Logs everything to agent-results/consolidate-full.log Fix: separator agent prompt was including all interference pairs (1114 pairs = 1.3M chars) instead of truncating to batch size. First successful run: 862s, 6/8 agents, +100 relations, 91 digest links applied. --- src/digest.rs | 387 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 13 +- src/neuro.rs | 3 +- 3 files changed, 400 insertions(+), 3 deletions(-) diff --git a/src/digest.rs b/src/digest.rs index 6b37017..7fe1559 100644 --- a/src/digest.rs +++ b/src/digest.rs @@ -608,6 +608,393 @@ pub fn generate_monthly(store: &mut Store, month_arg: &str) -> Result<(), String Ok(()) } +// --- Digest auto: freshness detection + bottom-up generation --- + +/// Scan the store for dates/weeks/months that need digests and generate them. +/// Works bottom-up: daily first, then weekly (needs dailies), then monthly +/// (needs weeklies). Skips today (incomplete day). Skips already-existing +/// digests. +pub fn digest_auto(store: &mut Store) -> Result<(), String> { + let today = capnp_store::today(); + let epi = episodic_dir(); + + // --- Phase 1: find dates with journal entries but no daily digest --- + let date_re = Regex::new(r"^\d{4}-\d{2}-\d{2}").unwrap(); + let mut dates: std::collections::BTreeSet = std::collections::BTreeSet::new(); + for key in store.nodes.keys() { + // Keys like: journal.md#j-2026-02-28t23-39-... + if let Some(rest) = key.strip_prefix("journal.md#j-") { + if rest.len() >= 10 && date_re.is_match(rest) { + dates.insert(rest[..10].to_string()); + } + } + } + + let mut daily_generated = 0u32; + let mut daily_skipped = 0u32; + let mut daily_dates_done: Vec = Vec::new(); + + for date in &dates { + if date == &today { + continue; // don't digest an incomplete day + } + let path = epi.join(format!("daily-{}.md", date)); + if path.exists() { + daily_skipped += 1; + daily_dates_done.push(date.clone()); + continue; + } + println!("[auto] Missing daily digest for {}", date); + generate_daily(store, date)?; + daily_generated += 1; + daily_dates_done.push(date.clone()); + } + + println!("[auto] Daily: {} generated, {} already existed", + daily_generated, daily_skipped); + + // --- Phase 2: find complete weeks needing weekly digests --- + // A week is "ready" if its Sunday is before today and at least one + // daily digest exists for it. + + let mut weeks_seen: std::collections::BTreeMap> = std::collections::BTreeMap::new(); + for date in &daily_dates_done { + if let Ok((week_label, _week_dates)) = week_dates(date) { + weeks_seen.entry(week_label).or_default().push(date.clone()); + } + } + + let mut weekly_generated = 0u32; + let mut weekly_skipped = 0u32; + let mut weekly_labels_done: Vec = Vec::new(); + + for (week_label, example_dates) in &weeks_seen { + // Check if this week is complete (Sunday has passed) + if let Ok((_, week_day_list)) = week_dates(example_dates.first().unwrap()) { + let sunday = week_day_list.last().unwrap(); + if sunday >= &today { + continue; // week not over yet + } + } + + let path = epi.join(format!("weekly-{}.md", week_label)); + if path.exists() { + weekly_skipped += 1; + weekly_labels_done.push(week_label.clone()); + continue; + } + + // Check that at least some dailies exist for this week + let has_dailies = example_dates.iter().any(|d| + epi.join(format!("daily-{}.md", d)).exists() + ); + if !has_dailies { + continue; + } + + println!("[auto] Missing weekly digest for {}", week_label); + generate_weekly(store, example_dates.first().unwrap())?; + weekly_generated += 1; + weekly_labels_done.push(week_label.clone()); + } + + println!("[auto] Weekly: {} generated, {} already existed", + weekly_generated, weekly_skipped); + + // --- Phase 3: find complete months needing monthly digests --- + // A month is "ready" if the month is before the current month and at + // least one weekly digest exists for it. + + let (cur_y, cur_m, _, _, _, _) = capnp_store::epoch_to_local(capnp_store::now_epoch()); + let mut months_seen: std::collections::BTreeSet<(i32, u32)> = std::collections::BTreeSet::new(); + + for date in &daily_dates_done { + let parts: Vec<&str> = date.split('-').collect(); + if parts.len() >= 2 { + if let (Ok(y), Ok(m)) = (parts[0].parse::(), parts[1].parse::()) { + months_seen.insert((y, m)); + } + } + } + + let mut monthly_generated = 0u32; + let mut monthly_skipped = 0u32; + + for (y, m) in &months_seen { + // Skip current month (still in progress) + if *y == cur_y && *m == cur_m { + continue; + } + // Skip future months + if *y > cur_y || (*y == cur_y && *m > cur_m) { + continue; + } + + let label = format!("{}-{:02}", y, m); + let path = epi.join(format!("monthly-{}.md", label)); + if path.exists() { + monthly_skipped += 1; + continue; + } + + // Check that at least one weekly exists for this month + let week_labels = weeks_in_month(*y, *m); + let has_weeklies = week_labels.iter().any(|w| + epi.join(format!("weekly-{}.md", w)).exists() + ); + if !has_weeklies { + continue; + } + + println!("[auto] Missing monthly digest for {}", label); + generate_monthly(store, &label)?; + monthly_generated += 1; + } + + println!("[auto] Monthly: {} generated, {} already existed", + monthly_generated, monthly_skipped); + + let total = daily_generated + weekly_generated + monthly_generated; + if total == 0 { + println!("[auto] All digests up to date."); + } else { + println!("[auto] Generated {} total digests.", total); + } + Ok(()) +} + +// --- Consolidate full: plan → agents → apply → digests → links --- + +/// Run the full autonomous consolidation pipeline with logging. +/// +/// Steps: +/// 1. Plan: analyze metrics, allocate agents +/// 2. Execute: run each agent (Sonnet calls), save reports +/// 3. Apply: extract and apply actions from reports +/// 4. Digest: generate missing daily/weekly/monthly digests +/// 5. Links: apply links extracted from digests +/// 6. Summary: final metrics comparison +pub fn consolidate_full(store: &mut Store) -> Result<(), String> { + let start = std::time::Instant::now(); + let log_path = agent_results_dir().join("consolidate-full.log"); + let mut log = LogWriter::new(&log_path)?; + + log.write("=== CONSOLIDATE FULL ===")?; + log.write(&format!("Started: {}", capnp_store::format_datetime(capnp_store::now_epoch())))?; + log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?; + log.write("")?; + + // --- Step 1: Plan --- + log.write("--- Step 1: Plan ---")?; + let plan = neuro::consolidation_plan(store); + let plan_text = neuro::format_plan(&plan); + log.write(&plan_text)?; + println!("{}", plan_text); + + let total_agents = plan.replay_count + plan.linker_count + + plan.separator_count + plan.transfer_count + + if plan.run_health { 1 } else { 0 }; + log.write(&format!("Total agents to run: {}", total_agents))?; + + // --- Step 2: Execute agents --- + log.write("\n--- Step 2: Execute agents ---")?; + let mut reports: Vec = Vec::new(); + let mut agent_num = 0usize; + let mut agent_errors = 0usize; + + // Build the list of (agent_type, batch_size) runs + let mut runs: Vec<(&str, usize)> = Vec::new(); + if plan.run_health { + runs.push(("health", 0)); + } + if plan.replay_count > 0 { + // Split replay into batches of ~5 nodes each + let batch = 5; + let mut remaining = plan.replay_count; + while remaining > 0 { + let this_batch = remaining.min(batch); + runs.push(("replay", this_batch)); + remaining -= this_batch; + } + } + if plan.linker_count > 0 { + let batch = 5; + let mut remaining = plan.linker_count; + while remaining > 0 { + let this_batch = remaining.min(batch); + runs.push(("linker", this_batch)); + remaining -= this_batch; + } + } + if plan.separator_count > 0 { + // Separator works on pairs, run in batches + let batch = 5; + let mut remaining = plan.separator_count; + while remaining > 0 { + let this_batch = remaining.min(batch); + runs.push(("separator", this_batch)); + remaining -= this_batch; + } + } + if plan.transfer_count > 0 { + let batch = 5; + let mut remaining = plan.transfer_count; + while remaining > 0 { + let this_batch = remaining.min(batch); + runs.push(("transfer", this_batch)); + remaining -= this_batch; + } + } + + for (agent_type, count) in &runs { + agent_num += 1; + let label = if *count > 0 { + format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count) + } else { + format!("[{}/{}] {}", agent_num, runs.len(), agent_type) + }; + + log.write(&format!("\n{}", label))?; + println!("{}", label); + + // Reload store to pick up changes from previous agents + if agent_num > 1 { + *store = Store::load()?; + } + + let prompt = match neuro::agent_prompt(store, agent_type, *count) { + Ok(p) => p, + Err(e) => { + let msg = format!(" ERROR building prompt: {}", e); + log.write(&msg)?; + eprintln!("{}", msg); + agent_errors += 1; + continue; + } + }; + + log.write(&format!(" Prompt: {} chars (~{} tokens)", + prompt.len(), prompt.len() / 4))?; + + let response = match call_sonnet(&prompt, 300) { + Ok(r) => r, + Err(e) => { + let msg = format!(" ERROR from Sonnet: {}", e); + log.write(&msg)?; + eprintln!("{}", msg); + agent_errors += 1; + continue; + } + }; + + // Save report + let ts = capnp_store::format_datetime(capnp_store::now_epoch()) + .replace([':', '-', 'T'], ""); + let report_name = format!("consolidation-{}-{}.md", agent_type, ts); + let report_path = agent_results_dir().join(&report_name); + fs::write(&report_path, &response) + .map_err(|e| format!("write report: {}", e))?; + reports.push(report_path.clone()); + + let msg = format!(" Done: {} lines → {}", response.lines().count(), report_name); + log.write(&msg)?; + println!("{}", msg); + } + + log.write(&format!("\nAgents complete: {} run, {} errors", + agent_num - agent_errors, agent_errors))?; + + // --- Step 3: Apply consolidation actions --- + log.write("\n--- Step 3: Apply consolidation actions ---")?; + println!("\n--- Applying consolidation actions ---"); + *store = Store::load()?; + + if reports.is_empty() { + log.write(" No reports to apply.")?; + } else { + match apply_consolidation(store, true, None) { + Ok(()) => log.write(" Applied.")?, + Err(e) => { + let msg = format!(" ERROR applying consolidation: {}", e); + log.write(&msg)?; + eprintln!("{}", msg); + } + } + } + + // --- Step 4: Digest auto --- + log.write("\n--- Step 4: Digest auto ---")?; + println!("\n--- Generating missing digests ---"); + *store = Store::load()?; + + match digest_auto(store) { + Ok(()) => log.write(" Digests done.")?, + Err(e) => { + let msg = format!(" ERROR in digest auto: {}", e); + log.write(&msg)?; + eprintln!("{}", msg); + } + } + + // --- Step 5: Apply digest links --- + log.write("\n--- Step 5: Apply digest links ---")?; + println!("\n--- Applying digest links ---"); + *store = Store::load()?; + + let links = parse_all_digest_links(); + let (applied, skipped, fallbacks) = apply_digest_links(store, &links); + store.save()?; + log.write(&format!(" {} links applied, {} skipped, {} fallbacks", + applied, skipped, fallbacks))?; + + // --- Step 6: Summary --- + let elapsed = start.elapsed(); + log.write("\n--- Summary ---")?; + log.write(&format!("Finished: {}", capnp_store::format_datetime(capnp_store::now_epoch())))?; + log.write(&format!("Duration: {:.0}s", elapsed.as_secs_f64()))?; + *store = Store::load()?; + log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?; + + let summary = format!( + "\n=== CONSOLIDATE FULL COMPLETE ===\n\ + Duration: {:.0}s\n\ + Agents: {} run, {} errors\n\ + Nodes: {} Relations: {}\n\ + Log: {}\n", + elapsed.as_secs_f64(), + agent_num - agent_errors, agent_errors, + store.nodes.len(), store.relations.len(), + log_path.display(), + ); + log.write(&summary)?; + println!("{}", summary); + + Ok(()) +} + +/// Simple append-only log writer for consolidate-full. +struct LogWriter { + path: PathBuf, +} + +impl LogWriter { + fn new(path: &Path) -> Result { + // Truncate on start + fs::write(path, "").map_err(|e| format!("create log: {}", e))?; + Ok(LogWriter { path: path.to_path_buf() }) + } + + fn write(&mut self, line: &str) -> Result<(), String> { + use std::io::Write; + let mut f = fs::OpenOptions::new() + .append(true) + .open(&self.path) + .map_err(|e| format!("open log: {}", e))?; + writeln!(f, "{}", line) + .map_err(|e| format!("write log: {}", e)) + } +} + // --- Digest link parsing --- // Replaces digest-link-parser.py: parses ## Links sections from digest // files and applies them to the memory graph. diff --git a/src/main.rs b/src/main.rs index 2bc720a..17f68e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,6 +86,7 @@ fn main() { "link-add" => cmd_link_add(&args[2..]), "link-impact" => cmd_link_impact(&args[2..]), "consolidate-session" => cmd_consolidate_session(), + "consolidate-full" => cmd_consolidate_full(), "daily-check" => cmd_daily_check(), "apply-agent" => cmd_apply_agent(&args[2..]), "digest" => cmd_digest(&args[2..]), @@ -147,11 +148,13 @@ Commands: Add a link between two nodes link-impact SOURCE TARGET Simulate adding an edge, report topology impact consolidate-session Analyze metrics, plan agent allocation + consolidate-full Autonomous: plan → agents → apply → digests → links daily-check Brief metrics check (for cron/notifications) apply-agent [--all] Import pending agent results into the graph digest daily [DATE] Generate daily episodic digest (default: today) digest weekly [DATE] Generate weekly digest (any date in target week) digest monthly [YYYY-MM] Generate monthly digest (default: current month) + digest auto Generate all missing digests (daily→weekly→monthly) digest-links [--apply] Parse and apply links from digest files journal-enrich JSONL TEXT [LINE] Enrich journal entry with conversation links @@ -429,6 +432,11 @@ fn cmd_consolidate_session() -> Result<(), String> { Ok(()) } +fn cmd_consolidate_full() -> Result<(), String> { + let mut store = capnp_store::Store::load()?; + digest::consolidate_full(&mut store) +} + fn cmd_daily_check() -> Result<(), String> { let store = capnp_store::Store::load()?; let report = neuro::daily_check(&store); @@ -644,7 +652,7 @@ fn cmd_apply_agent(args: &[String]) -> Result<(), String> { fn cmd_digest(args: &[String]) -> Result<(), String> { if args.is_empty() { - return Err("Usage: poc-memory digest daily|weekly|monthly [DATE]".into()); + return Err("Usage: poc-memory digest daily|weekly|monthly|auto [DATE]".into()); } let mut store = capnp_store::Store::load()?; @@ -671,7 +679,8 @@ fn cmd_digest(args: &[String]) -> Result<(), String> { let month = if date_arg.is_empty() { "" } else { date_arg }; digest::generate_monthly(&mut store, month) } - _ => Err(format!("Unknown digest type: {}. Use: daily, weekly, monthly", args[0])), + "auto" => digest::digest_auto(&mut store), + _ => Err(format!("Unknown digest type: {}. Use: daily, weekly, monthly, auto", args[0])), } } diff --git a/src/neuro.rs b/src/neuro.rs index 6068c08..d9319c9 100644 --- a/src/neuro.rs +++ b/src/neuro.rs @@ -495,7 +495,8 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { - let pairs = detect_interference(store, &graph, 0.5); + let mut pairs = detect_interference(store, &graph, 0.5); + pairs.truncate(count); let pairs_section = format_pairs_section(&pairs, store, &graph); load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)]) }