consolidation: data-driven agent plan, drop transfer/connector/replay

Replace per-field ConsolidationPlan struct with HashMap<String, usize>
counts map. Agent types are no longer hardcoded in the struct — add
agents by adding entries to the map.

Active agents: linker, organize, distill, separator, split.
Removed: transfer (redundant with distill), connector (rethink later),
replay (not needed for current graph work).

Elo-based budget allocation now iterates the map instead of indexing
a fixed array. Status display and TUI adapted to show dynamic agent
lists.

memory-instructions-core v13: added protected nodes section — agents
must not rewrite core-personality, core-personality-detail, or
memory-instructions-core. They may add links but not modify content.
High-value neighbors should be treated with care.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-20 14:02:28 -04:00
parent d6c26e27fe
commit d20baafe9d
5 changed files with 116 additions and 224 deletions

View file

@ -46,9 +46,7 @@ pub fn consolidate_full_with_progress(
log_line(&mut log_buf, &plan_text); log_line(&mut log_buf, &plan_text);
println!("{}", plan_text); println!("{}", plan_text);
let total_agents = plan.replay_count + plan.linker_count let total_agents = plan.total();
+ plan.separator_count + plan.transfer_count
+ if plan.run_health { 1 } else { 0 };
log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents)); log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents));
// --- Step 2: Execute agents --- // --- Step 2: Execute agents ---

View file

@ -552,13 +552,7 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
sigma: snap.sigma, sigma: snap.sigma,
episodic_ratio, episodic_ratio,
interference: 0, interference: 0,
plan_replay: plan.replay_count, plan_counts: plan.counts,
plan_linker: plan.linker_count,
plan_separator: plan.separator_count,
plan_transfer: plan.transfer_count,
plan_organize: plan.organize_count,
plan_connector: plan.connector_count,
plan_distill: plan.distill_count,
plan_rationale: plan.rationale, plan_rationale: plan.rationale,
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
} }
@ -680,14 +674,8 @@ pub struct GraphHealth {
pub episodic_ratio: f32, // episodic/total nodes (target <0.4) pub episodic_ratio: f32, // episodic/total nodes (target <0.4)
pub interference: usize, // interfering pairs (target <50) pub interference: usize, // interfering pairs (target <50)
// Consolidation work estimate from plan // Consolidation work estimate from plan
pub plan_replay: usize,
pub plan_linker: usize,
pub plan_separator: usize,
pub plan_transfer: usize,
pub plan_organize: usize,
pub plan_connector: usize,
#[serde(default)] #[serde(default)]
pub plan_distill: usize, pub plan_counts: std::collections::HashMap<String, usize>,
pub plan_rationale: Vec<String>, pub plan_rationale: Vec<String>,
pub computed_at: String, pub computed_at: String,
} }
@ -1042,22 +1030,18 @@ pub fn run_daemon() -> Result<(), String> {
// Use cached graph health plan (from consolidation_plan_quick). // Use cached graph health plan (from consolidation_plan_quick).
let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above
let plan = crate::neuro::ConsolidationPlan { let plan = crate::neuro::ConsolidationPlan {
replay_count: h.plan_replay, counts: h.plan_counts.clone(),
linker_count: h.plan_linker,
separator_count: h.plan_separator,
transfer_count: h.plan_transfer,
organize_count: h.plan_organize,
connector_count: h.plan_connector,
distill_count: h.plan_distill,
run_health: true, run_health: true,
rationale: Vec::new(), rationale: Vec::new(),
}; };
let runs = plan.to_agent_runs(5); let runs = plan.to_agent_runs(5);
let summary: Vec<String> = h.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
log_event("scheduler", "consolidation-plan", log_event("scheduler", "consolidation-plan",
&format!("{} agents ({}r {}l {}s {}t {}d)", &format!("{} agents ({})", runs.len(), summary.join(" ")));
runs.len(), h.plan_replay, h.plan_linker,
h.plan_separator, h.plan_transfer, h.plan_distill));
// Phase 1: Agent runs — sequential within type, parallel across types. // Phase 1: Agent runs — sequential within type, parallel across types.
// Same-type agents chain (they may touch overlapping graph regions), // Same-type agents chain (they may touch overlapping graph regions),
@ -1076,10 +1060,10 @@ pub fn run_daemon() -> Result<(), String> {
.init(move |ctx| { .init(move |ctx| {
job_consolidation_agent(ctx, &agent, b, &in_flight_clone) job_consolidation_agent(ctx, &agent, b, &in_flight_clone)
}); });
if let Some(dep) = prev_by_type.get(*agent_type) { if let Some(dep) = prev_by_type.get(agent_type.as_str()) {
builder.depend_on(dep); builder.depend_on(dep);
} }
prev_by_type.insert(agent_type.to_string(), builder.run()); prev_by_type.insert(agent_type.clone(), builder.run());
} }
// Orphans phase depends on all agent type chains completing // Orphans phase depends on all agent type chains completing
let prev_agent = prev_by_type.into_values().last(); let prev_agent = prev_by_type.into_values().last();
@ -1501,9 +1485,13 @@ pub fn show_status() -> Result<(), String> {
indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0,
gh.sigma); gh.sigma);
let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + gh.plan_distill + 1; let plan_total: usize = gh.plan_counts.values().sum::<usize>() + 1;
eprintln!(" consolidation plan: {} agents ({}r {}l {}s {}t {}d +health)", let plan_summary: Vec<String> = gh.plan_counts.iter()
total, gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer, gh.plan_distill); .filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
eprintln!(" consolidation plan: {} agents ({} +health)",
plan_total, plan_summary.join(" "));
} }
eprintln!(); eprintln!();

