jobkit-daemon in external repo

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-03-18 12:47:25 -04:00
parent 1629a2c4e3
commit c153daacd5
6 changed files with 5 additions and 350 deletions

View file

@ -1,12 +0,0 @@
[package]
name = "jobkit-daemon"
version.workspace = true
edition.workspace = true
[dependencies]
jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0.4"
libc = "0.2"
log = "0.4"

View file

@ -1,62 +0,0 @@
// JSONL event logging with size-based rotation
//
// Appends {"ts", "job", "event", "detail"} lines to daemon.log.
// Rotates by truncating to the last half when file exceeds 1MB.
// Rotation is intentionally simple — no external log infra needed.
use std::fs;
use std::io::Write;
use std::path::Path;
const LOG_MAX_BYTES: u64 = 1_000_000;
fn log_path(data_dir: &Path) -> std::path::PathBuf {
data_dir.join("daemon.log")
}
/// Append a structured event to the daemon log.
pub fn log(data_dir: &Path, job: &str, event: &str, detail: &str) {
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
let line = if detail.is_empty() {
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event)
} else {
let safe = detail.replace('\\', "\\\\").replace('"', "\\\"")
.replace('\n', "\\n");
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n",
ts, job, event, safe)
};
let path = log_path(data_dir);
rotate_if_needed(&path);
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
let _ = f.write_all(line.as_bytes());
}
}
fn rotate_if_needed(path: &Path) {
if let Ok(meta) = fs::metadata(path) {
if meta.len() > LOG_MAX_BYTES {
if let Ok(content) = fs::read_to_string(path) {
let half = content.len() / 2;
if let Some(nl) = content[half..].find('\n') {
let _ = fs::write(path, &content[half + nl + 1..]);
}
}
}
}
}
/// Read the last N log entries (for display).
pub fn tail(data_dir: &Path, count: usize) -> Vec<String> {
let path = log_path(data_dir);
let content = fs::read_to_string(path).unwrap_or_default();
content.lines()
.rev()
.take(count)
.map(String::from)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}

View file

@ -1,147 +0,0 @@
// jobkit-daemon — generic daemon infrastructure on top of jobkit
//
// Extracts the reusable parts of a background job daemon:
// - JSONL event logging with size-based rotation
// - Unix domain socket RPC server with signal handling
// - Status file management
// - Worker pool setup from config
// - run_job() wrapper with logging and error mapping
//
// Application code registers job functions, RPC handlers, and
// long-running tasks. This crate handles the plumbing.
pub mod event_log;
pub mod socket;
pub mod status;
use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError};
use std::path::PathBuf;
use std::sync::Arc;
/// Daemon configuration.
pub struct DaemonConfig {
/// Directory for status file, log file, and socket.
pub data_dir: PathBuf,
/// Number of LLM (or other gated resource) concurrent slots.
pub resource_slots: usize,
/// Name for the resource pool.
pub resource_name: String,
/// Extra workers beyond resource slots (for long-running loops + non-gated jobs).
pub extra_workers: usize,
}
impl Default for DaemonConfig {
fn default() -> Self {
Self {
data_dir: PathBuf::from("."),
resource_slots: 3,
resource_name: "llm".to_string(),
extra_workers: 3,
}
}
}
/// A running daemon instance.
pub struct Daemon {
pub choir: Arc<Choir>,
pub resource: Arc<ResourcePool>,
config: DaemonConfig,
rpc_handlers: Vec<RpcHandler>,
_workers: Vec<jobkit::WorkerHandle>,
}
type RpcHandler = Box<dyn Fn(&str, &DaemonContext) -> Option<String> + Send + Sync>;
/// Context passed to RPC handlers and status builders.
pub struct DaemonContext {
pub choir: Arc<Choir>,
pub resource: Arc<ResourcePool>,
pub data_dir: PathBuf,
}
impl Daemon {
/// Create a new daemon with the given configuration.
pub fn new(config: DaemonConfig) -> Self {
let choir = Choir::new();
let n_workers = config.resource_slots + config.extra_workers;
let workers: Vec<_> = (0..n_workers)
.map(|i| choir.add_worker(&format!("w{}", i)))
.collect();
let resource = ResourcePool::new(&config.resource_name, config.resource_slots);
resource.bind(&choir);
Daemon {
choir,
resource,
config,
rpc_handlers: Vec::new(),
_workers: workers,
}
}
/// Register an RPC handler. Called with (command_string, context).
/// Return Some(response_json) to handle, None to pass to next handler.
pub fn add_rpc_handler<F>(&mut self, handler: F)
where
F: Fn(&str, &DaemonContext) -> Option<String> + Send + Sync + 'static,
{
self.rpc_handlers.push(Box::new(handler));
}
/// Run the daemon main loop (socket server + signal handling).
/// Blocks until SIGINT/SIGTERM.
pub fn run<S, F>(&self, status_builder: F)
where
S: serde::Serialize,
F: Fn(&DaemonContext) -> S + Send + Sync,
{
let ctx = DaemonContext {
choir: Arc::clone(&self.choir),
resource: Arc::clone(&self.resource),
data_dir: self.config.data_dir.clone(),
};
event_log::log(&self.config.data_dir, "daemon", "started",
&format!("pid {}", std::process::id()));
eprintln!("daemon started (pid {})", std::process::id());
// Write initial status
let initial = status_builder(&ctx);
status::write(&self.config.data_dir, &initial);
socket::run_loop(
&self.config.data_dir,
&ctx,
&self.rpc_handlers,
&status_builder,
);
}
/// Convenience: wrap a closure with logging, progress, and error mapping.
pub fn run_job(
data_dir: &std::path::Path,
ctx: &ExecutionContext,
name: &str,
f: impl FnOnce() -> Result<(), String>,
) -> Result<(), TaskError> {
event_log::log(data_dir, name, "started", "");
ctx.set_progress("starting");
let start = std::time::Instant::now();
match f() {
Ok(()) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
event_log::log(data_dir, name, "completed", &duration);
ctx.set_result(&duration);
Ok(())
}
Err(e) => {
let duration = format!("{:.1}s", start.elapsed().as_secs_f64());
let msg = format!("{}: {}", duration, e);
event_log::log(data_dir, name, "failed", &msg);
Err(TaskError::Retry(msg))
}
}
}
}

