diff --git a/src/daemon.rs b/src/daemon.rs index e72e54c..0e0b190 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -17,7 +17,6 @@ use std::collections::HashSet; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; -use std::process::Command; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -74,41 +73,101 @@ fn log_event(job: &str, event: &str, detail: &str) { } } -// --- Shell out to poc-memory subcommands --- +// --- Job functions (direct, no subprocess) --- -fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> { - let job = args.join(" "); - log_event(&job, "started", ""); +/// Run a named job with logging and error mapping. +fn run_job(name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { + log_event(name, "started", ""); let start = std::time::Instant::now(); - let output = Command::new("poc-memory") - .args(args) - .output() - .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; - - let elapsed = start.elapsed(); - let duration = format!("{:.1}s", elapsed.as_secs_f64()); - - if output.status.success() { - log_event(&job, "completed", &duration); - Ok(()) - } else { - let stderr = String::from_utf8_lossy(&output.stderr); - let stderr_short = if stderr.len() > 500 { - &stderr[stderr.len()-500..] - } else { - &stderr - }; - let msg = format!("{} exit {}: {}", duration, output.status, stderr_short.trim_end()); - log_event(&job, "failed", &msg); - if output.status.code() == Some(1) && stderr.contains("Unknown command") { - Err(TaskError::Fatal(msg)) - } else { + match f() { + Ok(()) => { + let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); + log_event(name, "completed", &duration); + Ok(()) + } + Err(e) => { + let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); + let msg = format!("{}: {}", duration, e); + log_event(name, "failed", &msg); Err(TaskError::Retry(msg)) } } } +fn job_experience_mine(path: &str) -> Result<(), TaskError> { + let path = path.to_string(); + run_job(&format!("experience-mine {}", path), || { + let mut store = crate::store::Store::load()?; + let count = crate::enrich::experience_mine(&mut store, &path)?; + eprintln!("experience-mine: {} new entries from {}", count, + std::path::Path::new(&path).file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| path.clone())); + Ok(()) + }) +} + +fn job_fact_mine(path: &str) -> Result<(), TaskError> { + let path = path.to_string(); + run_job(&format!("fact-mine {}", path), || { + let p = std::path::Path::new(&path); + let count = crate::fact_mine::mine_and_store(p)?; + eprintln!("fact-mine: {} facts from {}", count, + p.file_name().map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| path.clone())); + Ok(()) + }) +} + +fn job_decay() -> Result<(), TaskError> { + run_job("decay", || { + let mut store = crate::store::Store::load()?; + let (decayed, pruned) = store.decay(); + store.save()?; + eprintln!("decay: {} decayed, {} pruned", decayed, pruned); + Ok(()) + }) +} + +fn job_consolidate() -> Result<(), TaskError> { + run_job("consolidate", || { + let mut store = crate::store::Store::load()?; + crate::consolidate::consolidate_full(&mut store) + }) +} + +fn job_knowledge_loop() -> Result<(), TaskError> { + run_job("knowledge-loop", || { + let config = crate::knowledge::KnowledgeLoopConfig { + max_cycles: 100, + batch_size: 5, + ..Default::default() + }; + let results = crate::knowledge::run_knowledge_loop(&config)?; + eprintln!("knowledge-loop: {} cycles, {} actions", + results.len(), + results.iter().map(|r| r.total_applied).sum::()); + Ok(()) + }) +} + +fn job_digest() -> Result<(), TaskError> { + run_job("digest", || { + let mut store = crate::store::Store::load()?; + crate::digest::digest_auto(&mut store) + }) +} + +fn job_daily_check() -> Result<(), TaskError> { + run_job("daily-check", || { + let store = crate::store::Store::load()?; + let report = crate::neuro::daily_check(&store); + eprint!("{}", report); + Ok(()) + }) +} + // --- Session detection --- /// Find JSONL session files that are stale (not recently written) and not @@ -295,7 +354,7 @@ pub fn run_daemon() -> Result<(), String> { .resource(&llm_sw) .retries(2) .init(move |_ctx| { - run_poc_memory(&["experience-mine", &path]) + job_experience_mine(&path) }) .run(); @@ -307,7 +366,7 @@ pub fn run_daemon() -> Result<(), String> { .resource(&llm_sw) .retries(1) .init(move |_ctx| { - run_poc_memory(&["fact-mine-store", &path2]) + job_fact_mine(&path2) }); fm.depend_on(&extract); } @@ -342,7 +401,7 @@ pub fn run_daemon() -> Result<(), String> { // Health check: every hour if last_health.elapsed() >= HEALTH_INTERVAL { choir_sched.spawn("health").init(|_ctx| { - run_poc_memory(&["daily-check"]) + job_daily_check() }); last_health = std::time::Instant::now(); } @@ -353,7 +412,7 @@ pub fn run_daemon() -> Result<(), String> { // Decay (no API calls, fast) choir_sched.spawn(format!("decay:{}", today)).init(|_ctx| { - run_poc_memory(&["decay"]) + job_decay() }); // Consolidation pipeline: consolidate → knowledge-loop → digest @@ -361,7 +420,7 @@ pub fn run_daemon() -> Result<(), String> { .resource(&llm_sched) .retries(2) .init(move |_ctx| { - run_poc_memory(&["consolidate-full"]) + job_consolidate() }) .run(); @@ -369,7 +428,7 @@ pub fn run_daemon() -> Result<(), String> { .resource(&llm_sched) .retries(1) .init(move |_ctx| { - run_poc_memory(&["knowledge-loop", "--max-cycles", "100", "--batch-size", "5"]) + job_knowledge_loop() }); knowledge.depend_on(&consolidate); let knowledge = knowledge.run(); @@ -378,7 +437,7 @@ pub fn run_daemon() -> Result<(), String> { .resource(&llm_sched) .retries(1) .init(move |_ctx| { - run_poc_memory(&["digest", "auto"]) + job_digest() }); digest.depend_on(&knowledge);