daemon: Unix socket for live status, simplify status display

The daemon now listens on daemon.sock — clients connect and get the
live status JSON immediately. `poc-memory daemon status` uses the
socket, so elapsed times and progress are always current. Falls back
to "Daemon not running" if socket connect fails.

Also: consolidate_full_with_progress() callback for per-step reporting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-05 22:19:58 -05:00
parent b6c70c7734
commit 0c15002797

View file

@ -265,12 +265,7 @@ fn proc_uptime(pid: u32) -> Option<String> {
// --- Status writing --- // --- Status writing ---
fn write_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) { fn write_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) {
let statuses = choir.task_statuses(); let status = build_status(choir, last_daily);
let status = DaemonStatus {
pid: std::process::id(),
tasks: statuses,
last_daily: last_daily.map(|d| d.to_string()),
};
if let Ok(json) = serde_json::to_string_pretty(&status) { if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = fs::write(status_path(), json); let _ = fs::write(status_path(), json);
} }
@ -484,9 +479,10 @@ pub fn run_daemon() -> Result<(), String> {
} }
}); });
// Main thread: wait for Ctrl-C // Main thread: listen on status socket + wait for signals
let choir_main = Arc::clone(&choir); let choir_main = Arc::clone(&choir);
ctrlc_wait(); let last_daily_main = Arc::clone(&last_daily);
status_socket_loop(&choir_main, &last_daily_main);
log_event("daemon", "stopping", ""); log_event("daemon", "stopping", "");
eprintln!("Shutting down..."); eprintln!("Shutting down...");
@ -497,14 +493,37 @@ pub fn run_daemon() -> Result<(), String> {
log_event(&info.name, "cancelling", ""); log_event(&info.name, "cancelling", "");
} }
} }
// Workers will shut down when their handles are dropped
// Clean up socket
let _ = fs::remove_file(status_sock_path());
log_event("daemon", "stopped", ""); log_event("daemon", "stopped", "");
Ok(()) Ok(())
} }
fn ctrlc_wait() { fn read_status_socket() -> Option<DaemonStatus> {
use std::io::Read as _;
use std::os::unix::net::UnixStream;
let mut stream = UnixStream::connect(status_sock_path()).ok()?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
let mut buf = String::new();
stream.read_to_string(&mut buf).ok()?;
serde_json::from_str(&buf).ok()
}
fn status_sock_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.sock")
}
/// Listen on a Unix domain socket for status requests.
/// Any connection gets the live status JSON written and closed.
/// Also handles SIGINT/SIGTERM for clean shutdown.
fn status_socket_loop(choir: &Choir, last_daily: &Arc<Mutex<Option<chrono::NaiveDate>>>) {
use std::io::Write as _;
use std::os::unix::net::UnixListener;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
static STOP: AtomicBool = AtomicBool::new(false); static STOP: AtomicBool = AtomicBool::new(false);
unsafe { unsafe {
@ -512,12 +531,52 @@ fn ctrlc_wait() {
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t); libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
} }
let sock_path = status_sock_path();
let _ = fs::remove_file(&sock_path); // clean up stale socket
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
eprintln!("Warning: couldn't bind status socket {}: {}", sock_path.display(), e);
// Fall back to just waiting for signals
while !STOP.load(Ordering::Acquire) { while !STOP.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(500)); std::thread::sleep(Duration::from_millis(500));
} }
return;
}
};
// Non-blocking so we can check STOP flag
listener.set_nonblocking(true).ok();
while !STOP.load(Ordering::Acquire) {
match listener.accept() {
Ok((mut stream, _)) => {
let status = build_status(choir, *last_daily.lock().unwrap());
if let Ok(json) = serde_json::to_string_pretty(&status) {
let _ = stream.write_all(json.as_bytes());
}
// Connection closes when stream is dropped
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(100));
}
Err(_) => {
std::thread::sleep(Duration::from_millis(100));
}
}
}
extern "C" fn handle_signal(_: libc::c_int) { extern "C" fn handle_signal(_: libc::c_int) {
STOP.store(true, Ordering::Release); STOP.store(true, std::sync::atomic::Ordering::Release);
}
}
fn build_status(choir: &Choir, last_daily: Option<chrono::NaiveDate>) -> DaemonStatus {
DaemonStatus {
pid: std::process::id(),
tasks: choir.task_statuses(),
last_daily: last_daily.map(|d| d.to_string()),
} }
} }
@ -574,39 +633,19 @@ fn status_symbol(t: &TaskInfo) -> &'static str {
} }
pub fn show_status() -> Result<(), String> { pub fn show_status() -> Result<(), String> {
let path = status_path(); let status = match read_status_socket() {
if !path.exists() { Some(s) => s,
eprintln!("No daemon status file found. Is the daemon running?"); None => {
eprintln!("Daemon not running.");
return Ok(()); return Ok(());
} }
let content = fs::read_to_string(&path)
.map_err(|e| format!("read status: {}", e))?;
let status: DaemonStatus = serde_json::from_str(&content)
.map_err(|e| format!("parse status: {}", e))?;
let alive = Path::new(&format!("/proc/{}", status.pid)).exists();
let state = if alive { "running" } else { "NOT RUNNING" };
// Show uptime from /proc/<pid>/stat start time
let uptime_str = if alive {
proc_uptime(status.pid).unwrap_or_default()
} else {
String::new()
}; };
let uptime_str = proc_uptime(status.pid).unwrap_or_default();
if uptime_str.is_empty() { if uptime_str.is_empty() {
eprintln!("poc-memory daemon pid={} {}", status.pid, state); eprintln!("poc-memory daemon pid={}", status.pid);
} else { } else {
eprintln!("poc-memory daemon pid={} {} uptime {}", status.pid, state, uptime_str); eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str);
}
// Status file age
if let Ok(meta) = fs::metadata(&path) {
if let Ok(modified) = meta.modified() {
let age = std::time::SystemTime::now().duration_since(modified).unwrap_or_default();
eprintln!(" status updated {}s ago", age.as_secs());
}
} }
if status.tasks.is_empty() { if status.tasks.is_empty() {