diff --git a/src/consolidate.rs b/src/consolidate.rs index 8209776..cd851ea 100644 --- a/src/consolidate.rs +++ b/src/consolidate.rs @@ -23,7 +23,15 @@ fn log_line(buf: &mut String, line: &str) { } /// Run the full autonomous consolidation pipeline with logging. +/// If `on_progress` is provided, it's called at each significant step. pub fn consolidate_full(store: &mut Store) -> Result<(), String> { + consolidate_full_with_progress(store, &|_| {}) +} + +pub fn consolidate_full_with_progress( + store: &mut Store, + on_progress: &dyn Fn(&str), +) -> Result<(), String> { let start = std::time::Instant::now(); let log_key = format!("_consolidate-log-{}", store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], "")); @@ -36,6 +44,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 1: Plan --- log_line(&mut log_buf, "--- Step 1: Plan ---"); + on_progress("planning"); let plan = neuro::consolidation_plan(store); let plan_text = neuro::format_plan(&plan); log_line(&mut log_buf, &plan_text); @@ -103,6 +112,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { }; log_line(&mut log_buf, &format!("\n{}", label)); + on_progress(&label); println!("{}", label); // Reload store to pick up changes from previous agents @@ -145,6 +155,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key); log_line(&mut log_buf, &msg); + on_progress(&msg); println!("{}", msg); } @@ -153,6 +164,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 3: Apply consolidation actions --- log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---"); + on_progress("applying actions"); println!("\n--- Applying consolidation actions ---"); *store = Store::load()?; @@ -171,6 +183,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 3b: Link orphans --- log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---"); + on_progress("linking orphans"); println!("\n--- Linking orphan nodes ---"); *store = Store::load()?; @@ -179,6 +192,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 3c: Cap degree --- log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---"); + on_progress("capping degree"); println!("\n--- Capping node degree ---"); *store = Store::load()?; @@ -192,6 +206,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 4: Digest auto --- log_line(&mut log_buf, "\n--- Step 4: Digest auto ---"); + on_progress("generating digests"); println!("\n--- Generating missing digests ---"); *store = Store::load()?; @@ -206,6 +221,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> { // --- Step 5: Apply digest links --- log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---"); + on_progress("applying digest links"); println!("\n--- Applying digest links ---"); *store = Store::load()?; diff --git a/src/daemon.rs b/src/daemon.rs index b582201..37df3cf 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -17,7 +17,7 @@ use std::collections::HashSet; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes @@ -100,11 +100,11 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { let path = path.to_string(); run_job(ctx, &format!("experience-mine {}", path), || { - ctx.set_progress("loading store"); + ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - ctx.set_progress("mining"); + ctx.log_line("mining"); let count = crate::enrich::experience_mine(&mut store, &path)?; - ctx.set_progress(&format!("{} entries mined", count)); + ctx.log_line(&format!("{} entries mined", count)); Ok(()) }) } @@ -112,32 +112,33 @@ fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskErr fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { let path = path.to_string(); run_job(ctx, &format!("fact-mine {}", path), || { - ctx.set_progress("mining facts"); + ctx.log_line("mining facts"); let p = std::path::Path::new(&path); let count = crate::fact_mine::mine_and_store(p)?; - ctx.set_progress(&format!("{} facts stored", count)); + ctx.log_line(&format!("{} facts stored", count)); Ok(()) }) } fn job_decay(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "decay", || { - ctx.set_progress("loading store"); + ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - ctx.set_progress("decaying"); + ctx.log_line("decaying"); let (decayed, pruned) = store.decay(); store.save()?; - ctx.set_progress(&format!("{} decayed, {} pruned", decayed, pruned)); + ctx.log_line(&format!("{} decayed, {} pruned", decayed, pruned)); Ok(()) }) } fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "consolidate", || { - ctx.set_progress("loading store"); + ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - ctx.set_progress("consolidating"); - crate::consolidate::consolidate_full(&mut store) + crate::consolidate::consolidate_full_with_progress(&mut store, &|msg| { + ctx.log_line(msg); + }) }) } @@ -148,9 +149,9 @@ fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> { batch_size: 5, ..Default::default() }; - ctx.set_progress("running agents"); + ctx.log_line("running agents"); let results = crate::knowledge::run_knowledge_loop(&config)?; - ctx.set_progress(&format!("{} cycles, {} actions", + ctx.log_line(&format!("{} cycles, {} actions", results.len(), results.iter().map(|r| r.total_applied).sum::())); Ok(()) @@ -159,18 +160,18 @@ fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> { fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "digest", || { - ctx.set_progress("loading store"); + ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - ctx.set_progress("generating digests"); + ctx.log_line("generating digests"); crate::digest::digest_auto(&mut store) }) } fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "daily-check", || { - ctx.set_progress("loading store"); + ctx.log_line("loading store"); let store = crate::store::Store::load()?; - ctx.set_progress("checking health"); + ctx.log_line("checking health"); let _report = crate::neuro::daily_check(&store); Ok(()) }) @@ -263,11 +264,12 @@ fn proc_uptime(pid: u32) -> Option { // --- Status writing --- -fn write_status(choir: &Choir) { +fn write_status(choir: &Choir, last_daily: Option) { let statuses = choir.task_statuses(); let status = DaemonStatus { pid: std::process::id(), tasks: statuses, + last_daily: last_daily.map(|d| d.to_string()), }; if let Ok(json) = serde_json::to_string_pretty(&status) { let _ = fs::write(status_path(), json); @@ -278,6 +280,8 @@ fn write_status(choir: &Choir) { struct DaemonStatus { pid: u32, tasks: Vec, + #[serde(default)] + last_daily: Option, } // --- The daemon --- @@ -294,11 +298,19 @@ pub fn run_daemon() -> Result<(), String> { let llm = ResourcePool::new("llm", 1); llm.bind(&choir); + // Recover last_daily from previous status file + let last_daily: Arc>> = Arc::new(Mutex::new( + fs::read_to_string(status_path()).ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|s| s.last_daily) + .and_then(|d| d.parse().ok()) + )); + log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); // Write initial status - write_status(&choir); + write_status(&choir, *last_daily.lock().unwrap()); // Session watcher: reconcile-based extraction // Each tick: scan filesystem for stale sessions, check store for what's @@ -306,6 +318,7 @@ pub fn run_daemon() -> Result<(), String> { // No persistent tracking state — the store is the source of truth. let choir_sw = Arc::clone(&choir); let llm_sw = Arc::clone(&llm); + let last_daily_sw = Arc::clone(&last_daily); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); loop { @@ -392,7 +405,7 @@ pub fn run_daemon() -> Result<(), String> { ctx.set_progress("idle"); } - write_status(&choir_sw); + write_status(&choir_sw, *last_daily_sw.lock().unwrap()); std::thread::sleep(SCHEDULER_INTERVAL); } }); @@ -400,8 +413,8 @@ pub fn run_daemon() -> Result<(), String> { // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); let llm_sched = Arc::clone(&llm); + let last_daily_sched = Arc::clone(&last_daily); choir.spawn("scheduler").init(move |ctx| { - let mut last_daily = None::; let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; ctx.set_progress("idle"); @@ -421,7 +434,8 @@ pub fn run_daemon() -> Result<(), String> { } // Daily jobs: once per day - if last_daily.is_none_or(|d| d < today) { + let last = *last_daily_sched.lock().unwrap(); + if last.is_none_or(|d| d < today) { log_event("scheduler", "daily-trigger", &today.to_string()); // Decay (no API calls, fast) @@ -455,7 +469,7 @@ pub fn run_daemon() -> Result<(), String> { }); digest.depend_on(&knowledge); - last_daily = Some(today); + *last_daily_sched.lock().unwrap() = Some(today); ctx.set_progress(&format!("daily pipeline triggered ({})", today)); } @@ -465,7 +479,7 @@ pub fn run_daemon() -> Result<(), String> { log::trace!("pruned {} finished tasks", pruned); } - write_status(&choir_sched); + write_status(&choir_sched, *last_daily_sched.lock().unwrap()); std::thread::sleep(SCHEDULER_INTERVAL); } }); @@ -530,6 +544,25 @@ fn task_group(name: &str) -> &str { else { "other" } } +/// Compute elapsed time for a task, using absolute started_at if available. +fn task_elapsed(t: &TaskInfo) -> Duration { + if matches!(t.status, TaskStatus::Running) { + if let Some(started) = t.started_at { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + Duration::from_secs_f64((now - started).max(0.0)) + } else { + t.elapsed + } + } else { + t.result.as_ref() + .map(|r| r.duration) + .unwrap_or(t.elapsed) + } +} + fn status_symbol(t: &TaskInfo) -> &'static str { if t.cancelled { return "✗" } match t.status { @@ -614,13 +647,20 @@ pub fn show_status() -> Result<(), String> { if n_running > 0 { for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { - let elapsed = if !t.elapsed.is_zero() { - format!(" ({})", format_duration_human(t.elapsed.as_millis())) + let e = task_elapsed(t); + let elapsed = if !e.is_zero() { + format!(" ({})", format_duration_human(e.as_millis())) } else { String::new() }; let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); + if !t.output_log.is_empty() { + let skip = t.output_log.len().saturating_sub(3); + for line in &t.output_log[skip..] { + eprintln!(" │ {}", line); + } + } } } let mut parts = Vec::new(); @@ -646,12 +686,11 @@ pub fn show_status() -> Result<(), String> { eprintln!(" {}", group_label); for t in &tasks { let sym = status_symbol(t); - let duration = if matches!(t.status, TaskStatus::Running) && !t.elapsed.is_zero() { - format_duration_human(t.elapsed.as_millis()) + let e = task_elapsed(t); + let duration = if !e.is_zero() { + format_duration_human(e.as_millis()) } else { - t.result.as_ref() - .map(|r| format_duration_human(r.duration.as_millis())) - .unwrap_or_default() + String::new() }; let retry = if t.max_retries > 0 && t.retry_count > 0 { @@ -660,9 +699,7 @@ pub fn show_status() -> Result<(), String> { String::new() }; - let detail = if matches!(t.status, TaskStatus::Running) { - t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default() - } else { + let detail = if matches!(t.status, TaskStatus::Failed) { t.result.as_ref() .and_then(|r| r.error.as_ref()) .map(|e| { @@ -670,6 +707,8 @@ pub fn show_status() -> Result<(), String> { format!(" err: {}", short) }) .unwrap_or_default() + } else { + String::new() }; if duration.is_empty() { @@ -677,6 +716,14 @@ pub fn show_status() -> Result<(), String> { } else { eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); } + + // Show output log tail for running tasks + if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() { + let skip = t.output_log.len().saturating_sub(5); + for line in &t.output_log[skip..] { + eprintln!(" │ {}", line); + } + } } eprintln!(); }