consciousness/jobkit-daemon/src/socket.rs
ProofOfConcept 420a777eba extract jobkit-daemon library from poc-memory daemon
Create jobkit-daemon crate with generic daemon infrastructure:
- event_log: JSONL append with size-based rotation
- socket: Unix domain socket RPC client and server with signal handling
- status: JSON status file read/write

Migrate daemon.rs to use the library:
- Worker pool setup via Daemon::new()
- Socket loop + signal handling via Daemon::run()
- RPC handlers as registered closures
- Logging, status writing, send_rpc all delegate to library

Migrate tui.rs to use socket::send_rpc() instead of inline UnixStream.

daemon.rs: 1952 → 1806 lines (-146), old status_socket_loop removed.
tui.rs: socket boilerplate removed.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-03-14 02:40:30 -04:00

99 lines
3.2 KiB
Rust

// Unix domain socket RPC server with signal handling
//
// Non-blocking accept loop, checks STOP flag between accepts.
// Dispatches commands through registered handlers; falls back
// to returning status JSON if no handler matches.
use super::{DaemonContext, RpcHandler};
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
static STOP: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_signal(_: libc::c_int) {
STOP.store(true, Ordering::Release);
}
pub fn run_loop<S, F>(
data_dir: &Path,
ctx: &DaemonContext,
handlers: &[RpcHandler],
status_builder: &F,
) where
S: serde::Serialize,
F: Fn(&DaemonContext) -> S,
{
unsafe {
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
}
let sock_path = data_dir.join("daemon.sock");
let _ = std::fs::remove_file(&sock_path);
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
eprintln!("Warning: couldn't bind socket {}: {}", sock_path.display(), e);
while !STOP.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(500));
}
return;
}
};
listener.set_nonblocking(true).ok();
while !STOP.load(Ordering::Acquire) {
match listener.accept() {
Ok((mut stream, _)) => {
stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
let mut cmd = String::new();
let _ = stream.read_to_string(&mut cmd);
let cmd = cmd.trim().to_string();
// Try registered handlers first
let mut handled = false;
for handler in handlers {
if let Some(response) = handler(&cmd, ctx) {
let _ = stream.write_all(response.as_bytes());
handled = true;
break;
}
}
// Default: return status JSON
if !handled {
let status = status_builder(ctx);
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = stream.write_all(json.as_bytes());
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(100));
}
Err(_) => {
std::thread::sleep(Duration::from_millis(100));
}
}
}
let _ = std::fs::remove_file(&sock_path);
}
/// Send an RPC command to a running daemon. Returns the response.
pub fn send_rpc(data_dir: &Path, cmd: &str) -> Option<String> {
use std::os::unix::net::UnixStream;
let sock_path = data_dir.join("daemon.sock");
let mut stream = UnixStream::connect(&sock_path).ok()?;
stream.write_all(cmd.as_bytes()).ok()?;
stream.shutdown(std::net::Shutdown::Write).ok()?;
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
Some(buf)
}