neuro: unify consolidation planning, fix threshold drift

The daemon's compute_graph_health had a duplicated copy of the
consolidation planning thresholds that had drifted from the canonical
version (α<2.0 → +7 replay in daemon vs +10 in neuro).

Split consolidation_plan into _inner(store, detect_interference) so
the daemon can call consolidation_plan_quick (skips O(n²) interference)
while using the same threshold logic.
This commit is contained in:
ProofOfConcept 2026-03-10 17:55:08 -04:00
parent 945865f594
commit 8ba58ce9cd
3 changed files with 36 additions and 64 deletions

View file

@ -492,9 +492,6 @@ fn job_daily_check(
}
fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
// Only compute cheap metrics here — interference detection is O(n²)
// and takes minutes. The full plan (with interference) runs during
// consolidation itself.
let graph = store.build_graph();
let snap = crate::graph::current_metrics(&graph);
@ -504,38 +501,8 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 };
// Estimate plan from cheap metrics only (skip interference)
let mut plan_replay = 3usize; // baseline maintenance
let mut plan_linker = 0usize;
let plan_separator = 0usize; // needs interference, skip for status
let mut plan_transfer = 0usize;
let mut rationale = Vec::new();
if snap.alpha < 2.0 {
plan_replay += 7; plan_linker += 5;
rationale.push(format!("α={:.2}: extreme hub dominance", snap.alpha));
} else if snap.alpha < 2.5 {
plan_replay += 2; plan_linker += 3;
rationale.push(format!("α={:.2}: moderate hub dominance", snap.alpha));
}
if snap.gini > 0.5 {
plan_replay += 3;
rationale.push(format!("gini={:.3}: high inequality", snap.gini));
}
if snap.avg_cc < 0.1 {
plan_replay += 5;
rationale.push(format!("cc={:.3}: very poor integration", snap.avg_cc));
} else if snap.avg_cc < 0.2 {
plan_replay += 2;
rationale.push(format!("cc={:.3}: low integration", snap.avg_cc));
}
if episodic_ratio > 0.6 {
plan_transfer += 10;
rationale.push(format!("episodic={:.0}%: needs extraction", episodic_ratio * 100.0));
} else if episodic_ratio > 0.4 {
plan_transfer += 5;
rationale.push(format!("episodic={:.0}%", episodic_ratio * 100.0));
}
// Use the same planning logic as consolidation (skip O(n²) interference)
let plan = crate::neuro::consolidation_plan_quick(store);
GraphHealth {
nodes: snap.nodes,
@ -546,12 +513,12 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
avg_cc: snap.avg_cc,
sigma: snap.sigma,
episodic_ratio,
interference: 0, // not computed in status check
plan_replay,
plan_linker,
plan_separator,
plan_transfer,
plan_rationale: rationale,
interference: 0,
plan_replay: plan.replay_count,
plan_linker: plan.linker_count,
plan_separator: plan.separator_count,
plan_transfer: plan.transfer_count,
plan_rationale: plan.rationale,
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
}
}
@ -1054,27 +1021,22 @@ pub fn run_daemon() -> Result<(), String> {
if last.is_none_or(|d| d < today) && gh.is_some() {
log_event("scheduler", "daily-trigger", &today.to_string());
// Use cached graph health for plan (cheap — no O(n²) interference detection).
let (replay, linker, separator, transfer) = match gh {
Some(ref h) => (h.plan_replay, h.plan_linker, h.plan_separator, h.plan_transfer),
None => unreachable!(), // guarded by gh.is_some() above
};
// Use cached graph health plan (from consolidation_plan_quick).
let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above
let plan = crate::neuro::ConsolidationPlan {
replay_count: replay,
linker_count: linker,
separator_count: separator,
transfer_count: transfer,
replay_count: h.plan_replay,
linker_count: h.plan_linker,
separator_count: h.plan_separator,
transfer_count: h.plan_transfer,
run_health: true,
rationale: Vec::new(),
};
let batch_size = 5;
let runs = plan.to_agent_runs(batch_size);
let runs = plan.to_agent_runs(5);
log_event("scheduler", "consolidation-plan",
&format!("{} agents ({}r {}l {}s {}t)",
runs.len(), plan.replay_count, plan.linker_count,
plan.separator_count, plan.transfer_count));
runs.len(), h.plan_replay, h.plan_linker,
h.plan_separator, h.plan_transfer));
// Phase 1: Agent runs (sequential — each reloads store to see prior changes)
let mut prev_agent = None;

View file

@ -13,7 +13,7 @@ pub use scoring::{
consolidation_priority,
replay_queue, replay_queue_with_graph,
detect_interference,
consolidation_plan, format_plan,
consolidation_plan, consolidation_plan_quick, format_plan,
daily_check,
};

View file

@ -203,18 +203,28 @@ impl ConsolidationPlan {
/// This is the control loop: metrics → error signal → agent allocation.
/// Target values are based on healthy small-world networks.
pub fn consolidation_plan(store: &Store) -> ConsolidationPlan {
consolidation_plan_inner(store, true)
}
/// Cheap version: skip O(n²) interference detection (for daemon status).
pub fn consolidation_plan_quick(store: &Store) -> ConsolidationPlan {
consolidation_plan_inner(store, false)
}
fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> ConsolidationPlan {
let graph = store.build_graph();
let alpha = graph.degree_power_law_exponent();
let gini = graph.degree_gini();
let avg_cc = graph.avg_clustering_coefficient();
let interference_pairs = detect_interference(store, &graph, 0.5);
let interference_count = interference_pairs.len();
let interference_count = if detect_interf {
detect_interference(store, &graph, 0.5).len()
} else {
0
};
// Count episodic vs semantic nodes
let episodic_count = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.count();
let _semantic_count = store.nodes.len() - episodic_count;
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 };
@ -223,7 +233,7 @@ pub fn consolidation_plan(store: &Store) -> ConsolidationPlan {
linker_count: 0,
separator_count: 0,
transfer_count: 0,
run_health: true, // always run health first
run_health: true,
rationale: Vec::new(),
};
@ -232,7 +242,7 @@ pub fn consolidation_plan(store: &Store) -> ConsolidationPlan {
plan.replay_count += 10;
plan.linker_count += 5;
plan.rationale.push(format!(
"α={:.2} (target ≥2.5): extreme hub dominance → 10 replay + 5 linker for lateral links",
"α={:.2} (target ≥2.5): extreme hub dominance → 10 replay + 5 linker",
alpha));
} else if alpha < 2.5 {
plan.replay_count += 5;
@ -250,7 +260,7 @@ pub fn consolidation_plan(store: &Store) -> ConsolidationPlan {
if gini > 0.5 {
plan.replay_count += 3;
plan.rationale.push(format!(
"Gini={:.3} (target ≤0.4): high inequality → +3 replay (lateral focus)",
"Gini={:.3} (target ≤0.4): high inequality → +3 replay",
gini));
}
@ -289,7 +299,7 @@ pub fn consolidation_plan(store: &Store) -> ConsolidationPlan {
if episodic_ratio > 0.6 {
plan.transfer_count += 10;
plan.rationale.push(format!(
"Episodic ratio: {:.0}% ({}/{}) → 10 transfer (knowledge extraction needed)",
"Episodic ratio: {:.0}% ({}/{}) → 10 transfer",
episodic_ratio * 100.0, episodic_count, store.nodes.len()));
} else if episodic_ratio > 0.4 {
plan.transfer_count += 5;