daemon: per-job output log, daily dedup, absolute timestamps

- Jobs report progress via ctx.log_line(), building a rolling output
  trail visible in `poc-memory daemon status` (last 5 lines per task).
- consolidate_full_with_progress() takes a callback, so each agent step
  ([1/7] health, [2/7] replay, etc.) shows up in the status display.
- Persist last_daily date in daemon-status.json so daily pipeline isn't
  re-triggered on daemon restart.
- Compute elapsed from absolute started_at timestamps instead of stale
  relative durations in the status file.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-05 22:16:17 -05:00
parent cc7943cb50
commit b6c70c7734
2 changed files with 98 additions and 35 deletions

View file

@ -23,7 +23,15 @@ fn log_line(buf: &mut String, line: &str) {
} }
/// Run the full autonomous consolidation pipeline with logging. /// Run the full autonomous consolidation pipeline with logging.
/// If `on_progress` is provided, it's called at each significant step.
pub fn consolidate_full(store: &mut Store) -> Result<(), String> { pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
consolidate_full_with_progress(store, &|_| {})
}
pub fn consolidate_full_with_progress(
store: &mut Store,
on_progress: &dyn Fn(&str),
) -> Result<(), String> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let log_key = format!("_consolidate-log-{}", let log_key = format!("_consolidate-log-{}",
store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], "")); store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], ""));
@ -36,6 +44,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 1: Plan --- // --- Step 1: Plan ---
log_line(&mut log_buf, "--- Step 1: Plan ---"); log_line(&mut log_buf, "--- Step 1: Plan ---");
on_progress("planning");
let plan = neuro::consolidation_plan(store); let plan = neuro::consolidation_plan(store);
let plan_text = neuro::format_plan(&plan); let plan_text = neuro::format_plan(&plan);
log_line(&mut log_buf, &plan_text); log_line(&mut log_buf, &plan_text);
@ -103,6 +112,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
}; };
log_line(&mut log_buf, &format!("\n{}", label)); log_line(&mut log_buf, &format!("\n{}", label));
on_progress(&label);
println!("{}", label); println!("{}", label);
// Reload store to pick up changes from previous agents // Reload store to pick up changes from previous agents
@ -145,6 +155,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key); let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key);
log_line(&mut log_buf, &msg); log_line(&mut log_buf, &msg);
on_progress(&msg);
println!("{}", msg); println!("{}", msg);
} }
@ -153,6 +164,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 3: Apply consolidation actions --- // --- Step 3: Apply consolidation actions ---
log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---"); log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---");
on_progress("applying actions");
println!("\n--- Applying consolidation actions ---"); println!("\n--- Applying consolidation actions ---");
*store = Store::load()?; *store = Store::load()?;
@ -171,6 +183,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 3b: Link orphans --- // --- Step 3b: Link orphans ---
log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---"); log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---");
on_progress("linking orphans");
println!("\n--- Linking orphan nodes ---"); println!("\n--- Linking orphan nodes ---");
*store = Store::load()?; *store = Store::load()?;
@ -179,6 +192,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 3c: Cap degree --- // --- Step 3c: Cap degree ---
log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---"); log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---");
on_progress("capping degree");
println!("\n--- Capping node degree ---"); println!("\n--- Capping node degree ---");
*store = Store::load()?; *store = Store::load()?;
@ -192,6 +206,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 4: Digest auto --- // --- Step 4: Digest auto ---
log_line(&mut log_buf, "\n--- Step 4: Digest auto ---"); log_line(&mut log_buf, "\n--- Step 4: Digest auto ---");
on_progress("generating digests");
println!("\n--- Generating missing digests ---"); println!("\n--- Generating missing digests ---");
*store = Store::load()?; *store = Store::load()?;
@ -206,6 +221,7 @@ pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
// --- Step 5: Apply digest links --- // --- Step 5: Apply digest links ---
log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---"); log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---");
on_progress("applying digest links");
println!("\n--- Applying digest links ---"); println!("\n--- Applying digest links ---");
*store = Store::load()?; *store = Store::load()?;

View file

