Track agent child processes, reap on completion
spawn_agent returns Child handle + log_path. AgentCycleState stores the Child, polls with try_wait() on each trigger to detect completion. No more filesystem scanning to track agent lifecycle. AgentSnapshot (Clone) sent to TUI for display. AgentInfo holds the Child handle and stays in the state. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
54ea7824d8
commit
9ac50bd999
5 changed files with 71 additions and 32 deletions
|
|
@ -243,7 +243,7 @@ impl Agent {
|
|||
} else {
|
||||
self.push_message(Message::user(user_input));
|
||||
}
|
||||
let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.agents.clone()));
|
||||
let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots()));
|
||||
|
||||
let mut overflow_retries: u32 = 0;
|
||||
let mut empty_retries: u32 = 0;
|
||||
|
|
|
|||
|
|
@ -349,7 +349,7 @@ pub struct App {
|
|||
/// Agent screen: viewing log for selected agent.
|
||||
agent_log_view: bool,
|
||||
/// Agent state from last cycle update.
|
||||
agent_state: Vec<crate::subconscious::hook::AgentInfo>,
|
||||
agent_state: Vec<crate::subconscious::hook::AgentSnapshot>,
|
||||
}
|
||||
|
||||
/// Overlay screens toggled by F-keys.
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ pub enum UiMessage {
|
|||
ContextInfoUpdate(ContextInfo),
|
||||
|
||||
/// Agent cycle state update — refreshes the F2 agents screen.
|
||||
AgentUpdate(Vec<crate::subconscious::hook::AgentInfo>),
|
||||
AgentUpdate(Vec<crate::subconscious::hook::AgentSnapshot>),
|
||||
}
|
||||
|
||||
/// Sender that fans out to both the TUI (mpsc) and observers (broadcast).
|
||||
|
|
|
|||
|
|
@ -135,13 +135,32 @@ pub struct AgentCycleOutput {
|
|||
}
|
||||
|
||||
/// Per-agent runtime state visible to the TUI.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AgentInfo {
|
||||
pub name: &'static str,
|
||||
pub pid: Option<u32>,
|
||||
pub phase: Option<String>,
|
||||
/// Path to the most recent agent log file.
|
||||
pub log_path: Option<std::path::PathBuf>,
|
||||
child: Option<std::process::Child>,
|
||||
}
|
||||
|
||||
/// Snapshot of agent state for sending to TUI (no Child handle).
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AgentSnapshot {
|
||||
pub name: &'static str,
|
||||
pub pid: Option<u32>,
|
||||
pub phase: Option<String>,
|
||||
pub log_path: Option<std::path::PathBuf>,
|
||||
}
|
||||
|
||||
impl AgentInfo {
|
||||
fn snapshot(&self) -> AgentSnapshot {
|
||||
AgentSnapshot {
|
||||
name: self.name,
|
||||
pid: self.pid,
|
||||
phase: self.phase.clone(),
|
||||
log_path: self.log_path.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Persistent state for the agent orchestration cycle.
|
||||
|
|
@ -166,7 +185,7 @@ impl AgentCycleState {
|
|||
.create(true).append(true).open(log_path).ok();
|
||||
|
||||
let agents = AGENT_CYCLE_NAMES.iter()
|
||||
.map(|&name| AgentInfo { name, pid: None, phase: None, log_path: None })
|
||||
.map(|&name| AgentInfo { name, pid: None, phase: None, log_path: None, child: None })
|
||||
.collect();
|
||||
|
||||
AgentCycleState {
|
||||
|
|
@ -187,20 +206,43 @@ impl AgentCycleState {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_agent(&mut self, name: &str, pid: Option<u32>, phase: Option<String>,
|
||||
log_path: Option<std::path::PathBuf>) {
|
||||
fn agent_spawned(&mut self, name: &str, phase: &str,
|
||||
result: crate::agents::knowledge::SpawnResult) {
|
||||
if let Some(agent) = self.agents.iter_mut().find(|a| a.name == name) {
|
||||
agent.pid = pid;
|
||||
agent.phase = phase;
|
||||
agent.log_path = log_path;
|
||||
agent.pid = Some(result.child.id());
|
||||
agent.phase = Some(phase.to_string());
|
||||
agent.log_path = Some(result.log_path);
|
||||
agent.child = Some(result.child);
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if any spawned agents have completed. Reap them.
|
||||
fn poll_children(&mut self) {
|
||||
for agent in &mut self.agents {
|
||||
if let Some(ref mut child) = agent.child {
|
||||
match child.try_wait() {
|
||||
Ok(Some(_status)) => {
|
||||
agent.pid = None;
|
||||
agent.phase = None;
|
||||
agent.child = None;
|
||||
// log_path stays — TUI can still view the log
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn snapshots(&self) -> Vec<AgentSnapshot> {
|
||||
self.agents.iter().map(|a| a.snapshot()).collect()
|
||||
}
|
||||
|
||||
/// Run all agent cycles. Call on each user message.
|
||||
pub fn trigger(&mut self, session: &HookSession) {
|
||||
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
|
||||
self.log(format_args!("\n=== {} agent_cycles ===\n", ts));
|
||||
|
||||
self.poll_children();
|
||||
cleanup_stale_files(&session.state_dir, Duration::from_secs(86400));
|
||||
|
||||
let (surfaced_keys, sleep_secs) = self.surface_observe_cycle(session);
|
||||
|
|
@ -305,12 +347,11 @@ impl AgentCycleState {
|
|||
if transcript.size > 0 {
|
||||
fs::write(&offset_path, transcript.size.to_string()).ok();
|
||||
}
|
||||
let spawned = crate::agents::knowledge::spawn_agent(
|
||||
"surface-observe", &state_dir, &session.session_id);
|
||||
self.update_agent("surface-observe",
|
||||
spawned.as_ref().map(|s| s.pid), Some("surface".into()),
|
||||
spawned.as_ref().map(|s| s.log_path.clone()));
|
||||
self.log(format_args!("spawned agent {:?}\n", spawned.as_ref().map(|s| s.pid)));
|
||||
if let Some(result) = crate::agents::knowledge::spawn_agent(
|
||||
"surface-observe", &state_dir, &session.session_id) {
|
||||
self.log(format_args!("spawned surface-observe pid {}\n", result.child.id()));
|
||||
self.agent_spawned("surface-observe", "surface", result);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait if agent is significantly behind
|
||||
|
|
@ -374,12 +415,11 @@ impl AgentCycleState {
|
|||
}
|
||||
|
||||
fs::write(&offset_path, transcript.size.to_string()).ok();
|
||||
let spawned = crate::agents::knowledge::spawn_agent(
|
||||
"reflect", &state_dir, &session.session_id);
|
||||
self.update_agent("reflect",
|
||||
spawned.as_ref().map(|s| s.pid), Some("step-0".into()),
|
||||
spawned.as_ref().map(|s| s.log_path.clone()));
|
||||
self.log(format_args!("reflect: spawned {:?}\n", spawned.as_ref().map(|s| s.pid)));
|
||||
if let Some(result) = crate::agents::knowledge::spawn_agent(
|
||||
"reflect", &state_dir, &session.session_id) {
|
||||
self.log(format_args!("reflect: spawned pid {}\n", result.child.id()));
|
||||
self.agent_spawned("reflect", "step-0", result);
|
||||
}
|
||||
|
||||
reflection
|
||||
}
|
||||
|
|
@ -405,12 +445,11 @@ impl AgentCycleState {
|
|||
}
|
||||
|
||||
fs::write(&offset_path, transcript.size.to_string()).ok();
|
||||
let spawned = crate::agents::knowledge::spawn_agent(
|
||||
"journal", &state_dir, &session.session_id);
|
||||
self.update_agent("journal",
|
||||
spawned.as_ref().map(|s| s.pid), Some("step-0".into()),
|
||||
spawned.as_ref().map(|s| s.log_path.clone()));
|
||||
self.log(format_args!("journal: spawned {:?}\n", spawned.as_ref().map(|s| s.pid)));
|
||||
if let Some(result) = crate::agents::knowledge::spawn_agent(
|
||||
"journal", &state_dir, &session.session_id) {
|
||||
self.log(format_args!("journal: spawned pid {}\n", result.child.id()));
|
||||
self.agent_spawned("journal", "step-0", result);
|
||||
}
|
||||
}
|
||||
} // end impl AgentCycleState (cycle methods)
|
||||
|
||||
|
|
|
|||
|
|
@ -252,9 +252,9 @@ pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(St
|
|||
|
||||
/// Spawn an agent asynchronously. Writes the pid file before returning
|
||||
/// so the caller immediately sees the agent as running.
|
||||
/// Spawn result: pid and path to the agent's log file.
|
||||
/// Spawn result: child process handle and log path.
|
||||
pub struct SpawnResult {
|
||||
pub pid: u32,
|
||||
pub child: std::process::Child,
|
||||
pub log_path: PathBuf,
|
||||
}
|
||||
|
||||
|
|
@ -287,7 +287,7 @@ pub fn spawn_agent(
|
|||
let pid = child.id();
|
||||
let pid_path = state_dir.join(format!("pid-{}", pid));
|
||||
fs::write(&pid_path, first_phase).ok();
|
||||
Some(SpawnResult { pid, log_path })
|
||||
Some(SpawnResult { child, log_path })
|
||||
}
|
||||
|
||||
fn run_one_agent_inner(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue