consciousness/src/daemon.rs

658 lines
24 KiB
Rust
Raw Normal View History

// poc-memory daemon: background job orchestration for memory maintenance
//
// Replaces the fragile cron+shell approach with a single long-running process
// that owns all background memory work. Uses jobkit for worker pool, status
// tracking, retry, and cancellation.
//
// Architecture:
// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs
// - Session watcher task: detects ended Claude sessions, triggers extraction
// - Jobs shell out to existing poc-memory subcommands (Phase 1)
// - Status written to daemon-status.json for `poc-memory daemon status`
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ResourcePool, TaskError, TaskInfo, TaskStatus};
use std::collections::HashSet;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
const STATUS_FILE: &str = ".claude/memory/daemon-status.json";
const LOG_FILE: &str = ".claude/memory/daemon.log";
fn home_dir() -> PathBuf {
PathBuf::from(std::env::var("HOME").expect("HOME not set"))
}
fn status_path() -> PathBuf {
home_dir().join(STATUS_FILE)
}
fn log_path() -> PathBuf {
home_dir().join(LOG_FILE)
}
fn projects_dir() -> PathBuf {
home_dir().join(".claude/projects")
}
// --- Logging ---
const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half
fn log_event(job: &str, event: &str, detail: &str) {
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
let line = if detail.is_empty() {
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event)
} else {
// Escape detail for JSON safety
let safe = detail.replace('\\', "\\\\").replace('"', "\\\"")
.replace('\n', "\\n");
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n",
ts, job, event, safe)
};
let path = log_path();
// Rotate if too large
if let Ok(meta) = fs::metadata(&path) {
if meta.len() > LOG_MAX_BYTES {
if let Ok(content) = fs::read_to_string(&path) {
let half = content.len() / 2;
// Find next newline after halfway point
if let Some(nl) = content[half..].find('\n') {
let _ = fs::write(&path, &content[half + nl + 1..]);
}
}
}
}
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
let _ = f.write_all(line.as_bytes());
}
}
// --- Shell out to poc-memory subcommands ---
fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> {
let job = args.join(" ");
log_event(&job, "started", "");
let start = std::time::Instant::now();
let output = Command::new("poc-memory")
.args(args)
.output()
.map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?;
let elapsed = start.elapsed();
let duration = format!("{:.1}s", elapsed.as_secs_f64());
if output.status.success() {
log_event(&job, "completed", &duration);
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
let stderr_short = if stderr.len() > 500 {
&stderr[stderr.len()-500..]
} else {
&stderr
};
let msg = format!("{} exit {}: {}", duration, output.status, stderr_short.trim_end());
log_event(&job, "failed", &msg);
if output.status.code() == Some(1) && stderr.contains("Unknown command") {
Err(TaskError::Fatal(msg))
} else {
Err(TaskError::Retry(msg))
}
}
}
// --- Session detection ---
/// Find JSONL session files that are stale (not recently written) and not
/// held open by any process.
/// Find JSONL session files that haven't been written to recently.
/// Only checks metadata (stat), no file reads or subprocess calls.
/// The fuser check (is file open?) is deferred to the reconcile loop,
/// only for sessions that pass the mined-key filter.
fn find_stale_sessions() -> Vec<PathBuf> {
let projects = projects_dir();
if !projects.exists() {
return Vec::new();
}
let mut stale = Vec::new();
let now = SystemTime::now();
let Ok(dirs) = fs::read_dir(&projects) else { return stale };
for dir_entry in dirs.filter_map(|e| e.ok()) {
if !dir_entry.path().is_dir() { continue; }
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
for f in files.filter_map(|e| e.ok()) {
let path = f.path();
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
if let Ok(meta) = path.metadata() {
if let Ok(mtime) = meta.modified() {
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
}
}
}
stale
}
/// Check if any other process has a file open by scanning /proc/*/fd/.
/// This is what `fuser` does internally, without the subprocess overhead.
fn is_file_open(path: &Path) -> bool {
let Ok(target) = path.canonicalize() else { return false };
let Ok(procs) = fs::read_dir("/proc") else { return false };
let my_pid = std::process::id().to_string();
for proc_entry in procs.filter_map(|e| e.ok()) {
let name = proc_entry.file_name();
let name = name.to_string_lossy();
if !name.chars().all(|c| c.is_ascii_digit()) { continue; }
if *name == my_pid { continue; }
let fd_dir = proc_entry.path().join("fd");
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
for fd in fds.filter_map(|e| e.ok()) {
if let Ok(link) = fs::read_link(fd.path()) {
if link == target { return true; }
}
}
}
false
}
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
fn proc_uptime(pid: u32) -> Option<String> {
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
// Fields after comm (which may contain spaces/parens) — find closing paren
let after_comm = stat.get(stat.rfind(')')? + 2..)?;
let fields: Vec<&str> = after_comm.split_whitespace().collect();
// Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19)
let start_ticks: u64 = fields.get(19)?.parse().ok()?;
let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64;
let boot_time_secs = {
let uptime = fs::read_to_string("/proc/uptime").ok()?;
let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
now - sys_uptime as u64
};
let start_secs = boot_time_secs + start_ticks / ticks_per_sec;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
let uptime = now.saturating_sub(start_secs);
Some(format_duration_human(uptime as u128 * 1000))
}
// --- Status writing ---
fn write_status(choir: &Choir) {
let statuses = choir.task_statuses();
let status = DaemonStatus {
pid: std::process::id(),
tasks: statuses,
};
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = fs::write(status_path(), json);
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct DaemonStatus {
pid: u32,
tasks: Vec<TaskInfo>,
}
// --- The daemon ---
pub fn run_daemon() -> Result<(), String> {
let choir = Choir::new();
// Workers: 2 for long-running loops (scheduler, session-watcher),
// plus 1 for the actual LLM job (pool capacity is 1).
// Non-LLM jobs (decay, health) also need a worker, so 4 total.
let names: Vec<String> = (0..4).map(|i| format!("w{}", i)).collect();
let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect();
// LLM API: 1 concurrent call to control token burn rate
let llm = ResourcePool::new("llm", 1);
llm.bind(&choir);
log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Write initial status
write_status(&choir);
// Session watcher: reconcile-based extraction
// Each tick: scan filesystem for stale sessions, check store for what's
// already mined, check task registry for what's in-flight, spawn the diff.
// No persistent tracking state — the store is the source of truth.
let choir_sw = Arc::clone(&choir);
let llm_sw = Arc::clone(&llm);
choir.spawn("session-watcher").init(move |ctx| {
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
// What's currently running/pending? (avoid spawning duplicates)
let active: HashSet<String> = choir_sw.task_statuses().iter()
.filter(|t| !t.status.is_finished())
.map(|t| t.name.clone())
.collect();
let stale = find_stale_sessions();
// Load mined transcript keys once for this tick
let mined = crate::enrich::mined_transcript_keys();
// Limit new tasks per tick — the resource pool gates execution,
// but we don't need thousands of task objects in the registry.
// The watcher ticks every 60s so backlog drains steadily.
const MAX_NEW_PER_TICK: usize = 10;
let mut queued = 0;
let mut already_mined = 0;
let mut still_open = 0;
let total_stale = stale.len();
for session in stale {
if queued >= MAX_NEW_PER_TICK { break; }
let filename = session.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let task_name = format!("extract:{}", filename);
// Skip if already in-flight
if active.contains(&task_name) { continue; }
// Skip if already mined (filename key in store)
let path_str = session.to_string_lossy().to_string();
if crate::enrich::is_transcript_mined_with_keys(&mined, &path_str) {
already_mined += 1;
continue;
}
// Skip if file is still open (active Claude session, possibly idle)
if is_file_open(&session) {
still_open += 1;
continue;
}
log_event("extract", "queued", &path_str);
let path = path_str.clone();
let extract = choir_sw.spawn(task_name)
.resource(&llm_sw)
.retries(2)
.init(move |_ctx| {
run_poc_memory(&["experience-mine", &path])
})
.run();
// Chain fact-mine after experience-mine
let fact_task = format!("fact-mine:{}", filename);
if !active.contains(&fact_task) {
let path2 = path_str;
let mut fm = choir_sw.spawn(fact_task)
.resource(&llm_sw)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["fact-mine-store", &path2])
});
fm.depend_on(&extract);
}
queued += 1;
}
if queued > 0 || still_open > 0 {
log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} open, {} queued",
total_stale, already_mined, still_open, queued));
}
write_status(&choir_sw);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir);
let llm_sched = Arc::clone(&llm);
choir.spawn("scheduler").init(move |ctx| {
let mut last_daily = None::<chrono::NaiveDate>;
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
let today = chrono::Local::now().date_naive();
// Health check: every hour
if last_health.elapsed() >= HEALTH_INTERVAL {
choir_sched.spawn("health").init(|_ctx| {
run_poc_memory(&["daily-check"])
});
last_health = std::time::Instant::now();
}
// Daily jobs: once per day
if last_daily.is_none_or(|d| d < today) {
log_event("scheduler", "daily-trigger", &today.to_string());
// Decay (no API calls, fast)
choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| {
run_poc_memory(&["decay"])
});
// Consolidation pipeline: consolidate → knowledge-loop → digest
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
.resource(&llm_sched)
.retries(2)
.init(move |_ctx| {
run_poc_memory(&["consolidate-full"])
})
.run();
let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["knowledge-loop", "--max-cycles", "100", "--batch-size", "5"])
});
knowledge.depend_on(&consolidate);
let knowledge = knowledge.run();
let mut digest = choir_sched.spawn(format!("digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["digest", "auto"])
});
digest.depend_on(&knowledge);
last_daily = Some(today);
}
// Prune finished tasks from registry
let pruned = choir_sched.gc_finished();
if pruned > 0 {
log::trace!("pruned {} finished tasks", pruned);
}
write_status(&choir_sched);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Main thread: wait for Ctrl-C
let choir_main = Arc::clone(&choir);
ctrlc_wait();
log_event("daemon", "stopping", "");
eprintln!("Shutting down...");
// Cancel all tasks
for info in choir_main.task_statuses() {
if !info.status.is_finished() {
log_event(&info.name, "cancelling", "");
}
}
// Workers will shut down when their handles are dropped
log_event("daemon", "stopped", "");
Ok(())
}
fn ctrlc_wait() {
use std::sync::atomic::{AtomicBool, Ordering};
static STOP: AtomicBool = AtomicBool::new(false);
unsafe {
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
}
while !STOP.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(500));
}
extern "C" fn handle_signal(_: libc::c_int) {
STOP.store(true, Ordering::Release);
}
}
// --- Status display ---
fn format_duration_human(ms: u128) -> String {
if ms < 1_000 {
format!("{}ms", ms)
} else if ms < 60_000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else if ms < 3_600_000 {
format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000)
} else {
format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000)
}
}
fn task_group(name: &str) -> &str {
if name == "session-watcher" || name == "scheduler" { "core" }
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:")
|| name.starts_with("digest:") || name.starts_with("decay:") { "daily" }
else if name == "health" { "health" }
else { "other" }
}
fn status_symbol(t: &TaskInfo) -> &'static str {
if t.cancelled { return "" }
match t.status {
TaskStatus::Running => "",
TaskStatus::Completed => "",
TaskStatus::Failed => "",
TaskStatus::Pending => "·",
}
}
pub fn show_status() -> Result<(), String> {
let path = status_path();
if !path.exists() {
eprintln!("No daemon status file found. Is the daemon running?");
return Ok(());
}
let content = fs::read_to_string(&path)
.map_err(|e| format!("read status: {}", e))?;
let status: DaemonStatus = serde_json::from_str(&content)
.map_err(|e| format!("parse status: {}", e))?;
let alive = Path::new(&format!("/proc/{}", status.pid)).exists();
let state = if alive { "running" } else { "NOT RUNNING" };
// Show uptime from /proc/<pid>/stat start time
let uptime_str = if alive {
proc_uptime(status.pid).unwrap_or_default()
} else {
String::new()
};
if uptime_str.is_empty() {
eprintln!("poc-memory daemon pid={} {}", status.pid, state);
} else {
eprintln!("poc-memory daemon pid={} {} uptime {}", status.pid, state, uptime_str);
}
// Status file age
if let Ok(meta) = fs::metadata(&path) {
if let Ok(modified) = meta.modified() {
let age = std::time::SystemTime::now().duration_since(modified).unwrap_or_default();
eprintln!(" status updated {}s ago", age.as_secs());
}
}
if status.tasks.is_empty() {
eprintln!("\n No tasks");
return Ok(());
}
// Count by status
let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n",
running, pending, completed, failed);
// Group and display
let groups: &[(&str, &str)] = &[
("core", "Core"),
("daily", "Daily pipeline"),
("extract", "Session extraction"),
("health", "Health"),
("other", "Other"),
];
for (group_id, group_label) in groups {
let tasks: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| task_group(&t.name) == *group_id)
.collect();
if tasks.is_empty() { continue; }
// For extract group, show summary instead of individual tasks
if *group_id == "extract" {
let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" {} ({} total)", group_label, tasks.len());
if n_running > 0 {
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
eprintln!(" {} {}", status_symbol(t), t.name);
}
}
let mut parts = Vec::new();
if n_done > 0 { parts.push(format!("{} done", n_done)); }
if n_running > 0 { parts.push(format!("{} running", n_running)); }
if n_pending > 0 { parts.push(format!("{} queued", n_pending)); }
if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); }
eprintln!(" {}", parts.join(", "));
// Show recent failures
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
if let Some(ref r) = t.result {
if let Some(ref err) = r.error {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
}
}
eprintln!();
continue;
}
eprintln!(" {}", group_label);
for t in &tasks {
let sym = status_symbol(t);
let duration = t.result.as_ref()
.map(|r| format_duration_human(r.duration.as_millis()))
.unwrap_or_default();
let retry = if t.max_retries > 0 && t.retry_count > 0 {
format!(" retry {}/{}", t.retry_count, t.max_retries)
} else {
String::new()
};
let err_msg = t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default();
if duration.is_empty() {
eprintln!(" {} {:30}{}{}", sym, t.name, retry, err_msg);
} else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, err_msg);
}
}
eprintln!();
}
Ok(())
}
pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
let path = log_path();
if !path.exists() {
eprintln!("No daemon log found.");
return Ok(());
}
let content = fs::read_to_string(&path)
.map_err(|e| format!("read log: {}", e))?;
let filtered: Vec<&str> = content.lines().rev()
.filter(|line| {
if let Some(job) = job_filter {
line.contains(&format!("\"job\":\"{}\"", job))
} else {
true
}
})
.take(lines)
.collect();
if filtered.is_empty() {
eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default());
return Ok(());
}
// Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]"
for line in filtered.into_iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
// Shorten timestamp to just time portion
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
if detail.is_empty() {
eprintln!(" {} {:20} {}", time, job, event);
} else {
// Truncate long details (file paths)
let short = if detail.len() > 60 {
let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail);
if last.len() > 60 { &last[..60] } else { last }
} else {
detail
};
eprintln!(" {} {:20} {:12} {}", time, job, event, short);
}
} else {
eprintln!("{}", line);
}
}
Ok(())
}