extract jobkit-daemon library from poc-memory daemon

Create jobkit-daemon crate with generic daemon infrastructure:
- event_log: JSONL append with size-based rotation
- socket: Unix domain socket RPC client and server with signal handling
- status: JSON status file read/write

Migrate daemon.rs to use the library:
- Worker pool setup via Daemon::new()
- Socket loop + signal handling via Daemon::run()
- RPC handlers as registered closures
- Logging, status writing, send_rpc all delegate to library

Migrate tui.rs to use socket::send_rpc() instead of inline UnixStream.

daemon.rs: 1952 → 1806 lines (-146), old status_socket_loop removed.
tui.rs: socket boilerplate removed.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-03-14 02:40:30 -04:00
parent 35bc93c22b
commit 420a777eba
11 changed files with 696 additions and 293 deletions

View file

@ -12,10 +12,9 @@
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus};
use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
@ -23,74 +22,21 @@ use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
fn status_file() -> &'static str { "daemon-status.json" }
fn log_file() -> &'static str { "daemon.log" }
fn status_path() -> PathBuf {
crate::config::get().data_dir.join(status_file())
}
fn log_path() -> PathBuf {
crate::config::get().data_dir.join(log_file())
crate::config::get().data_dir.join("daemon.log")
}
// --- Logging ---
const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half
fn log_event(job: &str, event: &str, detail: &str) {
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
let line = if detail.is_empty() {
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event)
} else {
// Escape detail for JSON safety
let safe = detail.replace('\\', "\\\\").replace('"', "\\\"")
.replace('\n', "\\n");
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n",
ts, job, event, safe)
};
let path = log_path();
// Rotate if too large
if let Ok(meta) = fs::metadata(&path) {
if meta.len() > LOG_MAX_BYTES {
if let Ok(content) = fs::read_to_string(&path) {
let half = content.len() / 2;
// Find next newline after halfway point
if let Some(nl) = content[half..].find('\n') {
let _ = fs::write(&path, &content[half + nl + 1..]);
}
}
}
}
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
let _ = f.write_all(line.as_bytes());
}
jobkit_daemon::event_log::log(&crate::config::get().data_dir, job, event, detail);
}
// --- Job functions (direct, no subprocess) ---
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
log_event(name, "started", "");
ctx.set_progress("starting");
let start = std::time::Instant::now();
match f() {
Ok(()) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
log_event(name, "completed", &duration);
ctx.set_result(&duration);
Ok(())
}
Err(e) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
let msg = format!("{}: {}", duration, e);
log_event(name, "failed", &msg);
Err(TaskError::Retry(msg))
}
}
jobkit_daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, f)
}
fn job_experience_mine(ctx: &ExecutionContext, path: &str, segment: Option<usize>) -> Result<(), TaskError> {
@ -638,9 +584,7 @@ fn write_status(
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) {
let status = build_status(choir, last_daily, graph_health);
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = fs::write(status_path(), json);
}
jobkit_daemon::status::write(&crate::config::get().data_dir, &status);
}
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
@ -676,20 +620,20 @@ struct DaemonStatus {
// --- The daemon ---
pub fn run_daemon() -> Result<(), String> {
let choir = Choir::new();
let llm_concurrency = crate::config::get().llm_concurrency;
// Workers: 2 for long-running loops + llm_concurrency + 1 for non-LLM jobs
let n_workers = llm_concurrency + 3;
let names: Vec<String> = (0..n_workers).map(|i| format!("w{}", i)).collect();
let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect();
let config = crate::config::get();
let mut daemon = jobkit_daemon::Daemon::new(jobkit_daemon::DaemonConfig {
data_dir: config.data_dir.clone(),
resource_slots: config.llm_concurrency,
resource_name: "llm".to_string(),
extra_workers: 3,
});
let llm = ResourcePool::new("llm", llm_concurrency);
llm.bind(&choir);
let choir = Arc::clone(&daemon.choir);
let llm = Arc::clone(&daemon.resource);
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
fs::read_to_string(status_path()).ok()
.and_then(|s| serde_json::from_str::<DaemonStatus>(&s).ok())
jobkit_daemon::status::load::<DaemonStatus>(&config.data_dir)
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
@ -1123,36 +1067,124 @@ pub fn run_daemon() -> Result<(), String> {
}
});
// Main thread: listen on status socket + wait for signals
let choir_main = Arc::clone(&choir);
let last_daily_main = Arc::clone(&last_daily);
let graph_health_main = Arc::clone(&graph_health);
status_socket_loop(&choir_main, &last_daily_main, &graph_health_main, &llm);
// Register RPC handlers
{
let last_daily_rpc = Arc::clone(&last_daily);
daemon.add_rpc_handler(move |cmd, _ctx| {
if cmd == "consolidate" {
*last_daily_rpc.lock().unwrap() = None;
log_event("rpc", "consolidate", "triggered via socket");
Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into())
} else {
None
}
});
}
daemon.add_rpc_handler(|cmd, _ctx| {
if !cmd.starts_with("record-hits ") { return None; }
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into());
}
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)),
Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))),
}
});
{
let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm);
daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(3, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S");
let mut prev = None;
let mut spawned = 0;
let mut remaining = count;
let is_rename = *agent_type == "rename";
let is_split = *agent_type == "split";
if is_split {
let store = crate::store::Store::load().ok();
let candidates = store.as_ref()
.map(|s| super::prompts::split_candidates(s))
.unwrap_or_default();
let to_split: Vec<String> = candidates.into_iter()
.take(count)
.collect();
for key in &to_split {
let key = key.clone();
let task_name = format!("c-split-{}:{}", key, today);
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.retries(1)
.init(move |ctx| {
job_split_one(ctx, key.clone())
})
.run();
spawned += 1;
}
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.retries(1)
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch)
}
});
if let Some(ref dep) = prev {
builder.depend_on(dep);
}
prev = Some(builder.run());
remaining -= batch;
spawned += 1;
}
log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count));
Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n",
count, agent_type, spawned))
});
}
// Main thread: socket server + signal handling
let last_daily_status = Arc::clone(&last_daily);
let graph_health_status = Arc::clone(&graph_health);
daemon.run(move |ctx| {
build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status)
});
log_event("daemon", "stopping", "");
eprintln!("Shutting down...");
// Clean up socket
let _ = fs::remove_file(status_sock_path());
log_event("daemon", "stopped", "");
// Exit immediately — PR_SET_PDEATHSIG on child processes ensures
// claude subprocesses get SIGTERM when we die.
std::process::exit(0)
}
fn send_rpc(cmd: &str) -> Option<String> {
use std::io::{Read as _, Write as _};
use std::os::unix::net::UnixStream;
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
stream.write_all(cmd.as_bytes()).ok()?;
stream.shutdown(std::net::Shutdown::Write).ok()?;
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
Some(buf)
jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
pub fn rpc_consolidate() -> Result<(), String> {
@ -1187,189 +1219,11 @@ pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
}
fn read_status_socket() -> Option<DaemonStatus> {
use std::io::Read as _;
use std::os::unix::net::UnixStream;
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
serde_json::from_str(&buf).ok()
let json = jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
fn status_sock_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.sock")
}
/// Listen on a Unix domain socket for status requests.
/// Any connection gets the live status JSON written and closed.
/// Also handles SIGINT/SIGTERM for clean shutdown.
fn status_socket_loop(
choir: &Arc<Choir>,
last_daily: &Arc<Mutex<Option<chrono::NaiveDate>>>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
llm: &Arc<ResourcePool>,
) {
use std::io::{Read as _, Write as _};
use std::os::unix::net::UnixListener;
use std::sync::atomic::{AtomicBool, Ordering};
static STOP: AtomicBool = AtomicBool::new(false);
unsafe {
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
}
let sock_path = status_sock_path();
let _ = fs::remove_file(&sock_path); // clean up stale socket
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
eprintln!("Warning: couldn't bind status socket {}: {}", sock_path.display(), e);
// Fall back to just waiting for signals
while !STOP.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(500));
}
return;
}
};
// Non-blocking so we can check STOP flag
listener.set_nonblocking(true).ok();
while !STOP.load(Ordering::Acquire) {
match listener.accept() {
Ok((mut stream, _)) => {
// Read command from client (with short timeout)
stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
let mut cmd = String::new();
let _ = stream.read_to_string(&mut cmd);
let cmd = cmd.trim().to_string();
match cmd.as_str() {
"consolidate" => {
*last_daily.lock().unwrap() = None;
let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n");
log_event("rpc", "consolidate", "triggered via socket");
}
cmd if cmd.starts_with("record-hits ") => {
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
let _ = stream.write_all(b"{\"ok\":false,\"error\":\"no keys\"}\n");
} else {
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => {
let msg = format!("{{\"ok\":true,\"recorded\":{}}}\n", n);
let _ = stream.write_all(msg.as_bytes());
}
Err(e) => {
let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n",
e.replace('"', "'"));
let _ = stream.write_all(msg.as_bytes());
}
}
}
}
cmd if cmd.starts_with("run-agent ") => {
let parts: Vec<&str> = cmd.splitn(3, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S");
let mut prev = None;
let mut spawned = 0;
let mut remaining = count;
let is_rename = *agent_type == "rename";
let is_split = *agent_type == "split";
if is_split {
// Split: load candidates upfront, spawn independent
// parallel tasks — one per node, no dependencies.
let store = crate::store::Store::load().ok();
let candidates = store.as_ref()
.map(|s| super::prompts::split_candidates(s))
.unwrap_or_default();
let to_split: Vec<String> = candidates.into_iter()
.take(count)
.collect();
for key in &to_split {
let key = key.clone();
let task_name = format!("c-split-{}:{}", key, today);
choir.spawn(task_name)
.resource(llm)
.retries(1)
.init(move |ctx| {
job_split_one(ctx, key.clone())
})
.run();
spawned += 1;
}
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir.spawn(task_name)
.resource(llm)
.retries(1)
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch)
}
});
if let Some(ref dep) = prev {
builder.depend_on(dep);
}
prev = Some(builder.run());
remaining -= batch;
spawned += 1;
}
let msg = format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n",
count, agent_type, spawned);
let _ = stream.write_all(msg.as_bytes());
log_event("rpc", "run-agent",
&format!("{} x{}", agent_type, count));
}
_ => {
// Default: return status
let status = build_status(choir, *last_daily.lock().unwrap(), graph_health);
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = stream.write_all(json.as_bytes());
}
}
}
// Connection closes when stream is dropped
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(100));
}
Err(_) => {
std::thread::sleep(Duration::from_millis(100));
}
}
}
extern "C" fn handle_signal(_: libc::c_int) {
STOP.store(true, std::sync::atomic::Ordering::Release);
}
}
// status_socket_loop has been replaced by daemon.run() in jobkit-daemon.
fn build_status(
choir: &Choir,

View file

@ -22,8 +22,6 @@ use ratatui::{
DefaultTerminal, Frame,
};
use std::fs;
use std::io::Read as _;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::time::{Duration, Instant};
@ -35,10 +33,6 @@ const AGENT_TYPES: &[&str] = &[
"apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", "split",
];
fn status_sock_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.sock")
}
fn log_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.log")
}
@ -58,11 +52,8 @@ struct DaemonStatus {
}
fn fetch_status() -> Option<DaemonStatus> {
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
serde_json::from_str(&buf).ok()
let json = jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
#[derive(Clone)]
@ -794,14 +785,7 @@ fn short_name(name: &str) -> String {
}
fn send_rpc(cmd: &str) -> Option<String> {
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
std::io::Write::write_all(&mut stream, cmd.as_bytes()).ok()?;
stream.shutdown(std::net::Shutdown::Write).ok()?;
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
Some(buf)
jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
// --- Entry point ---