Create jobkit-daemon crate with generic daemon infrastructure: - event_log: JSONL append with size-based rotation - socket: Unix domain socket RPC client and server with signal handling - status: JSON status file read/write Migrate daemon.rs to use the library: - Worker pool setup via Daemon::new() - Socket loop + signal handling via Daemon::run() - RPC handlers as registered closures - Logging, status writing, send_rpc all delegate to library Migrate tui.rs to use socket::send_rpc() instead of inline UnixStream. daemon.rs: 1952 → 1806 lines (-146), old status_socket_loop removed. tui.rs: socket boilerplate removed. Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
99 lines
3.2 KiB
Rust
99 lines
3.2 KiB
Rust
// Unix domain socket RPC server with signal handling
|
|
//
|
|
// Non-blocking accept loop, checks STOP flag between accepts.
|
|
// Dispatches commands through registered handlers; falls back
|
|
// to returning status JSON if no handler matches.
|
|
|
|
use super::{DaemonContext, RpcHandler};
|
|
use std::io::{Read, Write};
|
|
use std::os::unix::net::UnixListener;
|
|
use std::path::Path;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::time::Duration;
|
|
|
|
static STOP: AtomicBool = AtomicBool::new(false);
|
|
|
|
extern "C" fn handle_signal(_: libc::c_int) {
|
|
STOP.store(true, Ordering::Release);
|
|
}
|
|
|
|
pub fn run_loop<S, F>(
|
|
data_dir: &Path,
|
|
ctx: &DaemonContext,
|
|
handlers: &[RpcHandler],
|
|
status_builder: &F,
|
|
) where
|
|
S: serde::Serialize,
|
|
F: Fn(&DaemonContext) -> S,
|
|
{
|
|
unsafe {
|
|
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
|
|
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
|
|
}
|
|
|
|
let sock_path = data_dir.join("daemon.sock");
|
|
let _ = std::fs::remove_file(&sock_path);
|
|
|
|
let listener = match UnixListener::bind(&sock_path) {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
eprintln!("Warning: couldn't bind socket {}: {}", sock_path.display(), e);
|
|
while !STOP.load(Ordering::Acquire) {
|
|
std::thread::sleep(Duration::from_millis(500));
|
|
}
|
|
return;
|
|
}
|
|
};
|
|
|
|
listener.set_nonblocking(true).ok();
|
|
|
|
while !STOP.load(Ordering::Acquire) {
|
|
match listener.accept() {
|
|
Ok((mut stream, _)) => {
|
|
stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
|
|
let mut cmd = String::new();
|
|
let _ = stream.read_to_string(&mut cmd);
|
|
let cmd = cmd.trim().to_string();
|
|
|
|
// Try registered handlers first
|
|
let mut handled = false;
|
|
for handler in handlers {
|
|
if let Some(response) = handler(&cmd, ctx) {
|
|
let _ = stream.write_all(response.as_bytes());
|
|
handled = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Default: return status JSON
|
|
if !handled {
|
|
let status = status_builder(ctx);
|
|
if let Ok(json) = serde_json::to_string_pretty(&status) {
|
|
let _ = stream.write_all(json.as_bytes());
|
|
}
|
|
}
|
|
}
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
}
|
|
Err(_) => {
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
}
|
|
}
|
|
}
|
|
|
|
let _ = std::fs::remove_file(&sock_path);
|
|
}
|
|
|
|
/// Send an RPC command to a running daemon. Returns the response.
|
|
pub fn send_rpc(data_dir: &Path, cmd: &str) -> Option<String> {
|
|
use std::os::unix::net::UnixStream;
|
|
|
|
let sock_path = data_dir.join("daemon.sock");
|
|
let mut stream = UnixStream::connect(&sock_path).ok()?;
|
|
stream.write_all(cmd.as_bytes()).ok()?;
|
|
stream.shutdown(std::net::Shutdown::Write).ok()?;
|
|
let mut buf = String::new();
|
|
stream.read_to_string(&mut buf).ok()?;
|
|
Some(buf)
|
|
}
|