// Consolidation scoring, replay queues, interference detection, and // graph health metrics. Pure analysis — no store mutations. use crate::store::{Store, now_epoch}; use crate::graph::{self, Graph}; use crate::spectral::{self, SpectralEmbedding, SpectralPosition}; use std::collections::HashMap; const SECS_PER_DAY: f64 = 86400.0; /// Consolidation priority: how urgently a node needs attention. /// /// With spectral data: /// priority = spectral_displacement × overdue × emotion /// Without: /// priority = (1 - cc) × overdue × emotion /// /// Spectral displacement is the outlier_score clamped and normalized — /// it measures how far a node sits from its community center in the /// eigenspace. This is a global signal (considers all graph structure) /// vs CC which is local (only immediate neighbors). pub fn consolidation_priority( store: &Store, key: &str, graph: &Graph, spectral_outlier: Option, ) -> f64 { let node = match store.nodes.get(key) { Some(n) => n, None => return 0.0, }; // Integration factor: how poorly integrated is this node? let displacement = if let Some(outlier) = spectral_outlier { // outlier_score = dist_to_center / median_dist_in_community // 1.0 = typical position, >2 = unusual, >5 = extreme outlier // Use log scale for dynamic range: the difference between // outlier=5 and outlier=10 matters less than 1 vs 2. (outlier / 3.0).min(3.0) } else { let cc = graph.clustering_coefficient(key) as f64; 1.0 - cc }; // Spaced repetition: how overdue is this node for replay? let interval_secs = node.spaced_repetition_interval as f64 * SECS_PER_DAY; let time_since_replay = if node.last_replayed > 0 { (now_epoch() - node.last_replayed).max(0) as f64 } else { interval_secs * 3.0 }; let overdue_ratio = (time_since_replay / interval_secs).min(5.0); // Emotional intensity: higher emotion = higher priority let emotion_factor = 1.0 + (node.emotion as f64 / 10.0); displacement * overdue_ratio * emotion_factor } /// Item in the replay queue pub struct ReplayItem { pub key: String, pub priority: f64, pub interval_days: u32, pub emotion: f32, pub cc: f32, /// Spectral classification: "bridge", "outlier", "core", "peripheral" pub classification: &'static str, /// Raw spectral outlier score (distance / median) pub outlier_score: f64, } /// Generate the replay queue: nodes ordered by consolidation priority. /// Automatically loads spectral embedding if available. pub fn replay_queue(store: &Store, count: usize) -> Vec { let graph = store.build_graph(); let emb = spectral::load_embedding().ok(); replay_queue_with_graph(store, count, &graph, emb.as_ref()) } /// Generate the replay queue using pre-built graph and optional spectral data. pub fn replay_queue_with_graph( store: &Store, count: usize, graph: &Graph, emb: Option<&SpectralEmbedding>, ) -> Vec { // Build spectral position map if embedding is available let positions: HashMap = if let Some(emb) = emb { let communities = graph.communities().clone(); spectral::analyze_positions(emb, &communities) .into_iter() .map(|p| (p.key.clone(), p)) .collect() } else { HashMap::new() }; let mut items: Vec = store.nodes.iter() .map(|(key, node)| { let pos = positions.get(key); let outlier_score = pos.map(|p| p.outlier_score).unwrap_or(0.0); let classification = pos .map(spectral::classify_position) .unwrap_or("unknown"); let priority = consolidation_priority( store, key, graph, pos.map(|p| p.outlier_score), ); ReplayItem { key: key.clone(), priority, interval_days: node.spaced_repetition_interval, emotion: node.emotion, cc: graph.clustering_coefficient(key), classification, outlier_score, } }) .collect(); items.sort_by(|a, b| b.priority.total_cmp(&a.priority)); items.truncate(count); items } /// Detect interfering memory pairs: high text similarity but different communities pub fn detect_interference( store: &Store, graph: &Graph, threshold: f32, ) -> Vec<(String, String, f32)> { use crate::similarity; let communities = graph.communities(); // Only compare nodes within a reasonable set — take the most active ones let mut docs: Vec<(String, String)> = store.nodes.iter() .filter(|(_, n)| n.content.len() > 50) // skip tiny nodes .map(|(k, n)| (k.clone(), n.content.clone())) .collect(); // For large stores, sample to keep pairwise comparison feasible if docs.len() > 200 { docs.sort_by(|a, b| b.1.len().cmp(&a.1.len())); docs.truncate(200); } let similar = similarity::pairwise_similar(&docs, threshold); // Filter to pairs in different communities similar.into_iter() .filter(|(a, b, _)| { let ca = communities.get(a); let cb = communities.get(b); match (ca, cb) { (Some(a), Some(b)) => a != b, _ => true, // if community unknown, flag it } }) .collect() } /// Agent allocation from the control loop. /// Agent types and counts are data-driven — add agents by adding /// entries to the counts map. #[derive(Default)] pub struct ConsolidationPlan { /// agent_name → run count pub counts: std::collections::HashMap, pub run_health: bool, pub rationale: Vec, } impl ConsolidationPlan { pub fn count(&self, agent: &str) -> usize { self.counts.get(agent).copied().unwrap_or(0) } pub fn set(&mut self, agent: &str, count: usize) { self.counts.insert(agent.to_string(), count); } pub fn add(&mut self, agent: &str, count: usize) { *self.counts.entry(agent.to_string()).or_default() += count; } pub fn total(&self) -> usize { self.counts.values().sum::() + if self.run_health { 1 } else { 0 } } /// Expand the plan into a flat list of (agent_name, batch_size) runs. /// Interleaves agent types so different types alternate. pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(String, usize)> { let mut runs = Vec::new(); if self.run_health { runs.push(("health".to_string(), 0)); } // Sort by count descending so high-volume agents interleave well let mut types: Vec<(&String, &usize)> = self.counts.iter() .filter(|(_, c)| **c > 0) .collect(); types.sort_by(|a, b| b.1.cmp(a.1)); let mut queues: Vec> = types.iter().map(|(name, count)| { let mut q = Vec::new(); let mut remaining = **count; while remaining > 0 { let batch = remaining.min(batch_size); q.push((name.to_string(), batch)); remaining -= batch; } q }).collect(); // Round-robin interleave loop { let mut added = false; for q in &mut queues { if let Some(run) = q.first() { runs.push(run.clone()); q.remove(0); added = true; } } if !added { break; } } runs } } /// Analyze metrics and decide how much each agent needs to run. /// /// This is the control loop: metrics → error signal → agent allocation. /// Target values are based on healthy small-world networks. pub fn consolidation_plan(store: &Store) -> ConsolidationPlan { consolidation_plan_inner(store, true) } /// Cheap version: skip O(n²) interference detection (for daemon status). pub fn consolidation_plan_quick(store: &Store) -> ConsolidationPlan { consolidation_plan_inner(store, false) } fn consolidation_plan_inner(store: &Store, detect_interf: bool) -> ConsolidationPlan { let graph = store.build_graph(); let alpha = graph.degree_power_law_exponent(); let gini = graph.degree_gini(); let _avg_cc = graph.avg_clustering_coefficient(); let interference_count = if detect_interf { detect_interference(store, &graph, 0.5).len() } else { 0 }; let episodic_count = store.nodes.iter() .filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession)) .count(); let _episodic_ratio = if store.nodes.is_empty() { 0.0 } else { episodic_count as f32 / store.nodes.len() as f32 }; let mut plan = ConsolidationPlan { counts: std::collections::HashMap::new(), run_health: true, rationale: Vec::new(), }; // Active agent types from config let config = crate::config::get(); let agent_types: Vec<&str> = config.agent_types.iter().map(|s| s.as_str()).collect(); // Target: α ≥ 2.5 (healthy scale-free) if alpha < 2.0 { plan.add("linker", 100); plan.rationale.push(format!( "α={:.2} (target ≥2.5): extreme hub dominance → 100 linker", alpha)); } else if alpha < 2.5 { plan.add("linker", 50); plan.rationale.push(format!( "α={:.2} (target ≥2.5): moderate hub dominance → 50 linker", alpha)); } else { plan.add("linker", 20); plan.rationale.push(format!( "α={:.2}: healthy — 20 linker for maintenance", alpha)); } // Target: Gini ≤ 0.4 if gini > 0.5 { plan.add("linker", 50); plan.rationale.push(format!( "Gini={:.3} (target ≤0.4): high inequality → +50 linker", gini)); } // Interference: separator disambiguates confusable nodes if interference_count > 100 { plan.add("separator", 10); plan.rationale.push(format!( "Interference: {} pairs (target <50) → 10 separator", interference_count)); } else if interference_count > 20 { plan.add("separator", 5); plan.rationale.push(format!( "Interference: {} pairs → 5 separator", interference_count)); } else if interference_count > 0 { plan.add("separator", interference_count.min(3)); } // Organize: proportional to linker — synthesizes what linker connects let linker = plan.count("linker"); plan.set("organize", linker / 2); plan.rationale.push(format!( "Organize: {} (half of linker count)", plan.count("organize"))); // Distill: core concept maintenance let organize = plan.count("organize"); let mut distill = organize; if gini > 0.4 { distill += 20; } if alpha < 2.0 { distill += 20; } plan.set("distill", distill); plan.rationale.push(format!( "Distill: {} (synthesize hub content)", plan.count("distill"))); // Split: handle oversized nodes plan.set("split", 5); // Distribute agent budget using Elo ratings let budget = crate::config::get().agent_budget; let elo_path = crate::config::get().data_dir.join("agent-elo.json"); if let Ok(elo_json) = std::fs::read_to_string(&elo_path) { if let Ok(ratings) = serde_json::from_str::>(&elo_json) { let elos: Vec = agent_types.iter() .map(|t| ratings.get(*t).copied().unwrap_or(1000.0)) .collect(); let min_elo = elos.iter().copied().fold(f64::MAX, f64::min); let weights: Vec = elos.iter() .map(|e| { let shifted = e - min_elo + 50.0; shifted * shifted }) .collect(); let total_weight: f64 = weights.iter().sum(); let allocate = |w: f64| -> usize { ((w / total_weight * budget as f64).round() as usize).max(2) }; for (i, agent) in agent_types.iter().enumerate() { plan.set(agent, allocate(weights[i])); } let summary: Vec = agent_types.iter() .map(|a| format!("{}={}", a, plan.count(a))) .collect(); plan.rationale.push(format!( "Elo allocation (budget={}): {}", budget, summary.join(" "))); } } else { // No Elo file — use budget with equal distribution let per_type = budget / agent_types.len(); for agent in &agent_types { plan.set(agent, per_type); } plan.rationale.push(format!( "No Elo ratings — equal distribution ({} each, budget={})", per_type, budget)); } plan } /// Format the consolidation plan for display pub fn format_plan(plan: &ConsolidationPlan) -> String { let mut out = String::from("Consolidation Plan\n==================\n\n"); out.push_str("Analysis:\n"); for r in &plan.rationale { out.push_str(&format!(" • {}\n", r)); } out.push_str("\nAgent allocation:\n"); if plan.run_health { out.push_str(" 1. health — system audit\n"); } let mut step = 2; let mut sorted: Vec<_> = plan.counts.iter() .filter(|(_, c)| **c > 0) .collect(); sorted.sort_by(|a, b| b.1.cmp(a.1)); for (agent, count) in &sorted { out.push_str(&format!(" {}. {} ×{}\n", step, agent, count)); step += 1; } out.push_str(&format!("\nTotal agent runs: {}\n", plan.total())); out } /// Brief daily check: compare current metrics to last snapshot pub fn daily_check(store: &Store) -> String { let graph_obj = store.build_graph(); let snap = graph::current_metrics(&graph_obj); let history = graph::load_metrics_history(); let prev = history.last(); let mut out = String::from("Memory daily check\n"); // Current state out.push_str(&format!(" σ={:.1} α={:.2} gini={:.3} cc={:.4}\n", snap.sigma, snap.alpha, snap.gini, snap.avg_cc)); // Trend if let Some(p) = prev { let d_sigma = snap.sigma - p.sigma; let d_alpha = snap.alpha - p.alpha; let d_gini = snap.gini - p.gini; out.push_str(&format!(" Δσ={:+.1} Δα={:+.2} Δgini={:+.3}\n", d_sigma, d_alpha, d_gini)); // Assessment let mut issues = Vec::new(); if snap.alpha < 2.0 { issues.push("hub dominance critical"); } if snap.gini > 0.5 { issues.push("high inequality"); } if snap.avg_cc < 0.1 { issues.push("poor integration"); } if d_sigma < -5.0 { issues.push("σ declining"); } if d_alpha < -0.1 { issues.push("α declining"); } if d_gini > 0.02 { issues.push("inequality increasing"); } if issues.is_empty() { out.push_str(" Status: healthy\n"); } else { out.push_str(&format!(" Status: needs attention — {}\n", issues.join(", "))); out.push_str(" Run: poc-memory consolidate-session\n"); } } else { out.push_str(" (first snapshot, no trend data yet)\n"); } // Persist the snapshot graph::save_metrics_snapshot(&snap); out }