consolidate: eliminate second LLM call, apply actions inline

The consolidation pipeline previously made a second Sonnet call to
extract structured JSON actions from agent reports. This was both
wasteful (extra LLM call per consolidation) and lossy (only extracted
links and manual items, ignoring WRITE_NODE/REFINE).

Now actions are parsed and applied inline after each agent runs, using
the same parse_all_actions() parser as the knowledge loop. The daemon
scheduler's separate apply phase is also removed.

Also deletes 8 superseded/orphaned prompt .md files (784 lines) that
have been replaced by .agent files.
This commit is contained in:
ProofOfConcept 2026-03-10 17:22:53 -04:00
parent 42d8e265da
commit f6ea659975
11 changed files with 119 additions and 1024 deletions

View file

@ -2,18 +2,21 @@
//
// consolidate_full() runs the full autonomous consolidation:
// 1. Plan: analyze metrics, allocate agents
// 2. Execute: run each agent (Sonnet calls), save reports
// 3. Apply: extract and apply actions from reports
// 2. Execute: run each agent, parse + apply actions inline
// 3. Graph maintenance (orphans, degree cap)
// 4. Digest: generate missing daily/weekly/monthly digests
// 5. Links: apply links extracted from digests
// 6. Summary: final metrics comparison
//
// apply_consolidation() processes consolidation reports independently.
// Actions are parsed directly from agent output using the same parser
// as the knowledge loop (WRITE_NODE, LINK, REFINE), eliminating the
// second LLM call that was previously needed.
use super::digest;
use super::llm::{call_sonnet, parse_json_response};
use super::llm::call_sonnet;
use super::knowledge;
use crate::neuro;
use crate::store::{self, Store, new_relation};
use crate::store::{self, Store};
/// Append a line to the log buffer.
@ -57,9 +60,10 @@ pub fn consolidate_full_with_progress(
// --- Step 2: Execute agents ---
log_line(&mut log_buf, "\n--- Step 2: Execute agents ---");
let mut reports: Vec<String> = Vec::new();
let mut agent_num = 0usize;
let mut agent_errors = 0usize;
let mut total_applied = 0usize;
let mut total_actions = 0usize;
// Build the list of (agent_type, batch_size) runs
let mut runs: Vec<(&str, usize)> = Vec::new();
@ -123,13 +127,24 @@ pub fn consolidate_full_with_progress(
}
};
// Store report as a node
// Store report as a node (for audit trail)
let ts = store::format_datetime(store::now_epoch())
.replace([':', '-', 'T'], "");
let report_key = format!("_consolidation-{}-{}", agent_type, ts);
store.upsert_provenance(&report_key, &response,
store::Provenance::AgentConsolidate).ok();
reports.push(report_key.clone());
// Parse and apply actions inline — same parser as knowledge loop
let actions = knowledge::parse_all_actions(&response);
let no_ops = knowledge::count_no_ops(&response);
let mut applied = 0;
for action in &actions {
if knowledge::apply_action(store, action, agent_type, &ts, 0) {
applied += 1;
}
}
total_actions += actions.len();
total_applied += applied;
// Record visits for successfully processed nodes
if !agent_batch.node_keys.is_empty() {
@ -138,36 +153,19 @@ pub fn consolidate_full_with_progress(
}
}
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key);
let msg = format!(" Done: {} actions ({} applied, {} no-ops) → {}",
actions.len(), applied, no_ops, report_key);
log_line(&mut log_buf, &msg);
on_progress(&msg);
println!("{}", msg);
}
log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors",
agent_num - agent_errors, agent_errors));
log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors, {} actions ({} applied)",
agent_num - agent_errors, agent_errors, total_actions, total_applied));
store.save()?;
// --- Step 3: Apply consolidation actions ---
log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---");
on_progress("applying actions");
println!("\n--- Applying consolidation actions ---");
*store = Store::load()?;
if reports.is_empty() {
log_line(&mut log_buf, " No reports to apply.");
} else {
match apply_consolidation(store, true, None) {
Ok(()) => log_line(&mut log_buf, " Applied."),
Err(e) => {
let msg = format!(" ERROR applying consolidation: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
}
}
}
// --- Step 3b: Link orphans ---
log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---");
// --- Step 3: Link orphans ---
log_line(&mut log_buf, "\n--- Step 3: Link orphans ---");
on_progress("linking orphans");
println!("\n--- Linking orphan nodes ---");
*store = Store::load()?;
@ -175,8 +173,8 @@ pub fn consolidate_full_with_progress(
let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15);
log_line(&mut log_buf, &format!(" {} orphans, {} links added", lo_orphans, lo_added));
// --- Step 3c: Cap degree ---
log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---");
// --- Step 3b: Cap degree ---
log_line(&mut log_buf, "\n--- Step 3b: Cap degree ---");
on_progress("capping degree");
println!("\n--- Capping node degree ---");
*store = Store::load()?;
@ -244,166 +242,64 @@ pub fn consolidate_full_with_progress(
Ok(())
}
/// Find the most recent set of consolidation report keys from the store.
fn find_consolidation_reports(store: &Store) -> Vec<String> {
let mut keys: Vec<&String> = store.nodes.keys()
.filter(|k| k.starts_with("_consolidation-"))
.collect();
keys.sort();
keys.reverse();
if keys.is_empty() { return Vec::new(); }
// Group by timestamp (last segment after last '-')
let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string();
keys.into_iter()
.filter(|k| k.ends_with(&latest_ts))
.cloned()
.collect()
}
fn build_consolidation_prompt(store: &Store, report_keys: &[String]) -> Result<String, String> {
let mut report_text = String::new();
for key in report_keys {
let content = store.nodes.get(key)
.map(|n| n.content.as_str())
.unwrap_or("");
report_text.push_str(&format!("\n{}\n## Report: {}\n\n{}\n",
"=".repeat(60), key, content));
}
super::prompts::load_prompt("consolidation", &[("{{REPORTS}}", &report_text)])
}
/// Run the full apply-consolidation pipeline.
/// Re-parse and apply actions from stored consolidation reports.
/// This is for manually re-processing reports — during normal consolidation,
/// actions are applied inline as each agent runs.
pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option<&str>) -> Result<(), String> {
let reports = if let Some(key) = report_key {
let reports: Vec<String> = if let Some(key) = report_key {
vec![key.to_string()]
} else {
find_consolidation_reports(store)
// Find the most recent batch of reports
let mut keys: Vec<&String> = store.nodes.keys()
.filter(|k| k.starts_with("_consolidation-") && !k.contains("-actions-") && !k.contains("-log-"))
.collect();
keys.sort();
keys.reverse();
if keys.is_empty() { return Ok(()); }
let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string();
keys.into_iter()
.filter(|k| k.ends_with(&latest_ts))
.cloned()
.collect()
};
if reports.is_empty() {
println!("No consolidation reports found.");
println!("Run consolidation-agents first.");
return Ok(());
}
println!("Found {} reports:", reports.len());
for r in &reports {
println!(" {}", r);
let mut all_actions = Vec::new();
for key in &reports {
let content = store.nodes.get(key).map(|n| n.content.as_str()).unwrap_or("");
let actions = knowledge::parse_all_actions(content);
println!(" {}{} actions", key, actions.len());
all_actions.extend(actions);
}
println!("\nExtracting actions from reports...");
let prompt = build_consolidation_prompt(store, &reports)?;
println!(" Prompt: {} chars", prompt.len());
let response = call_sonnet("consolidate", &prompt)?;
let actions_value = parse_json_response(&response)?;
let actions = actions_value.as_array()
.ok_or("expected JSON array of actions")?;
println!(" {} actions extracted", actions.len());
// Store actions in the store
let timestamp = store::format_datetime(store::now_epoch())
.replace([':', '-'], "");
let actions_key = format!("_consolidation-actions-{}", timestamp);
let actions_json = serde_json::to_string_pretty(&actions_value).unwrap();
store.upsert_provenance(&actions_key, &actions_json,
store::Provenance::AgentConsolidate).ok();
println!(" Stored: {}", actions_key);
let link_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("link"))
.collect();
let manual_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("manual"))
.collect();
if !do_apply {
// Dry run
println!("\n{}", "=".repeat(60));
println!("DRY RUN — {} actions proposed", actions.len());
println!("{}\n", "=".repeat(60));
if !link_actions.is_empty() {
println!("## Links to add ({})\n", link_actions.len());
for (i, a) in link_actions.iter().enumerate() {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("?");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("?");
let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or("");
println!(" {:2}. {}{} ({})", i + 1, src, tgt, reason);
println!("\nDRY RUN — {} actions parsed", all_actions.len());
for action in &all_actions {
match &action.kind {
knowledge::ActionKind::Link { source, target } =>
println!(" LINK {}{}", source, target),
knowledge::ActionKind::WriteNode { key, .. } =>
println!(" WRITE {}", key),
knowledge::ActionKind::Refine { key, .. } =>
println!(" REFINE {}", key),
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions needed ({})\n", manual_actions.len());
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
}
}
println!("\n{}", "=".repeat(60));
println!("To apply: poc-memory apply-consolidation --apply");
println!("{}", "=".repeat(60));
println!("\nTo apply: poc-memory apply-consolidation --apply");
return Ok(());
}
// Apply
let mut applied = 0usize;
let mut skipped = 0usize;
if !link_actions.is_empty() {
println!("\nApplying {} links...", link_actions.len());
for a in &link_actions {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("");
if src.is_empty() || tgt.is_empty() { skipped += 1; continue; }
let source = match store.resolve_key(src) {
Ok(s) => s,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
let target = match store.resolve_key(tgt) {
Ok(t) => t,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
// Refine target to best-matching section
let source_content = store.nodes.get(&source)
.map(|n| n.content.as_str()).unwrap_or("");
let target = neuro::refine_target(store, source_content, &target);
let exists = store.relations.iter().any(|r|
r.source_key == source && r.target_key == target && !r.deleted
);
if exists { skipped += 1; continue; }
let source_uuid = match store.nodes.get(&source) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let target_uuid = match store.nodes.get(&target) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Auto,
0.5,
&source, &target,
);
if store.add_relation(rel).is_ok() {
println!(" + {}{}", source, target);
applied += 1;
}
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions (not auto-applied):\n");
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
let ts = store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], "");
let mut applied = 0;
for action in &all_actions {
if knowledge::apply_action(store, action, "consolidate", &ts, 0) {
applied += 1;
}
}
@ -411,9 +307,6 @@ pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option
store.save()?;
}
println!("\n{}", "=".repeat(60));
println!("Applied: {} Skipped: {} Manual: {}", applied, skipped, manual_actions.len());
println!("{}", "=".repeat(60));
println!("Applied: {}/{} actions", applied, all_actions.len());
Ok(())
}

View file

@ -149,6 +149,15 @@ fn job_consolidation_agent(
store.upsert_provenance(&report_key, &response,
crate::store::Provenance::AgentConsolidate).ok();
// Parse and apply actions inline
let actions = super::knowledge::parse_all_actions(&response);
let mut applied = 0;
for action in &actions {
if super::knowledge::apply_action(&mut store, action, &agent, &ts, 0) {
applied += 1;
}
}
// Record visits for successfully processed nodes
if !agent_batch.node_keys.is_empty() {
if let Err(e) = store.record_agent_visits(&agent_batch.node_keys, &agent) {
@ -156,7 +165,8 @@ fn job_consolidation_agent(
}
}
ctx.log_line(&format!("done: {} lines → {}", response.lines().count(), report_key));
ctx.log_line(&format!("done: {} actions ({} applied) → {}",
actions.len(), applied, report_key));
Ok(())
})
}
@ -455,16 +465,6 @@ fn job_split_one(
})
}
/// Apply consolidation actions from recent reports.
fn job_consolidation_apply(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-apply", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("applying consolidation actions");
super::consolidate::apply_consolidation(&mut store, true, None)
})
}
/// Link orphan nodes (CPU-heavy, no LLM).
fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-orphans", || {
@ -1174,31 +1174,23 @@ pub fn run_daemon() -> Result<(), String> {
prev_agent = Some(builder.run());
}
// Phase 2: Apply actions from agent reports
let mut apply = choir_sched.spawn(format!("c-apply:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |ctx| job_consolidation_apply(ctx));
if let Some(ref dep) = prev_agent {
apply.depend_on(dep);
}
let apply = apply.run();
// Phase 3: Link orphans (CPU-only, no LLM)
// Phase 2: Link orphans (CPU-only, no LLM)
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
.retries(1)
.init(move |ctx| job_link_orphans(ctx));
orphans.depend_on(&apply);
if let Some(ref dep) = prev_agent {
orphans.depend_on(dep);
}
let orphans = orphans.run();
// Phase 4: Cap degree
// Phase 3: Cap degree
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
.retries(1)
.init(move |ctx| job_cap_degree(ctx));
cap.depend_on(&orphans);
let cap = cap.run();
// Phase 5: Generate digests
// Phase 4: Generate digests
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
.resource(&llm_sched)
.retries(1)
@ -1206,7 +1198,7 @@ pub fn run_daemon() -> Result<(), String> {
digest.depend_on(&cap);
let digest = digest.run();
// Phase 6: Apply digest links
// Phase 5: Apply digest links
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
.retries(1)
.init(move |ctx| job_digest_links(ctx));

View file

@ -387,8 +387,14 @@ pub fn split_extract_prompt(store: &Store, parent_key: &str, child_key: &str, ch
])
}
/// Run agent consolidation on top-priority nodes
/// Show consolidation batch status or generate an agent prompt.
pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> {
if auto {
let batch = agent_prompt(store, "replay", count)?;
println!("{}", batch.prompt);
return Ok(());
}
let graph = store.build_graph();
let items = replay_queue(store, count);
@ -397,46 +403,34 @@ pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<()
return Ok(());
}
let nodes_section = format_nodes_section(store, &items, &graph);
if auto {
let prompt = load_prompt("replay", &[("{{NODES}}", &nodes_section)])?;
println!("{}", prompt);
} else {
// Interactive: show what needs attention and available agent types
println!("Consolidation batch ({} nodes):\n", items.len());
for item in &items {
let node_type = store.nodes.get(&item.key)
.map(|n| if matches!(n.node_type, crate::store::NodeType::EpisodicSession) { "episodic" } else { "semantic" })
.unwrap_or("?");
println!(" [{:.3}] {} (cc={:.3}, interval={}d, type={})",
item.priority, item.key, item.cc, item.interval_days, node_type);
}
// Also show interference pairs
let pairs = detect_interference(store, &graph, 0.6);
if !pairs.is_empty() {
println!("\nInterfering pairs ({}):", pairs.len());
for (a, b, sim) in pairs.iter().take(5) {
println!(" [{:.3}] {}{}", sim, a, b);
}
}
println!("\nAgent prompts:");
println!(" --auto Generate replay agent prompt");
println!(" --agent replay Replay agent (schema assimilation)");
println!(" --agent linker Linker agent (relational binding)");
println!(" --agent separator Separator agent (pattern separation)");
println!(" --agent transfer Transfer agent (CLS episodic→semantic)");
println!(" --agent health Health agent (synaptic homeostasis)");
println!("Consolidation batch ({} nodes):\n", items.len());
for item in &items {
let node_type = store.nodes.get(&item.key)
.map(|n| if matches!(n.node_type, crate::store::NodeType::EpisodicSession) { "episodic" } else { "semantic" })
.unwrap_or("?");
println!(" [{:.3}] {} (cc={:.3}, interval={}d, type={})",
item.priority, item.key, item.cc, item.interval_days, node_type);
}
let pairs = detect_interference(store, &graph, 0.6);
if !pairs.is_empty() {
println!("\nInterfering pairs ({}):", pairs.len());
for (a, b, sim) in pairs.iter().take(5) {
println!(" [{:.3}] {}{}", sim, a, b);
}
}
println!("\nAgent prompts:");
println!(" --auto Generate replay agent prompt");
println!(" --agent replay Replay agent (schema assimilation)");
println!(" --agent linker Linker agent (relational binding)");
println!(" --agent separator Separator agent (pattern separation)");
println!(" --agent transfer Transfer agent (CLS episodic→semantic)");
println!(" --agent health Health agent (synaptic homeostasis)");
Ok(())
}
/// Generate a specific agent prompt with filled-in data.
/// Returns an AgentBatch with the prompt text and the keys of nodes
/// selected for processing (for visit tracking on success).
pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<AgentBatch, String> {
let def = super::defs::get_def(agent)
.ok_or_else(|| format!("Unknown agent: {}", agent))?;