From e37f819dd28da8ef844bb24c5bd05843fef30600 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Thu, 5 Mar 2026 13:18:00 -0500 Subject: [PATCH] daemon: background job orchestration for memory maintenance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace fragile cron+shell approach with `poc-memory daemon` — a single long-running process using jobkit for worker pool, status tracking, retry, cancellation, and resource pools. Jobs: - session-watcher: detects ended Claude sessions, triggers extraction - scheduler: runs daily decay, consolidation, knowledge loop, digests - health: periodic graph metrics check - All Sonnet API calls serialized through a ResourcePool(1) Status queryable via `poc-memory daemon status`, structured log via `poc-memory daemon log`. Phase 1: shells out to existing subcommands. Co-Authored-By: ProofOfConcept --- Cargo.lock | 32 ++++ Cargo.toml | 2 + src/daemon.rs | 439 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 27 +++- 4 files changed, 499 insertions(+), 1 deletion(-) create mode 100644 src/daemon.rs diff --git a/Cargo.lock b/Cargo.lock index bb4dade..63cfb12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -755,6 +755,17 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jobkit" +version = "0.1.0" +dependencies = [ + "crossbeam-deque", + "log", + "profiling", + "serde", + "serde_json", +] + [[package]] name = "js-sys" version = "0.3.91" @@ -1069,7 +1080,9 @@ dependencies = [ "capnpc", "chrono", "faer", + "jobkit", "libc", + "log", "memmap2", "paste", "peg", @@ -1125,6 +1138,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "profiling" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eb8486b569e12e2c32ad3e204dbaba5e4b5b216e9367044f25f1dba42341773" +dependencies = [ + "profiling-procmacros", +] + +[[package]] +name = "profiling-procmacros" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52717f9a02b6965224f95ca2a81e2e0c5c43baacd28ca057577988930b6c3d5b" +dependencies = [ + "quote", + "syn 2.0.117", +] + [[package]] name = "ptr_meta" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index 59615a1..9b59315 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ memmap2 = "0.9" rayon = "1" peg = "0.8" paste = "1" +jobkit = { path = "/home/kent/jobkit" } +log = "0.4" [build-dependencies] capnpc = "0.20" diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..691949f --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,439 @@ +// poc-memory daemon: background job orchestration for memory maintenance +// +// Replaces the fragile cron+shell approach with a single long-running process +// that owns all background memory work. Uses jobkit for worker pool, status +// tracking, retry, and cancellation. +// +// Architecture: +// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs +// - Session watcher task: detects ended Claude sessions, triggers extraction +// - Jobs shell out to existing poc-memory subcommands (Phase 1) +// - Status written to daemon-status.json for `poc-memory daemon status` +// +// Phase 2 will inline job logic; Phase 3 integrates into poc-agent. + +use jobkit::{Choir, ResourcePool, TaskError, TaskInfo}; +use std::collections::HashSet; +use std::fs; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +const SESSION_STALE_SECS: u64 = 600; // 10 minutes +const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); +const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); +const STATUS_FILE: &str = ".claude/memory/daemon-status.json"; +const LOG_FILE: &str = ".claude/memory/daemon.log"; + +fn home_dir() -> PathBuf { + PathBuf::from(std::env::var("HOME").expect("HOME not set")) +} + +fn status_path() -> PathBuf { + home_dir().join(STATUS_FILE) +} + +fn log_path() -> PathBuf { + home_dir().join(LOG_FILE) +} + +fn projects_dir() -> PathBuf { + home_dir().join(".claude/projects") +} + +fn extracted_set_path() -> PathBuf { + home_dir().join(".claude/memory/extracted-sessions.json") +} + +// --- Logging --- + +fn log_event(job: &str, event: &str, detail: &str) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + let line = if detail.is_empty() { + format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) + } else { + format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", + ts, job, event, detail.replace('"', "\\\"")) + }; + if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(log_path()) { + let _ = f.write_all(line.as_bytes()); + } +} + +// --- Extracted sessions persistence --- + +fn load_extracted() -> HashSet { + fs::read_to_string(extracted_set_path()) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() +} + +fn save_extracted(set: &HashSet) { + if let Ok(json) = serde_json::to_string_pretty(set) { + let _ = fs::write(extracted_set_path(), json); + } +} + +// --- Shell out to poc-memory subcommands --- + +fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> { + log_event(args.first().unwrap_or(&"unknown"), "started", ""); + let output = Command::new("poc-memory") + .args(args) + .output() + .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; + + if output.status.success() { + log_event(args.first().unwrap_or(&"unknown"), "completed", ""); + Ok(()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + let msg = format!("exit {}: {}", output.status, stderr.trim_end()); + log_event(args.first().unwrap_or(&"unknown"), "failed", &msg); + // Treat non-zero exit as retryable (API errors, transient failures) + // unless it looks like a usage error + if output.status.code() == Some(1) && stderr.contains("Unknown command") { + Err(TaskError::Fatal(msg)) + } else { + Err(TaskError::Retry(msg)) + } + } +} + +fn run_python_script(script: &str, args: &[&str]) -> Result<(), TaskError> { + let label = Path::new(script).file_stem() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| "python".to_string()); + log_event(&label, "started", ""); + + let output = Command::new("python3") + .arg(script) + .args(args) + .output() + .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; + + if output.status.success() { + log_event(&label, "completed", ""); + Ok(()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + let msg = format!("exit {}: {}", output.status, stderr.trim_end()); + log_event(&label, "failed", &msg); + Err(TaskError::Retry(msg)) + } +} + +// --- Session detection --- + +fn find_ended_sessions(extracted: &HashSet) -> Vec { + let projects = projects_dir(); + if !projects.exists() { + return Vec::new(); + } + + let mut ended = Vec::new(); + let now = SystemTime::now(); + + let Ok(dirs) = fs::read_dir(&projects) else { return ended }; + for dir_entry in dirs.filter_map(|e| e.ok()) { + if !dir_entry.path().is_dir() { continue; } + let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; + for f in files.filter_map(|e| e.ok()) { + let path = f.path(); + if path.extension().map(|x| x == "jsonl").unwrap_or(false) { + let key = path.to_string_lossy().to_string(); + if extracted.contains(&key) { continue; } + + // Check staleness + if let Ok(meta) = path.metadata() { + if let Ok(mtime) = meta.modified() { + let age = now.duration_since(mtime).unwrap_or_default(); + if age.as_secs() >= SESSION_STALE_SECS { + // Check no process has it open + if !is_file_open(&path) { + ended.push(path); + } + } + } + } + } + } + } + ended +} + +fn is_file_open(path: &Path) -> bool { + Command::new("fuser") + .arg(path) + .output() + .map(|o| o.status.success()) // fuser exits 0 if file is open + .unwrap_or(false) +} + +// --- Status writing --- + +fn write_status(choir: &Choir) { + let statuses = choir.task_statuses(); + let status = DaemonStatus { + pid: std::process::id(), + tasks: statuses, + }; + if let Ok(json) = serde_json::to_string_pretty(&status) { + let _ = fs::write(status_path(), json); + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct DaemonStatus { + pid: u32, + tasks: Vec, +} + +// --- The daemon --- + +pub fn run_daemon() -> Result<(), String> { + let choir = Choir::new(); + // Two workers: one for the scheduler/watcher loops, one for actual jobs. + // Jobs are serialized through the Sonnet resource pool anyway. + let _w1 = choir.add_worker("scheduler"); + let _w2 = choir.add_worker("jobs"); + + // Sonnet API: one call at a time + let sonnet = ResourcePool::new("sonnet", 1); + + log_event("daemon", "started", &format!("pid {}", std::process::id())); + eprintln!("poc-memory daemon started (pid {})", std::process::id()); + + // Write initial status + write_status(&choir); + + // Session watcher: detect ended sessions, run extraction + let choir_sw = Arc::clone(&choir); + let sonnet_sw = Arc::clone(&sonnet); + choir.spawn("session-watcher").init(move |ctx| { + let mut extracted = load_extracted(); + loop { + if ctx.is_cancelled() { + return Err(TaskError::Fatal("cancelled".into())); + } + + let ended = find_ended_sessions(&extracted); + for session in ended { + let session_str = session.to_string_lossy().to_string(); + log_event("extract", "session-ended", &session_str); + + // Spawn extraction job + let s = Arc::clone(&sonnet_sw); + let path = session_str.clone(); + choir_sw.spawn(format!("extract:{}", session.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| "unknown".into()))) + .retries(2) + .init(move |_ctx| { + let _slot = s.acquire(); + run_poc_memory(&["experience-mine", &path]) + }); + + extracted.insert(session_str); + save_extracted(&extracted); + } + + write_status(&choir_sw); + std::thread::sleep(SCHEDULER_INTERVAL); + } + }); + + // Scheduler: runs daily jobs based on filesystem state + let choir_sched = Arc::clone(&choir); + let sonnet_sched = Arc::clone(&sonnet); + let knowledge_loop_script = home_dir() + .join("poc/memory/scripts/knowledge_loop.py") + .to_string_lossy() + .to_string(); + + choir.spawn("scheduler").init(move |ctx| { + let mut last_daily = None::; + let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; + + loop { + if ctx.is_cancelled() { + return Err(TaskError::Fatal("cancelled".into())); + } + + let today = chrono::Local::now().date_naive(); + + // Health check: every hour + if last_health.elapsed() >= HEALTH_INTERVAL { + choir_sched.spawn("health").init(|_ctx| { + run_poc_memory(&["daily-check"]) + }); + last_health = std::time::Instant::now(); + } + + // Daily jobs: once per day + if last_daily.is_none_or(|d| d < today) { + log_event("scheduler", "daily-trigger", &today.to_string()); + + // Decay (no API calls, fast) + choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| { + run_poc_memory(&["decay"]) + }); + + // Consolidation pipeline: consolidate → knowledge-loop → digest + // These share the Sonnet pool for serialization + let s1 = Arc::clone(&sonnet_sched); + let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) + .retries(2) + .init(move |_ctx| { + let _slot = s1.acquire(); + run_poc_memory(&["consolidate-full"]) + }) + .run(); + + let s2 = Arc::clone(&sonnet_sched); + let kl_script = knowledge_loop_script.clone(); + let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) + .retries(1) + .init(move |_ctx| { + let _slot = s2.acquire(); + run_python_script(&kl_script, &["--max-cycles", "100", "--batch-size", "5"]) + }); + knowledge.depend_on(&consolidate); + let knowledge = knowledge.run(); + + let s3 = Arc::clone(&sonnet_sched); + let mut digest = choir_sched.spawn(format!("digest:{}", today)) + .retries(1) + .init(move |_ctx| { + let _slot = s3.acquire(); + run_poc_memory(&["digest", "auto"]) + }); + digest.depend_on(&knowledge); + + last_daily = Some(today); + } + + // Prune finished tasks from registry + let pruned = choir_sched.gc_finished(); + if pruned > 0 { + log::trace!("pruned {} finished tasks", pruned); + } + + write_status(&choir_sched); + std::thread::sleep(SCHEDULER_INTERVAL); + } + }); + + // Main thread: wait for Ctrl-C + let choir_main = Arc::clone(&choir); + ctrlc_wait(); + + log_event("daemon", "stopping", ""); + eprintln!("Shutting down..."); + + // Cancel all tasks + for info in choir_main.task_statuses() { + if !info.status.is_finished() { + log_event(&info.name, "cancelling", ""); + } + } + // Workers will shut down when their handles are dropped + + log_event("daemon", "stopped", ""); + Ok(()) +} + +fn ctrlc_wait() { + use std::sync::atomic::{AtomicBool, Ordering}; + static STOP: AtomicBool = AtomicBool::new(false); + + unsafe { + libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); + libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); + } + + while !STOP.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(500)); + } + + extern "C" fn handle_signal(_: libc::c_int) { + STOP.store(true, Ordering::Release); + } +} + +// --- Status display --- + +pub fn show_status() -> Result<(), String> { + let path = status_path(); + if !path.exists() { + eprintln!("No daemon status file found. Is the daemon running?"); + return Ok(()); + } + + let content = fs::read_to_string(&path) + .map_err(|e| format!("read status: {}", e))?; + let status: DaemonStatus = serde_json::from_str(&content) + .map_err(|e| format!("parse status: {}", e))?; + + eprintln!("poc-memory daemon (pid {})", status.pid); + + // Check if actually running + let alive = Path::new(&format!("/proc/{}", status.pid)).exists(); + if !alive { + eprintln!(" WARNING: process not running"); + } + eprintln!(); + + if status.tasks.is_empty() { + eprintln!(" No tasks"); + } else { + for t in &status.tasks { + let retry_info = if t.max_retries > 0 { + format!(" (retry {}/{})", t.retry_count, t.max_retries) + } else { + String::new() + }; + let cancel_info = if t.cancelled { " [CANCELLED]" } else { "" }; + eprint!(" {:30} {:10}{}{}", t.name, t.status.to_string(), retry_info, cancel_info); + if let Some(ref result) = t.result { + if let Some(ref err) = result.error { + eprint!(" err: {}", err); + } + eprint!(" ({}ms)", result.duration.as_millis()); + } + eprintln!(); + } + } + + Ok(()) +} + +pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { + let path = log_path(); + if !path.exists() { + eprintln!("No daemon log found."); + return Ok(()); + } + + let content = fs::read_to_string(&path) + .map_err(|e| format!("read log: {}", e))?; + + let filtered: Vec<&str> = content.lines().rev() + .filter(|line| { + if let Some(job) = job_filter { + line.contains(&format!("\"job\":\"{}\"", job)) + } else { + true + } + }) + .take(lines) + .collect(); + + for line in filtered.into_iter().rev() { + eprintln!("{}", line); + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 3685e51..667c5c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,7 @@ mod neuro; mod query; mod spectral; mod lookups; +mod daemon; pub mod memory_capnp { include!(concat!(env!("OUT_DIR"), "/schema/memory_capnp.rs")); @@ -130,6 +131,7 @@ fn main() { "query" => cmd_query(&args[2..]), "lookup-bump" => cmd_lookup_bump(&args[2..]), "lookups" => cmd_lookups(&args[2..]), + "daemon" => cmd_daemon(&args[2..]), _ => { eprintln!("Unknown command: {}", args[1]); usage(); @@ -211,7 +213,10 @@ Commands: Stages: sort F [asc], limit N, select F,F, count Ex: \"degree > 15 | sort degree | limit 10\" lookup-bump KEY [KEY...] Bump daily lookup counter for keys (fast, no store) - lookups [DATE] Show daily lookup counts (default: today)"); + lookups [DATE] Show daily lookup counts (default: today) + daemon Start background job daemon + daemon status Show daemon status + daemon log [JOB] [N] Show last N log lines (default 50, optional job filter)"); } fn cmd_search(args: &[String]) -> Result<(), String> { @@ -1808,3 +1813,23 @@ fn cmd_lookups(args: &[String]) -> Result<(), String> { resolved.iter().map(|(_, c)| *c as u64).sum::()); Ok(()) } + +fn cmd_daemon(args: &[String]) -> Result<(), String> { + if args.is_empty() { + return daemon::run_daemon(); + } + match args[0].as_str() { + "status" => daemon::show_status(), + "log" => { + let job = args.get(1).map(|s| s.as_str()); + let lines = args.get(2) + .and_then(|s| s.parse().ok()) + .unwrap_or(50); + daemon::show_log(job, lines) + } + _ => { + eprintln!("Usage: poc-memory daemon [status|log [JOB] [LINES]]"); + Err("unknown daemon subcommand".into()) + } + } +}