daemon: rework consolidation pipeline and add graph health metrics
Replace monolithic consolidate job with individual agent jobs (replay, linker, separator, transfer, health) that run sequentially and store reports. Multi-phase daily pipeline: agent runs → apply actions → link orphans → cap degree → digest → digest links → knowledge loop. Add GraphHealth struct with graph metrics (alpha, gini, clustering coefficient, episodic ratio) computed during health checks. Display in `poc-memory daemon status`. Use cached metrics to build consolidation plan without expensive O(n²) interference detection. Add RPC consolidate command to trigger consolidation via socket. Harden session watcher: skip transcripts with zero segments, improve migration error handling. Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
This commit is contained in:
parent
8eb6308760
commit
53e6b32cb4
4 changed files with 426 additions and 61 deletions
|
|
@ -117,13 +117,92 @@ fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
|
|||
})
|
||||
}
|
||||
|
||||
fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "consolidate", || {
|
||||
/// Run a single consolidation agent (replay, linker, separator, transfer, health).
|
||||
/// Builds prompt, calls Sonnet, stores report node in the store.
|
||||
fn job_consolidation_agent(
|
||||
ctx: &ExecutionContext,
|
||||
agent_type: &str,
|
||||
batch_size: usize,
|
||||
) -> Result<(), TaskError> {
|
||||
let agent = agent_type.to_string();
|
||||
let batch = batch_size;
|
||||
run_job(ctx, &format!("c-{}", agent), || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
super::consolidate::consolidate_full_with_progress(&mut store, &|msg| {
|
||||
ctx.log_line(msg);
|
||||
})
|
||||
|
||||
let label = if batch > 0 {
|
||||
format!("{} (batch={})", agent, batch)
|
||||
} else {
|
||||
agent.to_string()
|
||||
};
|
||||
ctx.log_line(&format!("building prompt: {}", label));
|
||||
|
||||
let prompt = super::prompts::agent_prompt(&store, &agent, batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len()));
|
||||
|
||||
let response = super::llm::call_sonnet("consolidate", &prompt)?;
|
||||
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let report_key = format!("_consolidation-{}-{}", agent, ts);
|
||||
store.upsert_provenance(&report_key, &response,
|
||||
crate::store::Provenance::AgentConsolidate).ok();
|
||||
|
||||
ctx.log_line(&format!("done: {} lines → {}", response.lines().count(), report_key));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Apply consolidation actions from recent reports.
|
||||
fn job_consolidation_apply(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-apply", || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
ctx.log_line("applying consolidation actions");
|
||||
super::consolidate::apply_consolidation(&mut store, true, None)
|
||||
})
|
||||
}
|
||||
|
||||
/// Link orphan nodes (CPU-heavy, no LLM).
|
||||
fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-orphans", || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
ctx.log_line("linking orphans");
|
||||
let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15);
|
||||
ctx.log_line(&format!("{} orphans, {} links added", orphans, added));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Cap node degree to prevent mega-hubs.
|
||||
fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-cap", || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
ctx.log_line("capping degree");
|
||||
match store.cap_degree(50) {
|
||||
Ok((hubs, pruned)) => {
|
||||
store.save()?;
|
||||
ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned));
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Apply links extracted from digests.
|
||||
fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-digest-links", || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
ctx.log_line("applying digest links");
|
||||
let links = super::digest::parse_all_digest_links(&store);
|
||||
let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links);
|
||||
store.save()?;
|
||||
ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -152,16 +231,91 @@ fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
|||
})
|
||||
}
|
||||
|
||||
fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
fn job_daily_check(
|
||||
ctx: &ExecutionContext,
|
||||
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
|
||||
) -> Result<(), TaskError> {
|
||||
let gh = Arc::clone(graph_health);
|
||||
run_job(ctx, "daily-check", || {
|
||||
ctx.log_line("loading store");
|
||||
let store = crate::store::Store::load()?;
|
||||
ctx.log_line("checking health");
|
||||
let _report = crate::neuro::daily_check(&store);
|
||||
|
||||
// Compute graph health metrics for status display
|
||||
ctx.log_line("computing graph health");
|
||||
let health = compute_graph_health(&store);
|
||||
*gh.lock().unwrap() = Some(health);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
|
||||
// Only compute cheap metrics here — interference detection is O(n²)
|
||||
// and takes minutes. The full plan (with interference) runs during
|
||||
// consolidation itself.
|
||||
let graph = store.build_graph();
|
||||
let snap = crate::graph::current_metrics(&graph);
|
||||
|
||||
let episodic_count = store.nodes.iter()
|
||||
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
|
||||
.count();
|
||||
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
|
||||
else { episodic_count as f32 / store.nodes.len() as f32 };
|
||||
|
||||
// Estimate plan from cheap metrics only (skip interference)
|
||||
let mut plan_replay = 3usize; // baseline maintenance
|
||||
let mut plan_linker = 0usize;
|
||||
let plan_separator = 0usize; // needs interference, skip for status
|
||||
let mut plan_transfer = 0usize;
|
||||
let mut rationale = Vec::new();
|
||||
|
||||
if snap.alpha < 2.0 {
|
||||
plan_replay += 7; plan_linker += 5;
|
||||
rationale.push(format!("α={:.2}: extreme hub dominance", snap.alpha));
|
||||
} else if snap.alpha < 2.5 {
|
||||
plan_replay += 2; plan_linker += 3;
|
||||
rationale.push(format!("α={:.2}: moderate hub dominance", snap.alpha));
|
||||
}
|
||||
if snap.gini > 0.5 {
|
||||
plan_replay += 3;
|
||||
rationale.push(format!("gini={:.3}: high inequality", snap.gini));
|
||||
}
|
||||
if snap.avg_cc < 0.1 {
|
||||
plan_replay += 5;
|
||||
rationale.push(format!("cc={:.3}: very poor integration", snap.avg_cc));
|
||||
} else if snap.avg_cc < 0.2 {
|
||||
plan_replay += 2;
|
||||
rationale.push(format!("cc={:.3}: low integration", snap.avg_cc));
|
||||
}
|
||||
if episodic_ratio > 0.6 {
|
||||
plan_transfer += 10;
|
||||
rationale.push(format!("episodic={:.0}%: needs extraction", episodic_ratio * 100.0));
|
||||
} else if episodic_ratio > 0.4 {
|
||||
plan_transfer += 5;
|
||||
rationale.push(format!("episodic={:.0}%", episodic_ratio * 100.0));
|
||||
}
|
||||
|
||||
GraphHealth {
|
||||
nodes: snap.nodes,
|
||||
edges: snap.edges,
|
||||
communities: snap.communities,
|
||||
alpha: snap.alpha,
|
||||
gini: snap.gini,
|
||||
avg_cc: snap.avg_cc,
|
||||
sigma: snap.sigma,
|
||||
episodic_ratio,
|
||||
interference: 0, // not computed in status check
|
||||
plan_replay,
|
||||
plan_linker,
|
||||
plan_separator,
|
||||
plan_transfer,
|
||||
plan_rationale: rationale,
|
||||
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
|
||||
}
|
||||
}
|
||||
|
||||
// --- Session detection ---
|
||||
|
||||
/// Find JSONL session files that are stale (not recently written) and not
|
||||
|
|
@ -257,19 +411,45 @@ fn proc_uptime(pid: u32) -> Option<String> {
|
|||
|
||||
// --- Status writing ---
|
||||
|
||||
fn write_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) {
|
||||
let status = build_status(choir, last_daily);
|
||||
fn write_status(
|
||||
choir: &Choir,
|
||||
last_daily: Option<chrono::NaiveDate>,
|
||||
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
|
||||
) {
|
||||
let status = build_status(choir, last_daily, graph_health);
|
||||
if let Ok(json) = serde_json::to_string_pretty(&status) {
|
||||
let _ = fs::write(status_path(), json);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct GraphHealth {
|
||||
pub nodes: usize,
|
||||
pub edges: usize,
|
||||
pub communities: usize,
|
||||
pub alpha: f32, // power-law exponent (target ≥2.5)
|
||||
pub gini: f32, // degree inequality (target ≤0.4)
|
||||
pub avg_cc: f32, // clustering coefficient (target ≥0.2)
|
||||
pub sigma: f32, // small-world sigma
|
||||
pub episodic_ratio: f32, // episodic/total nodes (target <0.4)
|
||||
pub interference: usize, // interfering pairs (target <50)
|
||||
// Consolidation work estimate from plan
|
||||
pub plan_replay: usize,
|
||||
pub plan_linker: usize,
|
||||
pub plan_separator: usize,
|
||||
pub plan_transfer: usize,
|
||||
pub plan_rationale: Vec<String>,
|
||||
pub computed_at: String,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct DaemonStatus {
|
||||
pid: u32,
|
||||
tasks: Vec<TaskInfo>,
|
||||
#[serde(default)]
|
||||
last_daily: Option<String>,
|
||||
#[serde(default)]
|
||||
graph_health: Option<GraphHealth>,
|
||||
}
|
||||
|
||||
// --- The daemon ---
|
||||
|
|
@ -293,11 +473,13 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
.and_then(|d| d.parse().ok())
|
||||
));
|
||||
|
||||
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
log_event("daemon", "started", &format!("pid {}", std::process::id()));
|
||||
eprintln!("poc-memory daemon started (pid {})", std::process::id());
|
||||
|
||||
// Write initial status
|
||||
write_status(&choir, *last_daily.lock().unwrap());
|
||||
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
|
||||
|
||||
// Session watcher: reconcile-based extraction
|
||||
// Each tick: scan filesystem for stale sessions, check store for what's
|
||||
|
|
@ -306,6 +488,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
let choir_sw = Arc::clone(&choir);
|
||||
let llm_sw = Arc::clone(&llm);
|
||||
let last_daily_sw = Arc::clone(&last_daily);
|
||||
let graph_health_sw = Arc::clone(&graph_health);
|
||||
choir.spawn("session-watcher").init(move |ctx| {
|
||||
ctx.set_progress("idle");
|
||||
// Cache: path → (file_size, segment_count). Invalidated when size changes.
|
||||
|
|
@ -442,6 +625,12 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
count
|
||||
};
|
||||
|
||||
// No extractable messages — skip entirely
|
||||
if seg_count == 0 {
|
||||
already_mined += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let fname_key = super::enrich::transcript_filename_key(&path_str);
|
||||
let has_whole_file_key = mined.contains(&fname_key);
|
||||
|
||||
|
|
@ -460,7 +649,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
// Migrate old whole-file key: if it exists but no per-segment keys,
|
||||
// write per-segment keys for all current segments (they were mined
|
||||
// under the old scheme)
|
||||
if has_whole_file_key && !has_any_seg_key {
|
||||
if has_whole_file_key && !has_any_seg_key && seg_count > 0 {
|
||||
migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count));
|
||||
// After migration, all current segments are covered
|
||||
unmined_segs.clear();
|
||||
|
|
@ -493,19 +682,37 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
|
||||
// Migrate old whole-file keys to per-segment keys
|
||||
if !migrate_keys.is_empty() {
|
||||
if let Ok(mut store) = crate::store::Store::load() {
|
||||
for (fname_key, path_str, seg_count) in &migrate_keys {
|
||||
for i in 0..*seg_count {
|
||||
let seg_key = format!("{}.{}", fname_key, i);
|
||||
let content = format!("Migrated from whole-file key for {}", path_str);
|
||||
let mut node = crate::store::new_node(&seg_key, &content);
|
||||
node.provenance = crate::store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
match crate::store::Store::load() {
|
||||
Ok(mut store) => {
|
||||
let mut ok = 0;
|
||||
let mut fail = 0;
|
||||
for (fname_key, path_str, seg_count) in &migrate_keys {
|
||||
for i in 0..*seg_count {
|
||||
let seg_key = format!("{}.{}", fname_key, i);
|
||||
let content = format!("Migrated from whole-file key for {}", path_str);
|
||||
let mut node = crate::store::new_node(&seg_key, &content);
|
||||
node.provenance = crate::store::Provenance::AgentExperienceMine;
|
||||
match store.upsert_node(node) {
|
||||
Ok(()) => ok += 1,
|
||||
Err(e) => {
|
||||
if fail == 0 {
|
||||
eprintln!("migration upsert_node error: {}", e);
|
||||
}
|
||||
fail += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = store.save() {
|
||||
eprintln!("migration save error: {}", e);
|
||||
}
|
||||
log_event("session-watcher", "migrated",
|
||||
&format!("{} whole-file keys → per-segment ({} ok, {} fail)",
|
||||
migrate_keys.len(), ok, fail));
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("migration store load error: {}", e);
|
||||
}
|
||||
let _ = store.save();
|
||||
log_event("session-watcher", "migrated",
|
||||
&format!("{} whole-file keys → per-segment", migrate_keys.len()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -571,7 +778,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
ctx.set_progress("idle");
|
||||
}
|
||||
|
||||
write_status(&choir_sw, *last_daily_sw.lock().unwrap());
|
||||
write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw);
|
||||
std::thread::sleep(SCHEDULER_INTERVAL);
|
||||
}
|
||||
});
|
||||
|
|
@ -580,6 +787,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
let choir_sched = Arc::clone(&choir);
|
||||
let llm_sched = Arc::clone(&llm);
|
||||
let last_daily_sched = Arc::clone(&last_daily);
|
||||
let graph_health_sched = Arc::clone(&graph_health);
|
||||
choir.spawn("scheduler").init(move |ctx| {
|
||||
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
|
||||
ctx.set_progress("idle");
|
||||
|
|
@ -591,49 +799,124 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
|
||||
let today = chrono::Local::now().date_naive();
|
||||
|
||||
// Health check: every hour
|
||||
// Health check: every hour — also updates graph health metrics
|
||||
if last_health.elapsed() >= HEALTH_INTERVAL {
|
||||
choir_sched.spawn("health").init(|ctx| {
|
||||
job_daily_check(ctx)
|
||||
let gh = Arc::clone(&graph_health_sched);
|
||||
choir_sched.spawn("health").init(move |ctx| {
|
||||
job_daily_check(ctx, &gh)
|
||||
});
|
||||
last_health = std::time::Instant::now();
|
||||
}
|
||||
|
||||
// Daily jobs: once per day
|
||||
// Daily jobs: once per day (wait for health check to cache metrics first)
|
||||
let last = *last_daily_sched.lock().unwrap();
|
||||
if last.is_none_or(|d| d < today) {
|
||||
let gh = graph_health_sched.lock().unwrap().clone();
|
||||
if last.is_none_or(|d| d < today) && gh.is_some() {
|
||||
log_event("scheduler", "daily-trigger", &today.to_string());
|
||||
|
||||
// Decay disabled — version spam and premature demotion
|
||||
// choir_sched.spawn(format!("decay:{}", today)).init(|ctx| {
|
||||
// job_decay(ctx)
|
||||
// });
|
||||
// Use cached graph health for plan (cheap — no O(n²) interference detection).
|
||||
let (replay, linker, separator, transfer) = match gh {
|
||||
Some(ref h) => (h.plan_replay, h.plan_linker, h.plan_separator, h.plan_transfer),
|
||||
None => unreachable!(), // guarded by gh.is_some() above
|
||||
};
|
||||
let plan = crate::neuro::ConsolidationPlan {
|
||||
replay_count: replay,
|
||||
linker_count: linker,
|
||||
separator_count: separator,
|
||||
transfer_count: transfer,
|
||||
run_health: true,
|
||||
rationale: Vec::new(),
|
||||
};
|
||||
|
||||
// Consolidation pipeline: consolidate → knowledge-loop → digest
|
||||
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
|
||||
.resource(&llm_sched)
|
||||
.retries(2)
|
||||
.init(move |ctx| {
|
||||
job_consolidate(ctx)
|
||||
})
|
||||
.run();
|
||||
let batch_size = 5;
|
||||
|
||||
let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today))
|
||||
// Build the list of (agent_type, batch_size) runs
|
||||
let mut runs: Vec<(&str, usize)> = Vec::new();
|
||||
if plan.run_health {
|
||||
runs.push(("health", 0));
|
||||
}
|
||||
for (name, count) in [
|
||||
("replay", plan.replay_count),
|
||||
("linker", plan.linker_count),
|
||||
("separator", plan.separator_count),
|
||||
("transfer", plan.transfer_count),
|
||||
] {
|
||||
let mut remaining = count;
|
||||
while remaining > 0 {
|
||||
let batch = remaining.min(batch_size);
|
||||
runs.push((name, batch));
|
||||
remaining -= batch;
|
||||
}
|
||||
}
|
||||
|
||||
log_event("scheduler", "consolidation-plan",
|
||||
&format!("{} agents ({}r {}l {}s {}t)",
|
||||
runs.len(), plan.replay_count, plan.linker_count,
|
||||
plan.separator_count, plan.transfer_count));
|
||||
|
||||
// Phase 1: Agent runs (sequential — each reloads store to see prior changes)
|
||||
let mut prev_agent = None;
|
||||
for (i, (agent_type, batch)) in runs.iter().enumerate() {
|
||||
let agent = agent_type.to_string();
|
||||
let b = *batch;
|
||||
let task_name = format!("c-{}-{}:{}", agent, i, today);
|
||||
let mut builder = choir_sched.spawn(task_name)
|
||||
.resource(&llm_sched)
|
||||
.retries(1)
|
||||
.init(move |ctx| {
|
||||
job_consolidation_agent(ctx, &agent, b)
|
||||
});
|
||||
if let Some(ref dep) = prev_agent {
|
||||
builder.depend_on(dep);
|
||||
}
|
||||
prev_agent = Some(builder.run());
|
||||
}
|
||||
|
||||
// Phase 2: Apply actions from agent reports
|
||||
let mut apply = choir_sched.spawn(format!("c-apply:{}", today))
|
||||
.resource(&llm_sched)
|
||||
.retries(1)
|
||||
.init(move |ctx| {
|
||||
job_knowledge_loop(ctx)
|
||||
});
|
||||
knowledge.depend_on(&consolidate);
|
||||
let knowledge = knowledge.run();
|
||||
.init(move |ctx| job_consolidation_apply(ctx));
|
||||
if let Some(ref dep) = prev_agent {
|
||||
apply.depend_on(dep);
|
||||
}
|
||||
let apply = apply.run();
|
||||
|
||||
let mut digest = choir_sched.spawn(format!("digest:{}", today))
|
||||
// Phase 3: Link orphans (CPU-only, no LLM)
|
||||
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
|
||||
.retries(1)
|
||||
.init(move |ctx| job_link_orphans(ctx));
|
||||
orphans.depend_on(&apply);
|
||||
let orphans = orphans.run();
|
||||
|
||||
// Phase 4: Cap degree
|
||||
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
|
||||
.retries(1)
|
||||
.init(move |ctx| job_cap_degree(ctx));
|
||||
cap.depend_on(&orphans);
|
||||
let cap = cap.run();
|
||||
|
||||
// Phase 5: Generate digests
|
||||
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
|
||||
.resource(&llm_sched)
|
||||
.retries(1)
|
||||
.init(move |ctx| {
|
||||
job_digest(ctx)
|
||||
});
|
||||
digest.depend_on(&knowledge);
|
||||
.init(move |ctx| job_digest(ctx));
|
||||
digest.depend_on(&cap);
|
||||
let digest = digest.run();
|
||||
|
||||
// Phase 6: Apply digest links
|
||||
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
|
||||
.retries(1)
|
||||
.init(move |ctx| job_digest_links(ctx));
|
||||
digest_links.depend_on(&digest);
|
||||
let digest_links = digest_links.run();
|
||||
|
||||
// Phase 7: Knowledge loop
|
||||
let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today))
|
||||
.resource(&llm_sched)
|
||||
.retries(1)
|
||||
.init(move |ctx| job_knowledge_loop(ctx));
|
||||
knowledge.depend_on(&digest_links);
|
||||
|
||||
*last_daily_sched.lock().unwrap() = Some(today);
|
||||
ctx.set_progress(format!("daily pipeline triggered ({today})"));
|
||||
|
|
@ -645,7 +928,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
log::trace!("pruned {} finished tasks", pruned);
|
||||
}
|
||||
|
||||
write_status(&choir_sched, *last_daily_sched.lock().unwrap());
|
||||
write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched);
|
||||
std::thread::sleep(SCHEDULER_INTERVAL);
|
||||
}
|
||||
});
|
||||
|
|
@ -653,7 +936,8 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
// Main thread: listen on status socket + wait for signals
|
||||
let choir_main = Arc::clone(&choir);
|
||||
let last_daily_main = Arc::clone(&last_daily);
|
||||
status_socket_loop(&choir_main, &last_daily_main);
|
||||
let graph_health_main = Arc::clone(&graph_health);
|
||||
status_socket_loop(&choir_main, &last_daily_main, &graph_health_main);
|
||||
|
||||
log_event("daemon", "stopping", "");
|
||||
eprintln!("Shutting down...");
|
||||
|
|
@ -668,6 +952,29 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
std::process::exit(0)
|
||||
}
|
||||
|
||||
fn send_rpc(cmd: &str) -> Option<String> {
|
||||
use std::io::{Read as _, Write as _};
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
|
||||
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
|
||||
stream.write_all(cmd.as_bytes()).ok()?;
|
||||
stream.shutdown(std::net::Shutdown::Write).ok()?;
|
||||
let mut buf = String::new();
|
||||
stream.read_to_string(&mut buf).ok()?;
|
||||
Some(buf)
|
||||
}
|
||||
|
||||
pub fn rpc_consolidate() -> Result<(), String> {
|
||||
match send_rpc("consolidate") {
|
||||
Some(resp) => {
|
||||
println!("{}", resp.trim());
|
||||
Ok(())
|
||||
}
|
||||
None => Err("Daemon not running.".into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_status_socket() -> Option<DaemonStatus> {
|
||||
use std::io::Read as _;
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
|
@ -686,8 +993,12 @@ fn status_sock_path() -> PathBuf {
|
|||
/// Listen on a Unix domain socket for status requests.
|
||||
/// Any connection gets the live status JSON written and closed.
|
||||
/// Also handles SIGINT/SIGTERM for clean shutdown.
|
||||
fn status_socket_loop(choir: &Choir, last_daily: &Arc<Mutex<Option<chrono::NaiveDate>>>) {
|
||||
use std::io::Write as _;
|
||||
fn status_socket_loop(
|
||||
choir: &Choir,
|
||||
last_daily: &Arc<Mutex<Option<chrono::NaiveDate>>>,
|
||||
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
|
||||
) {
|
||||
use std::io::{Read as _, Write as _};
|
||||
use std::os::unix::net::UnixListener;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
|
|
@ -719,9 +1030,30 @@ fn status_socket_loop(choir: &Choir, last_daily: &Arc<Mutex<Option<chrono::Naive
|
|||
while !STOP.load(Ordering::Acquire) {
|
||||
match listener.accept() {
|
||||
Ok((mut stream, _)) => {
|
||||
let status = build_status(choir, *last_daily.lock().unwrap());
|
||||
if let Ok(json) = serde_json::to_string_pretty(&status) {
|
||||
let _ = stream.write_all(json.as_bytes());
|
||||
// Read command from client (with short timeout)
|
||||
stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
|
||||
let mut cmd_buf = [0u8; 256];
|
||||
let cmd = match stream.read(&mut cmd_buf) {
|
||||
Ok(n) if n > 0 => std::str::from_utf8(&cmd_buf[..n])
|
||||
.unwrap_or("")
|
||||
.trim()
|
||||
.to_string(),
|
||||
_ => String::new(),
|
||||
};
|
||||
|
||||
match cmd.as_str() {
|
||||
"consolidate" => {
|
||||
*last_daily.lock().unwrap() = None;
|
||||
let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n");
|
||||
log_event("rpc", "consolidate", "triggered via socket");
|
||||
}
|
||||
_ => {
|
||||
// Default: return status
|
||||
let status = build_status(choir, *last_daily.lock().unwrap(), graph_health);
|
||||
if let Ok(json) = serde_json::to_string_pretty(&status) {
|
||||
let _ = stream.write_all(json.as_bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Connection closes when stream is dropped
|
||||
}
|
||||
|
|
@ -739,11 +1071,16 @@ fn status_socket_loop(choir: &Choir, last_daily: &Arc<Mutex<Option<chrono::Naive
|
|||
}
|
||||
}
|
||||
|
||||
fn build_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) -> DaemonStatus {
|
||||
fn build_status(
|
||||
choir: &Choir,
|
||||
last_daily: Option<chrono::NaiveDate>,
|
||||
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
|
||||
) -> DaemonStatus {
|
||||
DaemonStatus {
|
||||
pid: std::process::id(),
|
||||
tasks: choir.task_statuses(),
|
||||
last_daily: last_daily.map(|d| d.to_string()),
|
||||
graph_health: graph_health.lock().unwrap().clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -764,8 +1101,9 @@ fn format_duration_human(ms: u128) -> String {
|
|||
fn task_group(name: &str) -> &str {
|
||||
if name == "session-watcher" || name == "scheduler" { "core" }
|
||||
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
|
||||
else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:")
|
||||
|| name.starts_with("digest:") || name.starts_with("decay:") { "daily" }
|
||||
else if name.starts_with("c-") || name.starts_with("consolidate:")
|
||||
|| name.starts_with("knowledge-loop:") || name.starts_with("digest:")
|
||||
|| name.starts_with("decay:") { "daily" }
|
||||
else if name == "health" { "health" }
|
||||
else { "other" }
|
||||
}
|
||||
|
|
@ -872,9 +1210,33 @@ pub fn show_status() -> Result<(), String> {
|
|||
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
|
||||
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
|
||||
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
|
||||
eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n",
|
||||
eprintln!(" tasks: {} running, {} pending, {} done, {} failed",
|
||||
running, pending, completed, failed);
|
||||
|
||||
// Graph health
|
||||
if let Some(ref gh) = status.graph_health {
|
||||
eprintln!();
|
||||
fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str {
|
||||
let ok = if higher_is_better { val >= target } else { val <= target };
|
||||
if ok { "✓" } else { "✗" }
|
||||
}
|
||||
eprintln!(" Graph health ({})", gh.computed_at);
|
||||
eprintln!(" {} nodes, {} edges, {} communities",
|
||||
gh.nodes, gh.edges, gh.communities);
|
||||
eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)",
|
||||
indicator(gh.alpha, 2.5, true), gh.alpha,
|
||||
indicator(gh.gini, 0.4, false), gh.gini,
|
||||
indicator(gh.avg_cc, 0.2, true), gh.avg_cc);
|
||||
eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}",
|
||||
indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0,
|
||||
gh.sigma);
|
||||
|
||||
let total = gh.plan_replay + gh.plan_linker + gh.plan_separator + gh.plan_transfer + 1;
|
||||
eprintln!(" consolidation plan: {} agents ({}r {}l {}s {}t +health)",
|
||||
total, gh.plan_replay, gh.plan_linker, gh.plan_separator, gh.plan_transfer);
|
||||
}
|
||||
eprintln!();
|
||||
|
||||
// Group and display
|
||||
let groups: &[(&str, &str)] = &[
|
||||
("core", "Core"),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue