From d20baafe9d2173b85dff136e338bd2baee7ecaf8 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Fri, 20 Mar 2026 14:02:28 -0400 Subject: [PATCH] consolidation: data-driven agent plan, drop transfer/connector/replay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace per-field ConsolidationPlan struct with HashMap counts map. Agent types are no longer hardcoded in the struct — add agents by adding entries to the map. Active agents: linker, organize, distill, separator, split. Removed: transfer (redundant with distill), connector (rethink later), replay (not needed for current graph work). Elo-based budget allocation now iterates the map instead of indexing a fixed array. Status display and TUI adapted to show dynamic agent lists. memory-instructions-core v13: added protected nodes section — agents must not rewrite core-personality, core-personality-detail, or memory-instructions-core. They may add links but not modify content. High-value neighbors should be treated with care. Co-Authored-By: Claude Opus 4.6 (1M context) --- poc-memory/src/agents/consolidate.rs | 4 +- poc-memory/src/agents/daemon.rs | 46 ++--- poc-memory/src/cli/agent.rs | 4 +- poc-memory/src/neuro/scoring.rs | 269 +++++++++------------------ poc-memory/src/tui.rs | 17 +- 5 files changed, 116 insertions(+), 224 deletions(-) diff --git a/poc-memory/src/agents/consolidate.rs b/poc-memory/src/agents/consolidate.rs index fca539c..2b40b6e 100644 --- a/poc-memory/src/agents/consolidate.rs +++ b/poc-memory/src/agents/consolidate.rs @@ -46,9 +46,7 @@ pub fn consolidate_full_with_progress( log_line(&mut log_buf, &plan_text); println!("{}", plan_text); - let total_agents = plan.replay_count + plan.linker_count - + plan.separator_count + plan.transfer_count - + if plan.run_health { 1 } else { 0 }; + let total_agents = plan.total(); log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents)); // --- Step 2: Execute agents --- diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 5137fe4..845c1d6 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -552,13 +552,7 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { sigma: snap.sigma, episodic_ratio, interference: 0, - plan_replay: plan.replay_count, - plan_linker: plan.linker_count, - plan_separator: plan.separator_count, - plan_transfer: plan.transfer_count, - plan_organize: plan.organize_count, - plan_connector: plan.connector_count, - plan_distill: plan.distill_count, + plan_counts: plan.counts, plan_rationale: plan.rationale, computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), } @@ -680,14 +674,8 @@ pub struct GraphHealth { pub episodic_ratio: f32, // episodic/total nodes (target <0.4) pub interference: usize, // interfering pairs (target <50) // Consolidation work estimate from plan - pub plan_replay: usize, - pub plan_linker: usize, - pub plan_separator: usize, - pub plan_transfer: usize, - pub plan_organize: usize, - pub plan_connector: usize, #[serde(default)] - pub plan_distill: usize, + pub plan_counts: std::collections::HashMap, pub plan_rationale: Vec, pub computed_at: String, } @@ -1042,22 +1030,18 @@ pub fn run_daemon() -> Result<(), String> { // Use cached graph health plan (from consolidation_plan_quick). let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above let plan = crate::neuro::ConsolidationPlan { - replay_count: h.plan_replay, - linker_count: h.plan_linker, - separator_count: h.plan_separator, - transfer_count: h.plan_transfer, - organize_count: h.plan_organize, - connector_count: h.plan_connector, - distill_count: h.plan_distill, + counts: h.plan_counts.clone(), run_health: true, rationale: Vec::new(), }; let runs = plan.to_agent_runs(5); + let summary: Vec = h.plan_counts.iter() + .filter(|(_, c)| **c > 0) + .map(|(a, c)| format!("{}{}", &a[..1], c)) + .collect(); log_event("scheduler", "consolidation-plan", - &format!("{} agents ({}r {}l {}s {}t {}d)", - runs.len(), h.plan_replay, h.plan_linker, - h.plan_separator, h.plan_transfer, h.plan_distill)); + &format!("{} agents ({})", runs.len(), summary.join(" "))); // Phase 1: Agent runs — sequential within type, parallel across types. // Same-type agents chain (they may touch overlapping graph regions), @@ -1076,10 +1060,10 @@ pub fn run_daemon() -> Result<(), String> { .init(move |ctx| { job_consolidation_agent(ctx, &agent, b, &in_flight_clone) }); - if let Some(dep) = prev_by_type.get(*agent_type) { + if let Some(dep) = prev_by_type.get(agent_type.as_str()) { builder.depend_on(dep); } - prev_by_type.insert(agent_type.to_string(), builder.run()); + prev_by_type.insert(agent_type.clone(), builder.run()); } // Orphans phase depends on all agent type chains completing let prev_agent = prev_by_type.into_values().last(); @@ -1501,9 +1485,13 @@ pub fn show_status() -> Result<(), String> { indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, gh.sigma); - let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + gh.plan_distill + 1; - eprintln!(" consolidation plan: {} agents ({}r {}l {}s {}t {}d +health)", - total, gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer, gh.plan_distill); + let plan_total: usize = gh.plan_counts.values().sum::() + 1; + let plan_summary: Vec = gh.plan_counts.iter() + .filter(|(_, c)| **c > 0) + .map(|(a, c)| format!("{}{}", &a[..1], c)) + .collect(); + eprintln!(" consolidation plan: {} agents ({} +health)", + plan_total, plan_summary.join(" ")); } eprintln!(); diff --git a/poc-memory/src/cli/agent.rs b/poc-memory/src/cli/agent.rs index 8a6354d..b5ad55b 100644 --- a/poc-memory/src/cli/agent.rs +++ b/poc-memory/src/cli/agent.rs @@ -180,8 +180,8 @@ pub fn cmd_evaluate_agents(matchups: usize, model: &str, dry_run: bool) -> Resul let store = store::Store::load()?; let agent_types: Vec<&str> = vec![ - "linker", "organize", "replay", "connector", - "separator", "transfer", "distill", "rename", + "linker", "organize", "distill", "separator", + "split", "rename", ]; // Load agent prompt files diff --git a/poc-memory/src/neuro/scoring.rs b/poc-memory/src/neuro/scoring.rs index e4c8885..e0a60d6 100644 --- a/poc-memory/src/neuro/scoring.rs +++ b/poc-memory/src/neuro/scoring.rs @@ -163,44 +163,54 @@ pub fn detect_interference( .collect() } -/// Agent allocation from the control loop +/// Agent allocation from the control loop. +/// Agent types and counts are data-driven — add agents by adding +/// entries to the counts map. #[derive(Default)] pub struct ConsolidationPlan { - pub replay_count: usize, - pub linker_count: usize, - pub separator_count: usize, - pub transfer_count: usize, - pub organize_count: usize, - pub connector_count: usize, - pub distill_count: usize, + /// agent_name → run count + pub counts: std::collections::HashMap, pub run_health: bool, pub rationale: Vec, } impl ConsolidationPlan { + pub fn count(&self, agent: &str) -> usize { + self.counts.get(agent).copied().unwrap_or(0) + } + + pub fn set(&mut self, agent: &str, count: usize) { + self.counts.insert(agent.to_string(), count); + } + + pub fn add(&mut self, agent: &str, count: usize) { + *self.counts.entry(agent.to_string()).or_default() += count; + } + + pub fn total(&self) -> usize { + self.counts.values().sum::() + if self.run_health { 1 } else { 0 } + } + /// Expand the plan into a flat list of (agent_name, batch_size) runs. - pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(&'static str, usize)> { + /// Interleaves agent types so different types alternate. + pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(String, usize)> { let mut runs = Vec::new(); if self.run_health { - runs.push(("health", 0)); + runs.push(("health".to_string(), 0)); } - // Build per-type batch lists, then interleave so different agent - // types alternate rather than running all-replay-then-all-linker. - let types: [(&str, usize); 7] = [ - ("linker", self.linker_count), - ("organize", self.organize_count), - ("distill", self.distill_count), - ("replay", self.replay_count), - ("connector", self.connector_count), - ("separator", self.separator_count), - ("transfer", self.transfer_count), - ]; - let mut queues: Vec> = types.iter().map(|(name, count)| { + + // Sort by count descending so high-volume agents interleave well + let mut types: Vec<(&String, &usize)> = self.counts.iter() + .filter(|(_, c)| **c > 0) + .collect(); + types.sort_by(|a, b| b.1.cmp(a.1)); + + let mut queues: Vec> = types.iter().map(|(name, count)| { let mut q = Vec::new(); - let mut remaining = *count; + let mut remaining = **count; while remaining > 0 { let batch = remaining.min(batch_size); - q.push((*name, batch)); + q.push((name.to_string(), batch)); remaining -= batch; } q @@ -211,7 +221,7 @@ impl ConsolidationPlan { let mut added = false; for q in &mut queues { if let Some(run) = q.first() { - runs.push(*run); + runs.push(run.clone()); q.remove(0); added = true; } @@ -253,146 +263,81 @@ fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> Consolidation else { episodic_count as f32 / store.nodes.len() as f32 }; let mut plan = ConsolidationPlan { - replay_count: 0, - linker_count: 0, - separator_count: 0, - transfer_count: 0, - organize_count: 0, - connector_count: 0, - distill_count: 0, + counts: std::collections::HashMap::new(), run_health: true, rationale: Vec::new(), }; + // Active agent types + let agent_types = ["linker", "organize", "distill", "separator", "split"]; + // Target: α ≥ 2.5 (healthy scale-free) if alpha < 2.0 { - plan.replay_count += 50; - plan.linker_count += 100; + plan.add("linker", 100); plan.rationale.push(format!( - "α={:.2} (target ≥2.5): extreme hub dominance → 50 replay + 100 linker", - alpha)); + "α={:.2} (target ≥2.5): extreme hub dominance → 100 linker", alpha)); } else if alpha < 2.5 { - plan.replay_count += 25; - plan.linker_count += 50; + plan.add("linker", 50); plan.rationale.push(format!( - "α={:.2} (target ≥2.5): moderate hub dominance → 25 replay + 50 linker", - alpha)); + "α={:.2} (target ≥2.5): moderate hub dominance → 50 linker", alpha)); } else { - plan.replay_count += 10; - plan.linker_count += 20; + plan.add("linker", 20); plan.rationale.push(format!( - "α={:.2}: healthy — 10 replay + 20 linker for maintenance", alpha)); + "α={:.2}: healthy — 20 linker for maintenance", alpha)); } // Target: Gini ≤ 0.4 - // High Gini means degree inequality — most nodes under-connected. - // Linker fixes this by adding edges to low-degree nodes. if gini > 0.5 { - plan.replay_count += 10; - plan.linker_count += 50; + plan.add("linker", 50); plan.rationale.push(format!( - "Gini={:.3} (target ≤0.4): high inequality → +10 replay + 50 linker", - gini)); + "Gini={:.3} (target ≤0.4): high inequality → +50 linker", gini)); } - // Target: avg CC ≥ 0.2 - if avg_cc < 0.1 { - plan.replay_count += 5; - plan.rationale.push(format!( - "CC={:.3} (target ≥0.2): very poor integration → +5 replay", - avg_cc)); - } else if avg_cc < 0.2 { - plan.replay_count += 2; - plan.rationale.push(format!( - "CC={:.3} (target ≥0.2): low integration → +2 replay", - avg_cc)); - } - - // Interference: >100 pairs is a lot, <10 is clean + // Interference: separator disambiguates confusable nodes if interference_count > 100 { - plan.separator_count += 10; + plan.add("separator", 10); plan.rationale.push(format!( - "Interference: {} pairs (target <50) → 10 separator", - interference_count)); + "Interference: {} pairs (target <50) → 10 separator", interference_count)); } else if interference_count > 20 { - plan.separator_count += 5; + plan.add("separator", 5); plan.rationale.push(format!( - "Interference: {} pairs (target <50) → 5 separator", - interference_count)); + "Interference: {} pairs → 5 separator", interference_count)); } else if interference_count > 0 { - plan.separator_count += interference_count.min(3); - plan.rationale.push(format!( - "Interference: {} pairs → {} separator", - interference_count, plan.separator_count)); - } - - // Episodic → semantic transfer - if episodic_ratio > 0.6 { - plan.transfer_count += 10; - plan.rationale.push(format!( - "Episodic ratio: {:.0}% ({}/{}) → 10 transfer", - episodic_ratio * 100.0, episodic_count, store.nodes.len())); - } else if episodic_ratio > 0.4 { - plan.transfer_count += 5; - plan.rationale.push(format!( - "Episodic ratio: {:.0}% → 5 transfer", - episodic_ratio * 100.0)); + plan.add("separator", interference_count.min(3)); } // Organize: proportional to linker — synthesizes what linker connects - plan.organize_count = plan.linker_count / 2; + let linker = plan.count("linker"); + plan.set("organize", linker / 2); plan.rationale.push(format!( - "Organize: {} (half of linker count)", plan.organize_count)); + "Organize: {} (half of linker count)", plan.count("organize"))); - // Distill: core concept maintenance — at least as much as organize - // High gini means hubs need refinement; low alpha means hubs are overloaded - plan.distill_count = plan.organize_count; - if gini > 0.4 { - plan.distill_count += 20; - } - if alpha < 2.0 { - plan.distill_count += 20; - } + // Distill: core concept maintenance + let organize = plan.count("organize"); + let mut distill = organize; + if gini > 0.4 { distill += 20; } + if alpha < 2.0 { distill += 20; } + plan.set("distill", distill); plan.rationale.push(format!( - "Distill: {} (synthesize hub content)", plan.distill_count)); + "Distill: {} (synthesize hub content)", plan.count("distill"))); - // Connector: bridges fragmented communities - let community_count = graph.community_count(); - let nodes_per_community = if community_count > 0 { - store.nodes.len() / community_count - } else { 0 }; - if nodes_per_community < 5 { - plan.connector_count += 20; - plan.rationale.push(format!( - "Communities fragmented ({} communities, {:.1} nodes/community) → 20 connector", - community_count, nodes_per_community)); - } else if nodes_per_community < 10 { - plan.connector_count += 10; - plan.rationale.push(format!( - "Communities moderate ({:.1} nodes/community) → 10 connector", - nodes_per_community)); - } + // Split: handle oversized nodes + plan.set("split", 5); // Distribute agent budget using Elo ratings let budget = crate::config::get().agent_budget; let elo_path = crate::config::get().data_dir.join("agent-elo.json"); if let Ok(elo_json) = std::fs::read_to_string(&elo_path) { if let Ok(ratings) = serde_json::from_str::>(&elo_json) { - let types = [ - "replay", "linker", "separator", "transfer", - "organize", "connector", "distill", - ]; - let elos: Vec = types.iter() + let elos: Vec = agent_types.iter() .map(|t| ratings.get(*t).copied().unwrap_or(1000.0)) .collect(); let min_elo = elos.iter().copied().fold(f64::MAX, f64::min); - // Square the shifted ratings for unfair distribution — - // top agents get disproportionately more runs let weights: Vec = elos.iter() .map(|e| { - let shifted = e - min_elo + 50.0; // lowest gets 50 - shifted * shifted // square for power-law distribution + let shifted = e - min_elo + 50.0; + shifted * shifted }) .collect(); let total_weight: f64 = weights.iter().sum(); @@ -401,30 +346,22 @@ fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> Consolidation ((w / total_weight * budget as f64).round() as usize).max(2) }; - plan.replay_count = allocate(weights[0]); - plan.linker_count = allocate(weights[1]); - plan.separator_count = allocate(weights[2]); - plan.transfer_count = allocate(weights[3]); - plan.organize_count = allocate(weights[4]); - plan.connector_count = allocate(weights[5]); - plan.distill_count = allocate(weights[6]); + for (i, agent) in agent_types.iter().enumerate() { + plan.set(agent, allocate(weights[i])); + } + let summary: Vec = agent_types.iter() + .map(|a| format!("{}={}", a, plan.count(a))) + .collect(); plan.rationale.push(format!( - "Elo allocation (budget={}): replay={} linker={} separator={} transfer={} organize={} connector={} distill={}", - budget, - plan.replay_count, plan.linker_count, plan.separator_count, - plan.transfer_count, plan.organize_count, plan.connector_count, plan.distill_count)); + "Elo allocation (budget={}): {}", budget, summary.join(" "))); } } else { // No Elo file — use budget with equal distribution - let per_type = budget / 7; - plan.replay_count = per_type; - plan.linker_count = per_type; - plan.separator_count = per_type; - plan.transfer_count = per_type; - plan.organize_count = per_type; - plan.connector_count = per_type; - plan.distill_count = per_type; + let per_type = budget / agent_types.len(); + for agent in &agent_types { + plan.set(agent, per_type); + } plan.rationale.push(format!( "No Elo ratings — equal distribution ({} each, budget={})", per_type, budget)); } @@ -443,51 +380,19 @@ pub fn format_plan(plan: &ConsolidationPlan) -> String { out.push_str("\nAgent allocation:\n"); if plan.run_health { - out.push_str(" 1. health — system audit\n"); + out.push_str(" 1. health — system audit\n"); } let mut step = 2; - if plan.replay_count > 0 { - out.push_str(&format!(" {}. replay ×{:2} — schema assimilation + lateral linking\n", - step, plan.replay_count)); + let mut sorted: Vec<_> = plan.counts.iter() + .filter(|(_, c)| **c > 0) + .collect(); + sorted.sort_by(|a, b| b.1.cmp(a.1)); + for (agent, count) in &sorted { + out.push_str(&format!(" {}. {} ×{}\n", step, agent, count)); step += 1; } - if plan.linker_count > 0 { - out.push_str(&format!(" {}. linker ×{:2} — relational binding from episodes\n", - step, plan.linker_count)); - step += 1; - } - if plan.separator_count > 0 { - out.push_str(&format!(" {}. separator ×{} — pattern separation\n", - step, plan.separator_count)); - step += 1; - } - if plan.transfer_count > 0 { - out.push_str(&format!(" {}. transfer ×{:2} — episodic→semantic extraction\n", - step, plan.transfer_count)); - step += 1; - } - if plan.organize_count > 0 { - out.push_str(&format!(" {}. organize ×{:2} — hub creation + knowledge synthesis\n", - step, plan.organize_count)); - step += 1; - } - if plan.connector_count > 0 { - out.push_str(&format!(" {}. connector ×{} — cross-cluster bridging\n", - step, plan.connector_count)); - step += 1; - } - if plan.distill_count > 0 { - out.push_str(&format!(" {}. distill ×{:2} — hub content synthesis + refinement\n", - step, plan.distill_count)); - } - - let total = plan.replay_count + plan.linker_count - + plan.separator_count + plan.transfer_count - + plan.organize_count + plan.connector_count - + plan.distill_count - + if plan.run_health { 1 } else { 0 }; - out.push_str(&format!("\nTotal agent runs: {}\n", total)); + out.push_str(&format!("\nTotal agent runs: {}\n", plan.total())); out } diff --git a/poc-memory/src/tui.rs b/poc-memory/src/tui.rs index a7d947b..baf755c 100644 --- a/poc-memory/src/tui.rs +++ b/poc-memory/src/tui.rs @@ -29,8 +29,8 @@ const POLL_INTERVAL: Duration = Duration::from_secs(2); // Agent types we know about, in display order const AGENT_TYPES: &[&str] = &[ - "health", "replay", "linker", "separator", "transfer", - "apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", "split", + "health", "linker", "organize", "distill", "separator", "split", + "apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", ]; fn log_path() -> PathBuf { @@ -536,17 +536,18 @@ fn render_health(frame: &mut Frame, gh: &GraphHealth, area: Rect) { ); // Plan - let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + gh.plan_distill + 1; + let plan_total: usize = gh.plan_counts.values().sum::() + 1; + let plan_summary: Vec = gh.plan_counts.iter() + .filter(|(_, c)| **c > 0) + .map(|(a, c)| format!("{}{}", &a[..1], c)) + .collect(); let plan_line = Line::from(vec![ Span::raw(" plan: "), Span::styled( - format!("{}", total), + format!("{}", plan_total), Style::default().add_modifier(Modifier::BOLD), ), - Span::raw(format!( - " agents ({}r {}l {}s {}t {}d +health)", - gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer, gh.plan_distill - )), + Span::raw(format!(" agents ({} +health)", plan_summary.join(" "))), ]); frame.render_widget(Paragraph::new(plan_line), plan_area); }