diff --git a/src/subconscious/hook.rs b/src/subconscious/hook.rs index 8758254..9f1e2ab 100644 --- a/src/subconscious/hook.rs +++ b/src/subconscious/hook.rs @@ -134,26 +134,84 @@ pub struct AgentCycleOutput { pub sleep_secs: Option, } -/// Run all agent cycles: surface-observe, reflect, journal. -/// Returns surfaced memory keys and any reflection text. -/// Caller decides how to render and inject the output. +/// Per-agent runtime state visible to the TUI. +pub struct AgentInfo { + pub name: &'static str, + pub pid: Option, + pub phase: Option, + pub last_log: Option, +} + +/// Persistent state for the agent orchestration cycle. +/// Created once, `trigger()` called on each user message. +/// TUI reads `agents` and `last_output` for display. +pub struct AgentCycleState { + output_dir: std::path::PathBuf, + log_file: Option, + pub agents: Vec, + pub last_output: AgentCycleOutput, +} + +const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; + +impl AgentCycleState { + pub fn new(session_id: &str) -> Self { + let output_dir = crate::store::memory_dir().join("agent-output"); + let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); + fs::create_dir_all(&log_dir).ok(); + let log_path = log_dir.join(format!("hook-{}", session_id)); + let log_file = fs::OpenOptions::new() + .create(true).append(true).open(log_path).ok(); + + let agents = AGENT_CYCLE_NAMES.iter() + .map(|&name| AgentInfo { name, pid: None, phase: None, last_log: None }) + .collect(); + + AgentCycleState { + output_dir, + log_file, + agents, + last_output: AgentCycleOutput { + surfaced_keys: vec![], + reflection: None, + sleep_secs: None, + }, + } + } + + fn log(&mut self, msg: std::fmt::Arguments) { + if let Some(ref mut f) = self.log_file { + let _ = write!(f, "{}", msg); + } + } + + fn update_agent(&mut self, name: &str, pid: Option, phase: Option) { + if let Some(agent) = self.agents.iter_mut().find(|a| a.name == name) { + agent.pid = pid; + agent.phase = phase; + } + } + + /// Run all agent cycles. Call on each user message. + pub fn trigger(&mut self, session: &HookSession) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + self.log(format_args!("\n=== {} agent_cycles ===\n", ts)); + + cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); + + let (surfaced_keys, sleep_secs) = self.surface_observe_cycle(session); + let reflection = self.reflection_cycle(session); + self.journal_cycle(session); + + self.last_output = AgentCycleOutput { surfaced_keys, reflection, sleep_secs }; + } +} + +/// Standalone entry point for the Claude Code hook path. pub fn run_agent_cycles(session: &HookSession) -> AgentCycleOutput { - let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); - fs::create_dir_all(&log_dir).ok(); - let log_path = log_dir.join(format!("hook-{}", session.session_id)); - let Ok(mut log_f) = fs::OpenOptions::new().create(true).append(true).open(log_path) - else { return AgentCycleOutput { surfaced_keys: vec![], reflection: None, sleep_secs: None } }; - - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - let _ = writeln!(log_f, "\n=== {} agent_cycles ===", ts); - - cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); - - let (surfaced_keys, sleep_secs) = surface_observe_cycle(session, &mut log_f); - let reflection = reflection_cycle(session, &mut log_f); - journal_cycle(session, &mut log_f); - - AgentCycleOutput { surfaced_keys, reflection, sleep_secs } + let mut state = AgentCycleState::new(&session.session_id); + state.trigger(session); + state.last_output } /// Format agent cycle output for injection into a Claude Code session. @@ -187,172 +245,170 @@ pub fn format_agent_output(output: &AgentCycleOutput) -> String { out } -/// Surface-observe cycle: read surfaced keys, manage agent lifecycle. -/// Returns (surfaced keys, optional sleep duration). -fn surface_observe_cycle(session: &HookSession, log_f: &mut File) -> (Vec, Option) { - let state_dir = crate::store::memory_dir() - .join("agent-output") - .join("surface-observe"); - fs::create_dir_all(&state_dir).ok(); - - let transcript = session.transcript(); - let offset_path = state_dir.join("transcript-offset"); - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - let timeout = crate::config::get() - .surface_timeout_secs - .unwrap_or(300) as u64; - - let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); - for (phase, pid) in &live { - let _ = writeln!(log_f, "alive pid-{}: phase={}", pid, phase); +impl AgentCycleState { + fn agent_dir(&self, name: &str) -> std::path::PathBuf { + let dir = self.output_dir.join(name); + fs::create_dir_all(&dir).ok(); + dir } - // Read surfaced keys - let mut surfaced_keys = Vec::new(); - let surface_path = state_dir.join("surface"); - if let Ok(content) = fs::read_to_string(&surface_path) { - let mut seen = session.seen(); - let seen_path = session.path("seen"); - for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { - if !seen.insert(key.to_string()) { - let _ = writeln!(log_f, " skip (seen): {}", key); - continue; - } - surfaced_keys.push(key.to_string()); - if let Ok(mut f) = fs::OpenOptions::new() - .create(true).append(true).open(&seen_path) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - writeln!(f, "{}\t{}", ts, key).ok(); - } - let _ = writeln!(log_f, " surfaced: {}", key); + fn surface_observe_cycle(&mut self, session: &HookSession) -> (Vec, Option) { + let state_dir = self.agent_dir("surface-observe"); + let transcript = session.transcript(); + let offset_path = state_dir.join("transcript-offset"); + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + + let timeout = crate::config::get() + .surface_timeout_secs + .unwrap_or(300) as u64; + + let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); + if let Some((phase, pid)) = live.first() { + self.update_agent("surface-observe", Some(*pid), Some(phase.clone())); + self.log(format_args!("alive pid-{}: phase={}\n", pid, phase)); + } else { + self.update_agent("surface-observe", None, None); } - fs::remove_file(&surface_path).ok(); + + // Read surfaced keys + let mut surfaced_keys = Vec::new(); + let surface_path = state_dir.join("surface"); + if let Ok(content) = fs::read_to_string(&surface_path) { + let mut seen = session.seen(); + let seen_path = session.path("seen"); + for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if !seen.insert(key.to_string()) { + self.log(format_args!(" skip (seen): {}\n", key)); + continue; + } + surfaced_keys.push(key.to_string()); + if let Ok(mut f) = fs::OpenOptions::new() + .create(true).append(true).open(&seen_path) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + writeln!(f, "{}\t{}", ts, key).ok(); + } + self.log(format_args!(" surfaced: {}\n", key)); + } + fs::remove_file(&surface_path).ok(); + } + + // Spawn new agent if needed + let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); + let any_in_surface = live.iter().any(|(p, _)| p == "surface"); + + if any_in_surface { + self.log(format_args!("agent in surface phase, waiting\n")); + } else { + if transcript.size > 0 { + fs::write(&offset_path, transcript.size.to_string()).ok(); + } + let pid = crate::agents::knowledge::spawn_agent( + "surface-observe", &state_dir, &session.session_id); + self.update_agent("surface-observe", + pid, Some("surface".into())); + self.log(format_args!("spawned agent {:?}\n", pid)); + } + + // Wait if agent is significantly behind + let mut sleep_secs = None; + let conversation_budget: u64 = 50_000; + + if !live.is_empty() && transcript.size > 0 { + let behind = transcript.size.saturating_sub(last_offset); + + if behind > conversation_budget / 2 { + let sleep_start = Instant::now(); + self.log(format_args!("agent {}KB behind\n", behind / 1024)); + + for _ in 0..5 { + std::thread::sleep(std::time::Duration::from_secs(1)); + let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); + if still_live.is_empty() { break; } + } + + let secs = (Instant::now() - sleep_start).as_secs_f64(); + self.log(format_args!("slept {secs:.2}s\n")); + sleep_secs = Some(secs); + } + } + + (surfaced_keys, sleep_secs) } - // Spawn new agent if needed - let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); - let any_in_surface = live.iter().any(|(p, _)| p == "surface"); + fn reflection_cycle(&mut self, session: &HookSession) -> Option { + let state_dir = self.agent_dir("reflect"); + let offset_path = state_dir.join("transcript-offset"); + let transcript = session.transcript(); - if any_in_surface { - let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live); - } else { - if transcript.size > 0 { - fs::write(&offset_path, transcript.size.to_string()).ok(); + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + + const REFLECTION_INTERVAL: u64 = 100_000; + if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { + return None; } + + let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300); + if let Some((phase, pid)) = live.first() { + self.update_agent("reflect", Some(*pid), Some(phase.clone())); + self.log(format_args!("reflect: already running pid {}\n", pid)); + return None; + } + + // Copy walked nodes from surface-observe + let so_state = self.agent_dir("surface-observe"); + if let Ok(walked) = fs::read_to_string(so_state.join("walked")) { + fs::write(state_dir.join("walked"), &walked).ok(); + } + + // Read and consume pending reflection + let reflection = fs::read_to_string(state_dir.join("reflection")).ok() + .filter(|s| !s.trim().is_empty()); + if reflection.is_some() { + fs::remove_file(state_dir.join("reflection")).ok(); + self.log(format_args!("reflect: consumed reflection\n")); + } + + fs::write(&offset_path, transcript.size.to_string()).ok(); let pid = crate::agents::knowledge::spawn_agent( - "surface-observe", &state_dir, &session.session_id); - let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live); + "reflect", &state_dir, &session.session_id); + self.update_agent("reflect", pid, Some("step-0".into())); + self.log(format_args!("reflect: spawned {:?}\n", pid)); + + reflection } - // Wait if agent is significantly behind - let mut sleep_secs = None; - let conversation_budget: u64 = 50_000; + fn journal_cycle(&mut self, session: &HookSession) { + let state_dir = self.agent_dir("journal"); + let offset_path = state_dir.join("transcript-offset"); + let transcript = session.transcript(); - if !live.is_empty() && transcript.size > 0 { - let behind = transcript.size.saturating_sub(last_offset); + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); - if behind > conversation_budget / 2 { - let sleep_start = Instant::now(); - let _ = write!(log_f, "agent {}KB behind (budget {}KB)", - behind / 1024, conversation_budget / 1024); - - for _ in 0..5 { - std::thread::sleep(std::time::Duration::from_secs(1)); - let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); - if still_live.is_empty() { break; } - } - - let secs = (Instant::now() - sleep_start).as_secs_f64(); - let _ = writeln!(log_f, ", slept {secs:.2}s"); - sleep_secs = Some(secs); + const JOURNAL_INTERVAL: u64 = 20_000; + if transcript.size.saturating_sub(last_offset) < JOURNAL_INTERVAL { + return; } + + let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300); + if let Some((phase, pid)) = live.first() { + self.update_agent("journal", Some(*pid), Some(phase.clone())); + self.log(format_args!("journal: already running pid {}\n", pid)); + return; + } + + fs::write(&offset_path, transcript.size.to_string()).ok(); + let pid = crate::agents::knowledge::spawn_agent( + "journal", &state_dir, &session.session_id); + self.update_agent("journal", pid, Some("step-0".into())); + self.log(format_args!("journal: spawned {:?}\n", pid)); } - - (surfaced_keys, sleep_secs) -} - -/// Reflection cycle: spawn reflect agent, return any pending reflection. -fn reflection_cycle(session: &HookSession, log_f: &mut File) -> Option { - let state_dir = crate::store::memory_dir() - .join("agent-output") - .join("reflect"); - fs::create_dir_all(&state_dir).ok(); - - let offset_path = state_dir.join("transcript-offset"); - let transcript = session.transcript(); - - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - const REFLECTION_INTERVAL: u64 = 100_000; - if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { - return None; - } - - let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300); - if !live.is_empty() { - let _ = writeln!(log_f, "reflect: already running {:?}", live); - return None; - } - - // Copy walked nodes from surface-observe - let so_state = crate::store::memory_dir() - .join("agent-output") - .join("surface-observe"); - if let Ok(walked) = fs::read_to_string(so_state.join("walked")) { - fs::write(state_dir.join("walked"), &walked).ok(); - } - - // Read and consume pending reflection - let reflection = fs::read_to_string(state_dir.join("reflection")).ok() - .filter(|s| !s.trim().is_empty()); - if reflection.is_some() { - fs::remove_file(state_dir.join("reflection")).ok(); - let _ = writeln!(log_f, "reflect: consumed reflection"); - } - - fs::write(&offset_path, transcript.size.to_string()).ok(); - let pid = crate::agents::knowledge::spawn_agent( - "reflect", &state_dir, &session.session_id); - let _ = writeln!(log_f, "reflect: spawned {:?}", pid); - - reflection -} - -/// Journal cycle: fire and forget. -fn journal_cycle(session: &HookSession, log_f: &mut File) { - let state_dir = crate::store::memory_dir() - .join("agent-output") - .join("journal"); - fs::create_dir_all(&state_dir).ok(); - - let offset_path = state_dir.join("transcript-offset"); - let transcript = session.transcript(); - - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - const JOURNAL_INTERVAL: u64 = 20_000; - if transcript.size.saturating_sub(last_offset) < JOURNAL_INTERVAL { - return; - } - - let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300); - if !live.is_empty() { - let _ = writeln!(log_f, "journal: already running {:?}", live); - return; - } - - fs::write(&offset_path, transcript.size.to_string()).ok(); - let pid = crate::agents::knowledge::spawn_agent( - "journal", &state_dir, &session.session_id); - let _ = writeln!(log_f, "journal: spawned {:?}", pid); -} +} // end impl AgentCycleState (cycle methods) fn cleanup_stale_files(dir: &Path, max_age: Duration) { let entries = match fs::read_dir(dir) {