cleanup: kill dead code, fix signal handler safety

- Remove unused now_secs(), parse_json_response, any_alive, Regex import
- Signal handler: replace Mutex with AtomicPtr<c_char> for signal safety
  (Mutex::lock in a signal handler can deadlock if main thread holds it)
- PidGuard Drop reclaims the leaked CString; signal handler just unlinks
- scan_pid_files moved to knowledge.rs as pub helper
- setup_agent_state calls scan_pid_files to clean stale pids on startup

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-03-26 15:58:59 -04:00
parent 52703b4637
commit 5d803441c9
5 changed files with 223 additions and 215 deletions

View file

@ -13,6 +13,52 @@ use crate::store::{self, Store};
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicPtr, Ordering};
// Global pid path for signal handler cleanup — stored as a leaked CString
// so the signal handler can unlink it without allocation.
static PID_CPATH: AtomicPtr<libc::c_char> = AtomicPtr::new(std::ptr::null_mut());
/// RAII guard that removes the pid file on drop (normal exit, panic).
struct PidGuard;
impl Drop for PidGuard {
fn drop(&mut self) {
let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst);
if !ptr.is_null() {
unsafe { libc::unlink(ptr); }
// Reclaim the leaked CString
unsafe { drop(std::ffi::CString::from_raw(ptr)); }
}
}
}
/// Register signal handlers to clean up pid file on SIGTERM/SIGINT.
fn register_pid_cleanup(pid_path: &std::path::Path) {
let c_path = std::ffi::CString::new(pid_path.to_string_lossy().as_bytes())
.expect("pid path contains null");
// Leak the CString so the signal handler can access it
let old = PID_CPATH.swap(c_path.into_raw(), Ordering::SeqCst);
if !old.is_null() {
unsafe { drop(std::ffi::CString::from_raw(old)); }
}
unsafe {
libc::signal(libc::SIGTERM, pid_cleanup_handler as libc::sighandler_t);
libc::signal(libc::SIGINT, pid_cleanup_handler as libc::sighandler_t);
}
}
extern "C" fn pid_cleanup_handler(sig: libc::c_int) {
let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst);
if !ptr.is_null() {
unsafe { libc::unlink(ptr); }
// Don't free — we're in a signal handler, just leak it
}
unsafe {
libc::signal(sig, libc::SIG_DFL);
libc::raise(sig);
}
}
// ---------------------------------------------------------------------------
// Agent execution
@ -23,7 +69,7 @@ pub struct AgentResult {
pub output: String,
pub node_keys: Vec<String>,
/// Directory containing output() files from the agent run.
pub output_dir: std::path::PathBuf,
pub state_dir: std::path::PathBuf,
}
/// Run a single agent and return the result (no action application — tools handle that).
@ -78,6 +124,8 @@ pub fn run_one_agent_with_keys(
let def = super::defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?;
log(&format!("targeting: {}", keys.join(", ")));
let graph = store.build_graph();
let mut resolved_steps = Vec::new();
@ -99,7 +147,7 @@ pub fn run_one_agent_with_keys(
store.record_agent_visits(&agent_batch.node_keys, agent_name).ok();
}
run_one_agent_inner(store, agent_name, &def, agent_batch, llm_tag, log)
run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log)
}
pub fn run_one_agent(
@ -124,11 +172,116 @@ pub fn run_one_agent_excluded(
let def = super::defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
// Set up output dir and write pid file BEFORE prompt building
let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?;
log("building prompt");
let effective_count = def.count.unwrap_or(batch_size);
let agent_batch = super::defs::run_agent(store, &def, effective_count, exclude)?;
run_one_agent_inner(store, agent_name, &def, agent_batch, llm_tag, log)
run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log)
}
/// Set up agent state dir, write initial pid file, register cleanup handlers.
/// Returns (state_dir, pid_path, guard). The guard removes the pid file on drop.
fn setup_agent_state(
agent_name: &str,
def: &super::defs::AgentDef,
) -> Result<(PathBuf, PathBuf, PidGuard), String> {
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
fs::create_dir_all(&state_dir)
.map_err(|e| format!("create state dir: {}", e))?;
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
// Clean up stale pid files from dead processes
scan_pid_files(&state_dir, 0);
let pid = std::process::id();
let pid_path = state_dir.join(format!("pid-{}", pid));
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
fs::write(&pid_path, first_phase).ok();
// Register for cleanup on signals and normal exit
register_pid_cleanup(&pid_path);
Ok((state_dir, pid_path, PidGuard))
}
/// Check for live agent processes in a state dir. Returns (phase, pid) pairs.
/// Cleans up stale pid files and kills timed-out processes.
pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(String, u32)> {
let mut live = Vec::new();
let Ok(entries) = fs::read_dir(state_dir) else { return live };
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.starts_with("pid-") { continue; }
let pid: u32 = name_str.strip_prefix("pid-")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if pid == 0 { continue; }
if unsafe { libc::kill(pid as i32, 0) } != 0 {
fs::remove_file(entry.path()).ok();
continue;
}
if timeout_secs > 0 {
if let Ok(meta) = entry.metadata() {
if let Ok(modified) = meta.modified() {
if modified.elapsed().unwrap_or_default().as_secs() > timeout_secs {
unsafe { libc::kill(pid as i32, libc::SIGTERM); }
fs::remove_file(entry.path()).ok();
continue;
}
}
}
}
let phase = fs::read_to_string(entry.path())
.unwrap_or_default()
.trim().to_string();
live.push((phase, pid));
}
live
}
/// Spawn an agent asynchronously. Writes the pid file before returning
/// so the caller immediately sees the agent as running.
pub fn spawn_agent(
agent_name: &str,
state_dir: &std::path::Path,
session_id: &str,
) -> Option<u32> {
let def = super::defs::get_def(agent_name)?;
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
let log_dir = store::memory_dir().join("logs");
fs::create_dir_all(&log_dir).ok();
let agent_log = fs::OpenOptions::new()
.create(true).append(true)
.open(log_dir.join(format!("{}.log", agent_name)))
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
let child = std::process::Command::new("poc-memory")
.args(["agent", "run", agent_name, "--count", "1", "--local",
"--state-dir", &state_dir.to_string_lossy()])
.env("POC_SESSION_ID", session_id)
.stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()))
.stderr(agent_log)
.spawn()
.ok()?;
let pid = child.id();
let pid_path = state_dir.join(format!("pid-{}", pid));
fs::write(&pid_path, first_phase).ok();
Some(pid)
}
fn run_one_agent_inner(
@ -136,6 +289,8 @@ fn run_one_agent_inner(
agent_name: &str,
def: &super::defs::AgentDef,
agent_batch: super::prompts::AgentBatch,
state_dir: std::path::PathBuf,
pid_path: std::path::PathBuf,
_llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<AgentResult, String> {
@ -166,26 +321,14 @@ fn run_one_agent_inner(
));
}
// Output/state directory — use --state-dir if set, otherwise flat per-agent
let output_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(std::path::PathBuf::from)
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
fs::create_dir_all(&output_dir).ok();
// Safe: agent runs single-threaded, env var read only by our dispatch code
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
// Write PID file — content is just the phase name
let pid = std::process::id();
let pid_path = output_dir.join(format!("pid-{}", pid));
let write_pid = |phase: &str| {
fs::write(&pid_path, phase).ok();
};
write_pid(&agent_batch.steps[0].phase);
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
log(&format!("{} step(s) {:?}, {}KB initial, model={}, {}, {} nodes, output={}",
n_steps, phases, first_len / 1024, def.model, tools_desc,
agent_batch.node_keys.len(), output_dir.display()));
agent_batch.node_keys.len(), state_dir.display()));
let prompts: Vec<String> = agent_batch.steps.iter()
.map(|s| s.prompt.clone()).collect();
@ -204,7 +347,7 @@ fn run_one_agent_inner(
let agents_dir = super::defs::agents_dir();
agents_dir.join(name)
});
let output_dir_for_bail = output_dir.clone();
let state_dir_for_bail = state_dir.clone();
let pid_path_for_bail = pid_path.clone();
let bail_fn = move |step_idx: usize| -> Result<(), String> {
// Update phase
@ -215,7 +358,7 @@ fn run_one_agent_inner(
if let Some(ref script) = bail_script {
let status = std::process::Command::new(script)
.arg(&pid_path_for_bail)
.current_dir(&output_dir_for_bail)
.current_dir(&state_dir_for_bail)
.status()
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
if !status.success() {
@ -227,20 +370,12 @@ fn run_one_agent_inner(
Ok(())
};
let output = match llm::call_for_def_multi(def, &prompts, Some(&bail_fn), log) {
Ok(output) => output,
Err(e) => {
fs::remove_file(&pid_path).ok();
return Err(e);
}
};
fs::remove_file(&pid_path).ok();
let output = llm::call_for_def_multi(def, &prompts, Some(&bail_fn), log)?;
Ok(AgentResult {
output,
node_keys: agent_batch.node_keys,
output_dir,
state_dir,
})
}