From 53e6b32cb42cbe4a23d054cbccf7ed98ace597f0 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 9 Mar 2026 17:02:01 -0400 Subject: [PATCH] daemon: rework consolidation pipeline and add graph health metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace monolithic consolidate job with individual agent jobs (replay, linker, separator, transfer, health) that run sequentially and store reports. Multi-phase daily pipeline: agent runs → apply actions → link orphans → cap degree → digest → digest links → knowledge loop. Add GraphHealth struct with graph metrics (alpha, gini, clustering coefficient, episodic ratio) computed during health checks. Display in `poc-memory daemon status`. Use cached metrics to build consolidation plan without expensive O(n²) interference detection. Add RPC consolidate command to trigger consolidation via socket. Harden session watcher: skip transcripts with zero segments, improve migration error handling. Co-Authored-By: ProofOfConcept --- poc-memory/src/agents/daemon.rs | 484 ++++++++++++++++++++++++++++---- poc-memory/src/main.rs | 1 + poc-memory/src/neuro/mod.rs | 1 + poc-memory/src/neuro/scoring.rs | 1 + 4 files changed, 426 insertions(+), 61 deletions(-) diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index b49fb67..1a81f4e 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -117,13 +117,92 @@ fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { }) } -fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> { - run_job(ctx, "consolidate", || { +/// Run a single consolidation agent (replay, linker, separator, transfer, health). +/// Builds prompt, calls Sonnet, stores report node in the store. +fn job_consolidation_agent( + ctx: &ExecutionContext, + agent_type: &str, + batch_size: usize, +) -> Result<(), TaskError> { + let agent = agent_type.to_string(); + let batch = batch_size; + run_job(ctx, &format!("c-{}", agent), || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - super::consolidate::consolidate_full_with_progress(&mut store, &|msg| { - ctx.log_line(msg); - }) + + let label = if batch > 0 { + format!("{} (batch={})", agent, batch) + } else { + agent.to_string() + }; + ctx.log_line(&format!("building prompt: {}", label)); + + let prompt = super::prompts::agent_prompt(&store, &agent, batch)?; + ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len())); + + let response = super::llm::call_sonnet("consolidate", &prompt)?; + + let ts = crate::store::format_datetime(crate::store::now_epoch()) + .replace([':', '-', 'T'], ""); + let report_key = format!("_consolidation-{}-{}", agent, ts); + store.upsert_provenance(&report_key, &response, + crate::store::Provenance::AgentConsolidate).ok(); + + ctx.log_line(&format!("done: {} lines → {}", response.lines().count(), report_key)); + Ok(()) + }) +} + +/// Apply consolidation actions from recent reports. +fn job_consolidation_apply(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-apply", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("applying consolidation actions"); + super::consolidate::apply_consolidation(&mut store, true, None) + }) +} + +/// Link orphan nodes (CPU-heavy, no LLM). +fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-orphans", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("linking orphans"); + let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15); + ctx.log_line(&format!("{} orphans, {} links added", orphans, added)); + Ok(()) + }) +} + +/// Cap node degree to prevent mega-hubs. +fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-cap", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("capping degree"); + match store.cap_degree(50) { + Ok((hubs, pruned)) => { + store.save()?; + ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned)); + Ok(()) + } + Err(e) => Err(e), + } + }) +} + +/// Apply links extracted from digests. +fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-digest-links", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("applying digest links"); + let links = super::digest::parse_all_digest_links(&store); + let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links); + store.save()?; + ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); + Ok(()) }) } @@ -152,16 +231,91 @@ fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { }) } -fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { +fn job_daily_check( + ctx: &ExecutionContext, + graph_health: &Arc>>, +) -> Result<(), TaskError> { + let gh = Arc::clone(graph_health); run_job(ctx, "daily-check", || { ctx.log_line("loading store"); let store = crate::store::Store::load()?; ctx.log_line("checking health"); let _report = crate::neuro::daily_check(&store); + + // Compute graph health metrics for status display + ctx.log_line("computing graph health"); + let health = compute_graph_health(&store); + *gh.lock().unwrap() = Some(health); + Ok(()) }) } +fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { + // Only compute cheap metrics here — interference detection is O(n²) + // and takes minutes. The full plan (with interference) runs during + // consolidation itself. + let graph = store.build_graph(); + let snap = crate::graph::current_metrics(&graph); + + let episodic_count = store.nodes.iter() + .filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession)) + .count(); + let episodic_ratio = if store.nodes.is_empty() { 0.0 } + else { episodic_count as f32 / store.nodes.len() as f32 }; + + // Estimate plan from cheap metrics only (skip interference) + let mut plan_replay = 3usize; // baseline maintenance + let mut plan_linker = 0usize; + let plan_separator = 0usize; // needs interference, skip for status + let mut plan_transfer = 0usize; + let mut rationale = Vec::new(); + + if snap.alpha < 2.0 { + plan_replay += 7; plan_linker += 5; + rationale.push(format!("α={:.2}: extreme hub dominance", snap.alpha)); + } else if snap.alpha < 2.5 { + plan_replay += 2; plan_linker += 3; + rationale.push(format!("α={:.2}: moderate hub dominance", snap.alpha)); + } + if snap.gini > 0.5 { + plan_replay += 3; + rationale.push(format!("gini={:.3}: high inequality", snap.gini)); + } + if snap.avg_cc < 0.1 { + plan_replay += 5; + rationale.push(format!("cc={:.3}: very poor integration", snap.avg_cc)); + } else if snap.avg_cc < 0.2 { + plan_replay += 2; + rationale.push(format!("cc={:.3}: low integration", snap.avg_cc)); + } + if episodic_ratio > 0.6 { + plan_transfer += 10; + rationale.push(format!("episodic={:.0}%: needs extraction", episodic_ratio * 100.0)); + } else if episodic_ratio > 0.4 { + plan_transfer += 5; + rationale.push(format!("episodic={:.0}%", episodic_ratio * 100.0)); + } + + GraphHealth { + nodes: snap.nodes, + edges: snap.edges, + communities: snap.communities, + alpha: snap.alpha, + gini: snap.gini, + avg_cc: snap.avg_cc, + sigma: snap.sigma, + episodic_ratio, + interference: 0, // not computed in status check + plan_replay, + plan_linker, + plan_separator, + plan_transfer, + plan_rationale: rationale, + computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), + } +} + // --- Session detection --- /// Find JSONL session files that are stale (not recently written) and not @@ -257,19 +411,45 @@ fn proc_uptime(pid: u32) -> Option { // --- Status writing --- -fn write_status(choir: &Choir, last_daily: Option) { - let status = build_status(choir, last_daily); +fn write_status( + choir: &Choir, + last_daily: Option, + graph_health: &Arc>>, +) { + let status = build_status(choir, last_daily, graph_health); if let Ok(json) = serde_json::to_string_pretty(&status) { let _ = fs::write(status_path(), json); } } +#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct GraphHealth { + pub nodes: usize, + pub edges: usize, + pub communities: usize, + pub alpha: f32, // power-law exponent (target ≥2.5) + pub gini: f32, // degree inequality (target ≤0.4) + pub avg_cc: f32, // clustering coefficient (target ≥0.2) + pub sigma: f32, // small-world sigma + pub episodic_ratio: f32, // episodic/total nodes (target <0.4) + pub interference: usize, // interfering pairs (target <50) + // Consolidation work estimate from plan + pub plan_replay: usize, + pub plan_linker: usize, + pub plan_separator: usize, + pub plan_transfer: usize, + pub plan_rationale: Vec, + pub computed_at: String, +} + #[derive(serde::Serialize, serde::Deserialize)] struct DaemonStatus { pid: u32, tasks: Vec, #[serde(default)] last_daily: Option, + #[serde(default)] + graph_health: Option, } // --- The daemon --- @@ -293,11 +473,13 @@ pub fn run_daemon() -> Result<(), String> { .and_then(|d| d.parse().ok()) )); + let graph_health: Arc>> = Arc::new(Mutex::new(None)); + log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); // Write initial status - write_status(&choir, *last_daily.lock().unwrap()); + write_status(&choir, *last_daily.lock().unwrap(), &graph_health); // Session watcher: reconcile-based extraction // Each tick: scan filesystem for stale sessions, check store for what's @@ -306,6 +488,7 @@ pub fn run_daemon() -> Result<(), String> { let choir_sw = Arc::clone(&choir); let llm_sw = Arc::clone(&llm); let last_daily_sw = Arc::clone(&last_daily); + let graph_health_sw = Arc::clone(&graph_health); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); // Cache: path → (file_size, segment_count). Invalidated when size changes. @@ -442,6 +625,12 @@ pub fn run_daemon() -> Result<(), String> { count }; + // No extractable messages — skip entirely + if seg_count == 0 { + already_mined += 1; + continue; + } + let fname_key = super::enrich::transcript_filename_key(&path_str); let has_whole_file_key = mined.contains(&fname_key); @@ -460,7 +649,7 @@ pub fn run_daemon() -> Result<(), String> { // Migrate old whole-file key: if it exists but no per-segment keys, // write per-segment keys for all current segments (they were mined // under the old scheme) - if has_whole_file_key && !has_any_seg_key { + if has_whole_file_key && !has_any_seg_key && seg_count > 0 { migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count)); // After migration, all current segments are covered unmined_segs.clear(); @@ -493,19 +682,37 @@ pub fn run_daemon() -> Result<(), String> { // Migrate old whole-file keys to per-segment keys if !migrate_keys.is_empty() { - if let Ok(mut store) = crate::store::Store::load() { - for (fname_key, path_str, seg_count) in &migrate_keys { - for i in 0..*seg_count { - let seg_key = format!("{}.{}", fname_key, i); - let content = format!("Migrated from whole-file key for {}", path_str); - let mut node = crate::store::new_node(&seg_key, &content); - node.provenance = crate::store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(node); + match crate::store::Store::load() { + Ok(mut store) => { + let mut ok = 0; + let mut fail = 0; + for (fname_key, path_str, seg_count) in &migrate_keys { + for i in 0..*seg_count { + let seg_key = format!("{}.{}", fname_key, i); + let content = format!("Migrated from whole-file key for {}", path_str); + let mut node = crate::store::new_node(&seg_key, &content); + node.provenance = crate::store::Provenance::AgentExperienceMine; + match store.upsert_node(node) { + Ok(()) => ok += 1, + Err(e) => { + if fail == 0 { + eprintln!("migration upsert_node error: {}", e); + } + fail += 1; + } + } + } } + if let Err(e) = store.save() { + eprintln!("migration save error: {}", e); + } + log_event("session-watcher", "migrated", + &format!("{} whole-file keys → per-segment ({} ok, {} fail)", + migrate_keys.len(), ok, fail)); + } + Err(e) => { + eprintln!("migration store load error: {}", e); } - let _ = store.save(); - log_event("session-watcher", "migrated", - &format!("{} whole-file keys → per-segment", migrate_keys.len())); } } @@ -571,7 +778,7 @@ pub fn run_daemon() -> Result<(), String> { ctx.set_progress("idle"); } - write_status(&choir_sw, *last_daily_sw.lock().unwrap()); + write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw); std::thread::sleep(SCHEDULER_INTERVAL); } }); @@ -580,6 +787,7 @@ pub fn run_daemon() -> Result<(), String> { let choir_sched = Arc::clone(&choir); let llm_sched = Arc::clone(&llm); let last_daily_sched = Arc::clone(&last_daily); + let graph_health_sched = Arc::clone(&graph_health); choir.spawn("scheduler").init(move |ctx| { let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; ctx.set_progress("idle"); @@ -591,49 +799,124 @@ pub fn run_daemon() -> Result<(), String> { let today = chrono::Local::now().date_naive(); - // Health check: every hour + // Health check: every hour — also updates graph health metrics if last_health.elapsed() >= HEALTH_INTERVAL { - choir_sched.spawn("health").init(|ctx| { - job_daily_check(ctx) + let gh = Arc::clone(&graph_health_sched); + choir_sched.spawn("health").init(move |ctx| { + job_daily_check(ctx, &gh) }); last_health = std::time::Instant::now(); } - // Daily jobs: once per day + // Daily jobs: once per day (wait for health check to cache metrics first) let last = *last_daily_sched.lock().unwrap(); - if last.is_none_or(|d| d < today) { + let gh = graph_health_sched.lock().unwrap().clone(); + if last.is_none_or(|d| d < today) && gh.is_some() { log_event("scheduler", "daily-trigger", &today.to_string()); - // Decay disabled — version spam and premature demotion - // choir_sched.spawn(format!("decay:{}", today)).init(|ctx| { - // job_decay(ctx) - // }); + // Use cached graph health for plan (cheap — no O(n²) interference detection). + let (replay, linker, separator, transfer) = match gh { + Some(ref h) => (h.plan_replay, h.plan_linker, h.plan_separator, h.plan_transfer), + None => unreachable!(), // guarded by gh.is_some() above + }; + let plan = crate::neuro::ConsolidationPlan { + replay_count: replay, + linker_count: linker, + separator_count: separator, + transfer_count: transfer, + run_health: true, + rationale: Vec::new(), + }; - // Consolidation pipeline: consolidate → knowledge-loop → digest - let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) - .resource(&llm_sched) - .retries(2) - .init(move |ctx| { - job_consolidate(ctx) - }) - .run(); + let batch_size = 5; - let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) + // Build the list of (agent_type, batch_size) runs + let mut runs: Vec<(&str, usize)> = Vec::new(); + if plan.run_health { + runs.push(("health", 0)); + } + for (name, count) in [ + ("replay", plan.replay_count), + ("linker", plan.linker_count), + ("separator", plan.separator_count), + ("transfer", plan.transfer_count), + ] { + let mut remaining = count; + while remaining > 0 { + let batch = remaining.min(batch_size); + runs.push((name, batch)); + remaining -= batch; + } + } + + log_event("scheduler", "consolidation-plan", + &format!("{} agents ({}r {}l {}s {}t)", + runs.len(), plan.replay_count, plan.linker_count, + plan.separator_count, plan.transfer_count)); + + // Phase 1: Agent runs (sequential — each reloads store to see prior changes) + let mut prev_agent = None; + for (i, (agent_type, batch)) in runs.iter().enumerate() { + let agent = agent_type.to_string(); + let b = *batch; + let task_name = format!("c-{}-{}:{}", agent, i, today); + let mut builder = choir_sched.spawn(task_name) + .resource(&llm_sched) + .retries(1) + .init(move |ctx| { + job_consolidation_agent(ctx, &agent, b) + }); + if let Some(ref dep) = prev_agent { + builder.depend_on(dep); + } + prev_agent = Some(builder.run()); + } + + // Phase 2: Apply actions from agent reports + let mut apply = choir_sched.spawn(format!("c-apply:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |ctx| { - job_knowledge_loop(ctx) - }); - knowledge.depend_on(&consolidate); - let knowledge = knowledge.run(); + .init(move |ctx| job_consolidation_apply(ctx)); + if let Some(ref dep) = prev_agent { + apply.depend_on(dep); + } + let apply = apply.run(); - let mut digest = choir_sched.spawn(format!("digest:{}", today)) + // Phase 3: Link orphans (CPU-only, no LLM) + let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today)) + .retries(1) + .init(move |ctx| job_link_orphans(ctx)); + orphans.depend_on(&apply); + let orphans = orphans.run(); + + // Phase 4: Cap degree + let mut cap = choir_sched.spawn(format!("c-cap:{}", today)) + .retries(1) + .init(move |ctx| job_cap_degree(ctx)); + cap.depend_on(&orphans); + let cap = cap.run(); + + // Phase 5: Generate digests + let mut digest = choir_sched.spawn(format!("c-digest:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |ctx| { - job_digest(ctx) - }); - digest.depend_on(&knowledge); + .init(move |ctx| job_digest(ctx)); + digest.depend_on(&cap); + let digest = digest.run(); + + // Phase 6: Apply digest links + let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today)) + .retries(1) + .init(move |ctx| job_digest_links(ctx)); + digest_links.depend_on(&digest); + let digest_links = digest_links.run(); + + // Phase 7: Knowledge loop + let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today)) + .resource(&llm_sched) + .retries(1) + .init(move |ctx| job_knowledge_loop(ctx)); + knowledge.depend_on(&digest_links); *last_daily_sched.lock().unwrap() = Some(today); ctx.set_progress(format!("daily pipeline triggered ({today})")); @@ -645,7 +928,7 @@ pub fn run_daemon() -> Result<(), String> { log::trace!("pruned {} finished tasks", pruned); } - write_status(&choir_sched, *last_daily_sched.lock().unwrap()); + write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched); std::thread::sleep(SCHEDULER_INTERVAL); } }); @@ -653,7 +936,8 @@ pub fn run_daemon() -> Result<(), String> { // Main thread: listen on status socket + wait for signals let choir_main = Arc::clone(&choir); let last_daily_main = Arc::clone(&last_daily); - status_socket_loop(&choir_main, &last_daily_main); + let graph_health_main = Arc::clone(&graph_health); + status_socket_loop(&choir_main, &last_daily_main, &graph_health_main); log_event("daemon", "stopping", ""); eprintln!("Shutting down..."); @@ -668,6 +952,29 @@ pub fn run_daemon() -> Result<(), String> { std::process::exit(0) } +fn send_rpc(cmd: &str) -> Option { + use std::io::{Read as _, Write as _}; + use std::os::unix::net::UnixStream; + + let mut stream = UnixStream::connect(status_sock_path()).ok()?; + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + stream.write_all(cmd.as_bytes()).ok()?; + stream.shutdown(std::net::Shutdown::Write).ok()?; + let mut buf = String::new(); + stream.read_to_string(&mut buf).ok()?; + Some(buf) +} + +pub fn rpc_consolidate() -> Result<(), String> { + match send_rpc("consolidate") { + Some(resp) => { + println!("{}", resp.trim()); + Ok(()) + } + None => Err("Daemon not running.".into()), + } +} + fn read_status_socket() -> Option { use std::io::Read as _; use std::os::unix::net::UnixStream; @@ -686,8 +993,12 @@ fn status_sock_path() -> PathBuf { /// Listen on a Unix domain socket for status requests. /// Any connection gets the live status JSON written and closed. /// Also handles SIGINT/SIGTERM for clean shutdown. -fn status_socket_loop(choir: &Choir, last_daily: &Arc>>) { - use std::io::Write as _; +fn status_socket_loop( + choir: &Choir, + last_daily: &Arc>>, + graph_health: &Arc>>, +) { + use std::io::{Read as _, Write as _}; use std::os::unix::net::UnixListener; use std::sync::atomic::{AtomicBool, Ordering}; @@ -719,9 +1030,30 @@ fn status_socket_loop(choir: &Choir, last_daily: &Arc { - let status = build_status(choir, *last_daily.lock().unwrap()); - if let Ok(json) = serde_json::to_string_pretty(&status) { - let _ = stream.write_all(json.as_bytes()); + // Read command from client (with short timeout) + stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); + let mut cmd_buf = [0u8; 256]; + let cmd = match stream.read(&mut cmd_buf) { + Ok(n) if n > 0 => std::str::from_utf8(&cmd_buf[..n]) + .unwrap_or("") + .trim() + .to_string(), + _ => String::new(), + }; + + match cmd.as_str() { + "consolidate" => { + *last_daily.lock().unwrap() = None; + let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n"); + log_event("rpc", "consolidate", "triggered via socket"); + } + _ => { + // Default: return status + let status = build_status(choir, *last_daily.lock().unwrap(), graph_health); + if let Ok(json) = serde_json::to_string_pretty(&status) { + let _ = stream.write_all(json.as_bytes()); + } + } } // Connection closes when stream is dropped } @@ -739,11 +1071,16 @@ fn status_socket_loop(choir: &Choir, last_daily: &Arc) -> DaemonStatus { +fn build_status( + choir: &Choir, + last_daily: Option, + graph_health: &Arc>>, +) -> DaemonStatus { DaemonStatus { pid: std::process::id(), tasks: choir.task_statuses(), last_daily: last_daily.map(|d| d.to_string()), + graph_health: graph_health.lock().unwrap().clone(), } } @@ -764,8 +1101,9 @@ fn format_duration_human(ms: u128) -> String { fn task_group(name: &str) -> &str { if name == "session-watcher" || name == "scheduler" { "core" } else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" } - else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:") - || name.starts_with("digest:") || name.starts_with("decay:") { "daily" } + else if name.starts_with("c-") || name.starts_with("consolidate:") + || name.starts_with("knowledge-loop:") || name.starts_with("digest:") + || name.starts_with("decay:") { "daily" } else if name == "health" { "health" } else { "other" } } @@ -872,9 +1210,33 @@ pub fn show_status() -> Result<(), String> { let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); - eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n", + eprintln!(" tasks: {} running, {} pending, {} done, {} failed", running, pending, completed, failed); + // Graph health + if let Some(ref gh) = status.graph_health { + eprintln!(); + fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str { + let ok = if higher_is_better { val >= target } else { val <= target }; + if ok { "✓" } else { "✗" } + } + eprintln!(" Graph health ({})", gh.computed_at); + eprintln!(" {} nodes, {} edges, {} communities", + gh.nodes, gh.edges, gh.communities); + eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)", + indicator(gh.alpha, 2.5, true), gh.alpha, + indicator(gh.gini, 0.4, false), gh.gini, + indicator(gh.avg_cc, 0.2, true), gh.avg_cc); + eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}", + indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, + gh.sigma); + + let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + 1; + eprintln!(" consolidation plan: {} agents ({}r {}l {}s {}t +health)", + total, gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer); + } + eprintln!(); + // Group and display let groups: &[(&str, &str)] = &[ ("core", "Core"), diff --git a/poc-memory/src/main.rs b/poc-memory/src/main.rs index 57b8520..33387d8 100644 --- a/poc-memory/src/main.rs +++ b/poc-memory/src/main.rs @@ -2182,6 +2182,7 @@ fn cmd_daemon(sub: Option<&str>, args: &[String]) -> Result<(), String> { daemon::show_log(job, lines) } Some("install") => daemon::install_service(), + Some("consolidate") => daemon::rpc_consolidate(), Some(other) => Err(format!("unknown daemon subcommand: {}", other)), } } diff --git a/poc-memory/src/neuro/mod.rs b/poc-memory/src/neuro/mod.rs index 851f8b9..d0de186 100644 --- a/poc-memory/src/neuro/mod.rs +++ b/poc-memory/src/neuro/mod.rs @@ -9,6 +9,7 @@ mod rewrite; pub use scoring::{ ReplayItem, + ConsolidationPlan, consolidation_priority, replay_queue, replay_queue_with_graph, detect_interference, diff --git a/poc-memory/src/neuro/scoring.rs b/poc-memory/src/neuro/scoring.rs index 6c29d8c..38950e1 100644 --- a/poc-memory/src/neuro/scoring.rs +++ b/poc-memory/src/neuro/scoring.rs @@ -164,6 +164,7 @@ pub fn detect_interference( } /// Agent allocation from the control loop +#[derive(Default)] pub struct ConsolidationPlan { pub replay_count: usize, pub linker_count: usize,