diff --git a/src/daemon.rs b/src/daemon.rs index 0e0b190..b582201 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -12,7 +12,7 @@ // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. -use jobkit::{Choir, ResourcePool, TaskError, TaskInfo, TaskStatus}; +use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus}; use std::collections::HashSet; use std::fs; use std::io::Write; @@ -75,15 +75,17 @@ fn log_event(job: &str, event: &str, detail: &str) { // --- Job functions (direct, no subprocess) --- -/// Run a named job with logging and error mapping. -fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { +/// Run a named job with logging, progress reporting, and error mapping. +fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { log_event(name, "started", ""); + ctx.set_progress("starting"); let start = std::time::Instant::now(); match f() { Ok(()) => { let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); log_event(name, "completed", &duration); + ctx.set_result(&duration); Ok(()) } Err(e) => { @@ -95,75 +97,81 @@ fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), Tas } } -fn job_experience_mine(path: &str) -> Result<(), TaskError> { +fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { let path = path.to_string(); - run_job(&format!("experience-mine {}", path), || { + run_job(ctx, &format!("experience-mine {}", path), || { + ctx.set_progress("loading store"); let mut store = crate::store::Store::load()?; + ctx.set_progress("mining"); let count = crate::enrich::experience_mine(&mut store, &path)?; - eprintln!("experience-mine: {} new entries from {}", count, - std::path::Path::new(&path).file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| path.clone())); + ctx.set_progress(&format!("{} entries mined", count)); Ok(()) }) } -fn job_fact_mine(path: &str) -> Result<(), TaskError> { +fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { let path = path.to_string(); - run_job(&format!("fact-mine {}", path), || { + run_job(ctx, &format!("fact-mine {}", path), || { + ctx.set_progress("mining facts"); let p = std::path::Path::new(&path); let count = crate::fact_mine::mine_and_store(p)?; - eprintln!("fact-mine: {} facts from {}", count, - p.file_name().map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| path.clone())); + ctx.set_progress(&format!("{} facts stored", count)); Ok(()) }) } -fn job_decay() -> Result<(), TaskError> { - run_job("decay", || { +fn job_decay(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "decay", || { + ctx.set_progress("loading store"); let mut store = crate::store::Store::load()?; + ctx.set_progress("decaying"); let (decayed, pruned) = store.decay(); store.save()?; - eprintln!("decay: {} decayed, {} pruned", decayed, pruned); + ctx.set_progress(&format!("{} decayed, {} pruned", decayed, pruned)); Ok(()) }) } -fn job_consolidate() -> Result<(), TaskError> { - run_job("consolidate", || { +fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "consolidate", || { + ctx.set_progress("loading store"); let mut store = crate::store::Store::load()?; + ctx.set_progress("consolidating"); crate::consolidate::consolidate_full(&mut store) }) } -fn job_knowledge_loop() -> Result<(), TaskError> { - run_job("knowledge-loop", || { +fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "knowledge-loop", || { let config = crate::knowledge::KnowledgeLoopConfig { max_cycles: 100, batch_size: 5, ..Default::default() }; + ctx.set_progress("running agents"); let results = crate::knowledge::run_knowledge_loop(&config)?; - eprintln!("knowledge-loop: {} cycles, {} actions", + ctx.set_progress(&format!("{} cycles, {} actions", results.len(), - results.iter().map(|r| r.total_applied).sum::()); + results.iter().map(|r| r.total_applied).sum::())); Ok(()) }) } -fn job_digest() -> Result<(), TaskError> { - run_job("digest", || { +fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "digest", || { + ctx.set_progress("loading store"); let mut store = crate::store::Store::load()?; + ctx.set_progress("generating digests"); crate::digest::digest_auto(&mut store) }) } -fn job_daily_check() -> Result<(), TaskError> { - run_job("daily-check", || { +fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "daily-check", || { + ctx.set_progress("loading store"); let store = crate::store::Store::load()?; - let report = crate::neuro::daily_check(&store); - eprint!("{}", report); + ctx.set_progress("checking health"); + let _report = crate::neuro::daily_check(&store); Ok(()) }) } @@ -299,11 +307,13 @@ pub fn run_daemon() -> Result<(), String> { let choir_sw = Arc::clone(&choir); let llm_sw = Arc::clone(&llm); choir.spawn("session-watcher").init(move |ctx| { + ctx.set_progress("idle"); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } + ctx.set_progress("scanning"); // What's currently running/pending? (avoid spawning duplicates) let active: HashSet = choir_sw.task_statuses().iter() .filter(|t| !t.status.is_finished()) @@ -353,8 +363,8 @@ pub fn run_daemon() -> Result<(), String> { let extract = choir_sw.spawn(task_name) .resource(&llm_sw) .retries(2) - .init(move |_ctx| { - job_experience_mine(&path) + .init(move |ctx| { + job_experience_mine(ctx, &path) }) .run(); @@ -365,8 +375,8 @@ pub fn run_daemon() -> Result<(), String> { let mut fm = choir_sw.spawn(fact_task) .resource(&llm_sw) .retries(1) - .init(move |_ctx| { - job_fact_mine(&path2) + .init(move |ctx| { + job_fact_mine(ctx, &path2) }); fm.depend_on(&extract); } @@ -377,6 +387,9 @@ pub fn run_daemon() -> Result<(), String> { log_event("session-watcher", "tick", &format!("{} stale, {} mined, {} open, {} queued", total_stale, already_mined, still_open, queued)); + ctx.set_progress(&format!("{} queued, {} open", queued, still_open)); + } else { + ctx.set_progress("idle"); } write_status(&choir_sw); @@ -390,6 +403,7 @@ pub fn run_daemon() -> Result<(), String> { choir.spawn("scheduler").init(move |ctx| { let mut last_daily = None::; let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; + ctx.set_progress("idle"); loop { if ctx.is_cancelled() { @@ -400,8 +414,8 @@ pub fn run_daemon() -> Result<(), String> { // Health check: every hour if last_health.elapsed() >= HEALTH_INTERVAL { - choir_sched.spawn("health").init(|_ctx| { - job_daily_check() + choir_sched.spawn("health").init(|ctx| { + job_daily_check(ctx) }); last_health = std::time::Instant::now(); } @@ -411,24 +425,24 @@ pub fn run_daemon() -> Result<(), String> { log_event("scheduler", "daily-trigger", &today.to_string()); // Decay (no API calls, fast) - choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| { - job_decay() + choir_sched.spawn(format!("decay:{}", today)).init(|ctx| { + job_decay(ctx) }); // Consolidation pipeline: consolidate → knowledge-loop → digest let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) .resource(&llm_sched) .retries(2) - .init(move |_ctx| { - job_consolidate() + .init(move |ctx| { + job_consolidate(ctx) }) .run(); let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |_ctx| { - job_knowledge_loop() + .init(move |ctx| { + job_knowledge_loop(ctx) }); knowledge.depend_on(&consolidate); let knowledge = knowledge.run(); @@ -436,12 +450,13 @@ pub fn run_daemon() -> Result<(), String> { let mut digest = choir_sched.spawn(format!("digest:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |_ctx| { - job_digest() + .init(move |ctx| { + job_digest(ctx) }); digest.depend_on(&knowledge); last_daily = Some(today); + ctx.set_progress(&format!("daily pipeline triggered ({})", today)); } // Prune finished tasks from registry @@ -604,7 +619,8 @@ pub fn show_status() -> Result<(), String> { } else { String::new() }; - eprintln!(" {} {}{}", status_symbol(t), t.name, elapsed); + let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); + eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); } } let mut parts = Vec::new(); @@ -644,18 +660,22 @@ pub fn show_status() -> Result<(), String> { String::new() }; - let err_msg = t.result.as_ref() - .and_then(|r| r.error.as_ref()) - .map(|e| { - let short = if e.len() > 60 { &e[..60] } else { e }; - format!(" err: {}", short) - }) - .unwrap_or_default(); + let detail = if matches!(t.status, TaskStatus::Running) { + t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default() + } else { + t.result.as_ref() + .and_then(|r| r.error.as_ref()) + .map(|e| { + let short = if e.len() > 60 { &e[..60] } else { e }; + format!(" err: {}", short) + }) + .unwrap_or_default() + }; if duration.is_empty() { - eprintln!(" {} {:30}{}{}", sym, t.name, retry, err_msg); + eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail); } else { - eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, err_msg); + eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); } } eprintln!();