View file

@ -180,8 +180,8 @@ pub fn cmd_evaluate_agents(matchups: usize, model: &str, dry_run: bool) -> Resul
let store = store::Store::load()?; let store = store::Store::load()?;
let agent_types: Vec<&str> = vec![ let agent_types: Vec<&str> = vec![
"linker", "organize", "replay", "connector", "linker", "organize", "distill", "separator",
"separator", "transfer", "distill", "rename", "split", "rename",
]; ];
// Load agent prompt files // Load agent prompt files

View file

@ -163,44 +163,54 @@ pub fn detect_interference(
.collect() .collect()
} }
/// Agent allocation from the control loop /// Agent allocation from the control loop.
/// Agent types and counts are data-driven — add agents by adding
/// entries to the counts map.
#[derive(Default)] #[derive(Default)]
pub struct ConsolidationPlan { pub struct ConsolidationPlan {
pub replay_count: usize, /// agent_name → run count
pub linker_count: usize, pub counts: std::collections::HashMap<String, usize>,
pub separator_count: usize,
pub transfer_count: usize,
pub organize_count: usize,
pub connector_count: usize,
pub distill_count: usize,
pub run_health: bool, pub run_health: bool,
pub rationale: Vec<String>, pub rationale: Vec<String>,
} }
impl ConsolidationPlan { impl ConsolidationPlan {
pub fn count(&self, agent: &str) -> usize {
self.counts.get(agent).copied().unwrap_or(0)
}
pub fn set(&mut self, agent: &str, count: usize) {
self.counts.insert(agent.to_string(), count);
}
pub fn add(&mut self, agent: &str, count: usize) {
*self.counts.entry(agent.to_string()).or_default() += count;
}
pub fn total(&self) -> usize {
self.counts.values().sum::<usize>() + if self.run_health { 1 } else { 0 }
}
/// Expand the plan into a flat list of (agent_name, batch_size) runs. /// Expand the plan into a flat list of (agent_name, batch_size) runs.
pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(&'static str, usize)> { /// Interleaves agent types so different types alternate.
pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(String, usize)> {
let mut runs = Vec::new(); let mut runs = Vec::new();
if self.run_health { if self.run_health {
runs.push(("health", 0)); runs.push(("health".to_string(), 0));
} }
// Build per-type batch lists, then interleave so different agent
// types alternate rather than running all-replay-then-all-linker. // Sort by count descending so high-volume agents interleave well
let types: [(&str, usize); 7] = [ let mut types: Vec<(&String, &usize)> = self.counts.iter()
("linker", self.linker_count), .filter(|(_, c)| **c > 0)
("organize", self.organize_count), .collect();
("distill", self.distill_count), types.sort_by(|a, b| b.1.cmp(a.1));
("replay", self.replay_count),
("connector", self.connector_count), let mut queues: Vec<Vec<(String, usize)>> = types.iter().map(|(name, count)| {
("separator", self.separator_count),
("transfer", self.transfer_count),
];
let mut queues: Vec<Vec<(&str, usize)>> = types.iter().map(|(name, count)| {
let mut q = Vec::new(); let mut q = Vec::new();
let mut remaining = *count; let mut remaining = **count;
while remaining > 0 { while remaining > 0 {
let batch = remaining.min(batch_size); let batch = remaining.min(batch_size);
q.push((*name, batch)); q.push((name.to_string(), batch));
remaining -= batch; remaining -= batch;
} }
q q
@ -211,7 +221,7 @@ impl ConsolidationPlan {
let mut added = false; let mut added = false;
for q in &mut queues { for q in &mut queues {
if let Some(run) = q.first() { if let Some(run) = q.first() {
runs.push(*run); runs.push(run.clone());
q.remove(0); q.remove(0);
added = true; added = true;
} }
@ -253,146 +263,81 @@ fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> Consolidation
else { episodic_count as f32 / store.nodes.len() as f32 }; else { episodic_count as f32 / store.nodes.len() as f32 };
let mut plan = ConsolidationPlan { let mut plan = ConsolidationPlan {
replay_count: 0, counts: std::collections::HashMap::new(),
linker_count: 0,
separator_count: 0,
transfer_count: 0,
organize_count: 0,
connector_count: 0,
distill_count: 0,
run_health: true, run_health: true,
rationale: Vec::new(), rationale: Vec::new(),
}; };
// Active agent types
let agent_types = ["linker", "organize", "distill", "separator", "split"];
// Target: α ≥ 2.5 (healthy scale-free) // Target: α ≥ 2.5 (healthy scale-free)
if alpha < 2.0 { if alpha < 2.0 {
plan.replay_count += 50; plan.add("linker", 100);
plan.linker_count += 100;
plan.rationale.push(format!( plan.rationale.push(format!(
"α={:.2} (target ≥2.5): extreme hub dominance → 50 replay + 100 linker", "α={:.2} (target ≥2.5): extreme hub dominance → 100 linker", alpha));
alpha));
} else if alpha < 2.5 { } else if alpha < 2.5 {
plan.replay_count += 25; plan.add("linker", 50);
plan.linker_count += 50;
plan.rationale.push(format!( plan.rationale.push(format!(
"α={:.2} (target ≥2.5): moderate hub dominance → 25 replay + 50 linker", "α={:.2} (target ≥2.5): moderate hub dominance → 50 linker", alpha));
alpha));
} else { } else {
plan.replay_count += 10; plan.add("linker", 20);
plan.linker_count += 20;
plan.rationale.push(format!( plan.rationale.push(format!(
"α={:.2}: healthy — 10 replay + 20 linker for maintenance", alpha)); "α={:.2}: healthy — 20 linker for maintenance", alpha));
} }
// Target: Gini ≤ 0.4 // Target: Gini ≤ 0.4
// High Gini means degree inequality — most nodes under-connected.
// Linker fixes this by adding edges to low-degree nodes.
if gini > 0.5 { if gini > 0.5 {
plan.replay_count += 10; plan.add("linker", 50);
plan.linker_count += 50;
plan.rationale.push(format!( plan.rationale.push(format!(
"Gini={:.3} (target ≤0.4): high inequality → +10 replay + 50 linker", "Gini={:.3} (target ≤0.4): high inequality → +50 linker", gini));
gini));
} }
// Target: avg CC ≥ 0.2 // Interference: separator disambiguates confusable nodes
if avg_cc < 0.1 {
plan.replay_count += 5;
plan.rationale.push(format!(
"CC={:.3} (target ≥0.2): very poor integration → +5 replay",
avg_cc));
} else if avg_cc < 0.2 {
plan.replay_count += 2;
plan.rationale.push(format!(
"CC={:.3} (target ≥0.2): low integration → +2 replay",
avg_cc));
}
// Interference: >100 pairs is a lot, <10 is clean
if interference_count > 100 { if interference_count > 100 {
plan.separator_count += 10; plan.add("separator", 10);
plan.rationale.push(format!( plan.rationale.push(format!(
"Interference: {} pairs (target <50) → 10 separator", "Interference: {} pairs (target <50) → 10 separator", interference_count));
interference_count));
} else if interference_count > 20 { } else if interference_count > 20 {
plan.separator_count += 5; plan.add("separator", 5);
plan.rationale.push(format!( plan.rationale.push(format!(
"Interference: {} pairs (target <50) → 5 separator", "Interference: {} pairs → 5 separator", interference_count));
interference_count));
} else if interference_count > 0 { } else if interference_count > 0 {
plan.separator_count += interference_count.min(3); plan.add("separator", interference_count.min(3));
plan.rationale.push(format!(
"Interference: {} pairs → {} separator",
interference_count, plan.separator_count));
}
// Episodic → semantic transfer
if episodic_ratio > 0.6 {
plan.transfer_count += 10;
plan.rationale.push(format!(
"Episodic ratio: {:.0}% ({}/{}) → 10 transfer",
episodic_ratio * 100.0, episodic_count, store.nodes.len()));
} else if episodic_ratio > 0.4 {
plan.transfer_count += 5;
plan.rationale.push(format!(
"Episodic ratio: {:.0}% → 5 transfer",
episodic_ratio * 100.0));
} }
// Organize: proportional to linker — synthesizes what linker connects // Organize: proportional to linker — synthesizes what linker connects
plan.organize_count = plan.linker_count / 2; let linker = plan.count("linker");
plan.set("organize", linker / 2);
plan.rationale.push(format!( plan.rationale.push(format!(
"Organize: {} (half of linker count)", plan.organize_count)); "Organize: {} (half of linker count)", plan.count("organize")));
// Distill: core concept maintenance — at least as much as organize // Distill: core concept maintenance
// High gini means hubs need refinement; low alpha means hubs are overloaded let organize = plan.count("organize");
plan.distill_count = plan.organize_count; let mut distill = organize;
if gini > 0.4 { if gini > 0.4 { distill += 20; }
plan.distill_count += 20; if alpha < 2.0 { distill += 20; }
} plan.set("distill", distill);
if alpha < 2.0 {
plan.distill_count += 20;
}
plan.rationale.push(format!( plan.rationale.push(format!(
"Distill: {} (synthesize hub content)", plan.distill_count)); "Distill: {} (synthesize hub content)", plan.count("distill")));
// Connector: bridges fragmented communities // Split: handle oversized nodes
let community_count = graph.community_count(); plan.set("split", 5);
let nodes_per_community = if community_count > 0 {
store.nodes.len() / community_count
} else { 0 };
if nodes_per_community < 5 {
plan.connector_count += 20;
plan.rationale.push(format!(
"Communities fragmented ({} communities, {:.1} nodes/community) → 20 connector",
community_count, nodes_per_community));
} else if nodes_per_community < 10 {
plan.connector_count += 10;
plan.rationale.push(format!(
"Communities moderate ({:.1} nodes/community) → 10 connector",
nodes_per_community));
}
// Distribute agent budget using Elo ratings // Distribute agent budget using Elo ratings
let budget = crate::config::get().agent_budget; let budget = crate::config::get().agent_budget;
let elo_path = crate::config::get().data_dir.join("agent-elo.json"); let elo_path = crate::config::get().data_dir.join("agent-elo.json");
if let Ok(elo_json) = std::fs::read_to_string(&elo_path) { if let Ok(elo_json) = std::fs::read_to_string(&elo_path) {
if let Ok(ratings) = serde_json::from_str::<std::collections::HashMap<String, f64>>(&elo_json) { if let Ok(ratings) = serde_json::from_str::<std::collections::HashMap<String, f64>>(&elo_json) {
let types = [ let elos: Vec<f64> = agent_types.iter()
"replay", "linker", "separator", "transfer",
"organize", "connector", "distill",
];
let elos: Vec<f64> = types.iter()
.map(|t| ratings.get(*t).copied().unwrap_or(1000.0)) .map(|t| ratings.get(*t).copied().unwrap_or(1000.0))
.collect(); .collect();
let min_elo = elos.iter().copied().fold(f64::MAX, f64::min); let min_elo = elos.iter().copied().fold(f64::MAX, f64::min);
// Square the shifted ratings for unfair distribution —
// top agents get disproportionately more runs
let weights: Vec<f64> = elos.iter() let weights: Vec<f64> = elos.iter()
.map(|e| { .map(|e| {
let shifted = e - min_elo + 50.0; // lowest gets 50 let shifted = e - min_elo + 50.0;
shifted * shifted // square for power-law distribution shifted * shifted
}) })
.collect(); .collect();
let total_weight: f64 = weights.iter().sum(); let total_weight: f64 = weights.iter().sum();
@ -401,30 +346,22 @@ fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> Consolidation
((w / total_weight * budget as f64).round() as usize).max(2) ((w / total_weight * budget as f64).round() as usize).max(2)
}; };
plan.replay_count = allocate(weights[0]); for (i, agent) in agent_types.iter().enumerate() {
plan.linker_count = allocate(weights[1]); plan.set(agent, allocate(weights[i]));
plan.separator_count = allocate(weights[2]); }
plan.transfer_count = allocate(weights[3]);
plan.organize_count = allocate(weights[4]);
plan.connector_count = allocate(weights[5]);
plan.distill_count = allocate(weights[6]);
let summary: Vec<String> = agent_types.iter()
.map(|a| format!("{}={}", a, plan.count(a)))
.collect();
plan.rationale.push(format!( plan.rationale.push(format!(
"Elo allocation (budget={}): replay={} linker={} separator={} transfer={} organize={} connector={} distill={}", "Elo allocation (budget={}): {}", budget, summary.join(" ")));
budget,
plan.replay_count, plan.linker_count, plan.separator_count,
plan.transfer_count, plan.organize_count, plan.connector_count, plan.distill_count));
} }
} else { } else {
// No Elo file — use budget with equal distribution // No Elo file — use budget with equal distribution
let per_type = budget / 7; let per_type = budget / agent_types.len();
plan.replay_count = per_type; for agent in &agent_types {
plan.linker_count = per_type; plan.set(agent, per_type);
plan.separator_count = per_type; }
plan.transfer_count = per_type;
plan.organize_count = per_type;
plan.connector_count = per_type;
plan.distill_count = per_type;
plan.rationale.push(format!( plan.rationale.push(format!(
"No Elo ratings — equal distribution ({} each, budget={})", per_type, budget)); "No Elo ratings — equal distribution ({} each, budget={})", per_type, budget));
} }
@ -443,51 +380,19 @@ pub fn format_plan(plan: &ConsolidationPlan) -> String {
out.push_str("\nAgent allocation:\n"); out.push_str("\nAgent allocation:\n");
if plan.run_health { if plan.run_health {
out.push_str(" 1. health — system audit\n"); out.push_str(" 1. health — system audit\n");
} }
let mut step = 2; let mut step = 2;
if plan.replay_count > 0 { let mut sorted: Vec<_> = plan.counts.iter()
out.push_str(&format!(" {}. replay ×{:2} — schema assimilation + lateral linking\n", .filter(|(_, c)| **c > 0)
step, plan.replay_count)); .collect();
sorted.sort_by(|a, b| b.1.cmp(a.1));
for (agent, count) in &sorted {
out.push_str(&format!(" {}. {} ×{}\n", step, agent, count));
step += 1; step += 1;
} }
if plan.linker_count > 0 {
out.push_str(&format!(" {}. linker ×{:2} — relational binding from episodes\n",
step, plan.linker_count));
step += 1;
}
if plan.separator_count > 0 {
out.push_str(&format!(" {}. separator ×{} — pattern separation\n",
step, plan.separator_count));
step += 1;
}
if plan.transfer_count > 0 {
out.push_str(&format!(" {}. transfer ×{:2} — episodic→semantic extraction\n",
step, plan.transfer_count));
step += 1;
}
if plan.organize_count > 0 {
out.push_str(&format!(" {}. organize ×{:2} — hub creation + knowledge synthesis\n",
step, plan.organize_count));
step += 1;
}
if plan.connector_count > 0 {
out.push_str(&format!(" {}. connector ×{} — cross-cluster bridging\n",
step, plan.connector_count));
step += 1;
}
if plan.distill_count > 0 {
out.push_str(&format!(" {}. distill ×{:2} — hub content synthesis + refinement\n",
step, plan.distill_count));
}
let total = plan.replay_count + plan.linker_count
+ plan.separator_count + plan.transfer_count
+ plan.organize_count + plan.connector_count
+ plan.distill_count
+ if plan.run_health { 1 } else { 0 };
out.push_str(&format!("\nTotal agent runs: {}\n", total));
out.push_str(&format!("\nTotal agent runs: {}\n", plan.total()));
out out
} }

View file

@ -29,8 +29,8 @@ const POLL_INTERVAL: Duration = Duration::from_secs(2);
// Agent types we know about, in display order // Agent types we know about, in display order
const AGENT_TYPES: &[&str] = &[ const AGENT_TYPES: &[&str] = &[
"health", "replay", "linker", "separator", "transfer", "health", "linker", "organize", "distill", "separator", "split",
"apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", "split", "apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename",
]; ];
fn log_path() -> PathBuf { fn log_path() -> PathBuf {
@ -536,17 +536,18 @@ fn render_health(frame: &mut Frame, gh: &GraphHealth, area: Rect) {
); );
// Plan // Plan
let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + gh.plan_distill + 1; let plan_total: usize = gh.plan_counts.values().sum::<usize>() + 1;
let plan_summary: Vec<String> = gh.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
let plan_line = Line::from(vec![ let plan_line = Line::from(vec![
Span::raw(" plan: "), Span::raw(" plan: "),
Span::styled( Span::styled(
format!("{}", total), format!("{}", plan_total),
Style::default().add_modifier(Modifier::BOLD), Style::default().add_modifier(Modifier::BOLD),
), ),
Span::raw(format!( Span::raw(format!(" agents ({} +health)", plan_summary.join(" "))),
" agents ({}r {}l {}s {}t {}d +health)",
gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer, gh.plan_distill
)),
]); ]);
frame.render_widget(Paragraph::new(plan_line), plan_area); frame.render_widget(Paragraph::new(plan_line), plan_area);
} }