consciousness/src/thalamus/supervisor.rs
Kent Overstreet 755a359078 supervisor: PID file to prevent duplicate daemon spawns
Multiple supervisor instances (Mind init + channel polling) could
both see no socket and start the same daemon. The socket hasn't
bound yet by the time the second check runs.

Write a PID file on spawn, check it in is_alive(). kill(pid, 0)
verifies the process is still running. Stale PID files are cleaned
up automatically.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-04-05 13:30:56 -04:00

232 lines
7.3 KiB
Rust

// supervisor.rs — Channel daemon lifecycle management
//
// Reads ~/.consciousness/channels/channels.json5, starts/stops
// channel daemons as needed. The socket file is the liveness
// indicator — if it exists and responds, the daemon is running.
//
// File locking prevents multiple consciousness/claude-code instances
// from racing to start the same daemon.
use std::collections::BTreeMap;
use std::fs::File;
use std::path::PathBuf;
use std::process::Child;
use log::{info, warn, error};
fn channels_dir() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels")
}
fn config_path() -> PathBuf {
channels_dir().join("channels.json5")
}
fn lock_path() -> PathBuf {
channels_dir().join(".supervisor.lock")
}
#[derive(serde::Deserialize, serde::Serialize, Clone)]
pub struct ChannelEntry {
/// Binary name (looked up in PATH)
pub binary: String,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_true")]
pub autostart: bool,
}
fn default_true() -> bool { true }
/// RAII file lock — prevents multiple instances from racing.
struct SupervisorLock {
_file: File,
}
impl SupervisorLock {
fn acquire() -> Option<Self> {
let _ = std::fs::create_dir_all(channels_dir());
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(lock_path())
.ok()?;
use std::os::unix::io::AsRawFd;
let ret = unsafe {
libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB)
};
if ret != 0 {
return None;
}
Some(Self { _file: file })
}
}
/// Manages channel daemon processes.
pub struct Supervisor {
config: BTreeMap<String, ChannelEntry>,
children: BTreeMap<String, Child>,
}
impl Supervisor {
pub fn new() -> Self {
Self {
config: BTreeMap::new(),
children: BTreeMap::new(),
}
}
/// Load config from channels.json5.
pub fn load_config(&mut self) {
let path = config_path();
match std::fs::read_to_string(&path) {
Ok(text) => {
match serde_json::from_str::<BTreeMap<String, ChannelEntry>>(&text) {
Ok(cfg) => {
info!("loaded {} channel configs", cfg.len());
self.config = cfg;
}
Err(e) => warn!("failed to parse {}: {}", path.display(), e),
}
}
Err(_) => info!("no channels.json5, no channels configured"),
}
}
/// Check if a daemon is in the config.
pub fn has_daemon(&self, name: &str) -> bool {
self.config.contains_key(name)
}
/// Add a daemon to the config and persist to channels.json5.
pub fn add_daemon(&mut self, name: &str, entry: ChannelEntry) {
self.config.insert(name.to_string(), entry);
let path = config_path();
if let Ok(json) = serde_json::to_string_pretty(&self.config) {
if let Err(e) = std::fs::write(&path, &json) {
error!("failed to write {}: {}", path.display(), e);
}
}
}
/// Check if a daemon is alive by testing its socket or PID file.
fn is_alive(name: &str) -> bool {
// Check socket first
let sock = channels_dir().join(format!("{}.sock", name));
if sock.exists() {
match std::os::unix::net::UnixStream::connect(&sock) {
Ok(_) => return true,
Err(_) => { let _ = std::fs::remove_file(&sock); }
}
}
// Check PID file — daemon may still be starting up
let pid_file = channels_dir().join(format!("{}.pid", name));
if let Ok(pid_str) = std::fs::read_to_string(&pid_file) {
if let Ok(pid) = pid_str.trim().parse::<i32>() {
// kill(pid, 0) checks if process exists
if unsafe { libc::kill(pid, 0) } == 0 {
return true;
}
// Process dead, clean up stale PID file
let _ = std::fs::remove_file(&pid_file);
}
}
false
}
/// Ensure all configured autostart daemons are running.
/// Acquires file lock to prevent races with other instances.
pub fn ensure_running(&mut self) {
let _lock = match SupervisorLock::acquire() {
Some(l) => l,
None => {
info!("another instance is managing channels");
return;
}
};
let entries: Vec<(String, ChannelEntry)> = self.config.iter()
.filter(|(_, e)| e.enabled && e.autostart)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (name, entry) in entries {
if Self::is_alive(&name) {
continue;
}
// Check if we spawned it and it died unexpectedly
if let Some(child) = self.children.get_mut(&name) {
match child.try_wait() {
Ok(Some(status)) => {
warn!("channel {} exited unexpectedly ({}), restarting", name, status);
self.children.remove(&name);
}
Ok(None) => continue, // still starting up
Err(_) => { self.children.remove(&name); }
}
}
self.start_one(&name, &entry);
}
}
fn start_one(&mut self, name: &str, entry: &ChannelEntry) {
info!("starting channel daemon: {} ({})", name, entry.binary);
match std::process::Command::new(&entry.binary)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => {
info!("channel {} started (pid {})", name, child.id());
let pid_file = channels_dir().join(format!("{}.pid", name));
let _ = std::fs::write(&pid_file, child.id().to_string());
self.children.insert(name.to_string(), child);
}
Err(e) => error!("failed to start channel {}: {}", name, e),
}
}
/// Stop a specific daemon.
pub fn stop_one(&mut self, name: &str) {
let sock = channels_dir().join(format!("{}.sock", name));
let _ = std::fs::remove_file(&sock);
if let Some(mut child) = self.children.remove(name) {
info!("stopping channel {} (pid {})", name, child.id());
let _ = child.kill();
let _ = child.wait();
}
}
/// Stop all managed daemons.
pub fn stop_all(&mut self) {
let names: Vec<String> = self.children.keys().cloned().collect();
for name in names {
self.stop_one(&name);
}
}
/// List configured channels and their status.
pub fn status(&self) -> Vec<(String, bool, bool)> {
self.config.iter()
.map(|(name, entry)| (name.clone(), entry.enabled, Self::is_alive(name)))
.collect()
}
}
impl Drop for Supervisor {
fn drop(&mut self) {
// Don't kill daemons on drop — they should outlive us
for (name, child) in &mut self.children {
match child.try_wait() {
Ok(Some(_)) => {}
_ => info!("leaving channel {} running (pid {})", name, child.id()),
}
}
}
}