View file

@ -1,99 +0,0 @@
// Unix domain socket RPC server with signal handling
//
// Non-blocking accept loop, checks STOP flag between accepts.
// Dispatches commands through registered handlers; falls back
// to returning status JSON if no handler matches.
use super::{DaemonContext, RpcHandler};
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
static STOP: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_signal(_: libc::c_int) {
STOP.store(true, Ordering::Release);
}
pub fn run_loop<S, F>(
data_dir: &Path,
ctx: &DaemonContext,
handlers: &[RpcHandler],
status_builder: &F,
) where
S: serde::Serialize,
F: Fn(&DaemonContext) -> S,
{
unsafe {
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
}
let sock_path = data_dir.join("daemon.sock");
let _ = std::fs::remove_file(&sock_path);
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
eprintln!("Warning: couldn't bind socket {}: {}", sock_path.display(), e);
while !STOP.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(500));
}
return;
}
};
listener.set_nonblocking(true).ok();
while !STOP.load(Ordering::Acquire) {
match listener.accept() {
Ok((mut stream, _)) => {
stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
let mut cmd = String::new();
let _ = stream.read_to_string(&mut cmd);
let cmd = cmd.trim().to_string();
// Try registered handlers first
let mut handled = false;
for handler in handlers {
if let Some(response) = handler(&cmd, ctx) {
let _ = stream.write_all(response.as_bytes());
handled = true;
break;
}
}
// Default: return status JSON
if !handled {
let status = status_builder(ctx);
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = stream.write_all(json.as_bytes());
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(100));
}
Err(_) => {
std::thread::sleep(Duration::from_millis(100));
}
}
}
let _ = std::fs::remove_file(&sock_path);
}
/// Send an RPC command to a running daemon. Returns the response.
pub fn send_rpc(data_dir: &Path, cmd: &str) -> Option<String> {
use std::os::unix::net::UnixStream;
let sock_path = data_dir.join("daemon.sock");
let mut stream = UnixStream::connect(&sock_path).ok()?;
stream.write_all(cmd.as_bytes()).ok()?;
stream.shutdown(std::net::Shutdown::Write).ok()?;
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
Some(buf)
}

View file

@ -1,29 +0,0 @@
// Status file management
//
// Writes a JSON status snapshot to data_dir/daemon-status.json.
// Applications provide their own status struct (must impl Serialize).
use std::fs;
use std::path::Path;
fn status_path(data_dir: &Path) -> std::path::PathBuf {
data_dir.join("daemon-status.json")
}
/// Write a status snapshot to the status file.
pub fn write<S: serde::Serialize>(data_dir: &Path, status: &S) {
if let Ok(json) = serde_json::to_string_pretty(status) {
let _ = fs::write(status_path(data_dir), json);
}
}
/// Read the status file as a string.
pub fn read(data_dir: &Path) -> Option<String> {
fs::read_to_string(status_path(data_dir)).ok()
}
/// Read and deserialize the status file.
pub fn load<S: serde::de::DeserializeOwned>(data_dir: &Path) -> Option<S> {
let s = read(data_dir)?;
serde_json::from_str(&s).ok()
}

View file

@ -20,7 +20,7 @@ rayon = "1"
peg = "0.8"
paste = "1"
jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" }
jobkit-daemon = { path = "../jobkit-daemon" }
jobkit-daemon = { git = "https://evilpiepirate.org/git/jobkit-daemon.git/" }
redb = "2"
log = "0.4"
ratatui = "0.29"
@ -53,3 +53,7 @@ path = "src/bin/merge-logs.rs"
[[bin]]
name = "diag-key"
path = "src/bin/diag-key.rs"
[[bin]]
name = "find-deleted"
path = "src/bin/find-deleted.rs"