@ -17,7 +17,7 @@ use std::collections::HashSet;
use std::fs; use std::fs;
use std::io::Write; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SESSION_STALE_SECS: u64 = 600; // 10 minutes
@ -100,11 +100,11 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St
fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
let path = path.to_string(); let path = path.to_string();
run_job(ctx, &format!("experience-mine {}", path), || { run_job(ctx, &format!("experience-mine {}", path), || {
ctx.set_progress("loading store"); ctx.log_line("loading store");
let mut store = crate::store::Store::load()?; let mut store = crate::store::Store::load()?;
ctx.set_progress("mining"); ctx.log_line("mining");
let count = crate::enrich::experience_mine(&mut store, &path)?; let count = crate::enrich::experience_mine(&mut store, &path)?;
ctx.set_progress(&format!("{} entries mined", count)); ctx.log_line(&format!("{} entries mined", count));
Ok(()) Ok(())
}) })
} }
@ -112,32 +112,33 @@ fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskErr
fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
let path = path.to_string(); let path = path.to_string();
run_job(ctx, &format!("fact-mine {}", path), || { run_job(ctx, &format!("fact-mine {}", path), || {
ctx.set_progress("mining facts"); ctx.log_line("mining facts");
let p = std::path::Path::new(&path); let p = std::path::Path::new(&path);
let count = crate::fact_mine::mine_and_store(p)?; let count = crate::fact_mine::mine_and_store(p)?;
ctx.set_progress(&format!("{} facts stored", count)); ctx.log_line(&format!("{} facts stored", count));
Ok(()) Ok(())
}) })
} }
fn job_decay(ctx: &ExecutionContext) -> Result<(), TaskError> { fn job_decay(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "decay", || { run_job(ctx, "decay", || {
ctx.set_progress("loading store"); ctx.log_line("loading store");
let mut store = crate::store::Store::load()?; let mut store = crate::store::Store::load()?;
ctx.set_progress("decaying"); ctx.log_line("decaying");
let (decayed, pruned) = store.decay(); let (decayed, pruned) = store.decay();
store.save()?; store.save()?;
ctx.set_progress(&format!("{} decayed, {} pruned", decayed, pruned)); ctx.log_line(&format!("{} decayed, {} pruned", decayed, pruned));
Ok(()) Ok(())
}) })
} }
fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> { fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "consolidate", || { run_job(ctx, "consolidate", || {
ctx.set_progress("loading store"); ctx.log_line("loading store");
let mut store = crate::store::Store::load()?; let mut store = crate::store::Store::load()?;
ctx.set_progress("consolidating"); crate::consolidate::consolidate_full_with_progress(&mut store, &|msg| {
crate::consolidate::consolidate_full(&mut store) ctx.log_line(msg);
})
}) })
} }
@ -148,9 +149,9 @@ fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> {
batch_size: 5, batch_size: 5,
..Default::default() ..Default::default()
}; };
ctx.set_progress("running agents"); ctx.log_line("running agents");
let results = crate::knowledge::run_knowledge_loop(&config)?; let results = crate::knowledge::run_knowledge_loop(&config)?;
ctx.set_progress(&format!("{} cycles, {} actions", ctx.log_line(&format!("{} cycles, {} actions",
results.len(), results.len(),
results.iter().map(|r| r.total_applied).sum::<usize>())); results.iter().map(|r| r.total_applied).sum::<usize>()));
Ok(()) Ok(())
@ -159,18 +160,18 @@ fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> {
fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "digest", || { run_job(ctx, "digest", || {
ctx.set_progress("loading store"); ctx.log_line("loading store");
let mut store = crate::store::Store::load()?; let mut store = crate::store::Store::load()?;
ctx.set_progress("generating digests"); ctx.log_line("generating digests");
crate::digest::digest_auto(&mut store) crate::digest::digest_auto(&mut store)
}) })
} }
fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "daily-check", || { run_job(ctx, "daily-check", || {
ctx.set_progress("loading store"); ctx.log_line("loading store");
let store = crate::store::Store::load()?; let store = crate::store::Store::load()?;
ctx.set_progress("checking health"); ctx.log_line("checking health");
let _report = crate::neuro::daily_check(&store); let _report = crate::neuro::daily_check(&store);
Ok(()) Ok(())
}) })
@ -263,11 +264,12 @@ fn proc_uptime(pid: u32) -> Option<String> {
// --- Status writing --- // --- Status writing ---
fn write_status(choir: &Choir) { fn write_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) {
let statuses = choir.task_statuses(); let statuses = choir.task_statuses();
let status = DaemonStatus { let status = DaemonStatus {
pid: std::process::id(), pid: std::process::id(),
tasks: statuses, tasks: statuses,
last_daily: last_daily.map(|d| d.to_string()),
}; };
if let Ok(json) = serde_json::to_string_pretty(&status) { if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = fs::write(status_path(), json); let _ = fs::write(status_path(), json);
@ -278,6 +280,8 @@ fn write_status(choir: &Choir) {
struct DaemonStatus { struct DaemonStatus {
pid: u32, pid: u32,
tasks: Vec<TaskInfo>, tasks: Vec<TaskInfo>,
#[serde(default)]
last_daily: Option<String>,
} }
// --- The daemon --- // --- The daemon ---
@ -294,11 +298,19 @@ pub fn run_daemon() -> Result<(), String> {
let llm = ResourcePool::new("llm", 1); let llm = ResourcePool::new("llm", 1);
llm.bind(&choir); llm.bind(&choir);
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
fs::read_to_string(status_path()).ok()
.and_then(|s| serde_json::from_str::<DaemonStatus>(&s).ok())
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
log_event("daemon", "started", &format!("pid {}", std::process::id())); log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id()); eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Write initial status // Write initial status
write_status(&choir); write_status(&choir, *last_daily.lock().unwrap());
// Session watcher: reconcile-based extraction // Session watcher: reconcile-based extraction
// Each tick: scan filesystem for stale sessions, check store for what's // Each tick: scan filesystem for stale sessions, check store for what's
@ -306,6 +318,7 @@ pub fn run_daemon() -> Result<(), String> {
// No persistent tracking state — the store is the source of truth. // No persistent tracking state — the store is the source of truth.
let choir_sw = Arc::clone(&choir); let choir_sw = Arc::clone(&choir);
let llm_sw = Arc::clone(&llm); let llm_sw = Arc::clone(&llm);
let last_daily_sw = Arc::clone(&last_daily);
choir.spawn("session-watcher").init(move |ctx| { choir.spawn("session-watcher").init(move |ctx| {
ctx.set_progress("idle"); ctx.set_progress("idle");
loop { loop {
@ -392,7 +405,7 @@ pub fn run_daemon() -> Result<(), String> {
ctx.set_progress("idle"); ctx.set_progress("idle");
} }
write_status(&choir_sw); write_status(&choir_sw, *last_daily_sw.lock().unwrap());
std::thread::sleep(SCHEDULER_INTERVAL); std::thread::sleep(SCHEDULER_INTERVAL);
} }
}); });
@ -400,8 +413,8 @@ pub fn run_daemon() -> Result<(), String> {
// Scheduler: runs daily jobs based on filesystem state // Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir); let choir_sched = Arc::clone(&choir);
let llm_sched = Arc::clone(&llm); let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily);
choir.spawn("scheduler").init(move |ctx| { choir.spawn("scheduler").init(move |ctx| {
let mut last_daily = None::<chrono::NaiveDate>;
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
ctx.set_progress("idle"); ctx.set_progress("idle");
@ -421,7 +434,8 @@ pub fn run_daemon() -> Result<(), String> {
} }
// Daily jobs: once per day // Daily jobs: once per day
if last_daily.is_none_or(|d| d < today) { let last = *last_daily_sched.lock().unwrap();
if last.is_none_or(|d| d < today) {
log_event("scheduler", "daily-trigger", &today.to_string()); log_event("scheduler", "daily-trigger", &today.to_string());
// Decay (no API calls, fast) // Decay (no API calls, fast)
@ -455,7 +469,7 @@ pub fn run_daemon() -> Result<(), String> {
}); });
digest.depend_on(&knowledge); digest.depend_on(&knowledge);
last_daily = Some(today); *last_daily_sched.lock().unwrap() = Some(today);
ctx.set_progress(&format!("daily pipeline triggered ({})", today)); ctx.set_progress(&format!("daily pipeline triggered ({})", today));
} }
@ -465,7 +479,7 @@ pub fn run_daemon() -> Result<(), String> {
log::trace!("pruned {} finished tasks", pruned); log::trace!("pruned {} finished tasks", pruned);
} }
write_status(&choir_sched); write_status(&choir_sched, *last_daily_sched.lock().unwrap());
std::thread::sleep(SCHEDULER_INTERVAL); std::thread::sleep(SCHEDULER_INTERVAL);
} }
}); });
@ -530,6 +544,25 @@ fn task_group(name: &str) -> &str {
else { "other" } else { "other" }
} }
/// Compute elapsed time for a task, using absolute started_at if available.
fn task_elapsed(t: &TaskInfo) -> Duration {
if matches!(t.status, TaskStatus::Running) {
if let Some(started) = t.started_at {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
Duration::from_secs_f64((now - started).max(0.0))
} else {
t.elapsed
}
} else {
t.result.as_ref()
.map(|r| r.duration)
.unwrap_or(t.elapsed)
}
}
fn status_symbol(t: &TaskInfo) -> &'static str { fn status_symbol(t: &TaskInfo) -> &'static str {
if t.cancelled { return "" } if t.cancelled { return "" }
match t.status { match t.status {
@ -614,13 +647,20 @@ pub fn show_status() -> Result<(), String> {
if n_running > 0 { if n_running > 0 {
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
let elapsed = if !t.elapsed.is_zero() { let e = task_elapsed(t);
format!(" ({})", format_duration_human(t.elapsed.as_millis())) let elapsed = if !e.is_zero() {
format!(" ({})", format_duration_human(e.as_millis()))
} else { } else {
String::new() String::new()
}; };
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
if !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(3);
for line in &t.output_log[skip..] {
eprintln!("{}", line);
}
}
} }
} }
let mut parts = Vec::new(); let mut parts = Vec::new();
@ -646,12 +686,11 @@ pub fn show_status() -> Result<(), String> {
eprintln!(" {}", group_label); eprintln!(" {}", group_label);
for t in &tasks { for t in &tasks {
let sym = status_symbol(t); let sym = status_symbol(t);
let duration = if matches!(t.status, TaskStatus::Running) && !t.elapsed.is_zero() { let e = task_elapsed(t);
format_duration_human(t.elapsed.as_millis()) let duration = if !e.is_zero() {
format_duration_human(e.as_millis())
} else { } else {
t.result.as_ref() String::new()
.map(|r| format_duration_human(r.duration.as_millis()))
.unwrap_or_default()
}; };
let retry = if t.max_retries > 0 && t.retry_count > 0 { let retry = if t.max_retries > 0 && t.retry_count > 0 {
@ -660,9 +699,7 @@ pub fn show_status() -> Result<(), String> {
String::new() String::new()
}; };
let detail = if matches!(t.status, TaskStatus::Running) { let detail = if matches!(t.status, TaskStatus::Failed) {
t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default()
} else {
t.result.as_ref() t.result.as_ref()
.and_then(|r| r.error.as_ref()) .and_then(|r| r.error.as_ref())
.map(|e| { .map(|e| {
@ -670,6 +707,8 @@ pub fn show_status() -> Result<(), String> {
format!(" err: {}", short) format!(" err: {}", short)
}) })
.unwrap_or_default() .unwrap_or_default()
} else {
String::new()
}; };
if duration.is_empty() { if duration.is_empty() {
@ -677,6 +716,14 @@ pub fn show_status() -> Result<(), String> {
} else { } else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail);
} }
// Show output log tail for running tasks
if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(5);
for line in &t.output_log[skip..] {
eprintln!("{}", line);
}
}
} }
eprintln!(); eprintln!();
} }