// poc-memory daemon: background job orchestration for memory maintenance // // Replaces the fragile cron+shell approach with a single long-running process // that owns all background memory work. Uses jobkit for worker pool, status // tracking, retry, and cancellation. // // Architecture: // - Scheduler task: runs every 60s, scans filesystem state, spawns jobs // - Session watcher task: detects ended Claude sessions, triggers extraction // - Jobs shell out to existing poc-memory subcommands (Phase 1) // - Status written to daemon-status.json for `poc-memory daemon status` // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus}; use std::collections::{HashMap, HashSet}; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); fn status_file() -> &'static str { "daemon-status.json" } fn log_file() -> &'static str { "daemon.log" } fn status_path() -> PathBuf { crate::config::get().data_dir.join(status_file()) } fn log_path() -> PathBuf { crate::config::get().data_dir.join(log_file()) } fn projects_dir() -> PathBuf { crate::config::get().projects_dir.clone() } // --- Logging --- const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half fn log_event(job: &str, event: &str, detail: &str) { let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); let line = if detail.is_empty() { format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) } else { // Escape detail for JSON safety let safe = detail.replace('\\', "\\\\").replace('"', "\\\"") .replace('\n', "\\n"); format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", ts, job, event, safe) }; let path = log_path(); // Rotate if too large if let Ok(meta) = fs::metadata(&path) { if meta.len() > LOG_MAX_BYTES { if let Ok(content) = fs::read_to_string(&path) { let half = content.len() / 2; // Find next newline after halfway point if let Some(nl) = content[half..].find('\n') { let _ = fs::write(&path, &content[half + nl + 1..]); } } } } if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) { let _ = f.write_all(line.as_bytes()); } } // --- Job functions (direct, no subprocess) --- /// Run a named job with logging, progress reporting, and error mapping. fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { log_event(name, "started", ""); ctx.set_progress("starting"); let start = std::time::Instant::now(); match f() { Ok(()) => { let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); log_event(name, "completed", &duration); ctx.set_result(&duration); Ok(()) } Err(e) => { let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); let msg = format!("{}: {}", duration, e); log_event(name, "failed", &msg); Err(TaskError::Retry(msg)) } } } fn job_experience_mine(ctx: &ExecutionContext, path: &str, segment: Option) -> Result<(), TaskError> { let path = path.to_string(); run_job(ctx, &format!("experience-mine {}", path), || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("mining"); let count = crate::enrich::experience_mine(&mut store, &path, segment)?; ctx.log_line(format!("{count} entries mined")); Ok(()) }) } fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { let path = path.to_string(); run_job(ctx, &format!("fact-mine {}", path), || { ctx.log_line("mining facts"); let p = std::path::Path::new(&path); let progress = |msg: &str| { ctx.set_progress(msg); }; let count = crate::fact_mine::mine_and_store(p, Some(&progress))?; ctx.log_line(format!("{count} facts stored")); Ok(()) }) } fn job_consolidate(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "consolidate", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; crate::consolidate::consolidate_full_with_progress(&mut store, &|msg| { ctx.log_line(msg); }) }) } fn job_knowledge_loop(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "knowledge-loop", || { let config = crate::knowledge::KnowledgeLoopConfig { max_cycles: 100, batch_size: 5, ..Default::default() }; ctx.log_line("running agents"); let results = crate::knowledge::run_knowledge_loop(&config)?; ctx.log_line(format!("{} cycles, {} actions", results.len(), results.iter().map(|r| r.total_applied).sum::())); Ok(()) }) } fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "digest", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("generating digests"); crate::digest::digest_auto(&mut store) }) } fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "daily-check", || { ctx.log_line("loading store"); let store = crate::store::Store::load()?; ctx.log_line("checking health"); let _report = crate::neuro::daily_check(&store); Ok(()) }) } // --- Session detection --- /// Find JSONL session files that are stale (not recently written) and not /// held open by any process. /// Find JSONL session files that haven't been written to recently. /// Only checks metadata (stat), no file reads or subprocess calls. /// The fuser check (is file open?) is deferred to the reconcile loop, /// only for sessions that pass the mined-key filter. /// Minimum file size for a session to be worth mining. /// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive /// sessions are much larger. Skip anything too small to contain /// meaningful conversation. const MIN_SESSION_BYTES: u64 = 100_000; fn find_stale_sessions() -> Vec { let projects = projects_dir(); if !projects.exists() { return Vec::new(); } let mut stale = Vec::new(); let now = SystemTime::now(); let Ok(dirs) = fs::read_dir(&projects) else { return stale }; for dir_entry in dirs.filter_map(|e| e.ok()) { if !dir_entry.path().is_dir() { continue; } let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; for f in files.filter_map(|e| e.ok()) { let path = f.path(); if path.extension().map(|x| x == "jsonl").unwrap_or(false) { if let Ok(meta) = path.metadata() { // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) if meta.len() < MIN_SESSION_BYTES { continue; } if let Ok(mtime) = meta.modified() { let age = now.duration_since(mtime).unwrap_or_default(); if age.as_secs() >= SESSION_STALE_SECS { stale.push(path); } } } } } } stale } /// Check if any other process has a file open by scanning /proc/*/fd/. /// This is what `fuser` does internally, without the subprocess overhead. fn is_file_open(path: &Path) -> bool { let Ok(target) = path.canonicalize() else { return false }; let Ok(procs) = fs::read_dir("/proc") else { return false }; let my_pid = std::process::id().to_string(); for proc_entry in procs.filter_map(|e| e.ok()) { let name = proc_entry.file_name(); let name = name.to_string_lossy(); if !name.chars().all(|c| c.is_ascii_digit()) { continue; } if *name == my_pid { continue; } let fd_dir = proc_entry.path().join("fd"); let Ok(fds) = fs::read_dir(&fd_dir) else { continue }; for fd in fds.filter_map(|e| e.ok()) { if let Ok(link) = fs::read_link(fd.path()) { if link == target { return true; } } } } false } /// Get process uptime as human-readable string by reading /proc//stat. fn proc_uptime(pid: u32) -> Option { // /proc//stat field 22 (1-indexed) is start time in clock ticks let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?; // Fields after comm (which may contain spaces/parens) — find closing paren let after_comm = stat.get(stat.rfind(')')? + 2..)?; let fields: Vec<&str> = after_comm.split_whitespace().collect(); // Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19) let start_ticks: u64 = fields.get(19)?.parse().ok()?; let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64; let boot_time_secs = { let uptime = fs::read_to_string("/proc/uptime").ok()?; let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?; let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); now - sys_uptime as u64 }; let start_secs = boot_time_secs + start_ticks / ticks_per_sec; let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); let uptime = now.saturating_sub(start_secs); Some(format_duration_human(uptime as u128 * 1000)) } // --- Status writing --- fn write_status(choir: &Choir, last_daily: Option) { let status = build_status(choir, last_daily); if let Ok(json) = serde_json::to_string_pretty(&status) { let _ = fs::write(status_path(), json); } } #[derive(serde::Serialize, serde::Deserialize)] struct DaemonStatus { pid: u32, tasks: Vec, #[serde(default)] last_daily: Option, } // --- The daemon --- pub fn run_daemon() -> Result<(), String> { let choir = Choir::new(); let llm_concurrency = crate::config::get().llm_concurrency; // Workers: 2 for long-running loops + llm_concurrency + 1 for non-LLM jobs let n_workers = llm_concurrency + 3; let names: Vec = (0..n_workers).map(|i| format!("w{}", i)).collect(); let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect(); let llm = ResourcePool::new("llm", llm_concurrency); llm.bind(&choir); // Recover last_daily from previous status file let last_daily: Arc>> = Arc::new(Mutex::new( fs::read_to_string(status_path()).ok() .and_then(|s| serde_json::from_str::(&s).ok()) .and_then(|s| s.last_daily) .and_then(|d| d.parse().ok()) )); log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); // Write initial status write_status(&choir, *last_daily.lock().unwrap()); // Session watcher: reconcile-based extraction // Each tick: scan filesystem for stale sessions, check store for what's // already mined, check task registry for what's in-flight, spawn the diff. // No persistent tracking state — the store is the source of truth. let choir_sw = Arc::clone(&choir); let llm_sw = Arc::clone(&llm); let last_daily_sw = Arc::clone(&last_daily); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); // Cache segment counts so we don't re-parse large files every tick let mut seg_cache: HashMap = HashMap::new(); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } ctx.set_progress("scanning"); // What's currently running/pending? (avoid spawning duplicates) let active: HashSet = choir_sw.task_statuses().iter() .filter(|t| !t.status.is_finished()) .map(|t| t.name.clone()) .collect(); let stale = find_stale_sessions(); // Load mined transcript keys once for this tick let mined = crate::enrich::mined_transcript_keys(); // Limit new tasks per tick — the resource pool gates execution, // but we don't need thousands of task objects in the registry. // The watcher ticks every 60s so backlog drains steadily. const MAX_NEW_PER_TICK: usize = 10; // Load fact-mined keys too let fact_keys: HashSet = { use crate::store::StoreView; let view = crate::store::AnyView::load().ok(); view.map(|v| { let mut keys = HashSet::new(); v.for_each_node(|key, _, _| { if key.starts_with("_facts-") { keys.insert(key.to_string()); } }); keys }).unwrap_or_default() }; let mut extract_queued = 0; let mut extract_remaining = 0; let mut fact_remaining = 0; let mut already_mined = 0; let mut still_open = 0; let total_stale = stale.len(); // Multi-segment files where all segments are done — write whole-file key let mut mark_transcript_done: Vec<(String, String, usize)> = Vec::new(); // Classify each session — now segment-aware // Each entry: (filename, path_str, segment_index) let mut needs_extract: Vec<(String, String, Option)> = Vec::new(); let mut needs_fact: Vec<(String, String)> = Vec::new(); for session in stale { let filename = session.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); let path_str = session.to_string_lossy().to_string(); // Check for old-style whole-file mined key let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str); if !experience_done { if is_file_open(&session) { still_open += 1; continue; } // Get segment count, using cache to avoid re-parsing large files let seg_count = if let Some(&cached) = seg_cache.get(&path_str) { cached } else { let messages = match crate::enrich::extract_conversation(&path_str) { Ok(m) => m, Err(_) => continue, }; let count = crate::enrich::split_on_compaction(messages).len(); seg_cache.insert(path_str.clone(), count); count }; if seg_count <= 1 { let task_name = format!("extract:{}", filename); if !active.contains(&task_name) { needs_extract.push((filename, path_str, None)); } } else { // Multi-segment — find unmined segments let fname_key = crate::enrich::transcript_filename_key(&path_str); let mut unmined = 0; for i in 0..seg_count { let seg_key = format!("{}.{}", fname_key, i); if mined.contains(&seg_key) { continue; } unmined += 1; let task_name = format!("extract:{}.{}", filename, i); if active.contains(&task_name) { continue; } needs_extract.push(( format!("{}.{}", filename, i), path_str.clone(), Some(i), )); } if unmined == 0 { // All segments done — write whole-file key so we skip next tick mark_transcript_done.push((fname_key, path_str.clone(), seg_count)); already_mined += 1; } } } else { let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); let fact_done = fact_keys.contains(&fact_key); if !fact_done { let task_name = format!("fact-mine:{}", filename); if !active.contains(&task_name) { needs_fact.push((filename, path_str)); } } else { already_mined += 1; } } } // Spawn experience-mine jobs (priority) for (task_label, path_str, segment) in &needs_extract { if extract_queued >= MAX_NEW_PER_TICK { extract_remaining += 1; continue; } let task_name = format!("extract:{}", task_label); log_event("extract", "queued", &task_name); let path = path_str.clone(); let seg = *segment; choir_sw.spawn(task_name) .resource(&llm_sw) .retries(2) .init(move |ctx| { job_experience_mine(ctx, &path, seg) }); extract_queued += 1; } // Only queue fact-mine when experience backlog is clear // Sort by file size so small transcripts drain first needs_fact.sort_by_key(|(_, path_str)| { fs::metadata(path_str).map(|m| m.len()).unwrap_or(u64::MAX) }); let mut fact_queued = 0; if needs_extract.len() == extract_queued { let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued); for (filename, path_str) in &needs_fact { if fact_queued >= fact_budget { fact_remaining += 1; continue; } let task_name = format!("fact-mine:{}", filename); log_event("fact-mine", "queued", path_str); let path = path_str.clone(); choir_sw.spawn(task_name) .resource(&llm_sw) .retries(1) .init(move |ctx| { job_fact_mine(ctx, &path) }); fact_queued += 1; } } else { fact_remaining = needs_fact.len(); } // Write whole-file keys for fully-mined multi-segment files if !mark_transcript_done.is_empty() { if let Ok(mut store) = crate::store::Store::load() { for (fname_key, path_str, seg_count) in &mark_transcript_done { let content = format!("All {} segments mined for {}", seg_count, path_str); let mut node = crate::store::new_node(fname_key, &content); node.provenance = crate::store::Provenance::AgentExperienceMine; let _ = store.upsert_node(node); seg_cache.remove(path_str); } let _ = store.save(); } } let extract_pending = extract_queued + extract_remaining; let fact_pending = fact_queued + fact_remaining; if extract_pending > 0 || fact_pending > 0 || still_open > 0 { log_event("session-watcher", "tick", &format!("{} stale, {} mined, {} extract, {} fact, {} open", total_stale, already_mined, extract_pending, fact_pending, still_open)); let mut parts = Vec::new(); if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); } if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); } if still_open > 0 { parts.push(format!("{} open", still_open)); } ctx.set_progress(parts.join(", ")); } else { ctx.set_progress("idle"); } write_status(&choir_sw, *last_daily_sw.lock().unwrap()); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); let llm_sched = Arc::clone(&llm); let last_daily_sched = Arc::clone(&last_daily); choir.spawn("scheduler").init(move |ctx| { let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; ctx.set_progress("idle"); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } let today = chrono::Local::now().date_naive(); // Health check: every hour if last_health.elapsed() >= HEALTH_INTERVAL { choir_sched.spawn("health").init(|ctx| { job_daily_check(ctx) }); last_health = std::time::Instant::now(); } // Daily jobs: once per day let last = *last_daily_sched.lock().unwrap(); if last.is_none_or(|d| d < today) { log_event("scheduler", "daily-trigger", &today.to_string()); // Decay disabled — version spam and premature demotion // choir_sched.spawn(format!("decay:{}", today)).init(|ctx| { // job_decay(ctx) // }); // Consolidation pipeline: consolidate → knowledge-loop → digest let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) .resource(&llm_sched) .retries(2) .init(move |ctx| { job_consolidate(ctx) }) .run(); let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) .resource(&llm_sched) .retries(1) .init(move |ctx| { job_knowledge_loop(ctx) }); knowledge.depend_on(&consolidate); let knowledge = knowledge.run(); let mut digest = choir_sched.spawn(format!("digest:{}", today)) .resource(&llm_sched) .retries(1) .init(move |ctx| { job_digest(ctx) }); digest.depend_on(&knowledge); *last_daily_sched.lock().unwrap() = Some(today); ctx.set_progress(format!("daily pipeline triggered ({today})")); } // Prune finished tasks from registry let pruned = choir_sched.gc_finished(); if pruned > 0 { log::trace!("pruned {} finished tasks", pruned); } write_status(&choir_sched, *last_daily_sched.lock().unwrap()); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Main thread: listen on status socket + wait for signals let choir_main = Arc::clone(&choir); let last_daily_main = Arc::clone(&last_daily); status_socket_loop(&choir_main, &last_daily_main); log_event("daemon", "stopping", ""); eprintln!("Shutting down..."); // Clean up socket let _ = fs::remove_file(status_sock_path()); log_event("daemon", "stopped", ""); // Exit immediately — PR_SET_PDEATHSIG on child processes ensures // claude subprocesses get SIGTERM when we die. std::process::exit(0) } fn read_status_socket() -> Option { use std::io::Read as _; use std::os::unix::net::UnixStream; let mut stream = UnixStream::connect(status_sock_path()).ok()?; stream.set_read_timeout(Some(Duration::from_secs(2))).ok(); let mut buf = String::new(); stream.read_to_string(&mut buf).ok()?; serde_json::from_str(&buf).ok() } fn status_sock_path() -> PathBuf { crate::config::get().data_dir.join("daemon.sock") } /// Listen on a Unix domain socket for status requests. /// Any connection gets the live status JSON written and closed. /// Also handles SIGINT/SIGTERM for clean shutdown. fn status_socket_loop(choir: &Choir, last_daily: &Arc>>) { use std::io::Write as _; use std::os::unix::net::UnixListener; use std::sync::atomic::{AtomicBool, Ordering}; static STOP: AtomicBool = AtomicBool::new(false); unsafe { libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); } let sock_path = status_sock_path(); let _ = fs::remove_file(&sock_path); // clean up stale socket let listener = match UnixListener::bind(&sock_path) { Ok(l) => l, Err(e) => { eprintln!("Warning: couldn't bind status socket {}: {}", sock_path.display(), e); // Fall back to just waiting for signals while !STOP.load(Ordering::Acquire) { std::thread::sleep(Duration::from_millis(500)); } return; } }; // Non-blocking so we can check STOP flag listener.set_nonblocking(true).ok(); while !STOP.load(Ordering::Acquire) { match listener.accept() { Ok((mut stream, _)) => { let status = build_status(choir, *last_daily.lock().unwrap()); if let Ok(json) = serde_json::to_string_pretty(&status) { let _ = stream.write_all(json.as_bytes()); } // Connection closes when stream is dropped } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::thread::sleep(Duration::from_millis(100)); } Err(_) => { std::thread::sleep(Duration::from_millis(100)); } } } extern "C" fn handle_signal(_: libc::c_int) { STOP.store(true, std::sync::atomic::Ordering::Release); } } fn build_status(choir: &Choir, last_daily: Option) -> DaemonStatus { DaemonStatus { pid: std::process::id(), tasks: choir.task_statuses(), last_daily: last_daily.map(|d| d.to_string()), } } // --- Status display --- fn format_duration_human(ms: u128) -> String { if ms < 1_000 { format!("{}ms", ms) } else if ms < 60_000 { format!("{:.1}s", ms as f64 / 1000.0) } else if ms < 3_600_000 { format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000) } else { format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) } } fn task_group(name: &str) -> &str { if name == "session-watcher" || name == "scheduler" { "core" } else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" } else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:") || name.starts_with("digest:") || name.starts_with("decay:") { "daily" } else if name == "health" { "health" } else { "other" } } /// Compute elapsed time for a task, using absolute started_at if available. fn task_elapsed(t: &TaskInfo) -> Duration { if matches!(t.status, TaskStatus::Running) { if let Some(started) = t.started_at { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(); Duration::from_secs_f64((now - started).max(0.0)) } else { t.elapsed } } else { t.result.as_ref() .map(|r| r.duration) .unwrap_or(t.elapsed) } } fn status_symbol(t: &TaskInfo) -> &'static str { if t.cancelled { return "✗" } match t.status { TaskStatus::Running => "▶", TaskStatus::Completed => "✓", TaskStatus::Failed => "✗", TaskStatus::Pending => "·", } } /// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…" fn short_job_name(job: &str) -> String { // Split "verb path" or just return as-is if let Some((verb, path)) = job.split_once(' ') { let file = path.rsplit('/').next().unwrap_or(path); let file = file.strip_suffix(".jsonl").unwrap_or(file); let short = if file.len() > 12 { &file[..12] } else { file }; format!("{} {}", verb, short) } else { job.to_string() } } fn show_recent_completions(n: usize) { let path = log_path(); let content = match fs::read_to_string(&path) { Ok(c) => c, Err(_) => return, }; let recent: Vec<&str> = content.lines().rev() .filter(|line| { line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"") }) .take(n) .collect(); if recent.is_empty() { return; } eprintln!(" Recent:"); for line in recent.iter().rev() { if let Ok(obj) = serde_json::from_str::(line) { let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; let sym = if event == "completed" { "✓" } else { "✗" }; let name = short_job_name(job); eprintln!(" {} {} {:30} {}", sym, time, name, detail); } } eprintln!(); } pub fn show_status() -> Result<(), String> { let status = match read_status_socket() { Some(s) => s, None => { eprintln!("Daemon not running."); return Ok(()); } }; let uptime_str = proc_uptime(status.pid).unwrap_or_default(); if uptime_str.is_empty() { eprintln!("poc-memory daemon pid={}", status.pid); } else { eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str); } if status.tasks.is_empty() { eprintln!("\n No tasks"); return Ok(()); } // Count by status let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n", running, pending, completed, failed); // Group and display let groups: &[(&str, &str)] = &[ ("core", "Core"), ("daily", "Daily pipeline"), ("extract", "Session extraction"), ("health", "Health"), ("other", "Other"), ]; // In-flight tasks first (running + pending) let in_flight: Vec<&TaskInfo> = status.tasks.iter() .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) .collect(); if !in_flight.is_empty() { eprintln!(" In flight:"); for t in &in_flight { let sym = status_symbol(t); let e = task_elapsed(t); let elapsed = if !e.is_zero() { format!(" {}", format_duration_human(e.as_millis())) } else { String::new() }; let progress = t.progress.as_deref() .filter(|p| *p != "idle") .map(|p| format!(" {}", p)) .unwrap_or_default(); let name = short_job_name(&t.name); eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress); if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() { let skip = t.output_log.len().saturating_sub(3); for line in &t.output_log[skip..] { eprintln!(" │ {}", line); } } } eprintln!(); } // Recent completions from log file show_recent_completions(20); // Detailed group view only if there are failures worth showing for (group_id, group_label) in groups { let tasks: Vec<&TaskInfo> = status.tasks.iter() .filter(|t| task_group(&t.name) == *group_id) .collect(); if tasks.is_empty() { continue; } // For extract group, show summary instead of individual tasks if *group_id == "extract" { let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); eprintln!(" {} ({} total)", group_label, tasks.len()); if n_running > 0 { for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { let e = task_elapsed(t); let elapsed = if !e.is_zero() { format!(" ({})", format_duration_human(e.as_millis())) } else { String::new() }; let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); if !t.output_log.is_empty() { let skip = t.output_log.len().saturating_sub(3); for line in &t.output_log[skip..] { eprintln!(" │ {}", line); } } } } let mut parts = Vec::new(); if n_done > 0 { parts.push(format!("{} done", n_done)); } if n_running > 0 { parts.push(format!("{} running", n_running)); } if n_pending > 0 { parts.push(format!("{} queued", n_pending)); } if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); } eprintln!(" {}", parts.join(", ")); // Show recent failures for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { if let Some(ref r) = t.result { if let Some(ref err) = r.error { let short = if err.len() > 80 { &err[..80] } else { err }; eprintln!(" ✗ {}: {}", t.name, short); } } } eprintln!(); continue; } eprintln!(" {}", group_label); for t in &tasks { let sym = status_symbol(t); let e = task_elapsed(t); let duration = if !e.is_zero() { format_duration_human(e.as_millis()) } else { String::new() }; let retry = if t.max_retries > 0 && t.retry_count > 0 { format!(" retry {}/{}", t.retry_count, t.max_retries) } else { String::new() }; let detail = if matches!(t.status, TaskStatus::Failed) { t.result.as_ref() .and_then(|r| r.error.as_ref()) .map(|e| { let short = if e.len() > 60 { &e[..60] } else { e }; format!(" err: {}", short) }) .unwrap_or_default() } else { String::new() }; if duration.is_empty() { eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail); } else { eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); } // Show output log tail for running tasks if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() { let skip = t.output_log.len().saturating_sub(5); for line in &t.output_log[skip..] { eprintln!(" │ {}", line); } } } eprintln!(); } Ok(()) } pub fn install_service() -> Result<(), String> { let exe = std::env::current_exe() .map_err(|e| format!("current_exe: {}", e))?; let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?; let unit_dir = PathBuf::from(&home).join(".config/systemd/user"); fs::create_dir_all(&unit_dir) .map_err(|e| format!("create {}: {}", unit_dir.display(), e))?; let unit = format!( r#"[Unit] Description=poc-memory daemon — background memory maintenance After=default.target [Service] Type=simple ExecStart={exe} daemon Restart=on-failure RestartSec=30 Environment=HOME={home} Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin [Install] WantedBy=default.target "#, exe = exe.display(), home = home); let unit_path = unit_dir.join("poc-memory.service"); fs::write(&unit_path, &unit) .map_err(|e| format!("write {}: {}", unit_path.display(), e))?; eprintln!("Wrote {}", unit_path.display()); let status = std::process::Command::new("systemctl") .args(["--user", "daemon-reload"]) .status() .map_err(|e| format!("systemctl daemon-reload: {}", e))?; if !status.success() { return Err("systemctl daemon-reload failed".into()); } let status = std::process::Command::new("systemctl") .args(["--user", "enable", "--now", "poc-memory"]) .status() .map_err(|e| format!("systemctl enable: {}", e))?; if !status.success() { return Err("systemctl enable --now failed".into()); } eprintln!("Service enabled and started"); // Install poc-daemon service install_notify_daemon(&unit_dir, &home)?; // Install memory-search + poc-hook into Claude settings install_hook()?; Ok(()) } /// Install the poc-daemon (notification/idle) systemd user service. fn install_notify_daemon(unit_dir: &Path, home: &str) -> Result<(), String> { let poc_daemon = PathBuf::from(home).join(".cargo/bin/poc-daemon"); if !poc_daemon.exists() { eprintln!("Warning: poc-daemon not found at {} — skipping service install", poc_daemon.display()); eprintln!(" Build with: cargo install --path ."); return Ok(()); } let unit = format!( r#"[Unit] Description=poc-daemon — notification routing and idle management After=default.target [Service] Type=simple ExecStart={exe} daemon Restart=on-failure RestartSec=10 Environment=HOME={home} Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin [Install] WantedBy=default.target "#, exe = poc_daemon.display(), home = home); let unit_path = unit_dir.join("poc-daemon.service"); fs::write(&unit_path, &unit) .map_err(|e| format!("write {}: {}", unit_path.display(), e))?; eprintln!("Wrote {}", unit_path.display()); let status = std::process::Command::new("systemctl") .args(["--user", "daemon-reload"]) .status() .map_err(|e| format!("systemctl daemon-reload: {}", e))?; if !status.success() { return Err("systemctl daemon-reload failed".into()); } let status = std::process::Command::new("systemctl") .args(["--user", "enable", "--now", "poc-daemon"]) .status() .map_err(|e| format!("systemctl enable: {}", e))?; if !status.success() { return Err("systemctl enable --now poc-daemon failed".into()); } eprintln!("poc-daemon service enabled and started"); Ok(()) } /// Install memory-search and poc-hook into Claude Code settings.json. /// Public so `poc-memory init` can call it too. /// /// Hook layout: /// UserPromptSubmit: memory-search (10s), poc-hook (5s) /// PostToolUse: poc-hook (5s) /// Stop: poc-hook (5s) pub fn install_hook() -> Result<(), String> { let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?; let exe = std::env::current_exe() .map_err(|e| format!("current_exe: {}", e))?; let settings_path = PathBuf::from(&home).join(".claude/settings.json"); let memory_search = exe.with_file_name("memory-search"); let poc_hook = exe.with_file_name("poc-hook"); let mut settings: serde_json::Value = if settings_path.exists() { let content = fs::read_to_string(&settings_path) .map_err(|e| format!("read settings: {}", e))?; serde_json::from_str(&content) .map_err(|e| format!("parse settings: {}", e))? } else { serde_json::json!({}) }; let obj = settings.as_object_mut().ok_or("settings not an object")?; let hooks_obj = obj.entry("hooks") .or_insert_with(|| serde_json::json!({})) .as_object_mut().ok_or("hooks not an object")?; let mut changed = false; // Helper: ensure a hook binary is present in an event's hook list let ensure_hook = |hooks_obj: &mut serde_json::Map, event: &str, binary: &Path, timeout: u32, changed: &mut bool| { if !binary.exists() { eprintln!("Warning: {} not found — skipping", binary.display()); return; } let cmd = binary.to_string_lossy().to_string(); let name = binary.file_name().unwrap().to_string_lossy().to_string(); let event_array = hooks_obj.entry(event) .or_insert_with(|| serde_json::json!([{"hooks": []}])) .as_array_mut().unwrap(); if event_array.is_empty() { event_array.push(serde_json::json!({"hooks": []})); } let inner = event_array[0] .as_object_mut().unwrap() .entry("hooks") .or_insert_with(|| serde_json::json!([])) .as_array_mut().unwrap(); // Remove legacy load-memory.sh let before = inner.len(); inner.retain(|h| { let c = h.get("command").and_then(|c| c.as_str()).unwrap_or(""); !c.contains("load-memory") }); if inner.len() < before { eprintln!("Removed load-memory.sh from {event}"); *changed = true; } let already = inner.iter().any(|h| { h.get("command").and_then(|c| c.as_str()) .is_some_and(|c| c.contains(&name)) }); if !already { inner.push(serde_json::json!({ "type": "command", "command": cmd, "timeout": timeout })); *changed = true; eprintln!("Installed {name} in {event}"); } }; // UserPromptSubmit: memory-search + poc-hook ensure_hook(hooks_obj, "UserPromptSubmit", &memory_search, 10, &mut changed); ensure_hook(hooks_obj, "UserPromptSubmit", &poc_hook, 5, &mut changed); // PostToolUse + Stop: poc-hook only ensure_hook(hooks_obj, "PostToolUse", &poc_hook, 5, &mut changed); ensure_hook(hooks_obj, "Stop", &poc_hook, 5, &mut changed); if changed { let json = serde_json::to_string_pretty(&settings) .map_err(|e| format!("serialize settings: {}", e))?; fs::write(&settings_path, json) .map_err(|e| format!("write settings: {}", e))?; eprintln!("Updated {}", settings_path.display()); } else { eprintln!("All hooks already installed in {}", settings_path.display()); } Ok(()) } pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { let path = log_path(); if !path.exists() { eprintln!("No daemon log found."); return Ok(()); } let content = fs::read_to_string(&path) .map_err(|e| format!("read log: {}", e))?; let filtered: Vec<&str> = content.lines().rev() .filter(|line| { if let Some(job) = job_filter { line.contains(&format!("\"job\":\"{}\"", job)) } else { true } }) .take(lines) .collect(); if filtered.is_empty() { eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default()); return Ok(()); } // Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]" for line in filtered.into_iter().rev() { if let Ok(obj) = serde_json::from_str::(line) { let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); // Shorten timestamp to just time portion let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; if detail.is_empty() { eprintln!(" {} {:20} {}", time, job, event); } else { // Truncate long details (file paths) let short = if detail.len() > 60 { let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail); if last.len() > 60 { &last[..60] } else { last } } else { detail }; eprintln!(" {} {:20} {:12} {}", time, job, event, short); } } else { eprintln!("{}", line); } } Ok(()) }