From f4def8d03b480e14a529a67521d7c34220d3b78d Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Tue, 7 Apr 2026 13:27:59 -0400 Subject: [PATCH] Fix: reap stale agent pid files in poc-hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scan_pid_files was removed as dead code but it was actually needed by the hook path — the bug was that it was never wired in. Add reap_agent_pids() directly to poc-hook.rs and call it on every UserPromptSubmit. Kills timed-out agents (10min) and cleans up pid files for dead processes. Also remove dead subconscious/subconscious.rs (420 lines) — was forked to claude/agent_cycles.rs and never removed. Co-Authored-By: Proof of Concept --- src/claude/poc-hook.rs | 41 +++ src/subconscious/mod.rs | 1 - src/subconscious/subconscious.rs | 420 ------------------------------- 3 files changed, 41 insertions(+), 421 deletions(-) delete mode 100644 src/subconscious/subconscious.rs diff --git a/src/claude/poc-hook.rs b/src/claude/poc-hook.rs index 19b314c..415f575 100644 --- a/src/claude/poc-hook.rs +++ b/src/claude/poc-hook.rs @@ -83,6 +83,46 @@ fn check_notifications() { } } +/// Check for stale agent processes in a state dir. +/// Cleans up pid files for dead processes and kills timed-out ones. +fn reap_agent_pids(state_dir: &std::path::Path, timeout_secs: u64) { + let Ok(entries) = fs::read_dir(state_dir) else { return }; + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + let Some(pid_str) = name_str.strip_prefix("pid-") else { continue }; + let Ok(pid) = pid_str.parse::() else { continue }; + + if unsafe { libc::kill(pid, 0) } != 0 { + fs::remove_file(entry.path()).ok(); + continue; + } + + if timeout_secs > 0 { + if let Ok(meta) = entry.metadata() { + if let Ok(modified) = meta.modified() { + if modified.elapsed().unwrap_or_default().as_secs() > timeout_secs { + unsafe { libc::kill(pid, libc::SIGTERM); } + fs::remove_file(entry.path()).ok(); + } + } + } + } + } +} + +/// Reap all agent output directories. +fn reap_all_agents() { + let agent_output = poc_memory::store::memory_dir().join("agent-output"); + if let Ok(entries) = fs::read_dir(&agent_output) { + for entry in entries.flatten() { + if entry.file_type().map_or(false, |t| t.is_dir()) { + reap_agent_pids(&entry.path(), 600); // 10 min timeout + } + } + } +} + /// Check if enough new conversation has accumulated to trigger an observation run. fn maybe_trigger_observation(transcript: &PathBuf) { let cursor_file = poc_memory::store::memory_dir().join("observation-cursor"); @@ -189,6 +229,7 @@ fn main() { "UserPromptSubmit" => { signal_user(); check_notifications(); + reap_all_agents(); print!("{}", poc_memory::memory_search::run_hook(&input)); if let Some(ref t) = transcript { diff --git a/src/subconscious/mod.rs b/src/subconscious/mod.rs index a1c1fb2..a48620d 100644 --- a/src/subconscious/mod.rs +++ b/src/subconscious/mod.rs @@ -24,4 +24,3 @@ pub mod defs; pub mod digest; pub mod learn; pub mod prompts; -pub mod subconscious; diff --git a/src/subconscious/subconscious.rs b/src/subconscious/subconscious.rs deleted file mode 100644 index 5088806..0000000 --- a/src/subconscious/subconscious.rs +++ /dev/null @@ -1,420 +0,0 @@ -// agents.rs — Agent orchestration: lifecycle, state, cycle management -// -// Core agent cycle state — spawns and tracks surface-observe, journal, -// reflect agents. Used by both poc-agent (in-memory) and the Claude -// Code hook path (serialized to disk). - -use std::fs; -use std::fs::File; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::time::{Duration, Instant, SystemTime}; - -pub use crate::session::HookSession; - -/// Output from a single agent orchestration cycle. -#[derive(Default)] -pub struct AgentCycleOutput { - /// Memory node keys surfaced by surface-observe. - pub surfaced_keys: Vec, - /// Freeform reflection text from the reflect agent. - pub reflection: Option, - /// How long we slept waiting for observe to catch up, if at all. - pub sleep_secs: Option, -} - -/// Per-agent runtime state. -pub struct AgentInfo { - pub name: &'static str, - pub pid: Option, - pub phase: Option, - pub log_path: Option, - child: Option, -} - -/// Snapshot of agent state — serializable, sendable to TUI. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct AgentSnapshot { - pub name: String, - pub pid: Option, - pub phase: Option, - pub log_path: Option, -} - -impl AgentInfo { - fn snapshot(&self) -> AgentSnapshot { - AgentSnapshot { - name: self.name.to_string(), - pid: self.pid, - phase: self.phase.clone(), - log_path: self.log_path.clone(), - } - } -} - -/// Serializable state for persisting across Claude Code hook invocations. -#[derive(serde::Serialize, serde::Deserialize)] -pub struct SavedAgentState { - pub agents: Vec, -} - -impl SavedAgentState { - fn state_path(session_id: &str) -> PathBuf { - let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/sessions"); - fs::create_dir_all(&dir).ok(); - dir.join(format!("agent-state-{}.json", session_id)) - } - - pub fn load(session_id: &str) -> Self { - let path = Self::state_path(session_id); - let mut state: Self = fs::read_to_string(&path).ok() - .and_then(|s| serde_json::from_str(&s).ok()) - .unwrap_or(SavedAgentState { agents: Vec::new() }); - - for agent in &mut state.agents { - if let Some(pid) = agent.pid { - unsafe { - if libc::kill(pid as i32, 0) != 0 { - agent.pid = None; - agent.phase = None; - } - } - } - } - state - } - - pub fn save(&self, session_id: &str) { - let path = Self::state_path(session_id); - if let Ok(json) = serde_json::to_string(self) { - fs::write(path, json).ok(); - } - } -} - -/// Persistent state for the agent orchestration cycle. -/// Created once, `trigger()` called on each user message. -/// TUI reads snapshots for display. -/// -/// TODO: surface-observe, journal, reflect agents currently spawn child -/// processes (legacy from the Claude Code hook path). They should be -/// converted to async tasks using the ApiClient, like memory scoring. -pub struct AgentCycleState { - output_dir: PathBuf, - log_file: Option, - pub agents: Vec, - pub last_output: AgentCycleOutput, -} - -const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; - -impl AgentCycleState { - pub fn new(session_id: &str) -> Self { - let output_dir = crate::store::memory_dir().join("agent-output"); - let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); - fs::create_dir_all(&log_dir).ok(); - let log_path = log_dir.join(format!("hook-{}", session_id)); - let log_file = fs::OpenOptions::new() - .create(true).append(true).open(log_path).ok(); - - let agents = AGENT_CYCLE_NAMES.iter() - .map(|&name| AgentInfo { name, pid: None, phase: None, log_path: None, child: None }) - .collect(); - - AgentCycleState { - output_dir, - log_file, - agents, - last_output: AgentCycleOutput { - surfaced_keys: vec![], - reflection: None, - sleep_secs: None, - }, - } - } - - fn log(&mut self, msg: std::fmt::Arguments) { - if let Some(ref mut f) = self.log_file { - let _ = write!(f, "{}", msg); - } - } - - fn agent_running(&self, name: &str) -> bool { - self.agents.iter().any(|a| a.name == name && a.pid.is_some()) - } - - fn agent_spawned(&mut self, name: &str, phase: &str, - result: crate::agent::oneshot::SpawnResult) { - if let Some(agent) = self.agents.iter_mut().find(|a| a.name == name) { - agent.pid = Some(result.child.id()); - agent.phase = Some(phase.to_string()); - agent.log_path = Some(result.log_path); - agent.child = Some(result.child); - } - } - - /// Check if any agents have completed. Reap child handles, or - /// check pid liveness for restored-from-disk agents. - fn poll_children(&mut self) { - for agent in &mut self.agents { - if let Some(ref mut child) = agent.child { - if let Ok(Some(_)) = child.try_wait() { - agent.pid = None; - agent.phase = None; - agent.child = None; - } - } else if let Some(pid) = agent.pid { - unsafe { - if libc::kill(pid as i32, 0) != 0 { - agent.pid = None; - agent.phase = None; - } - } - } - } - } - - pub fn snapshots(&self, scoring_in_flight: bool, scored_count: usize) -> Vec { - let mut snaps: Vec = self.agents.iter().map(|a| a.snapshot()).collect(); - snaps.push(AgentSnapshot { - name: "memory-scoring".to_string(), - pid: None, - phase: if scoring_in_flight { - Some("scoring...".into()) - } else if scored_count == 0 { - None - } else { - Some(format!("{} scored", scored_count)) - }, - log_path: None, - }); - snaps - } - - /// Restore agent state from a saved snapshot (for Claude Code hook path). - pub fn restore(&mut self, saved: &SavedAgentState) { - for sa in &saved.agents { - if let Some(agent) = self.agents.iter_mut().find(|a| a.name == sa.name) { - agent.pid = sa.pid; - agent.phase = sa.phase.clone(); - agent.log_path = sa.log_path.clone(); - } - } - } - - /// Save current state for the Claude Code hook path. - pub fn save(&self, session_id: &str) { - let state = SavedAgentState { agents: self.snapshots(false, 0) }; - state.save(session_id); - } - - /// Run all agent cycles. Call on each user message. - pub fn trigger(&mut self, session: &HookSession) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - self.log(format_args!("\n=== {} agent_cycles ===\n", ts)); - - self.poll_children(); - cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); - - let (surfaced_keys, sleep_secs) = self.surface_observe_cycle(session); - let reflection = self.reflection_cycle(session); - self.journal_cycle(session); - - self.last_output = AgentCycleOutput { surfaced_keys, reflection, sleep_secs }; - } - - fn agent_dir(&self, name: &str) -> PathBuf { - let dir = self.output_dir.join(name); - fs::create_dir_all(&dir).ok(); - dir - } - - fn surface_observe_cycle(&mut self, session: &HookSession) -> (Vec, Option) { - let state_dir = self.agent_dir("surface-observe"); - let transcript = session.transcript(); - let offset_path = state_dir.join("transcript-offset"); - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - // Read surfaced keys - let mut surfaced_keys = Vec::new(); - let surface_path = state_dir.join("surface"); - if let Ok(content) = fs::read_to_string(&surface_path) { - let mut seen = session.seen(); - let seen_path = session.path("seen"); - for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { - if !seen.insert(key.to_string()) { - self.log(format_args!(" skip (seen): {}\n", key)); - continue; - } - surfaced_keys.push(key.to_string()); - if let Ok(mut f) = fs::OpenOptions::new() - .create(true).append(true).open(&seen_path) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - writeln!(f, "{}\t{}", ts, key).ok(); - } - self.log(format_args!(" surfaced: {}\n", key)); - } - fs::remove_file(&surface_path).ok(); - } - - // Spawn new agent if not already running - let running = self.agent_running("surface-observe"); - if running { - self.log(format_args!("surface-observe already running\n")); - } else { - if transcript.size > 0 { - fs::write(&offset_path, transcript.size.to_string()).ok(); - } - if let Some(result) = crate::agent::oneshot::spawn_agent( - "surface-observe", &state_dir, &session.session_id) { - self.log(format_args!("spawned surface-observe pid {}\n", result.child.id())); - self.agent_spawned("surface-observe", "surface", result); - } - } - - // Wait if agent is significantly behind - let mut sleep_secs = None; - let conversation_budget: u64 = 50_000; - - if running && transcript.size > 0 { - let behind = transcript.size.saturating_sub(last_offset); - - if behind > conversation_budget / 2 { - let sleep_start = Instant::now(); - self.log(format_args!("agent {}KB behind\n", behind / 1024)); - - for _ in 0..5 { - std::thread::sleep(std::time::Duration::from_secs(1)); - self.poll_children(); - if !self.agent_running("surface-observe") { break; } - } - - let secs = (Instant::now() - sleep_start).as_secs_f64(); - self.log(format_args!("slept {secs:.2}s\n")); - sleep_secs = Some(secs); - } - } - - (surfaced_keys, sleep_secs) - } - - fn reflection_cycle(&mut self, session: &HookSession) -> Option { - let state_dir = self.agent_dir("reflect"); - let offset_path = state_dir.join("transcript-offset"); - let transcript = session.transcript(); - - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - const REFLECTION_INTERVAL: u64 = 100_000; - if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { - return None; - } - - if self.agent_running("reflect") { - self.log(format_args!("reflect: already running\n")); - return None; - } - - // Copy walked nodes from surface-observe - let so_state = self.agent_dir("surface-observe"); - if let Ok(walked) = fs::read_to_string(so_state.join("walked")) { - fs::write(state_dir.join("walked"), &walked).ok(); - } - - // Read and consume pending reflection - let reflection = fs::read_to_string(state_dir.join("reflection")).ok() - .filter(|s| !s.trim().is_empty()); - if reflection.is_some() { - fs::remove_file(state_dir.join("reflection")).ok(); - self.log(format_args!("reflect: consumed reflection\n")); - } - - fs::write(&offset_path, transcript.size.to_string()).ok(); - if let Some(result) = crate::agent::oneshot::spawn_agent( - "reflect", &state_dir, &session.session_id) { - self.log(format_args!("reflect: spawned pid {}\n", result.child.id())); - self.agent_spawned("reflect", "step-0", result); - } - - reflection - } - - fn journal_cycle(&mut self, session: &HookSession) { - let state_dir = self.agent_dir("journal"); - let offset_path = state_dir.join("transcript-offset"); - let transcript = session.transcript(); - - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - const JOURNAL_INTERVAL: u64 = 20_000; - if transcript.size.saturating_sub(last_offset) < JOURNAL_INTERVAL { - return; - } - - if self.agent_running("journal") { - self.log(format_args!("journal: already running\n")); - return; - } - - fs::write(&offset_path, transcript.size.to_string()).ok(); - if let Some(result) = crate::agent::oneshot::spawn_agent( - "journal", &state_dir, &session.session_id) { - self.log(format_args!("journal: spawned pid {}\n", result.child.id())); - self.agent_spawned("journal", "step-0", result); - } - } -} - -/// Format agent cycle output for injection into a Claude Code session. -pub fn format_agent_output(output: &AgentCycleOutput) -> String { - let mut out = String::new(); - - if let Some(secs) = output.sleep_secs { - out.push_str(&format!("Slept {secs:.2}s to let observe catch up\n")); - } - - if !output.surfaced_keys.is_empty() { - if let Ok(store) = crate::store::Store::load() { - for key in &output.surfaced_keys { - if let Some(rendered) = crate::cli::node::render_node(&store, key) { - if !rendered.trim().is_empty() { - use std::fmt::Write as _; - writeln!(out, "--- {} (surfaced) ---", key).ok(); - write!(out, "{}", rendered).ok(); - } - } - } - } - } - - if let Some(ref reflection) = output.reflection { - use std::fmt::Write as _; - writeln!(out, "--- subconscious reflection ---").ok(); - write!(out, "{}", reflection.trim()).ok(); - } - - out -} - -fn cleanup_stale_files(dir: &Path, max_age: Duration) { - let entries = match fs::read_dir(dir) { - Ok(e) => e, - Err(_) => return, - }; - let cutoff = SystemTime::now() - max_age; - for entry in entries.flatten() { - if let Ok(meta) = entry.metadata() { - if let Ok(modified) = meta.modified() { - if modified < cutoff { - fs::remove_file(entry.path()).ok(); - } - } - } - } -}