// poc-memory daemon: background job orchestration for memory maintenance // // Replaces the fragile cron+shell approach with a single long-running process // that owns all background memory work. Uses jobkit for worker pool, status // tracking, retry, and cancellation. // // Architecture: // - Scheduler task: runs every 60s, scans filesystem state, spawns jobs // - Session watcher task: detects ended Claude sessions, triggers extraction // - Jobs shell out to existing poc-memory subcommands (Phase 1) // - Status written to daemon-status.json for `poc-memory daemon status` // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. use jobkit::{Choir, ResourcePool, TaskError, TaskInfo}; use std::collections::HashSet; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::Arc; use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); const STATUS_FILE: &str = ".claude/memory/daemon-status.json"; const LOG_FILE: &str = ".claude/memory/daemon.log"; fn home_dir() -> PathBuf { PathBuf::from(std::env::var("HOME").expect("HOME not set")) } fn status_path() -> PathBuf { home_dir().join(STATUS_FILE) } fn log_path() -> PathBuf { home_dir().join(LOG_FILE) } fn projects_dir() -> PathBuf { home_dir().join(".claude/projects") } fn extracted_set_path() -> PathBuf { home_dir().join(".claude/memory/extracted-sessions.json") } // --- Logging --- fn log_event(job: &str, event: &str, detail: &str) { let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); let line = if detail.is_empty() { format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) } else { format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", ts, job, event, detail.replace('"', "\\\"")) }; if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(log_path()) { let _ = f.write_all(line.as_bytes()); } } // --- Extracted sessions persistence --- fn load_extracted() -> HashSet { fs::read_to_string(extracted_set_path()) .ok() .and_then(|s| serde_json::from_str(&s).ok()) .unwrap_or_default() } fn save_extracted(set: &HashSet) { if let Ok(json) = serde_json::to_string_pretty(set) { let _ = fs::write(extracted_set_path(), json); } } // --- Shell out to poc-memory subcommands --- fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> { log_event(args.first().unwrap_or(&"unknown"), "started", ""); let output = Command::new("poc-memory") .args(args) .output() .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; if output.status.success() { log_event(args.first().unwrap_or(&"unknown"), "completed", ""); Ok(()) } else { let stderr = String::from_utf8_lossy(&output.stderr); let msg = format!("exit {}: {}", output.status, stderr.trim_end()); log_event(args.first().unwrap_or(&"unknown"), "failed", &msg); // Treat non-zero exit as retryable (API errors, transient failures) // unless it looks like a usage error if output.status.code() == Some(1) && stderr.contains("Unknown command") { Err(TaskError::Fatal(msg)) } else { Err(TaskError::Retry(msg)) } } } fn run_python_script(script: &str, args: &[&str]) -> Result<(), TaskError> { let label = Path::new(script).file_stem() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| "python".to_string()); log_event(&label, "started", ""); let output = Command::new("python3") .arg(script) .args(args) .output() .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; if output.status.success() { log_event(&label, "completed", ""); Ok(()) } else { let stderr = String::from_utf8_lossy(&output.stderr); let msg = format!("exit {}: {}", output.status, stderr.trim_end()); log_event(&label, "failed", &msg); Err(TaskError::Retry(msg)) } } // --- Session detection --- fn find_ended_sessions(extracted: &HashSet) -> Vec { let projects = projects_dir(); if !projects.exists() { return Vec::new(); } let mut ended = Vec::new(); let now = SystemTime::now(); let Ok(dirs) = fs::read_dir(&projects) else { return ended }; for dir_entry in dirs.filter_map(|e| e.ok()) { if !dir_entry.path().is_dir() { continue; } let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; for f in files.filter_map(|e| e.ok()) { let path = f.path(); if path.extension().map(|x| x == "jsonl").unwrap_or(false) { let key = path.to_string_lossy().to_string(); if extracted.contains(&key) { continue; } // Check staleness if let Ok(meta) = path.metadata() { if let Ok(mtime) = meta.modified() { let age = now.duration_since(mtime).unwrap_or_default(); if age.as_secs() >= SESSION_STALE_SECS { // Check no process has it open if !is_file_open(&path) { ended.push(path); } } } } } } } ended } fn is_file_open(path: &Path) -> bool { Command::new("fuser") .arg(path) .output() .map(|o| o.status.success()) // fuser exits 0 if file is open .unwrap_or(false) } // --- Status writing --- fn write_status(choir: &Choir) { let statuses = choir.task_statuses(); let status = DaemonStatus { pid: std::process::id(), tasks: statuses, }; if let Ok(json) = serde_json::to_string_pretty(&status) { let _ = fs::write(status_path(), json); } } #[derive(serde::Serialize, serde::Deserialize)] struct DaemonStatus { pid: u32, tasks: Vec, } // --- The daemon --- pub fn run_daemon() -> Result<(), String> { let choir = Choir::new(); // Two workers: one for the scheduler/watcher loops, one for actual jobs. // Jobs are serialized through the Sonnet resource pool anyway. let _w1 = choir.add_worker("scheduler"); let _w2 = choir.add_worker("jobs"); // Sonnet API: one call at a time let sonnet = ResourcePool::new("sonnet", 1); log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); // Write initial status write_status(&choir); // Session watcher: detect ended sessions, run extraction let choir_sw = Arc::clone(&choir); let sonnet_sw = Arc::clone(&sonnet); choir.spawn("session-watcher").init(move |ctx| { let mut extracted = load_extracted(); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } let ended = find_ended_sessions(&extracted); for session in ended { let session_str = session.to_string_lossy().to_string(); log_event("extract", "session-ended", &session_str); // Spawn extraction job let s = Arc::clone(&sonnet_sw); let path = session_str.clone(); choir_sw.spawn(format!("extract:{}", session.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()))) .retries(2) .init(move |_ctx| { let _slot = s.acquire(); run_poc_memory(&["experience-mine", &path]) }); extracted.insert(session_str); save_extracted(&extracted); } write_status(&choir_sw); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); let sonnet_sched = Arc::clone(&sonnet); let knowledge_loop_script = home_dir() .join("poc/memory/scripts/knowledge_loop.py") .to_string_lossy() .to_string(); choir.spawn("scheduler").init(move |ctx| { let mut last_daily = None::; let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } let today = chrono::Local::now().date_naive(); // Health check: every hour if last_health.elapsed() >= HEALTH_INTERVAL { choir_sched.spawn("health").init(|_ctx| { run_poc_memory(&["daily-check"]) }); last_health = std::time::Instant::now(); } // Daily jobs: once per day if last_daily.is_none_or(|d| d < today) { log_event("scheduler", "daily-trigger", &today.to_string()); // Decay (no API calls, fast) choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| { run_poc_memory(&["decay"]) }); // Consolidation pipeline: consolidate → knowledge-loop → digest // These share the Sonnet pool for serialization let s1 = Arc::clone(&sonnet_sched); let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) .retries(2) .init(move |_ctx| { let _slot = s1.acquire(); run_poc_memory(&["consolidate-full"]) }) .run(); let s2 = Arc::clone(&sonnet_sched); let kl_script = knowledge_loop_script.clone(); let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) .retries(1) .init(move |_ctx| { let _slot = s2.acquire(); run_python_script(&kl_script, &["--max-cycles", "100", "--batch-size", "5"]) }); knowledge.depend_on(&consolidate); let knowledge = knowledge.run(); let s3 = Arc::clone(&sonnet_sched); let mut digest = choir_sched.spawn(format!("digest:{}", today)) .retries(1) .init(move |_ctx| { let _slot = s3.acquire(); run_poc_memory(&["digest", "auto"]) }); digest.depend_on(&knowledge); last_daily = Some(today); } // Prune finished tasks from registry let pruned = choir_sched.gc_finished(); if pruned > 0 { log::trace!("pruned {} finished tasks", pruned); } write_status(&choir_sched); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Main thread: wait for Ctrl-C let choir_main = Arc::clone(&choir); ctrlc_wait(); log_event("daemon", "stopping", ""); eprintln!("Shutting down..."); // Cancel all tasks for info in choir_main.task_statuses() { if !info.status.is_finished() { log_event(&info.name, "cancelling", ""); } } // Workers will shut down when their handles are dropped log_event("daemon", "stopped", ""); Ok(()) } fn ctrlc_wait() { use std::sync::atomic::{AtomicBool, Ordering}; static STOP: AtomicBool = AtomicBool::new(false); unsafe { libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); } while !STOP.load(Ordering::Acquire) { std::thread::sleep(Duration::from_millis(500)); } extern "C" fn handle_signal(_: libc::c_int) { STOP.store(true, Ordering::Release); } } // --- Status display --- pub fn show_status() -> Result<(), String> { let path = status_path(); if !path.exists() { eprintln!("No daemon status file found. Is the daemon running?"); return Ok(()); } let content = fs::read_to_string(&path) .map_err(|e| format!("read status: {}", e))?; let status: DaemonStatus = serde_json::from_str(&content) .map_err(|e| format!("parse status: {}", e))?; eprintln!("poc-memory daemon (pid {})", status.pid); // Check if actually running let alive = Path::new(&format!("/proc/{}", status.pid)).exists(); if !alive { eprintln!(" WARNING: process not running"); } eprintln!(); if status.tasks.is_empty() { eprintln!(" No tasks"); } else { for t in &status.tasks { let retry_info = if t.max_retries > 0 { format!(" (retry {}/{})", t.retry_count, t.max_retries) } else { String::new() }; let cancel_info = if t.cancelled { " [CANCELLED]" } else { "" }; eprint!(" {:30} {:10}{}{}", t.name, t.status.to_string(), retry_info, cancel_info); if let Some(ref result) = t.result { if let Some(ref err) = result.error { eprint!(" err: {}", err); } eprint!(" ({}ms)", result.duration.as_millis()); } eprintln!(); } } Ok(()) } pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { let path = log_path(); if !path.exists() { eprintln!("No daemon log found."); return Ok(()); } let content = fs::read_to_string(&path) .map_err(|e| format!("read log: {}", e))?; let filtered: Vec<&str> = content.lines().rev() .filter(|line| { if let Some(job) = job_filter { line.contains(&format!("\"job\":\"{}\"", job)) } else { true } }) .take(lines) .collect(); for line in filtered.into_iter().rev() { eprintln!("{}", line); } Ok(()) }