daemon: inline job functions instead of shelling out to poc-memory

Replace run_poc_memory() subprocess calls with direct function calls
to the library. Each job (experience-mine, fact-mine, decay, consolidate,
knowledge-loop, digest, daily-check) now runs in-process, fixing the
orphaned subprocess problem on daemon shutdown.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-05 21:53:27 -05:00
parent 81d3ce93fe
commit cf5fe42a15

View file

@ -17,7 +17,6 @@ use std::collections::HashSet;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
@ -74,41 +73,101 @@ fn log_event(job: &str, event: &str, detail: &str) {
}
}
// --- Shell out to poc-memory subcommands ---
// --- Job functions (direct, no subprocess) ---
fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> {
let job = args.join(" ");
log_event(&job, "started", "");
/// Run a named job with logging and error mapping.
fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
log_event(name, "started", "");
let start = std::time::Instant::now();
let output = Command::new("poc-memory")
.args(args)
.output()
.map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?;
let elapsed = start.elapsed();
let duration = format!("{:.1}s", elapsed.as_secs_f64());
if output.status.success() {
log_event(&job, "completed", &duration);
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
let stderr_short = if stderr.len() > 500 {
&stderr[stderr.len()-500..]
} else {
&stderr
};
let msg = format!("{} exit {}: {}", duration, output.status, stderr_short.trim_end());
log_event(&job, "failed", &msg);
if output.status.code() == Some(1) && stderr.contains("Unknown command") {
Err(TaskError::Fatal(msg))
} else {
match f() {
Ok(()) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
log_event(name, "completed", &duration);
Ok(())
}
Err(e) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
let msg = format!("{}: {}", duration, e);
log_event(name, "failed", &msg);
Err(TaskError::Retry(msg))
}
}
}
fn job_experience_mine(path: &str) -> Result<(), TaskError> {
let path = path.to_string();
run_job(&format!("experience-mine {}", path), || {
let mut store = crate::store::Store::load()?;
let count = crate::enrich::experience_mine(&mut store, &path)?;
eprintln!("experience-mine: {} new entries from {}", count,
std::path::Path::new(&path).file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.clone()));
Ok(())
})
}
fn job_fact_mine(path: &str) -> Result<(), TaskError> {
let path = path.to_string();
run_job(&format!("fact-mine {}", path), || {
let p = std::path::Path::new(&path);
let count = crate::fact_mine::mine_and_store(p)?;
eprintln!("fact-mine: {} facts from {}", count,
p.file_name().map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.clone()));
Ok(())
})
}
fn job_decay() -> Result<(), TaskError> {
run_job("decay", || {
let mut store = crate::store::Store::load()?;
let (decayed, pruned) = store.decay();
store.save()?;
eprintln!("decay: {} decayed, {} pruned", decayed, pruned);
Ok(())
})
}
fn job_consolidate() -> Result<(), TaskError> {
run_job("consolidate", || {
let mut store = crate::store::Store::load()?;
crate::consolidate::consolidate_full(&mut store)
})
}
fn job_knowledge_loop() -> Result<(), TaskError> {
run_job("knowledge-loop", || {
let config = crate::knowledge::KnowledgeLoopConfig {
max_cycles: 100,
batch_size: 5,
..Default::default()
};
let results = crate::knowledge::run_knowledge_loop(&config)?;
eprintln!("knowledge-loop: {} cycles, {} actions",
results.len(),
results.iter().map(|r| r.total_applied).sum::<usize>());
Ok(())
})
}
fn job_digest() -> Result<(), TaskError> {
run_job("digest", || {
let mut store = crate::store::Store::load()?;
crate::digest::digest_auto(&mut store)
})
}
fn job_daily_check() -> Result<(), TaskError> {
run_job("daily-check", || {
let store = crate::store::Store::load()?;
let report = crate::neuro::daily_check(&store);
eprint!("{}", report);
Ok(())
})
}
// --- Session detection ---
/// Find JSONL session files that are stale (not recently written) and not
@ -295,7 +354,7 @@ pub fn run_daemon() -> Result<(), String> {
.resource(&llm_sw)
.retries(2)
.init(move |_ctx| {
run_poc_memory(&["experience-mine", &path])
job_experience_mine(&path)
})
.run();
@ -307,7 +366,7 @@ pub fn run_daemon() -> Result<(), String> {
.resource(&llm_sw)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["fact-mine-store", &path2])
job_fact_mine(&path2)
});
fm.depend_on(&extract);
}
@ -342,7 +401,7 @@ pub fn run_daemon() -> Result<(), String> {
// Health check: every hour
if last_health.elapsed() >= HEALTH_INTERVAL {
choir_sched.spawn("health").init(|_ctx| {
run_poc_memory(&["daily-check"])
job_daily_check()
});
last_health = std::time::Instant::now();
}
@ -353,7 +412,7 @@ pub fn run_daemon() -> Result<(), String> {
// Decay (no API calls, fast)
choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| {
run_poc_memory(&["decay"])
job_decay()
});
// Consolidation pipeline: consolidate → knowledge-loop → digest
@ -361,7 +420,7 @@ pub fn run_daemon() -> Result<(), String> {
.resource(&llm_sched)
.retries(2)
.init(move |_ctx| {
run_poc_memory(&["consolidate-full"])
job_consolidate()
})
.run();
@ -369,7 +428,7 @@ pub fn run_daemon() -> Result<(), String> {
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["knowledge-loop", "--max-cycles", "100", "--batch-size", "5"])
job_knowledge_loop()
});
knowledge.depend_on(&consolidate);
let knowledge = knowledge.run();
@ -378,7 +437,7 @@ pub fn run_daemon() -> Result<(), String> {
.resource(&llm_sched)
.retries(1)
.init(move |_ctx| {
run_poc_memory(&["digest", "auto"])
job_digest()
});
digest.depend_on(&knowledge);