daemon: add progress reporting to all jobs

Jobs now call ctx.set_progress() at key stages (loading store, mining,
consolidating, etc.), visible in `poc-memory daemon status`. The
session-watcher and scheduler loops also report their state (idle,
scanning, queued counts).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-05 21:57:53 -05:00
parent cf5fe42a15
commit cc7943cb50

View file

@ -12,7 +12,7 @@
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ResourcePool, TaskError, TaskInfo, TaskStatus};
use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus};
use std::collections::HashSet;
use std::fs;
use std::io::Write;
@ -75,15 +75,17 @@ fn log_event(job: &str, event: &str, detail: &str) {
// --- Job functions (direct, no subprocess) ---
/// Run a named job with logging and error mapping.
fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
log_event(name, "started", "");
ctx.set_progress("starting");
let start = std::time::Instant::now();
match f() {
Ok(()) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
log_event(name, "completed", &duration);
ctx.set_result(&duration);
Ok(())
}
Err(e) => {
@ -95,75 +97,81 @@ fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), Tas
}
}
fn job_experience_mine(path: &str) -> Result<(), TaskError> {
fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
let path = path.to_string();
run_job(&format!("experience-mine {}", path), || {
run_job(ctx, &format!("experience-mine {}", path), || {
ctx.set_progress("loading store");
let mut store = crate::store::Store::load()?;
ctx.set_progress("mining");
let count = crate::enrich::experience_mine(&mut store, &path)?;
eprintln!("experience-mine: {} new entries from {}", count,
std::path::Path::new(&path).file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.clone()));
ctx.set_progress(&format!("{} entries mined", count));
Ok(())
})
}
fn job_fact_mine(path: &str) -> Result<(), TaskError> {
fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
let path = path.to_string();
run_job(&format!("fact-mine {}", path), || {
run_job(ctx, &format!("fact-mine {}", path), || {
ctx.set_progress("mining facts");
let p = std::path::Path::new(&path);
let count = crate::fact_mine::mine_and_store(p)?;
eprintln!("fact-mine: {} facts from {}", count,
p.file_name().map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.clone()));
ctx.set_progress(&format!("{} facts stored", count));
Ok(())
})
}
fn job_decay() -> Result<(), TaskError> {
run_job("decay", || {
fn job_decay(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "decay", || {
ctx.set_progress("loading store");
let mut store = crate::store::Store::load()?;
ctx.set_progress("decaying");
let (decayed, pruned) = store.decay();
store.save()?;
eprintln!("decay: {} decayed, {} pruned", decayed, pruned);
ctx.set_progress(&format!("{} decayed, {} pruned", decayed, pruned));
Ok(())
})
}
fn job_consolidate() -> Result<(), TaskError> {
run_job("consolidate", || {
fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "consolidate", || {
ctx.set_progress("loading store");
let mut store = crate::store::Store::load()?;
ctx.set_progress("consolidating");
crate::consolidate::consolidate_full(&mut store)
})
}
fn job_knowledge_loop() -> Result<(), TaskError> {
run_job("knowledge-loop", || {
fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "knowledge-loop", || {
let config = crate::knowledge::KnowledgeLoopConfig {
max_cycles: 100,
batch_size: 5,
..Default::default()
};
ctx.set_progress("running agents");
let results = crate::knowledge::run_knowledge_loop(&config)?;
eprintln!("knowledge-loop: {} cycles, {} actions",
ctx.set_progress(&format!("{} cycles, {} actions",
results.len(),
results.iter().map(|r| r.total_applied).sum::<usize>());
results.iter().map(|r| r.total_applied).sum::<usize>()));
Ok(())
})
}
fn job_digest() -> Result<(), TaskError> {
run_job("digest", || {
fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "digest", || {
ctx.set_progress("loading store");
let mut store = crate::store::Store::load()?;
ctx.set_progress("generating digests");
crate::digest::digest_auto(&mut store)
})
}
fn job_daily_check() -> Result<(), TaskError> {
run_job("daily-check", || {
fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "daily-check", || {
ctx.set_progress("loading store");
let store = crate::store::Store::load()?;
let report = crate::neuro::daily_check(&store);
eprint!("{}", report);
ctx.set_progress("checking health");
let _report = crate::neuro::daily_check(&store);
Ok(())
})
}
@ -299,11 +307,13 @@ pub fn run_daemon() -> Result<(), String> {
let choir_sw = Arc::clone(&choir);
let llm_sw = Arc::clone(&llm);
choir.spawn("session-watcher").init(move |ctx| {
ctx.set_progress("idle");
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
ctx.set_progress("scanning");
// What's currently running/pending? (avoid spawning duplicates)
let active: HashSet<String> = choir_sw.task_statuses().iter()
.filter(|t| !t.status.is_finished())
@ -353,8 +363,8 @@ pub fn run_daemon() -> Result<(), String> {
let extract = choir_sw.spawn(task_name)
.resource(&llm_sw)
.retries(2)
.init(move |_ctx| {
job_experience_mine(&path)
.init(move |ctx| {
job_experience_mine(ctx, &path)
})
.run();
@ -365,8 +375,8 @@ pub fn run_daemon() -> Result<(), String> {
let mut fm = choir_sw.spawn(fact_task)
.resource(&llm_sw)
.retries(1)
.init(move |_ctx| {
job_fact_mine(&path2)
.init(move |ctx| {
job_fact_mine(ctx, &path2)
});
fm.depend_on(&extract);
}
@ -377,6 +387,9 @@ pub fn run_daemon() -> Result<(), String> {
log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} open, {} queued",
total_stale, already_mined, still_open, queued));
ctx.set_progress(&format!("{} queued, {} open", queued, still_open));
} else {
ctx.set_progress("idle");
}
write_status(&choir_sw);
@ -390,6 +403,7 @@ pub fn run_daemon() -> Result<(), String> {
choir.spawn("scheduler").init(move |ctx| {
let mut last_daily = None::<chrono::NaiveDate>;
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
ctx.set_progress("idle");
loop {
if ctx.is_cancelled() {
@ -400,8 +414,8 @@ pub fn run_daemon() -> Result<(), String> {
// Health check: every hour
if last_health.elapsed() >= HEALTH_INTERVAL {
choir_sched.spawn("health").init(|_ctx| {
job_daily_check()
choir_sched.spawn("health").init(|ctx| {
job_daily_check(ctx)
});
last_health = std::time::Instant::now();
}
@ -411,24 +425,24 @@ pub fn run_daemon() -> Result<(), String> {
log_event("scheduler", "daily-trigger", &today.to_string());
// Decay (no API calls, fast)
choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| {
job_decay()
choir_sched.spawn(format!("decay:{}", today)).init(|ctx| {
job_decay(ctx)
});
// Consolidation pipeline: consolidate → knowledge-loop → digest
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
.resource(&llm_sched)
.retries(2)
.init(move |_ctx| {
job_consolidate()
.init(move |ctx| {
job_consolidate(ctx)
})
.run();
let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
job_knowledge_loop()
.init(move |ctx| {
job_knowledge_loop(ctx)
});
knowledge.depend_on(&consolidate);
let knowledge = knowledge.run();
@ -436,12 +450,13 @@ pub fn run_daemon() -> Result<(), String> {
let mut digest = choir_sched.spawn(format!("digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
job_digest()
.init(move |ctx| {
job_digest(ctx)
});
digest.depend_on(&knowledge);
last_daily = Some(today);
ctx.set_progress(&format!("daily pipeline triggered ({})", today));
}
// Prune finished tasks from registry
@ -604,7 +619,8 @@ pub fn show_status() -> Result<(), String> {
} else {
String::new()
};
eprintln!(" {} {}{}", status_symbol(t), t.name, elapsed);
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
}
}
let mut parts = Vec::new();
@ -644,18 +660,22 @@ pub fn show_status() -> Result<(), String> {
String::new()
};
let err_msg = t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default();
let detail = if matches!(t.status, TaskStatus::Running) {
t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default()
} else {
t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default()
};
if duration.is_empty() {
eprintln!(" {} {:30}{}{}", sym, t.name, retry, err_msg);
eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail);
} else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, err_msg);
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail);
}
}
eprintln!();