consolidate-full: autonomous consolidation pipeline

New commands:
- `digest auto`: detect and generate missing daily/weekly/monthly
  digests bottom-up. Validates date format to skip non-date journal
  keys. Skips today (incomplete) and current week/month.
- `consolidate-full`: full autonomous pipeline:
  1. Plan (metrics → agent allocation)
  2. Execute agents (batched Sonnet calls, 5 nodes per batch)
  3. Apply consolidation actions
  4. Generate missing digests
  5. Apply digest links
  Logs everything to agent-results/consolidate-full.log

Fix: separator agent prompt was including all interference pairs
(1114 pairs = 1.3M chars) instead of truncating to batch size.

First successful run: 862s, 6/8 agents, +100 relations, 91 digest
links applied.
This commit is contained in:
ProofOfConcept 2026-03-01 07:14:03 -05:00
parent c7e7cfb7af
commit 6bc11e5fb6
3 changed files with 400 additions and 3 deletions

View file

@ -608,6 +608,393 @@ pub fn generate_monthly(store: &mut Store, month_arg: &str) -> Result<(), String
Ok(()) Ok(())
} }
// --- Digest auto: freshness detection + bottom-up generation ---
/// Scan the store for dates/weeks/months that need digests and generate them.
/// Works bottom-up: daily first, then weekly (needs dailies), then monthly
/// (needs weeklies). Skips today (incomplete day). Skips already-existing
/// digests.
pub fn digest_auto(store: &mut Store) -> Result<(), String> {
let today = capnp_store::today();
let epi = episodic_dir();
// --- Phase 1: find dates with journal entries but no daily digest ---
let date_re = Regex::new(r"^\d{4}-\d{2}-\d{2}").unwrap();
let mut dates: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
for key in store.nodes.keys() {
// Keys like: journal.md#j-2026-02-28t23-39-...
if let Some(rest) = key.strip_prefix("journal.md#j-") {
if rest.len() >= 10 && date_re.is_match(rest) {
dates.insert(rest[..10].to_string());
}
}
}
let mut daily_generated = 0u32;
let mut daily_skipped = 0u32;
let mut daily_dates_done: Vec<String> = Vec::new();
for date in &dates {
if date == &today {
continue; // don't digest an incomplete day
}
let path = epi.join(format!("daily-{}.md", date));
if path.exists() {
daily_skipped += 1;
daily_dates_done.push(date.clone());
continue;
}
println!("[auto] Missing daily digest for {}", date);
generate_daily(store, date)?;
daily_generated += 1;
daily_dates_done.push(date.clone());
}
println!("[auto] Daily: {} generated, {} already existed",
daily_generated, daily_skipped);
// --- Phase 2: find complete weeks needing weekly digests ---
// A week is "ready" if its Sunday is before today and at least one
// daily digest exists for it.
let mut weeks_seen: std::collections::BTreeMap<String, Vec<String>> = std::collections::BTreeMap::new();
for date in &daily_dates_done {
if let Ok((week_label, _week_dates)) = week_dates(date) {
weeks_seen.entry(week_label).or_default().push(date.clone());
}
}
let mut weekly_generated = 0u32;
let mut weekly_skipped = 0u32;
let mut weekly_labels_done: Vec<String> = Vec::new();
for (week_label, example_dates) in &weeks_seen {
// Check if this week is complete (Sunday has passed)
if let Ok((_, week_day_list)) = week_dates(example_dates.first().unwrap()) {
let sunday = week_day_list.last().unwrap();
if sunday >= &today {
continue; // week not over yet
}
}
let path = epi.join(format!("weekly-{}.md", week_label));
if path.exists() {
weekly_skipped += 1;
weekly_labels_done.push(week_label.clone());
continue;
}
// Check that at least some dailies exist for this week
let has_dailies = example_dates.iter().any(|d|
epi.join(format!("daily-{}.md", d)).exists()
);
if !has_dailies {
continue;
}
println!("[auto] Missing weekly digest for {}", week_label);
generate_weekly(store, example_dates.first().unwrap())?;
weekly_generated += 1;
weekly_labels_done.push(week_label.clone());
}
println!("[auto] Weekly: {} generated, {} already existed",
weekly_generated, weekly_skipped);
// --- Phase 3: find complete months needing monthly digests ---
// A month is "ready" if the month is before the current month and at
// least one weekly digest exists for it.
let (cur_y, cur_m, _, _, _, _) = capnp_store::epoch_to_local(capnp_store::now_epoch());
let mut months_seen: std::collections::BTreeSet<(i32, u32)> = std::collections::BTreeSet::new();
for date in &daily_dates_done {
let parts: Vec<&str> = date.split('-').collect();
if parts.len() >= 2 {
if let (Ok(y), Ok(m)) = (parts[0].parse::<i32>(), parts[1].parse::<u32>()) {
months_seen.insert((y, m));
}
}
}
let mut monthly_generated = 0u32;
let mut monthly_skipped = 0u32;
for (y, m) in &months_seen {
// Skip current month (still in progress)
if *y == cur_y && *m == cur_m {
continue;
}
// Skip future months
if *y > cur_y || (*y == cur_y && *m > cur_m) {
continue;
}
let label = format!("{}-{:02}", y, m);
let path = epi.join(format!("monthly-{}.md", label));
if path.exists() {
monthly_skipped += 1;
continue;
}
// Check that at least one weekly exists for this month
let week_labels = weeks_in_month(*y, *m);
let has_weeklies = week_labels.iter().any(|w|
epi.join(format!("weekly-{}.md", w)).exists()
);
if !has_weeklies {
continue;
}
println!("[auto] Missing monthly digest for {}", label);
generate_monthly(store, &label)?;
monthly_generated += 1;
}
println!("[auto] Monthly: {} generated, {} already existed",
monthly_generated, monthly_skipped);
let total = daily_generated + weekly_generated + monthly_generated;
if total == 0 {
println!("[auto] All digests up to date.");
} else {
println!("[auto] Generated {} total digests.", total);
}
Ok(())
}
// --- Consolidate full: plan → agents → apply → digests → links ---
/// Run the full autonomous consolidation pipeline with logging.
///
/// Steps:
/// 1. Plan: analyze metrics, allocate agents
/// 2. Execute: run each agent (Sonnet calls), save reports
/// 3. Apply: extract and apply actions from reports
/// 4. Digest: generate missing daily/weekly/monthly digests
/// 5. Links: apply links extracted from digests
/// 6. Summary: final metrics comparison
pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
let start = std::time::Instant::now();
let log_path = agent_results_dir().join("consolidate-full.log");
let mut log = LogWriter::new(&log_path)?;
log.write("=== CONSOLIDATE FULL ===")?;
log.write(&format!("Started: {}", capnp_store::format_datetime(capnp_store::now_epoch())))?;
log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?;
log.write("")?;
// --- Step 1: Plan ---
log.write("--- Step 1: Plan ---")?;
let plan = neuro::consolidation_plan(store);
let plan_text = neuro::format_plan(&plan);
log.write(&plan_text)?;
println!("{}", plan_text);
let total_agents = plan.replay_count + plan.linker_count
+ plan.separator_count + plan.transfer_count
+ if plan.run_health { 1 } else { 0 };
log.write(&format!("Total agents to run: {}", total_agents))?;
// --- Step 2: Execute agents ---
log.write("\n--- Step 2: Execute agents ---")?;
let mut reports: Vec<PathBuf> = Vec::new();
let mut agent_num = 0usize;
let mut agent_errors = 0usize;
// Build the list of (agent_type, batch_size) runs
let mut runs: Vec<(&str, usize)> = Vec::new();
if plan.run_health {
runs.push(("health", 0));
}
if plan.replay_count > 0 {
// Split replay into batches of ~5 nodes each
let batch = 5;
let mut remaining = plan.replay_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("replay", this_batch));
remaining -= this_batch;
}
}
if plan.linker_count > 0 {
let batch = 5;
let mut remaining = plan.linker_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("linker", this_batch));
remaining -= this_batch;
}
}
if plan.separator_count > 0 {
// Separator works on pairs, run in batches
let batch = 5;
let mut remaining = plan.separator_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("separator", this_batch));
remaining -= this_batch;
}
}
if plan.transfer_count > 0 {
let batch = 5;
let mut remaining = plan.transfer_count;
while remaining > 0 {
let this_batch = remaining.min(batch);
runs.push(("transfer", this_batch));
remaining -= this_batch;
}
}
for (agent_type, count) in &runs {
agent_num += 1;
let label = if *count > 0 {
format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count)
} else {
format!("[{}/{}] {}", agent_num, runs.len(), agent_type)
};
log.write(&format!("\n{}", label))?;
println!("{}", label);
// Reload store to pick up changes from previous agents
if agent_num > 1 {
*store = Store::load()?;
}
let prompt = match neuro::agent_prompt(store, agent_type, *count) {
Ok(p) => p,
Err(e) => {
let msg = format!(" ERROR building prompt: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
log.write(&format!(" Prompt: {} chars (~{} tokens)",
prompt.len(), prompt.len() / 4))?;
let response = match call_sonnet(&prompt, 300) {
Ok(r) => r,
Err(e) => {
let msg = format!(" ERROR from Sonnet: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
// Save report
let ts = capnp_store::format_datetime(capnp_store::now_epoch())
.replace([':', '-', 'T'], "");
let report_name = format!("consolidation-{}-{}.md", agent_type, ts);
let report_path = agent_results_dir().join(&report_name);
fs::write(&report_path, &response)
.map_err(|e| format!("write report: {}", e))?;
reports.push(report_path.clone());
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_name);
log.write(&msg)?;
println!("{}", msg);
}
log.write(&format!("\nAgents complete: {} run, {} errors",
agent_num - agent_errors, agent_errors))?;
// --- Step 3: Apply consolidation actions ---
log.write("\n--- Step 3: Apply consolidation actions ---")?;
println!("\n--- Applying consolidation actions ---");
*store = Store::load()?;
if reports.is_empty() {
log.write(" No reports to apply.")?;
} else {
match apply_consolidation(store, true, None) {
Ok(()) => log.write(" Applied.")?,
Err(e) => {
let msg = format!(" ERROR applying consolidation: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
}
}
}
// --- Step 4: Digest auto ---
log.write("\n--- Step 4: Digest auto ---")?;
println!("\n--- Generating missing digests ---");
*store = Store::load()?;
match digest_auto(store) {
Ok(()) => log.write(" Digests done.")?,
Err(e) => {
let msg = format!(" ERROR in digest auto: {}", e);
log.write(&msg)?;
eprintln!("{}", msg);
}
}
// --- Step 5: Apply digest links ---
log.write("\n--- Step 5: Apply digest links ---")?;
println!("\n--- Applying digest links ---");
*store = Store::load()?;
let links = parse_all_digest_links();
let (applied, skipped, fallbacks) = apply_digest_links(store, &links);
store.save()?;
log.write(&format!(" {} links applied, {} skipped, {} fallbacks",
applied, skipped, fallbacks))?;
// --- Step 6: Summary ---
let elapsed = start.elapsed();
log.write("\n--- Summary ---")?;
log.write(&format!("Finished: {}", capnp_store::format_datetime(capnp_store::now_epoch())))?;
log.write(&format!("Duration: {:.0}s", elapsed.as_secs_f64()))?;
*store = Store::load()?;
log.write(&format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()))?;
let summary = format!(
"\n=== CONSOLIDATE FULL COMPLETE ===\n\
Duration: {:.0}s\n\
Agents: {} run, {} errors\n\
Nodes: {} Relations: {}\n\
Log: {}\n",
elapsed.as_secs_f64(),
agent_num - agent_errors, agent_errors,
store.nodes.len(), store.relations.len(),
log_path.display(),
);
log.write(&summary)?;
println!("{}", summary);
Ok(())
}
/// Simple append-only log writer for consolidate-full.
struct LogWriter {
path: PathBuf,
}
impl LogWriter {
fn new(path: &Path) -> Result<Self, String> {
// Truncate on start
fs::write(path, "").map_err(|e| format!("create log: {}", e))?;
Ok(LogWriter { path: path.to_path_buf() })
}
fn write(&mut self, line: &str) -> Result<(), String> {
use std::io::Write;
let mut f = fs::OpenOptions::new()
.append(true)
.open(&self.path)
.map_err(|e| format!("open log: {}", e))?;
writeln!(f, "{}", line)
.map_err(|e| format!("write log: {}", e))
}
}
// --- Digest link parsing --- // --- Digest link parsing ---
// Replaces digest-link-parser.py: parses ## Links sections from digest // Replaces digest-link-parser.py: parses ## Links sections from digest
// files and applies them to the memory graph. // files and applies them to the memory graph.

View file

@ -86,6 +86,7 @@ fn main() {
"link-add" => cmd_link_add(&args[2..]), "link-add" => cmd_link_add(&args[2..]),
"link-impact" => cmd_link_impact(&args[2..]), "link-impact" => cmd_link_impact(&args[2..]),
"consolidate-session" => cmd_consolidate_session(), "consolidate-session" => cmd_consolidate_session(),
"consolidate-full" => cmd_consolidate_full(),
"daily-check" => cmd_daily_check(), "daily-check" => cmd_daily_check(),
"apply-agent" => cmd_apply_agent(&args[2..]), "apply-agent" => cmd_apply_agent(&args[2..]),
"digest" => cmd_digest(&args[2..]), "digest" => cmd_digest(&args[2..]),
@ -147,11 +148,13 @@ Commands:
Add a link between two nodes Add a link between two nodes
link-impact SOURCE TARGET Simulate adding an edge, report topology impact link-impact SOURCE TARGET Simulate adding an edge, report topology impact
consolidate-session Analyze metrics, plan agent allocation consolidate-session Analyze metrics, plan agent allocation
consolidate-full Autonomous: plan agents apply digests links
daily-check Brief metrics check (for cron/notifications) daily-check Brief metrics check (for cron/notifications)
apply-agent [--all] Import pending agent results into the graph apply-agent [--all] Import pending agent results into the graph
digest daily [DATE] Generate daily episodic digest (default: today) digest daily [DATE] Generate daily episodic digest (default: today)
digest weekly [DATE] Generate weekly digest (any date in target week) digest weekly [DATE] Generate weekly digest (any date in target week)
digest monthly [YYYY-MM] Generate monthly digest (default: current month) digest monthly [YYYY-MM] Generate monthly digest (default: current month)
digest auto Generate all missing digests (dailyweeklymonthly)
digest-links [--apply] Parse and apply links from digest files digest-links [--apply] Parse and apply links from digest files
journal-enrich JSONL TEXT [LINE] journal-enrich JSONL TEXT [LINE]
Enrich journal entry with conversation links Enrich journal entry with conversation links
@ -429,6 +432,11 @@ fn cmd_consolidate_session() -> Result<(), String> {
Ok(()) Ok(())
} }
fn cmd_consolidate_full() -> Result<(), String> {
let mut store = capnp_store::Store::load()?;
digest::consolidate_full(&mut store)
}
fn cmd_daily_check() -> Result<(), String> { fn cmd_daily_check() -> Result<(), String> {
let store = capnp_store::Store::load()?; let store = capnp_store::Store::load()?;
let report = neuro::daily_check(&store); let report = neuro::daily_check(&store);
@ -644,7 +652,7 @@ fn cmd_apply_agent(args: &[String]) -> Result<(), String> {
fn cmd_digest(args: &[String]) -> Result<(), String> { fn cmd_digest(args: &[String]) -> Result<(), String> {
if args.is_empty() { if args.is_empty() {
return Err("Usage: poc-memory digest daily|weekly|monthly [DATE]".into()); return Err("Usage: poc-memory digest daily|weekly|monthly|auto [DATE]".into());
} }
let mut store = capnp_store::Store::load()?; let mut store = capnp_store::Store::load()?;
@ -671,7 +679,8 @@ fn cmd_digest(args: &[String]) -> Result<(), String> {
let month = if date_arg.is_empty() { "" } else { date_arg }; let month = if date_arg.is_empty() { "" } else { date_arg };
digest::generate_monthly(&mut store, month) digest::generate_monthly(&mut store, month)
} }
_ => Err(format!("Unknown digest type: {}. Use: daily, weekly, monthly", args[0])), "auto" => digest::digest_auto(&mut store),
_ => Err(format!("Unknown digest type: {}. Use: daily, weekly, monthly, auto", args[0])),
} }
} }

View file

@ -495,7 +495,8 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)]) load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])
} }
"separator" => { "separator" => {
let pairs = detect_interference(store, &graph, 0.5); let mut pairs = detect_interference(store, &graph, 0.5);
pairs.truncate(count);
let pairs_section = format_pairs_section(&pairs, store, &graph); let pairs_section = format_pairs_section(&pairs, store, &graph);
load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)]) load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)])
} }