daemon status: add in-flight tasks, recent completions, and node history command

Show running/pending tasks with elapsed time, progress, and last 3
output lines. Show last 20 completed/failed jobs from daemon log.
Both displayed before the existing grouped task view.

Add 'poc-memory history KEY' command that replays the append-only node
log to show all versions of a key with version number, weight, timestamp,
and content preview. Useful for auditing what modified a node.
This commit is contained in:
ProofOfConcept 2026-03-06 21:38:33 -05:00
parent f4c4e1bb39
commit 851fc0d417
2 changed files with 134 additions and 0 deletions

View file

@ -627,6 +627,53 @@ fn status_symbol(t: &TaskInfo) -> &'static str {
} }
} }
/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…"
fn short_job_name(job: &str) -> String {
// Split "verb path" or just return as-is
if let Some((verb, path)) = job.split_once(' ') {
let file = path.rsplit('/').next().unwrap_or(path);
let file = file.strip_suffix(".jsonl").unwrap_or(file);
let short = if file.len() > 12 { &file[..12] } else { file };
format!("{} {}", verb, short)
} else {
job.to_string()
}
}
fn show_recent_completions(n: usize) {
let path = log_path();
let content = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return,
};
let recent: Vec<&str> = content.lines().rev()
.filter(|line| {
line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"")
})
.take(n)
.collect();
if recent.is_empty() { return; }
eprintln!(" Recent:");
for line in recent.iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
let sym = if event == "completed" { "" } else { "" };
let name = short_job_name(job);
eprintln!(" {} {} {:30} {}", sym, time, name, detail);
}
}
eprintln!();
}
pub fn show_status() -> Result<(), String> { pub fn show_status() -> Result<(), String> {
let status = match read_status_socket() { let status = match read_status_socket() {
Some(s) => s, Some(s) => s,
@ -665,6 +712,41 @@ pub fn show_status() -> Result<(), String> {
("other", "Other"), ("other", "Other"),
]; ];
// In-flight tasks first (running + pending)
let in_flight: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending))
.collect();
if !in_flight.is_empty() {
eprintln!(" In flight:");
for t in &in_flight {
let sym = status_symbol(t);
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" {}", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref()
.filter(|p| *p != "idle")
.map(|p| format!(" {}", p))
.unwrap_or_default();
let name = short_job_name(&t.name);
eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress);
if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(3);
for line in &t.output_log[skip..] {
eprintln!("{}", line);
}
}
}
eprintln!();
}
// Recent completions from log file
show_recent_completions(20);
// Detailed group view only if there are failures worth showing
for (group_id, group_label) in groups { for (group_id, group_label) in groups {
let tasks: Vec<&TaskInfo> = status.tasks.iter() let tasks: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| task_group(&t.name) == *group_id) .filter(|t| task_group(&t.name) == *group_id)

View file

@ -107,6 +107,7 @@ fn main() {
"journal-ts-migrate" => cmd_journal_ts_migrate(), "journal-ts-migrate" => cmd_journal_ts_migrate(),
"load-context" => cmd_load_context(&args[2..]), "load-context" => cmd_load_context(&args[2..]),
"render" => cmd_render(&args[2..]), "render" => cmd_render(&args[2..]),
"history" => cmd_history(&args[2..]),
"write" => cmd_write(&args[2..]), "write" => cmd_write(&args[2..]),
"import" => cmd_import(&args[2..]), "import" => cmd_import(&args[2..]),
"export" => cmd_export(&args[2..]), "export" => cmd_export(&args[2..]),
@ -1547,6 +1548,57 @@ fn cmd_render(args: &[String]) -> Result<(), String> {
Ok(()) Ok(())
} }
fn cmd_history(args: &[String]) -> Result<(), String> {
if args.is_empty() {
return Err("Usage: poc-memory history KEY".into());
}
let key = args.join(" ");
// Replay the node log, collecting all versions of this key
let path = store::nodes_path();
if !path.exists() {
return Err("No node log found".into());
}
use std::io::BufReader;
let file = std::fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
let mut versions: Vec<store::Node> = Vec::new();
while let Ok(msg) = capnp::serialize::read_message(&mut reader, capnp::message::ReaderOptions::new()) {
let log = msg.get_root::<poc_memory::memory_capnp::node_log::Reader>()
.map_err(|e| format!("read log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = store::Node::from_capnp(node_reader)?;
if node.key == key {
versions.push(node);
}
}
}
if versions.is_empty() {
return Err(format!("No history found for '{}'", key));
}
eprintln!("{} versions of '{}':\n", versions.len(), key);
for (i, node) in versions.iter().enumerate() {
let preview: String = node.content.chars().take(200).collect();
let preview = preview.replace('\n', "\\n");
eprintln!(" v{} (w={:.3}, {}): {}",
node.version, node.weight, node.timestamp, preview);
}
// Show latest full content
if let Some(latest) = versions.last() {
eprintln!("\n--- Latest content (v{}) ---", latest.version);
print!("{}", latest.content);
}
Ok(())
}
fn cmd_write(args: &[String]) -> Result<(), String> { fn cmd_write(args: &[String]) -> Result<(), String> {
if args.is_empty() { if args.is_empty() {
return Err("Usage: poc-memory write KEY < content\n\ return Err("Usage: poc-memory write KEY < content\n\