From 420a777ebadf710d7ed28f74806ad277a318fd06 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 14 Mar 2026 02:40:30 -0400 Subject: [PATCH] extract jobkit-daemon library from poc-memory daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create jobkit-daemon crate with generic daemon infrastructure: - event_log: JSONL append with size-based rotation - socket: Unix domain socket RPC client and server with signal handling - status: JSON status file read/write Migrate daemon.rs to use the library: - Worker pool setup via Daemon::new() - Socket loop + signal handling via Daemon::run() - RPC handlers as registered closures - Logging, status writing, send_rpc all delegate to library Migrate tui.rs to use socket::send_rpc() instead of inline UnixStream. daemon.rs: 1952 → 1806 lines (-146), old status_socket_loop removed. tui.rs: socket boilerplate removed. Co-Authored-By: Kent Overstreet --- Cargo.lock | 13 + Cargo.toml | 2 +- jobkit-daemon/Cargo.toml | 12 + jobkit-daemon/src/event_log.rs | 62 +++ jobkit-daemon/src/lib.rs | 147 +++++++ jobkit-daemon/src/socket.rs | 99 +++++ jobkit-daemon/src/status.rs | 29 ++ .../2026-03-14-daemon-jobkit-survey.md | 202 +++++++++ poc-memory/Cargo.toml | 1 + poc-memory/src/agents/daemon.rs | 400 ++++++------------ poc-memory/src/tui.rs | 22 +- 11 files changed, 696 insertions(+), 293 deletions(-) create mode 100644 jobkit-daemon/Cargo.toml create mode 100644 jobkit-daemon/src/event_log.rs create mode 100644 jobkit-daemon/src/lib.rs create mode 100644 jobkit-daemon/src/socket.rs create mode 100644 jobkit-daemon/src/status.rs create mode 100644 poc-memory/.claude/analysis/2026-03-14-daemon-jobkit-survey.md diff --git a/Cargo.lock b/Cargo.lock index 8555f26..5937877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1437,6 +1437,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jobkit-daemon" +version = "0.4.0" +dependencies = [ + "chrono", + "jobkit", + "libc", + "log", + "serde", + "serde_json", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -1883,6 +1895,7 @@ dependencies = [ "crossterm", "faer", "jobkit", + "jobkit-daemon", "libc", "log", "memmap2", diff --git a/Cargo.toml b/Cargo.toml index a223f69..93cb8ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["poc-memory", "poc-daemon"] +members = ["poc-memory", "poc-daemon", "jobkit-daemon"] resolver = "2" [workspace.package] diff --git a/jobkit-daemon/Cargo.toml b/jobkit-daemon/Cargo.toml new file mode 100644 index 0000000..5b6cb2f --- /dev/null +++ b/jobkit-daemon/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "jobkit-daemon" +version.workspace = true +edition.workspace = true + +[dependencies] +jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +chrono = "0.4" +libc = "0.2" +log = "0.4" diff --git a/jobkit-daemon/src/event_log.rs b/jobkit-daemon/src/event_log.rs new file mode 100644 index 0000000..a0afea2 --- /dev/null +++ b/jobkit-daemon/src/event_log.rs @@ -0,0 +1,62 @@ +// JSONL event logging with size-based rotation +// +// Appends {"ts", "job", "event", "detail"} lines to daemon.log. +// Rotates by truncating to the last half when file exceeds 1MB. +// Rotation is intentionally simple — no external log infra needed. + +use std::fs; +use std::io::Write; +use std::path::Path; + +const LOG_MAX_BYTES: u64 = 1_000_000; + +fn log_path(data_dir: &Path) -> std::path::PathBuf { + data_dir.join("daemon.log") +} + +/// Append a structured event to the daemon log. +pub fn log(data_dir: &Path, job: &str, event: &str, detail: &str) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + let line = if detail.is_empty() { + format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) + } else { + let safe = detail.replace('\\', "\\\\").replace('"', "\\\"") + .replace('\n', "\\n"); + format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", + ts, job, event, safe) + }; + + let path = log_path(data_dir); + rotate_if_needed(&path); + + if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) { + let _ = f.write_all(line.as_bytes()); + } +} + +fn rotate_if_needed(path: &Path) { + if let Ok(meta) = fs::metadata(path) { + if meta.len() > LOG_MAX_BYTES { + if let Ok(content) = fs::read_to_string(path) { + let half = content.len() / 2; + if let Some(nl) = content[half..].find('\n') { + let _ = fs::write(path, &content[half + nl + 1..]); + } + } + } + } +} + +/// Read the last N log entries (for display). +pub fn tail(data_dir: &Path, count: usize) -> Vec { + let path = log_path(data_dir); + let content = fs::read_to_string(path).unwrap_or_default(); + content.lines() + .rev() + .take(count) + .map(String::from) + .collect::>() + .into_iter() + .rev() + .collect() +} diff --git a/jobkit-daemon/src/lib.rs b/jobkit-daemon/src/lib.rs new file mode 100644 index 0000000..d26fd11 --- /dev/null +++ b/jobkit-daemon/src/lib.rs @@ -0,0 +1,147 @@ +// jobkit-daemon — generic daemon infrastructure on top of jobkit +// +// Extracts the reusable parts of a background job daemon: +// - JSONL event logging with size-based rotation +// - Unix domain socket RPC server with signal handling +// - Status file management +// - Worker pool setup from config +// - run_job() wrapper with logging and error mapping +// +// Application code registers job functions, RPC handlers, and +// long-running tasks. This crate handles the plumbing. + +pub mod event_log; +pub mod socket; +pub mod status; + +use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError}; +use std::path::PathBuf; +use std::sync::Arc; + +/// Daemon configuration. +pub struct DaemonConfig { + /// Directory for status file, log file, and socket. + pub data_dir: PathBuf, + /// Number of LLM (or other gated resource) concurrent slots. + pub resource_slots: usize, + /// Name for the resource pool. + pub resource_name: String, + /// Extra workers beyond resource slots (for long-running loops + non-gated jobs). + pub extra_workers: usize, +} + +impl Default for DaemonConfig { + fn default() -> Self { + Self { + data_dir: PathBuf::from("."), + resource_slots: 3, + resource_name: "llm".to_string(), + extra_workers: 3, + } + } +} + +/// A running daemon instance. +pub struct Daemon { + pub choir: Arc, + pub resource: Arc, + config: DaemonConfig, + rpc_handlers: Vec, + _workers: Vec, +} + +type RpcHandler = Box Option + Send + Sync>; + +/// Context passed to RPC handlers and status builders. +pub struct DaemonContext { + pub choir: Arc, + pub resource: Arc, + pub data_dir: PathBuf, +} + +impl Daemon { + /// Create a new daemon with the given configuration. + pub fn new(config: DaemonConfig) -> Self { + let choir = Choir::new(); + let n_workers = config.resource_slots + config.extra_workers; + let workers: Vec<_> = (0..n_workers) + .map(|i| choir.add_worker(&format!("w{}", i))) + .collect(); + + let resource = ResourcePool::new(&config.resource_name, config.resource_slots); + resource.bind(&choir); + + Daemon { + choir, + resource, + config, + rpc_handlers: Vec::new(), + _workers: workers, + } + } + + /// Register an RPC handler. Called with (command_string, context). + /// Return Some(response_json) to handle, None to pass to next handler. + pub fn add_rpc_handler(&mut self, handler: F) + where + F: Fn(&str, &DaemonContext) -> Option + Send + Sync + 'static, + { + self.rpc_handlers.push(Box::new(handler)); + } + + /// Run the daemon main loop (socket server + signal handling). + /// Blocks until SIGINT/SIGTERM. + pub fn run(&self, status_builder: F) + where + S: serde::Serialize, + F: Fn(&DaemonContext) -> S + Send + Sync, + { + let ctx = DaemonContext { + choir: Arc::clone(&self.choir), + resource: Arc::clone(&self.resource), + data_dir: self.config.data_dir.clone(), + }; + + event_log::log(&self.config.data_dir, "daemon", "started", + &format!("pid {}", std::process::id())); + eprintln!("daemon started (pid {})", std::process::id()); + + // Write initial status + let initial = status_builder(&ctx); + status::write(&self.config.data_dir, &initial); + + socket::run_loop( + &self.config.data_dir, + &ctx, + &self.rpc_handlers, + &status_builder, + ); + } + + /// Convenience: wrap a closure with logging, progress, and error mapping. + pub fn run_job( + data_dir: &std::path::Path, + ctx: &ExecutionContext, + name: &str, + f: impl FnOnce() -> Result<(), String>, + ) -> Result<(), TaskError> { + event_log::log(data_dir, name, "started", ""); + ctx.set_progress("starting"); + let start = std::time::Instant::now(); + + match f() { + Ok(()) => { + let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); + event_log::log(data_dir, name, "completed", &duration); + ctx.set_result(&duration); + Ok(()) + } + Err(e) => { + let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); + let msg = format!("{}: {}", duration, e); + event_log::log(data_dir, name, "failed", &msg); + Err(TaskError::Retry(msg)) + } + } + } +} diff --git a/jobkit-daemon/src/socket.rs b/jobkit-daemon/src/socket.rs new file mode 100644 index 0000000..25b74b8 --- /dev/null +++ b/jobkit-daemon/src/socket.rs @@ -0,0 +1,99 @@ +// Unix domain socket RPC server with signal handling +// +// Non-blocking accept loop, checks STOP flag between accepts. +// Dispatches commands through registered handlers; falls back +// to returning status JSON if no handler matches. + +use super::{DaemonContext, RpcHandler}; +use std::io::{Read, Write}; +use std::os::unix::net::UnixListener; +use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +static STOP: AtomicBool = AtomicBool::new(false); + +extern "C" fn handle_signal(_: libc::c_int) { + STOP.store(true, Ordering::Release); +} + +pub fn run_loop( + data_dir: &Path, + ctx: &DaemonContext, + handlers: &[RpcHandler], + status_builder: &F, +) where + S: serde::Serialize, + F: Fn(&DaemonContext) -> S, +{ + unsafe { + libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); + libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); + } + + let sock_path = data_dir.join("daemon.sock"); + let _ = std::fs::remove_file(&sock_path); + + let listener = match UnixListener::bind(&sock_path) { + Ok(l) => l, + Err(e) => { + eprintln!("Warning: couldn't bind socket {}: {}", sock_path.display(), e); + while !STOP.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(500)); + } + return; + } + }; + + listener.set_nonblocking(true).ok(); + + while !STOP.load(Ordering::Acquire) { + match listener.accept() { + Ok((mut stream, _)) => { + stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); + let mut cmd = String::new(); + let _ = stream.read_to_string(&mut cmd); + let cmd = cmd.trim().to_string(); + + // Try registered handlers first + let mut handled = false; + for handler in handlers { + if let Some(response) = handler(&cmd, ctx) { + let _ = stream.write_all(response.as_bytes()); + handled = true; + break; + } + } + + // Default: return status JSON + if !handled { + let status = status_builder(ctx); + if let Ok(json) = serde_json::to_string_pretty(&status) { + let _ = stream.write_all(json.as_bytes()); + } + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(100)); + } + Err(_) => { + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + let _ = std::fs::remove_file(&sock_path); +} + +/// Send an RPC command to a running daemon. Returns the response. +pub fn send_rpc(data_dir: &Path, cmd: &str) -> Option { + use std::os::unix::net::UnixStream; + + let sock_path = data_dir.join("daemon.sock"); + let mut stream = UnixStream::connect(&sock_path).ok()?; + stream.write_all(cmd.as_bytes()).ok()?; + stream.shutdown(std::net::Shutdown::Write).ok()?; + let mut buf = String::new(); + stream.read_to_string(&mut buf).ok()?; + Some(buf) +} diff --git a/jobkit-daemon/src/status.rs b/jobkit-daemon/src/status.rs new file mode 100644 index 0000000..5c659ae --- /dev/null +++ b/jobkit-daemon/src/status.rs @@ -0,0 +1,29 @@ +// Status file management +// +// Writes a JSON status snapshot to data_dir/daemon-status.json. +// Applications provide their own status struct (must impl Serialize). + +use std::fs; +use std::path::Path; + +fn status_path(data_dir: &Path) -> std::path::PathBuf { + data_dir.join("daemon-status.json") +} + +/// Write a status snapshot to the status file. +pub fn write(data_dir: &Path, status: &S) { + if let Ok(json) = serde_json::to_string_pretty(status) { + let _ = fs::write(status_path(data_dir), json); + } +} + +/// Read the status file as a string. +pub fn read(data_dir: &Path) -> Option { + fs::read_to_string(status_path(data_dir)).ok() +} + +/// Read and deserialize the status file. +pub fn load(data_dir: &Path) -> Option { + let s = read(data_dir)?; + serde_json::from_str(&s).ok() +} diff --git a/poc-memory/.claude/analysis/2026-03-14-daemon-jobkit-survey.md b/poc-memory/.claude/analysis/2026-03-14-daemon-jobkit-survey.md new file mode 100644 index 0000000..0844439 --- /dev/null +++ b/poc-memory/.claude/analysis/2026-03-14-daemon-jobkit-survey.md @@ -0,0 +1,202 @@ +# Daemon & Jobkit Architecture Survey +_2026-03-14, autonomous survey while Kent debugs discard FIFO_ + +## Current state + +daemon.rs is 1952 lines mixing three concerns: +- ~400 lines: pure jobkit usage (spawn, depend_on, resource) +- ~600 lines: logging/monitoring (log_event, status, RPC) +- ~950 lines: job functions embedding business logic + +## What jobkit provides (good) + +- Worker pool with named workers +- Dependency graph: `depend_on()` for ordering +- Resource pools: `ResourcePool` for concurrency gating (LLM slots) +- Retry logic: `retries(N)` on `TaskError::Retry` +- Task status tracking: `choir.task_statuses()` → `Vec` +- Cancellation: `ctx.is_cancelled()` + +## What jobkit is missing + +### 1. Structured logging (PRIORITY) +- Currently dual-channel: `ctx.log_line()` (per-task) + `log_event()` (daemon JSONL) +- No log levels, no structured context, no correlation IDs +- Log rotation is naive (truncate at 1MB, keep second half) +- Need: observability hooks that both human TUI and AI can consume + +### 2. Metrics (NONE EXIST) +- No task duration histograms +- No worker utilization tracking +- No queue depth monitoring +- No success/failure rates by type +- No resource pool wait times + +### 3. Health monitoring +- No watchdog timers +- No health check hooks per job +- No alerting on threshold violations +- Health computed on-demand in daemon, not in jobkit + +### 4. RPC (ad-hoc in daemon, should be schematized) +- Unix socket with string matching: `match cmd.as_str()` +- No cap'n proto schema for daemon control +- No versioning, no validation, no streaming + +## Architecture problems + +### Tangled concerns +Job functions hardcode `log_event()` calls. Graph health is in daemon +but uses domain-specific metrics. Store loading happens inside jobs +(10 agent runs = 10 store loads). Not separable. + +### Magic numbers +- Workers = `llm_concurrency + 3` (line 682) +- 10 max new jobs per tick (line 770) +- 300/1800s backoff range (lines 721-722) +- 1MB log rotation (line 39) +- 60s scheduler interval (line 24) +None configurable. + +### Hardcoded pipeline DAG +Daily pipeline phases are `depend_on()` chains in Rust code (lines +1061-1109). Can't adjust without recompile. No visualization. No +conditional skipping of phases. + +### Task naming is fragile +Names used as both identifiers AND for parsing in TUI. Format varies +(colons, dashes, dates). `task_group()` splits on '-' to categorize — +brittle. + +### No persistent task queue +Restart loses all pending tasks. Session watcher handles this via +reconciliation (good), but scheduler uses `last_daily` date from file. + +## What works well + +1. **Reconciliation-based session discovery** — elegant, restart-resilient +2. **Resource pooling** — LLM concurrency decoupled from worker count +3. **Dependency-driven pipeline** — clean DAG via `depend_on()` +4. **Retry with backoff** — exponential 5min→30min, resets on success +5. **Graceful shutdown** — SIGINT/SIGTERM handled properly + +## Kent's design direction + +### Event stream, not log files +One pipeline, multiple consumers. TUI renders for humans, AI consumes +structured data. Same events, different renderers. Cap'n Proto streaming +subscription: `subscribe(filter) -> stream`. + +"No one ever thinks further ahead than log files with monitoring and +it's infuriating." — Kent + +### Extend jobkit, don't add a layer +jobkit already has the scheduling and dependency graph. Don't create a +new orchestration layer — add the missing pieces (logging, metrics, +health, RPC) to jobkit itself. + +### Cap'n Proto for everything +Standard RPC definitions for: +- Status queries (what's running, pending, failed) +- Control (start, stop, restart, queue) +- Event streaming (subscribe with filter) +- Health checks + +## The bigger picture: bcachefs as library + +Kent's monitoring system in bcachefs (event_inc/event_inc_trace + x-macro +counters) is the real monitoring infrastructure. 1-1 correspondence between +counters (cheap, always-on dashboard via `fs top`) and tracepoints (expensive +detail, only runs when enabled). The x-macro enforces this — can't have one +without the other. + +When the Rust conversion is complete, bcachefs becomes a library. At that +point, jobkit doesn't need its own monitoring — it uses the same counter/ +tracepoint infrastructure. One observability system for everything. + +**Implication for now:** jobkit monitoring just needs to be good enough. +JSON events, not typed. Don't over-engineer — the real infrastructure is +coming from the Rust conversion. + +## Extraction: jobkit-daemon library (designed with Kent) + +### Goes to jobkit-daemon (generic) +- JSONL event logging with size-based rotation +- Unix domain socket server + signal handling +- Status file writing (periodic JSON snapshot) +- `run_job()` wrapper (logging + progress + error mapping) +- Systemd service installation +- Worker pool setup from config +- Cap'n Proto RPC for control protocol + +### Stays in poc-memory (application) +- All job functions (experience-mine, fact-mine, consolidation, etc.) +- Session watcher, scheduler, RPC command handlers +- GraphHealth, consolidation plan logic + +### Interface design +- Cap'n Proto RPC for typed operations (submit, cancel, subscribe) +- JSON blob for status (inherently open-ended, every app has different + job types — typing this is the tracepoint mistake) +- Application registers: RPC handlers, long-running tasks, job functions +- ~50-100 lines of setup code, call `daemon.run()` + +## Plan of attack + +1. **Observability hooks in jobkit** — `on_task_start/progress/complete` + callbacks that consumers can subscribe to +2. **Structured event type** — typed events with task ID, name, duration, + result, metadata. Not strings. +3. **Metrics collection** — duration histograms, success rates, queue + depth. Built on the event stream. +4. **Cap'n Proto daemon RPC schema** — replace ad-hoc socket protocol +5. **TUI consumes event stream** — same data as AI consumer +6. **Extract monitoring from daemon.rs** — the 600 lines of logging/status + become generic, reusable infrastructure +7. **Declarative pipeline config** — DAG definition in config, not code + +## File reference + +- `src/agents/daemon.rs` — 1952 lines, all orchestration + - Job functions: 96-553 + - run_daemon(): 678-1143 + - Socket/RPC: 1145-1372 + - Status display: 1374-1682 +- `src/tui.rs` — 907 lines, polls status socket every 2s +- `schema/memory.capnp` — 125 lines, data only, no RPC definitions +- `src/config.rs` — configuration loading +- External: `jobkit` crate (git dependency) + +## Mistakes I made building this (learning notes) + +_Per Kent's instruction: note what went wrong and WHY._ + +1. **Dual logging channels** — I added `log_event()` because `ctx.log_line()` + wasn't enough, instead of fixing the underlying abstraction. Symptom: + can't find a failed job without searching two places. + +2. **Magic numbers** — I hardcoded constants because "I'll make them + configurable later." Later never came. Every magic number is a design + decision that should have been explicit. + +3. **1952-line file** — daemon.rs grew organically because each new feature + was "just one more function." Should have extracted when it passed 500 + lines. The pain of refactoring later is always worse than the pain of + organizing early. + +4. **Ad-hoc RPC** — String matching seemed fine for 2 commands. Now it's 4 + commands and growing, with implicit formats. Should have used cap'n proto + from the start — the schema IS the documentation. + +5. **No tests** — Zero tests in daemon code. "It's a daemon, how do you test + it?" is not an excuse. The job functions are pure-ish and testable. The + scheduler logic is testable with a clock abstraction. + +6. **Not using systemd** — There's a systemd service for the daemon. + I keep starting it manually with `poc-memory agent daemon start` and + accumulating multiple instances. Tonight: 4 concurrent daemons, 32 + cores pegged at 95%, load average 92. USE SYSTEMD. That's what it's for. + `systemctl --user start poc-memory-daemon`. ONE instance. Managed. + +Pattern: every shortcut was "just for now" and every "just for now" became +permanent. Kent's yelling was right every time. diff --git a/poc-memory/Cargo.toml b/poc-memory/Cargo.toml index 0364ef3..0158743 100644 --- a/poc-memory/Cargo.toml +++ b/poc-memory/Cargo.toml @@ -20,6 +20,7 @@ rayon = "1" peg = "0.8" paste = "1" jobkit = { git = "https://evilpiepirate.org/git/jobkit.git/" } +jobkit-daemon = { path = "../jobkit-daemon" } redb = "2" log = "0.4" ratatui = "0.29" diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index ca309a2..4cd2fb4 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -12,10 +12,9 @@ // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. -use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus}; +use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus}; use std::collections::{HashMap, HashSet}; use std::fs; -use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -23,74 +22,21 @@ use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); -fn status_file() -> &'static str { "daemon-status.json" } -fn log_file() -> &'static str { "daemon.log" } - -fn status_path() -> PathBuf { - crate::config::get().data_dir.join(status_file()) -} - fn log_path() -> PathBuf { - crate::config::get().data_dir.join(log_file()) + crate::config::get().data_dir.join("daemon.log") } // --- Logging --- -const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half - fn log_event(job: &str, event: &str, detail: &str) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - let line = if detail.is_empty() { - format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) - } else { - // Escape detail for JSON safety - let safe = detail.replace('\\', "\\\\").replace('"', "\\\"") - .replace('\n', "\\n"); - format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", - ts, job, event, safe) - }; - let path = log_path(); - - // Rotate if too large - if let Ok(meta) = fs::metadata(&path) { - if meta.len() > LOG_MAX_BYTES { - if let Ok(content) = fs::read_to_string(&path) { - let half = content.len() / 2; - // Find next newline after halfway point - if let Some(nl) = content[half..].find('\n') { - let _ = fs::write(&path, &content[half + nl + 1..]); - } - } - } - } - - if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) { - let _ = f.write_all(line.as_bytes()); - } + jobkit_daemon::event_log::log(&crate::config::get().data_dir, job, event, detail); } // --- Job functions (direct, no subprocess) --- /// Run a named job with logging, progress reporting, and error mapping. fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { - log_event(name, "started", ""); - ctx.set_progress("starting"); - let start = std::time::Instant::now(); - - match f() { - Ok(()) => { - let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); - log_event(name, "completed", &duration); - ctx.set_result(&duration); - Ok(()) - } - Err(e) => { - let duration = format!("{:.1}s", start.elapsed().as_secs_f64()); - let msg = format!("{}: {}", duration, e); - log_event(name, "failed", &msg); - Err(TaskError::Retry(msg)) - } - } + jobkit_daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, f) } fn job_experience_mine(ctx: &ExecutionContext, path: &str, segment: Option) -> Result<(), TaskError> { @@ -638,9 +584,7 @@ fn write_status( graph_health: &Arc>>, ) { let status = build_status(choir, last_daily, graph_health); - if let Ok(json) = serde_json::to_string_pretty(&status) { - let _ = fs::write(status_path(), json); - } + jobkit_daemon::status::write(&crate::config::get().data_dir, &status); } #[derive(Clone, Default, serde::Serialize, serde::Deserialize)] @@ -676,20 +620,20 @@ struct DaemonStatus { // --- The daemon --- pub fn run_daemon() -> Result<(), String> { - let choir = Choir::new(); - let llm_concurrency = crate::config::get().llm_concurrency; - // Workers: 2 for long-running loops + llm_concurrency + 1 for non-LLM jobs - let n_workers = llm_concurrency + 3; - let names: Vec = (0..n_workers).map(|i| format!("w{}", i)).collect(); - let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect(); + let config = crate::config::get(); + let mut daemon = jobkit_daemon::Daemon::new(jobkit_daemon::DaemonConfig { + data_dir: config.data_dir.clone(), + resource_slots: config.llm_concurrency, + resource_name: "llm".to_string(), + extra_workers: 3, + }); - let llm = ResourcePool::new("llm", llm_concurrency); - llm.bind(&choir); + let choir = Arc::clone(&daemon.choir); + let llm = Arc::clone(&daemon.resource); // Recover last_daily from previous status file let last_daily: Arc>> = Arc::new(Mutex::new( - fs::read_to_string(status_path()).ok() - .and_then(|s| serde_json::from_str::(&s).ok()) + jobkit_daemon::status::load::(&config.data_dir) .and_then(|s| s.last_daily) .and_then(|d| d.parse().ok()) )); @@ -1123,36 +1067,124 @@ pub fn run_daemon() -> Result<(), String> { } }); - // Main thread: listen on status socket + wait for signals - let choir_main = Arc::clone(&choir); - let last_daily_main = Arc::clone(&last_daily); - let graph_health_main = Arc::clone(&graph_health); - status_socket_loop(&choir_main, &last_daily_main, &graph_health_main, &llm); + // Register RPC handlers + { + let last_daily_rpc = Arc::clone(&last_daily); + daemon.add_rpc_handler(move |cmd, _ctx| { + if cmd == "consolidate" { + *last_daily_rpc.lock().unwrap() = None; + log_event("rpc", "consolidate", "triggered via socket"); + Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into()) + } else { + None + } + }); + } + + daemon.add_rpc_handler(|cmd, _ctx| { + if !cmd.starts_with("record-hits ") { return None; } + let keys: Vec<&str> = cmd.strip_prefix("record-hits ") + .unwrap_or("") + .split('\t') + .filter(|k| !k.is_empty()) + .collect(); + if keys.is_empty() { + return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into()); + } + let n = keys.len(); + match crate::counters::record_search_hits(&keys) { + Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)), + Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))), + } + }); + + { + let choir_rpc = Arc::clone(&choir); + let llm_rpc = Arc::clone(&llm); + daemon.add_rpc_handler(move |cmd, _ctx| { + if !cmd.starts_with("run-agent ") { return None; } + let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); + let agent_type = parts.get(1).unwrap_or(&"replay"); + let count: usize = parts.get(2) + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + let batch_size = 5; + let today = chrono::Local::now().format("%Y-%m-%d"); + let ts = chrono::Local::now().format("%H%M%S"); + let mut prev = None; + let mut spawned = 0; + let mut remaining = count; + + let is_rename = *agent_type == "rename"; + let is_split = *agent_type == "split"; + + if is_split { + let store = crate::store::Store::load().ok(); + let candidates = store.as_ref() + .map(|s| super::prompts::split_candidates(s)) + .unwrap_or_default(); + let to_split: Vec = candidates.into_iter() + .take(count) + .collect(); + for key in &to_split { + let key = key.clone(); + let task_name = format!("c-split-{}:{}", key, today); + choir_rpc.spawn(task_name) + .resource(&llm_rpc) + .retries(1) + .init(move |ctx| { + job_split_one(ctx, key.clone()) + }) + .run(); + spawned += 1; + } + remaining = 0; + } + + while remaining > 0 { + let batch = remaining.min(batch_size); + let agent = agent_type.to_string(); + let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); + let mut builder = choir_rpc.spawn(task_name) + .resource(&llm_rpc) + .retries(1) + .init(move |ctx| { + if is_rename { + job_rename_agent(ctx, batch) + } else { + job_consolidation_agent(ctx, &agent, batch) + } + }); + if let Some(ref dep) = prev { + builder.depend_on(dep); + } + prev = Some(builder.run()); + remaining -= batch; + spawned += 1; + } + + log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count)); + Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n", + count, agent_type, spawned)) + }); + } + + // Main thread: socket server + signal handling + let last_daily_status = Arc::clone(&last_daily); + let graph_health_status = Arc::clone(&graph_health); + daemon.run(move |ctx| { + build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status) + }); log_event("daemon", "stopping", ""); eprintln!("Shutting down..."); - // Clean up socket - let _ = fs::remove_file(status_sock_path()); - log_event("daemon", "stopped", ""); - - // Exit immediately — PR_SET_PDEATHSIG on child processes ensures - // claude subprocesses get SIGTERM when we die. std::process::exit(0) } fn send_rpc(cmd: &str) -> Option { - use std::io::{Read as _, Write as _}; - use std::os::unix::net::UnixStream; - - let mut stream = UnixStream::connect(status_sock_path()).ok()?; - stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); - stream.write_all(cmd.as_bytes()).ok()?; - stream.shutdown(std::net::Shutdown::Write).ok()?; - let mut buf = String::new(); - stream.read_to_string(&mut buf).ok()?; - Some(buf) + jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) } pub fn rpc_consolidate() -> Result<(), String> { @@ -1187,189 +1219,11 @@ pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { } fn read_status_socket() -> Option { - use std::io::Read as _; - use std::os::unix::net::UnixStream; - - let mut stream = UnixStream::connect(status_sock_path()).ok()?; - stream.set_read_timeout(Some(Duration::from_secs(2))).ok(); - let mut buf = String::new(); - stream.read_to_string(&mut buf).ok()?; - serde_json::from_str(&buf).ok() + let json = jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; + serde_json::from_str(&json).ok() } -fn status_sock_path() -> PathBuf { - crate::config::get().data_dir.join("daemon.sock") -} - -/// Listen on a Unix domain socket for status requests. -/// Any connection gets the live status JSON written and closed. -/// Also handles SIGINT/SIGTERM for clean shutdown. -fn status_socket_loop( - choir: &Arc, - last_daily: &Arc>>, - graph_health: &Arc>>, - llm: &Arc, -) { - use std::io::{Read as _, Write as _}; - use std::os::unix::net::UnixListener; - use std::sync::atomic::{AtomicBool, Ordering}; - - static STOP: AtomicBool = AtomicBool::new(false); - - unsafe { - libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t); - libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); - } - - let sock_path = status_sock_path(); - let _ = fs::remove_file(&sock_path); // clean up stale socket - - let listener = match UnixListener::bind(&sock_path) { - Ok(l) => l, - Err(e) => { - eprintln!("Warning: couldn't bind status socket {}: {}", sock_path.display(), e); - // Fall back to just waiting for signals - while !STOP.load(Ordering::Acquire) { - std::thread::sleep(Duration::from_millis(500)); - } - return; - } - }; - - // Non-blocking so we can check STOP flag - listener.set_nonblocking(true).ok(); - - while !STOP.load(Ordering::Acquire) { - match listener.accept() { - Ok((mut stream, _)) => { - // Read command from client (with short timeout) - stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); - let mut cmd = String::new(); - let _ = stream.read_to_string(&mut cmd); - let cmd = cmd.trim().to_string(); - - match cmd.as_str() { - "consolidate" => { - *last_daily.lock().unwrap() = None; - let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n"); - log_event("rpc", "consolidate", "triggered via socket"); - } - cmd if cmd.starts_with("record-hits ") => { - let keys: Vec<&str> = cmd.strip_prefix("record-hits ") - .unwrap_or("") - .split('\t') - .filter(|k| !k.is_empty()) - .collect(); - if keys.is_empty() { - let _ = stream.write_all(b"{\"ok\":false,\"error\":\"no keys\"}\n"); - } else { - let n = keys.len(); - match crate::counters::record_search_hits(&keys) { - Ok(()) => { - let msg = format!("{{\"ok\":true,\"recorded\":{}}}\n", n); - let _ = stream.write_all(msg.as_bytes()); - } - Err(e) => { - let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n", - e.replace('"', "'")); - let _ = stream.write_all(msg.as_bytes()); - } - } - } - } - cmd if cmd.starts_with("run-agent ") => { - let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); - let agent_type = parts.get(1).unwrap_or(&"replay"); - let count: usize = parts.get(2) - .and_then(|s| s.parse().ok()) - .unwrap_or(1); - let batch_size = 5; - - let today = chrono::Local::now().format("%Y-%m-%d"); - let ts = chrono::Local::now().format("%H%M%S"); - let mut prev = None; - let mut spawned = 0; - let mut remaining = count; - - let is_rename = *agent_type == "rename"; - let is_split = *agent_type == "split"; - - if is_split { - // Split: load candidates upfront, spawn independent - // parallel tasks — one per node, no dependencies. - let store = crate::store::Store::load().ok(); - let candidates = store.as_ref() - .map(|s| super::prompts::split_candidates(s)) - .unwrap_or_default(); - let to_split: Vec = candidates.into_iter() - .take(count) - .collect(); - for key in &to_split { - let key = key.clone(); - let task_name = format!("c-split-{}:{}", key, today); - choir.spawn(task_name) - .resource(llm) - .retries(1) - .init(move |ctx| { - job_split_one(ctx, key.clone()) - }) - .run(); - spawned += 1; - } - remaining = 0; - } - - while remaining > 0 { - let batch = remaining.min(batch_size); - let agent = agent_type.to_string(); - let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); - let mut builder = choir.spawn(task_name) - .resource(llm) - .retries(1) - .init(move |ctx| { - if is_rename { - job_rename_agent(ctx, batch) - } else { - job_consolidation_agent(ctx, &agent, batch) - } - }); - if let Some(ref dep) = prev { - builder.depend_on(dep); - } - prev = Some(builder.run()); - remaining -= batch; - spawned += 1; - } - - let msg = format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n", - count, agent_type, spawned); - let _ = stream.write_all(msg.as_bytes()); - log_event("rpc", "run-agent", - &format!("{} x{}", agent_type, count)); - } - _ => { - // Default: return status - let status = build_status(choir, *last_daily.lock().unwrap(), graph_health); - if let Ok(json) = serde_json::to_string_pretty(&status) { - let _ = stream.write_all(json.as_bytes()); - } - } - } - // Connection closes when stream is dropped - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - std::thread::sleep(Duration::from_millis(100)); - } - Err(_) => { - std::thread::sleep(Duration::from_millis(100)); - } - } - } - - extern "C" fn handle_signal(_: libc::c_int) { - STOP.store(true, std::sync::atomic::Ordering::Release); - } -} +// status_socket_loop has been replaced by daemon.run() in jobkit-daemon. fn build_status( choir: &Choir, diff --git a/poc-memory/src/tui.rs b/poc-memory/src/tui.rs index c5c0377..d3de45f 100644 --- a/poc-memory/src/tui.rs +++ b/poc-memory/src/tui.rs @@ -22,8 +22,6 @@ use ratatui::{ DefaultTerminal, Frame, }; use std::fs; -use std::io::Read as _; -use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::time::{Duration, Instant}; @@ -35,10 +33,6 @@ const AGENT_TYPES: &[&str] = &[ "apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", "split", ]; -fn status_sock_path() -> PathBuf { - crate::config::get().data_dir.join("daemon.sock") -} - fn log_path() -> PathBuf { crate::config::get().data_dir.join("daemon.log") } @@ -58,11 +52,8 @@ struct DaemonStatus { } fn fetch_status() -> Option { - let mut stream = UnixStream::connect(status_sock_path()).ok()?; - stream.set_read_timeout(Some(Duration::from_secs(2))).ok(); - let mut buf = String::new(); - stream.read_to_string(&mut buf).ok()?; - serde_json::from_str(&buf).ok() + let json = jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; + serde_json::from_str(&json).ok() } #[derive(Clone)] @@ -794,14 +785,7 @@ fn short_name(name: &str) -> String { } fn send_rpc(cmd: &str) -> Option { - let mut stream = UnixStream::connect(status_sock_path()).ok()?; - stream.set_write_timeout(Some(Duration::from_secs(2))).ok(); - stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); - std::io::Write::write_all(&mut stream, cmd.as_bytes()).ok()?; - stream.shutdown(std::net::Shutdown::Write).ok()?; - let mut buf = String::new(); - stream.read_to_string(&mut buf).ok()?; - Some(buf) + jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) } // --- Entry point ---