From 851fc0d41729cd876d185ff5b27b7038c39f6a9f Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 6 Mar 2026 21:38:33 -0500 Subject: [PATCH] daemon status: add in-flight tasks, recent completions, and node history command Show running/pending tasks with elapsed time, progress, and last 3 output lines. Show last 20 completed/failed jobs from daemon log. Both displayed before the existing grouped task view. Add 'poc-memory history KEY' command that replays the append-only node log to show all versions of a key with version number, weight, timestamp, and content preview. Useful for auditing what modified a node. --- src/daemon.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 52 ++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/src/daemon.rs b/src/daemon.rs index 8c35ab8..702a2ab 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -627,6 +627,53 @@ fn status_symbol(t: &TaskInfo) -> &'static str { } } +/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…" +fn short_job_name(job: &str) -> String { + // Split "verb path" or just return as-is + if let Some((verb, path)) = job.split_once(' ') { + let file = path.rsplit('/').next().unwrap_or(path); + let file = file.strip_suffix(".jsonl").unwrap_or(file); + let short = if file.len() > 12 { &file[..12] } else { file }; + format!("{} {}", verb, short) + } else { + job.to_string() + } +} + +fn show_recent_completions(n: usize) { + let path = log_path(); + let content = match fs::read_to_string(&path) { + Ok(c) => c, + Err(_) => return, + }; + + let recent: Vec<&str> = content.lines().rev() + .filter(|line| { + line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"") + }) + .take(n) + .collect(); + + if recent.is_empty() { return; } + + eprintln!(" Recent:"); + for line in recent.iter().rev() { + if let Ok(obj) = serde_json::from_str::(line) { + let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); + let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); + let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); + let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); + + let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; + let sym = if event == "completed" { "✓" } else { "✗" }; + let name = short_job_name(job); + + eprintln!(" {} {} {:30} {}", sym, time, name, detail); + } + } + eprintln!(); +} + pub fn show_status() -> Result<(), String> { let status = match read_status_socket() { Some(s) => s, @@ -665,6 +712,41 @@ pub fn show_status() -> Result<(), String> { ("other", "Other"), ]; + // In-flight tasks first (running + pending) + let in_flight: Vec<&TaskInfo> = status.tasks.iter() + .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) + .collect(); + + if !in_flight.is_empty() { + eprintln!(" In flight:"); + for t in &in_flight { + let sym = status_symbol(t); + let e = task_elapsed(t); + let elapsed = if !e.is_zero() { + format!(" {}", format_duration_human(e.as_millis())) + } else { + String::new() + }; + let progress = t.progress.as_deref() + .filter(|p| *p != "idle") + .map(|p| format!(" {}", p)) + .unwrap_or_default(); + let name = short_job_name(&t.name); + eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress); + if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() { + let skip = t.output_log.len().saturating_sub(3); + for line in &t.output_log[skip..] { + eprintln!(" │ {}", line); + } + } + } + eprintln!(); + } + + // Recent completions from log file + show_recent_completions(20); + + // Detailed group view only if there are failures worth showing for (group_id, group_label) in groups { let tasks: Vec<&TaskInfo> = status.tasks.iter() .filter(|t| task_group(&t.name) == *group_id) diff --git a/src/main.rs b/src/main.rs index 1f45309..a2f358e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -107,6 +107,7 @@ fn main() { "journal-ts-migrate" => cmd_journal_ts_migrate(), "load-context" => cmd_load_context(&args[2..]), "render" => cmd_render(&args[2..]), + "history" => cmd_history(&args[2..]), "write" => cmd_write(&args[2..]), "import" => cmd_import(&args[2..]), "export" => cmd_export(&args[2..]), @@ -1547,6 +1548,57 @@ fn cmd_render(args: &[String]) -> Result<(), String> { Ok(()) } +fn cmd_history(args: &[String]) -> Result<(), String> { + if args.is_empty() { + return Err("Usage: poc-memory history KEY".into()); + } + let key = args.join(" "); + + // Replay the node log, collecting all versions of this key + let path = store::nodes_path(); + if !path.exists() { + return Err("No node log found".into()); + } + + use std::io::BufReader; + let file = std::fs::File::open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + let mut versions: Vec = Vec::new(); + while let Ok(msg) = capnp::serialize::read_message(&mut reader, capnp::message::ReaderOptions::new()) { + let log = msg.get_root::() + .map_err(|e| format!("read log: {}", e))?; + for node_reader in log.get_nodes() + .map_err(|e| format!("get nodes: {}", e))? { + let node = store::Node::from_capnp(node_reader)?; + if node.key == key { + versions.push(node); + } + } + } + + if versions.is_empty() { + return Err(format!("No history found for '{}'", key)); + } + + eprintln!("{} versions of '{}':\n", versions.len(), key); + for (i, node) in versions.iter().enumerate() { + let preview: String = node.content.chars().take(200).collect(); + let preview = preview.replace('\n', "\\n"); + eprintln!(" v{} (w={:.3}, {}): {}", + node.version, node.weight, node.timestamp, preview); + } + + // Show latest full content + if let Some(latest) = versions.last() { + eprintln!("\n--- Latest content (v{}) ---", latest.version); + print!("{}", latest.content); + } + + Ok(()) +} + fn cmd_write(args: &[String]) -> Result<(), String> { if args.is_empty() { return Err("Usage: poc-memory write KEY < content\n\