diff --git a/jobkit-daemon/Cargo.toml b/jobkit-daemon/Cargo.toml deleted file mode 100644 index 5b6cb2f..0000000 --- a/jobkit-daemon/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "jobkit-daemon" -version.workspace = true -edition.workspace = true - -[dependencies] -jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" } -serde = { version = "1", features = ["derive"] } -serde_json = "1" -chrono = "0.4" -libc = "0.2" -log = "0.4" diff --git a/jobkit-daemon/src/event_log.rs b/jobkit-daemon/src/event_log.rs deleted file mode 100644 index a0afea2..0000000 --- a/jobkit-daemon/src/event_log.rs +++ /dev/null @@ -1,62 +0,0 @@ -// JSONL event logging with size-based rotation -// -// Appends {"ts", "job", "event", "detail"} lines to daemon.log. -// Rotates by truncating to the last half when file exceeds 1MB. -// Rotation is intentionally simple — no external log infra needed. - -use std::fs; -use std::io::Write; -use std::path::Path; - -const LOG_MAX_BYTES: u64 = 1_000_000; - -fn log_path(data_dir: &Path) -> std::path::PathBuf { - data_dir.join("daemon.log") -} - -/// Append a structured event to the daemon log. -pub fn log(data_dir: &Path, job: &str, event: &str, detail: &str) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - let line = if detail.is_empty() { - format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) - } else { - let safe = detail.replace('\\', "\\\\").replace('"', "\\\"") - .replace('\n', "\\n"); - format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", - ts, job, event, safe) - }; - - let path = log_path(data_dir); - rotate_if_needed(&path); - - if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) { - let _ = f.write_all(line.as_bytes()); - } -} - -fn rotate_if_needed(path: &Path) { - if let Ok(meta) = fs::metadata(path) { - if meta.len() > LOG_MAX_BYTES { - if let Ok(content) = fs::read_to_string(path) { - let half = content.len() / 2; - if let Some(nl) = content[half..].find('\n') { - let _ = fs::write(path, &content[half + nl + 1..]); - } - } - } - } -} - -/// Read the last N log entries (for display). -pub fn tail(data_dir: &Path, count: usize) -> Vec { - let path = log_path(data_dir); - let content = fs::read_to_string(path).unwrap_or_default(); - content.lines() - .rev() - .take(count) - .map(String::from) - .collect::>() - .into_iter() - .rev() - .collect() -} diff --git a/jobkit-daemon/src/lib.rs b/jobkit-daemon/src/lib.rs deleted file mode 100644 index d26fd11..0000000 --- a/jobkit-daemon/src/lib.rs +++ /dev/null @@ -1,147 +0,0 @@ -// jobkit-daemon — generic daemon infrastructure on top of jobkit -// -// Extracts the reusable parts of a background job daemon: -// - JSONL event logging with size-based rotation -// - Unix domain socket RPC server with signal handling -// - Status file management -// - Worker pool setup from config -// - run_job() wrapper with logging and error mapping -// -// Application code registers job functions, RPC handlers, and -// long-running tasks. This crate handles the plumbing. - -pub mod event_log; -pub mod socket; -pub mod status; - -use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError}; -use std::path::PathBuf; -use std::sync::Arc; - -/// Daemon configuration. -pub struct DaemonConfig { - /// Directory for status file, log file, and socket. - pub data_dir: PathBuf, - /// Number of LLM (or other gated resource) concurrent slots. - pub resource_slots: usize, - /// Name for the resource pool. - pub resource_name: String, - /// Extra workers beyond resource slots (for long-running loops + non-gated jobs). - pub extra_workers: usize, -} - -impl Default for DaemonConfig { - fn default() -> Self { - Self { - data_dir: PathBuf::from("."), - resource_slots: 3, - resource_name: "llm".to_string(), - extra_workers: 3, - } - } -} - -/// A running daemon instance. -pub struct Daemon { - pub choir: Arc, - pub resource: Arc, - config: DaemonConfig, - rpc_handlers: Vec, - _workers: Vec, -} - -type RpcHandler = Box Option + Send + Sync>; - -/// Context passed to RPC handlers and status builders. -pub struct DaemonContext { - pub choir: Arc, - pub resource: Arc, - pub data_dir: PathBuf, -} - -impl Daemon { - /// Create a new daemon with the given configuration. - pub fn new(config: DaemonConfig) -> Self { - let choir = Choir::new(); - let n_workers = config.resource_slots + config.extra_workers; - let workers: Vec<_> = (0..n_workers) - .map(|i| choir.add_worker(&format!("w{}", i))) - .collect(); - - let resource = ResourcePool::new(&config.resource_name, config.resource_slots); - resource.bind(&choir); - - Daemon { - choir, - resource, - config, - rpc_handlers: Vec::new(), - _workers: workers, - } - } - - /// Register an RPC handler. Called with (command_string, context). - /// Return Some(response_json) to handle, None to pass to next handler. - pub fn add_rpc_handler(&mut self, handler: F) - where - F: Fn(&str, &DaemonContext) -> Option + Send + Sync + 'static, - { - self.rpc_handlers.push(Box::new(handler)); - } - - /// Run the daemon main loop (socket server + signal handling). - /// Blocks until SIGINT/SIGTERM. - pub fn run(&self, status_builder: F) - where - S: serde::Serialize, - F: Fn(&DaemonContext) -> S + Send + Sync, - { - let ctx = DaemonContext { - choir: Arc::clone(&self.choir), - resource: Arc::clone(&self.resource), - data_dir: self.config.data_dir.clone(), - }; - - event_log::log(&self.config.data_dir, "daemon", "started", - &format!("pid {}", std::process::id())); - eprintln!("daemon started (pid {})", std::process::id()); - - // Write initial status - let initial = status_builder(&ctx); - status::write(&self.config.data_dir, &initial); - - socket::run_loop( - &self.config.data_dir, - &ctx, - &self.rpc_handlers, - &status_builder, - ); - } - - /// Convenience: wrap a closure with logging, progress, and error mapping. - pub fn run_job( - data_dir: &std::path::Path, - ctx: &ExecutionContext, - name: &str, - f: impl FnOnce() -> Result<(), String>, - ) -> Result<(), TaskError> { - event_log::log(data_dir, name, "started", ""); - ctx.set_progress("starting"); - let start = std::time::Instant::now(); - - match f() { - Ok(()) => { - let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); - event_log::log(data_dir, name, "completed", &duration); - ctx.set_result(&duration); - Ok(()) - } - Err(e) => { - let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); - let msg = format!("{}: {}", duration, e); - event_log::log(data_dir, name, "failed", &msg); - Err(TaskError::Retry(msg)) - } - } - } -} diff --git a/jobkit-daemon/src/socket.rs b/jobkit-daemon/src/socket.rs deleted file mode 100644 index 25b74b8..0000000 --- a/jobkit-daemon/src/socket.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Unix domain socket RPC server with signal handling -// -// Non-blocking accept loop, checks STOP flag between accepts. -// Dispatches commands through registered handlers; falls back -// to returning status JSON if no handler matches. - -use super::{DaemonContext, RpcHandler}; -use std::io::{Read, Write}; -use std::os::unix::net::UnixListener; -use std::path::Path; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; - -static STOP: AtomicBool = AtomicBool::new(false); - -extern "C" fn handle_signal(_: libc::c_int) { - STOP.store(true, Ordering::Release); -} - -pub fn run_loop( - data_dir: &Path, - ctx: &DaemonContext, - handlers: &[RpcHandler], - status_builder: &F, -) where - S: serde::Serialize, - F: Fn(&DaemonContext) -> S, -{ - unsafe { - libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); - libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); - } - - let sock_path = data_dir.join("daemon.sock"); - let _ = std::fs::remove_file(&sock_path); - - let listener = match UnixListener::bind(&sock_path) { - Ok(l) => l, - Err(e) => { - eprintln!("Warning: couldn't bind socket {}: {}", sock_path.display(), e); - while !STOP.load(Ordering::Acquire) { - std::thread::sleep(Duration::from_millis(500)); - } - return; - } - }; - - listener.set_nonblocking(true).ok(); - - while !STOP.load(Ordering::Acquire) { - match listener.accept() { - Ok((mut stream, _)) => { - stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); - let mut cmd = String::new(); - let _ = stream.read_to_string(&mut cmd); - let cmd = cmd.trim().to_string(); - - // Try registered handlers first - let mut handled = false; - for handler in handlers { - if let Some(response) = handler(&cmd, ctx) { - let _ = stream.write_all(response.as_bytes()); - handled = true; - break; - } - } - - // Default: return status JSON - if !handled { - let status = status_builder(ctx); - if let Ok(json) = serde_json::to_string_pretty(&status) { - let _ = stream.write_all(json.as_bytes()); - } - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - std::thread::sleep(Duration::from_millis(100)); - } - Err(_) => { - std::thread::sleep(Duration::from_millis(100)); - } - } - } - - let _ = std::fs::remove_file(&sock_path); -} - -/// Send an RPC command to a running daemon. Returns the response. -pub fn send_rpc(data_dir: &Path, cmd: &str) -> Option { - use std::os::unix::net::UnixStream; - - let sock_path = data_dir.join("daemon.sock"); - let mut stream = UnixStream::connect(&sock_path).ok()?; - stream.write_all(cmd.as_bytes()).ok()?; - stream.shutdown(std::net::Shutdown::Write).ok()?; - let mut buf = String::new(); - stream.read_to_string(&mut buf).ok()?; - Some(buf) -} diff --git a/jobkit-daemon/src/status.rs b/jobkit-daemon/src/status.rs deleted file mode 100644 index 5c659ae..0000000 --- a/jobkit-daemon/src/status.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Status file management -// -// Writes a JSON status snapshot to data_dir/daemon-status.json. -// Applications provide their own status struct (must impl Serialize). - -use std::fs; -use std::path::Path; - -fn status_path(data_dir: &Path) -> std::path::PathBuf { - data_dir.join("daemon-status.json") -} - -/// Write a status snapshot to the status file. -pub fn write(data_dir: &Path, status: &S) { - if let Ok(json) = serde_json::to_string_pretty(status) { - let _ = fs::write(status_path(data_dir), json); - } -} - -/// Read the status file as a string. -pub fn read(data_dir: &Path) -> Option { - fs::read_to_string(status_path(data_dir)).ok() -} - -/// Read and deserialize the status file. -pub fn load(data_dir: &Path) -> Option { - let s = read(data_dir)?; - serde_json::from_str(&s).ok() -} diff --git a/poc-memory/Cargo.toml b/poc-memory/Cargo.toml index df00dd0..d8773af 100644 --- a/poc-memory/Cargo.toml +++ b/poc-memory/Cargo.toml @@ -20,7 +20,7 @@ rayon = "1" peg = "0.8" paste = "1" jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" } -jobkit-daemon = { path = "../jobkit-daemon" } +jobkit-daemon = { git = "https://evilpiepirate.org/git/jobkit-daemon.git/" } redb = "2" log = "0.4" ratatui = "0.29" @@ -53,3 +53,7 @@ path = "src/bin/merge-logs.rs" [[bin]] name = "diag-key" path = "src/bin/diag-key.rs" + +[[bin]] +name = "find-deleted" +path = "src/bin/find-deleted.rs"