From a1fb3fe557946899a1ee813b384fac7f501573b6 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 4 Apr 2026 17:33:36 -0400 Subject: [PATCH] oneshot: simplify API surface Kill 5 wrapper functions (run_and_apply chain, run_one_agent_excluded), drop dead llm_tag parameter, clean up scan_pid_files parsing. Public API: run_one_agent, run_one_agent_with_keys, spawn_agent, scan_pid_files. Co-Authored-By: Proof of Concept --- src/agent/oneshot.rs | 340 ++++++++++++-------------------- src/cli/agent.rs | 6 +- src/subconscious/consolidate.rs | 4 +- src/subconscious/daemon.rs | 10 +- 4 files changed, 136 insertions(+), 224 deletions(-) diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 4455ef5..67a7497 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -27,17 +27,14 @@ impl Drop for PidGuard { let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst); if !ptr.is_null() { unsafe { libc::unlink(ptr); } - // Reclaim the leaked CString unsafe { drop(std::ffi::CString::from_raw(ptr)); } } } } -/// Register signal handlers to clean up pid file on SIGTERM/SIGINT. fn register_pid_cleanup(pid_path: &std::path::Path) { let c_path = std::ffi::CString::new(pid_path.to_string_lossy().as_bytes()) .expect("pid path contains null"); - // Leak the CString so the signal handler can access it let old = PID_CPATH.swap(c_path.into_raw(), Ordering::SeqCst); if !old.is_null() { unsafe { drop(std::ffi::CString::from_raw(old)); } @@ -52,7 +49,6 @@ extern "C" fn pid_cleanup_handler(sig: libc::c_int) { let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst); if !ptr.is_null() { unsafe { libc::unlink(ptr); } - // Don't free — we're in a signal handler, just leak it } unsafe { libc::signal(sig, libc::SIG_DFL); @@ -69,114 +65,141 @@ pub struct AgentResult { pub output: String, pub node_keys: Vec, /// Directory containing output() files from the agent run. - pub state_dir: std::path::PathBuf, -} - -/// Run a single agent and return the result (no action application — tools handle that). -pub fn run_and_apply( - store: &mut Store, - agent_name: &str, - batch_size: usize, - llm_tag: &str, -) -> Result<(), String> { - run_and_apply_with_log(store, agent_name, batch_size, llm_tag, &|_| {}) -} - -pub fn run_and_apply_with_log( - store: &mut Store, - agent_name: &str, - batch_size: usize, - llm_tag: &str, - log: &(dyn Fn(&str) + Sync), -) -> Result<(), String> { - run_and_apply_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default()) -} - -/// Like run_and_apply_with_log but with an in-flight exclusion set. -pub fn run_and_apply_excluded( - store: &mut Store, - agent_name: &str, - batch_size: usize, - llm_tag: &str, - log: &(dyn Fn(&str) + Sync), - exclude: &std::collections::HashSet, -) -> Result<(), String> { - let _result = run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, exclude)?; - Ok(()) -} - -/// Run an agent with explicit target keys, bypassing the agent's query. -pub fn run_one_agent_with_keys( - store: &mut Store, - agent_name: &str, - keys: &[String], - count: usize, - llm_tag: &str, - log: &(dyn Fn(&str) + Sync), -) -> Result { - let def = defs::get_def(agent_name) - .ok_or_else(|| format!("no .agent file for {}", agent_name))?; - - let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?; - - log(&format!("targeting: {}", keys.join(", "))); - let graph = store.build_graph(); - let mut resolved_steps = Vec::new(); - let mut all_keys: Vec = keys.to_vec(); - for step in &def.steps { - let (prompt, extra_keys) = defs::resolve_placeholders( - &step.prompt, store, &graph, keys, count, - ); - all_keys.extend(extra_keys); - resolved_steps.push(prompts::ResolvedStep { - prompt, - phase: step.phase.clone(), - }); - } - let agent_batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; - - // Record visits eagerly so concurrent agents pick different seeds - if !agent_batch.node_keys.is_empty() { - store.record_agent_visits(&agent_batch.node_keys, agent_name).ok(); - } - - run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log) + pub state_dir: PathBuf, } +/// Run an agent. If keys are provided, use them directly (bypassing the +/// agent's query). Otherwise, run the query to select target nodes. pub fn run_one_agent( store: &mut Store, agent_name: &str, - batch_size: usize, - llm_tag: &str, + count: usize, + keys: Option<&[String]>, log: &(dyn Fn(&str) + Sync), -) -> Result { - run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default()) -} - -/// Like run_one_agent but excludes nodes currently being worked on by other agents. -pub fn run_one_agent_excluded( - store: &mut Store, - agent_name: &str, - batch_size: usize, - llm_tag: &str, - log: &(dyn Fn(&str) + Sync), - exclude: &std::collections::HashSet, ) -> Result { let def = defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; - // Set up output dir and write pid file BEFORE prompt building let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?; - log("building prompt"); - let effective_count = def.count.unwrap_or(batch_size); - let agent_batch = defs::run_agent(store, &def, effective_count, exclude)?; + // Build prompt batch — either from explicit keys or the agent's query + let agent_batch = if let Some(keys) = keys { + log(&format!("targeting: {}", keys.join(", "))); + let graph = store.build_graph(); + let mut resolved_steps = Vec::new(); + let mut all_keys: Vec = keys.to_vec(); + for step in &def.steps { + let (prompt, extra_keys) = defs::resolve_placeholders( + &step.prompt, store, &graph, keys, count, + ); + all_keys.extend(extra_keys); + resolved_steps.push(prompts::ResolvedStep { + prompt, + phase: step.phase.clone(), + }); + } + let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; + if !batch.node_keys.is_empty() { + store.record_agent_visits(&batch.node_keys, agent_name).ok(); + } + batch + } else { + log("building prompt"); + let effective_count = def.count.unwrap_or(count); + defs::run_agent(store, &def, effective_count, &Default::default())? + }; - run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log) + // Filter tools based on agent def + let all_tools = super::tools::memory_and_journal_tools(); + let effective_tools: Vec = if def.tools.is_empty() { + all_tools.to_vec() + } else { + all_tools.into_iter() + .filter(|t| def.tools.iter().any(|w| w == &t.name)) + .collect() + }; + let tools_desc = effective_tools.iter().map(|t| t.name).collect::>().join(", "); + let n_steps = agent_batch.steps.len(); + + for key in &agent_batch.node_keys { + log(&format!(" node: {}", key)); + } + + // Guard: reject oversized first prompt + let max_prompt_bytes = 800_000; + let first_len = agent_batch.steps[0].prompt.len(); + if first_len > max_prompt_bytes { + let prompt_kb = first_len / 1024; + let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); + fs::create_dir_all(&oversize_dir).ok(); + let oversize_path = oversize_dir.join(format!("{}-{}.txt", + agent_name, store::compact_timestamp())); + let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", + agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); + fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); + log(&format!("oversized prompt logged to {}", oversize_path.display())); + return Err(format!( + "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", + prompt_kb, max_prompt_bytes / 1024, + )); + } + + let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); + log(&format!("{} step(s) {:?}, {}KB initial, {}, {} nodes, output={}", + n_steps, phases, first_len / 1024, tools_desc, + agent_batch.node_keys.len(), state_dir.display())); + + let prompts: Vec = agent_batch.steps.iter() + .map(|s| s.prompt.clone()).collect(); + let step_phases: Vec = agent_batch.steps.iter() + .map(|s| s.phase.clone()).collect(); + let step_phases_for_bail = step_phases.clone(); + + if std::env::var("POC_AGENT_VERBOSE").is_ok() { + for (i, s) in agent_batch.steps.iter().enumerate() { + log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt)); + } + } + log("\n=== CALLING LLM ==="); + + // Bail check: if the agent defines a bail script, run it between steps. + let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name)); + let state_dir_for_bail = state_dir.clone(); + let pid_path_for_bail = pid_path.clone(); + let bail_fn = move |step_idx: usize| -> Result<(), String> { + if step_idx < step_phases_for_bail.len() { + fs::write(&pid_path_for_bail, &step_phases_for_bail[step_idx]).ok(); + } + if let Some(ref script) = bail_script { + let status = std::process::Command::new(script) + .arg(&pid_path_for_bail) + .current_dir(&state_dir_for_bail) + .status() + .map_err(|e| format!("bail script {:?} failed: {}", script, e))?; + if !status.success() { + return Err(format!("bailed at step {}: {:?} exited {}", + step_idx + 1, script.file_name().unwrap_or_default(), + status.code().unwrap_or(-1))); + } + } + Ok(()) + }; + + let output = crate::subconscious::api::call_api_with_tools_sync( + agent_name, &prompts, &step_phases, def.temperature, def.priority, + &effective_tools, Some(&bail_fn), log)?; + + Ok(AgentResult { + output, + node_keys: agent_batch.node_keys, + state_dir, + }) } -/// Set up agent state dir, write initial pid file, register cleanup handlers. -/// Returns (state_dir, pid_path, guard). The guard removes the pid file on drop. +// --------------------------------------------------------------------------- +// PID file management +// --------------------------------------------------------------------------- + fn setup_agent_state( agent_name: &str, def: &defs::AgentDef, @@ -188,7 +211,6 @@ fn setup_agent_state( .map_err(|e| format!("create state dir: {}", e))?; unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); } - // Clean up stale pid files from dead processes scan_pid_files(&state_dir, 0); let pid = std::process::id(); @@ -198,9 +220,7 @@ fn setup_agent_state( .unwrap_or("step-0"); fs::write(&pid_path, first_phase).ok(); - // Register for cleanup on signals and normal exit register_pid_cleanup(&pid_path); - Ok((state_dir, pid_path, PidGuard)) } @@ -212,11 +232,8 @@ pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(St for entry in entries.flatten() { let name = entry.file_name(); let name_str = name.to_string_lossy(); - if !name_str.starts_with("pid-") { continue; } - let pid: u32 = name_str.strip_prefix("pid-") - .and_then(|s| s.parse().ok()) - .unwrap_or(0); - if pid == 0 { continue; } + let Some(pid_str) = name_str.strip_prefix("pid-") else { continue }; + let Ok(pid) = pid_str.parse::() else { continue }; if unsafe { libc::kill(pid as i32, 0) } != 0 { fs::remove_file(entry.path()).ok(); @@ -243,8 +260,10 @@ pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(St live } -/// Spawn an agent asynchronously. Writes the pid file before returning -/// so the caller immediately sees the agent as running. +// --------------------------------------------------------------------------- +// Subprocess spawning +// --------------------------------------------------------------------------- + pub struct SpawnResult { pub child: std::process::Child, pub log_path: PathBuf, @@ -281,110 +300,3 @@ pub fn spawn_agent( fs::write(&pid_path, first_phase).ok(); Some(SpawnResult { child, log_path }) } - -fn run_one_agent_inner( - _store: &mut Store, - agent_name: &str, - def: &defs::AgentDef, - agent_batch: prompts::AgentBatch, - state_dir: std::path::PathBuf, - pid_path: std::path::PathBuf, - _llm_tag: &str, - log: &(dyn Fn(&str) + Sync), -) -> Result { - // Filter tools based on agent def specification - let all_tools = super::tools::memory_and_journal_tools(); - let effective_tools: Vec = if def.tools.is_empty() { - all_tools.to_vec() - } else { - all_tools.into_iter() - .filter(|t| def.tools.iter().any(|w| w == &t.name)) - .collect() - }; - let tools_desc = effective_tools.iter().map(|t| t.name).collect::>().join(", "); - let n_steps = agent_batch.steps.len(); - - for key in &agent_batch.node_keys { - log(&format!(" node: {}", key)); - } - - // Guard: reject oversized first prompt (later steps grow via conversation) - let max_prompt_bytes = 800_000; - let first_len = agent_batch.steps[0].prompt.len(); - if first_len > max_prompt_bytes { - let prompt_kb = first_len / 1024; - let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); - fs::create_dir_all(&oversize_dir).ok(); - let oversize_path = oversize_dir.join(format!("{}-{}.txt", - agent_name, store::compact_timestamp())); - let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", - agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); - fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); - log(&format!("oversized prompt logged to {}", oversize_path.display())); - return Err(format!( - "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", - prompt_kb, max_prompt_bytes / 1024, - )); - } - - let write_pid = |phase: &str| { - fs::write(&pid_path, phase).ok(); - }; - - let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); - log(&format!("{} step(s) {:?}, {}KB initial, {}, {} nodes, output={}", - n_steps, phases, first_len / 1024, tools_desc, - agent_batch.node_keys.len(), state_dir.display())); - - let prompts: Vec = agent_batch.steps.iter() - .map(|s| s.prompt.clone()).collect(); - let step_phases: Vec = agent_batch.steps.iter() - .map(|s| s.phase.clone()).collect(); - let step_phases_for_bail = step_phases.clone(); - - if std::env::var("POC_AGENT_VERBOSE").is_ok() { - for (i, s) in agent_batch.steps.iter().enumerate() { - log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt)); - } - } - log("\n=== CALLING LLM ==="); - - // Bail check: if the agent defines a bail script, run it between steps. - // The script receives the pid file path as $1, cwd = state dir. - let bail_script = def.bail.as_ref().map(|name| { - let agents_dir = defs::agents_dir(); - agents_dir.join(name) - }); - let state_dir_for_bail = state_dir.clone(); - let pid_path_for_bail = pid_path.clone(); - let bail_fn = move |step_idx: usize| -> Result<(), String> { - // Update phase in pid file and provenance tracking - if step_idx < step_phases_for_bail.len() { - write_pid(&step_phases_for_bail[step_idx]); - } - // Run bail script if defined - if let Some(ref script) = bail_script { - let status = std::process::Command::new(script) - .arg(&pid_path_for_bail) - .current_dir(&state_dir_for_bail) - .status() - .map_err(|e| format!("bail script {:?} failed: {}", script, e))?; - if !status.success() { - return Err(format!("bailed at step {}: {:?} exited {}", - step_idx + 1, script.file_name().unwrap_or_default(), - status.code().unwrap_or(-1))); - } - } - Ok(()) - }; - - let output = crate::subconscious::api::call_api_with_tools_sync( - agent_name, &prompts, &step_phases, def.temperature, def.priority, - &effective_tools, Some(&bail_fn), log)?; - - Ok(AgentResult { - output, - node_keys: agent_batch.node_keys, - state_dir, - }) -} diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 3e38427..8c787c5 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -59,8 +59,8 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option for (i, key) in resolved_targets.iter().enumerate() { println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); if i > 0 { store = store::Store::load()?; } - if let Err(e) = crate::agent::oneshot::run_one_agent_with_keys( - &mut store, agent, &[key.clone()], count, "test", &log, + if let Err(e) = crate::agent::oneshot::run_one_agent( + &mut store, agent, count, Some(&[key.clone()]), &log, ) { println!("[{}] ERROR on {}: {}", agent, key, e); } @@ -80,7 +80,7 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option } else { // Local execution (--local, --debug, dry-run, or daemon unavailable) crate::agent::oneshot::run_one_agent( - &mut store, agent, count, "test", &log, + &mut store, agent, count, None, &log, )?; } Ok(()) diff --git a/src/subconscious/consolidate.rs b/src/subconscious/consolidate.rs index 35094bb..e2b6f72 100644 --- a/src/subconscious/consolidate.rs +++ b/src/subconscious/consolidate.rs @@ -74,8 +74,8 @@ pub fn consolidate_full_with_progress( *store = Store::load()?; } - match oneshot::run_and_apply(store, agent_type, *count, "consolidate") { - Ok(()) => { + match oneshot::run_one_agent(store, agent_type, *count, None, &|_| {}) { + Ok(_) => { let msg = " Done".to_string(); log_line(&mut log_buf, &msg); on_progress(&msg); diff --git a/src/subconscious/daemon.rs b/src/subconscious/daemon.rs index 6e33847..e793cbb 100644 --- a/src/subconscious/daemon.rs +++ b/src/subconscious/daemon.rs @@ -123,8 +123,8 @@ fn job_targeted_agent( ctx.log_line(msg); log_event(&job_name, "progress", msg); }; - crate::agent::oneshot::run_one_agent_with_keys( - &mut store, &agent, std::slice::from_ref(&key), 5, "daemon", &log, + crate::agent::oneshot::run_one_agent( + &mut store, &agent, 5, Some(std::slice::from_ref(&key)), &log, )?; ctx.log_line("done"); Ok(()) @@ -208,8 +208,8 @@ fn job_consolidation_agent( }; // Use run_one_agent_with_keys — we already selected seeds above, // no need to re-run the query. - let result = crate::agent::oneshot::run_one_agent_with_keys( - &mut store, &agent, &claimed_keys, batch, "consolidate", &log, + let result = crate::agent::oneshot::run_one_agent( + &mut store, &agent, batch, Some(&claimed_keys), &log, ).map(|_| ()); // Release all claimed keys (seeds + neighbors) @@ -239,7 +239,7 @@ fn job_rename_agent( ctx.log_line(format!("running rename agent (batch={})", batch)); let log = |msg: &str| ctx.log_line(msg); - let result = crate::agent::oneshot::run_one_agent(&mut store, "rename", batch, "consolidate", &log)?; + let result = crate::agent::oneshot::run_one_agent(&mut store, "rename", batch, None, &log)?; // Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE) let mut applied = 0;