diff --git a/src/cli/agent.rs b/src/cli/agent.rs index bc437d8..a23629e 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -92,11 +92,11 @@ pub fn cmd_consolidate_batch(count: usize, auto: bool, agent: Option) -> if let Some(agent_name) = agent { let batch = crate::agents::prompts::agent_prompt(&store, &agent_name, count)?; - for (i, p) in batch.prompts.iter().enumerate() { - if batch.prompts.len() > 1 { - println!("=== STEP {} ===\n", i + 1); + for (i, s) in batch.steps.iter().enumerate() { + if batch.steps.len() > 1 { + println!("=== STEP {} ({}) ===\n", i + 1, s.phase); } - println!("{}", p); + println!("{}", s.prompt); } Ok(()) } else { diff --git a/src/hippocampus/memory_search.rs b/src/hippocampus/memory_search.rs index 5e8e39c..0363e4a 100644 --- a/src/hippocampus/memory_search.rs +++ b/src/hippocampus/memory_search.rs @@ -129,159 +129,121 @@ fn mark_seen(dir: &Path, session_id: &str, key: &str, seen: &mut HashSet } } -fn surface_agent_cycle(session: &Session, out: &mut String, log_f: &mut File) { - let result_path = session.state_dir.join(format!("surface-result-{}", session.session_id)); - let pid_path = session.state_dir.join(format!("surface-pid-{}", session.session_id)); +/// Unified agent cycle — runs surface-observe agent with state dir. +/// Reads output files for surface results, spawns new agent when ready. +/// +/// Pipelining: if a running agent is past the surface phase, start +/// a new one so surface stays fresh. +fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) { + let state_dir = crate::store::memory_dir() + .join("agent-output") + .join("surface-observe"); + fs::create_dir_all(&state_dir).ok(); - let surface_timeout = crate::config::get() + let timeout = crate::config::get() .surface_timeout_secs - .unwrap_or(120) as u64; + .unwrap_or(300) as u64; - let agent_done = match fs::read_to_string(&pid_path) { - Ok(content) => { - let parts: Vec<&str> = content.split('\t').collect(); - let pid: u32 = parts.first().and_then(|s| s.trim().parse().ok()).unwrap_or(0); - let start_ts: u64 = parts.get(1).and_then(|s| s.trim().parse().ok()).unwrap_or(0); - if pid == 0 { true } - else { - let alive = unsafe { libc::kill(pid as i32, 0) == 0 }; - if !alive { true } - else if now_secs().saturating_sub(start_ts) > surface_timeout { - unsafe { libc::kill(pid as i32, libc::SIGTERM); } - true - } else { false } + // Scan pid files — find live agents and their phases + let mut any_in_surface = false; + let mut any_alive = false; + if let Ok(entries) = fs::read_dir(&state_dir) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if !name_str.starts_with("pid-") { continue; } + let pid: u32 = name_str.strip_prefix("pid-") + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + if pid == 0 { continue; } + + let alive = unsafe { libc::kill(pid as i32, 0) == 0 }; + if !alive { + let _ = writeln!(log_f, "cleanup stale pid-{}", pid); + fs::remove_file(entry.path()).ok(); + continue; } + + // Check for timeout + let phase_json = fs::read_to_string(entry.path()).unwrap_or_default(); + let started: u64 = phase_json.split("\"started\":") + .nth(1) + .and_then(|s| s.trim_start().split(|c: char| !c.is_ascii_digit()).next()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + if started > 0 && now_secs().saturating_sub(started) > timeout { + let _ = writeln!(log_f, "killing timed-out pid-{} ({}s)", pid, timeout); + unsafe { libc::kill(pid as i32, libc::SIGTERM); } + fs::remove_file(entry.path()).ok(); + continue; + } + + any_alive = true; + + let in_surface = phase_json.contains("\"phase\":\"surface\"") + || phase_json.contains("\"phase\":\"step-0\""); + if in_surface { + any_in_surface = true; + } + let _ = writeln!(log_f, "alive pid-{}: {}", pid, phase_json.trim()); } - Err(_) => true, - }; + } - let _ = writeln!(log_f, "agent_done {agent_done}"); - - if !agent_done { return; } - - if let Ok(result) = fs::read_to_string(&result_path) { - if !result.trim().is_empty() { - let tail_lines: Vec<&str> = result.lines().rev() - .filter(|l| !l.trim().is_empty()).take(8).collect(); - let has_new = tail_lines.iter().any(|l| l.starts_with("NEW RELEVANT MEMORIES:")); - let has_none = tail_lines.iter().any(|l| l.starts_with("NO NEW RELEVANT MEMORIES")); - - let _ = writeln!(log_f, "has_new {has_new} has_none {has_none}"); - - if has_new { - let after_marker = result.rsplit_once("NEW RELEVANT MEMORIES:") - .map(|(_, rest)| rest).unwrap_or(""); - let keys: Vec = after_marker.lines() - .map(|l| l.trim().trim_start_matches("- ").trim().to_string()) - .filter(|l| !l.is_empty() && !l.starts_with("```")).collect(); - - let _ = writeln!(log_f, "keys {:?}", keys); - - let Ok(store) = crate::store::Store::load() else { return; }; - let mut seen = session.seen(); - let seen_path = session.path("seen"); - for key in &keys { - if !seen.insert(key.clone()) { - let _ = writeln!(log_f, " skip (seen): {}", key); - continue; - } - if let Some(content) = crate::cli::node::render_node(&store, key) { - if !content.trim().is_empty() { - use std::fmt::Write as _; - writeln!(out, "--- {} (surfaced) ---", key).ok(); - write!(out, "{}", content).ok(); - let _ = writeln!(log_f, " rendered {}: {} bytes, out now {} bytes", key, content.len(), out.len()); - if let Ok(mut f) = fs::OpenOptions::new() - .create(true).append(true).open(&seen_path) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - writeln!(f, "{}\t{}", ts, key).ok(); - } - } + // Read surface output and inject into context + let surface_path = state_dir.join("surface"); + if let Ok(content) = fs::read_to_string(&surface_path) { + let Ok(store) = crate::store::Store::load() else { return; }; + let mut seen = session.seen(); + let seen_path = session.path("seen"); + for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if !seen.insert(key.to_string()) { + let _ = writeln!(log_f, " skip (seen): {}", key); + continue; + } + if let Some(rendered) = crate::cli::node::render_node(&store, key) { + if !rendered.trim().is_empty() { + use std::fmt::Write as _; + writeln!(out, "--- {} (surfaced) ---", key).ok(); + write!(out, "{}", rendered).ok(); + let _ = writeln!(log_f, " rendered {}: {} bytes", key, rendered.len()); + if let Ok(mut f) = fs::OpenOptions::new() + .create(true).append(true).open(&seen_path) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + writeln!(f, "{}\t{}", ts, key).ok(); } } - } else if !has_none { - let log_dir = crate::store::memory_dir().join("logs"); - fs::create_dir_all(&log_dir).ok(); - let log_path = log_dir.join("surface-errors.log"); - if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&log_path) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - let last = tail_lines.first().unwrap_or(&""); - let _ = writeln!(f, "[{}] unexpected surface output: {}", ts, last); - } } } + // Clear surface output after consuming + fs::remove_file(&surface_path).ok(); } - fs::remove_file(&result_path).ok(); - fs::remove_file(&pid_path).ok(); - if let Ok(output_file) = fs::File::create(&result_path) { - if let Ok(child) = Command::new("poc-memory") - .args(["agent", "run", "surface", "--count", "1", "--local"]) - .env("POC_SESSION_ID", &session.session_id) - .stdout(output_file) - .stderr(std::process::Stdio::null()) - .spawn() - { - let pid = child.id(); - let ts = now_secs(); - if let Ok(mut f) = fs::File::create(&pid_path) { - write!(f, "{}\t{}", pid, ts).ok(); - } - } - } -} - -const JOURNAL_INTERVAL_BYTES: u64 = 10_000; - -fn journal_agent_cycle(session: &Session, log_f: &mut File) { - let offset_path = session.path("journal-offset"); - let pid_path = session.path("journal-pid"); - - // Check if a previous run is still going - if let Ok(content) = fs::read_to_string(&pid_path) { - let pid: u32 = content.split('\t').next() - .and_then(|s| s.trim().parse().ok()).unwrap_or(0); - if pid != 0 && unsafe { libc::kill(pid as i32, 0) == 0 } { - let _ = writeln!(log_f, "journal: still running (pid {})", pid); - return; - } - } - fs::remove_file(&pid_path).ok(); - - // Check transcript size vs last run - let transcript_size = fs::metadata(&session.transcript_path) - .map(|m| m.len()).unwrap_or(0); - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()).unwrap_or(0); - - if transcript_size.saturating_sub(last_offset) < JOURNAL_INTERVAL_BYTES { + // Start a new agent if: + // - nothing running, OR + // - something running but past surface phase (pipelining) + if any_in_surface { + let _ = writeln!(log_f, "agent in surface phase, waiting"); return; } - let _ = writeln!(log_f, "journal: spawning (transcript {}, last {})", - transcript_size, last_offset); + if any_alive { + let _ = writeln!(log_f, "agent past surface, starting new (pipeline)"); + } - // Save current offset - fs::write(&offset_path, transcript_size.to_string()).ok(); - - // Spawn journal agent — it writes directly to the store via memory tools let log_dir = crate::store::memory_dir().join("logs"); fs::create_dir_all(&log_dir).ok(); - let journal_log = fs::File::create(log_dir.join("journal-agent.log")) + let agent_log = fs::File::create(log_dir.join("surface-observe.log")) .unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()); if let Ok(child) = Command::new("poc-memory") - .args(["agent", "run", "journal", "--count", "1", "--local"]) + .args(["agent", "run", "surface-observe", "--count", "1", "--local", + "--state-dir", &state_dir.to_string_lossy()]) .env("POC_SESSION_ID", &session.session_id) - .stdout(journal_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap())) - .stderr(journal_log) + .stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap())) + .stderr(agent_log) .spawn() { - let pid = child.id(); - let ts = now_secs(); - if let Ok(mut f) = fs::File::create(&pid_path) { - write!(f, "{}\t{}", pid, ts).ok(); - } + let _ = writeln!(log_f, "spawned pid {}", child.id()); } } @@ -361,8 +323,7 @@ fn hook(session: &Session) -> String { } else { let cfg = crate::config::get(); if cfg.surface_hooks.iter().any(|h| h == &session.hook_event) { - surface_agent_cycle(session, &mut out, &mut log_f); - journal_agent_cycle(session, &mut log_f); + surface_observe_cycle(session, &mut out, &mut log_f); } } diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index 21fd340..4beef24 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -35,6 +35,7 @@ pub async fn call_api_with_tools( agent: &str, prompts: &[String], temperature: Option, + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &dyn Fn(&str), ) -> Result { let client = get_client()?; @@ -178,8 +179,12 @@ pub async fn call_api_with_tools( log(&format!("\n=== RESPONSE ===\n\n{}", text)); - // If there are more prompts, inject the next one and continue + // If there are more prompts, check bail condition and inject the next one if next_prompt_idx < prompts.len() { + // Run bail check before continuing to next step + if let Some(ref check) = bail_fn { + check(next_prompt_idx)?; + } messages.push(Message::assistant(&text)); let next = &prompts[next_prompt_idx]; next_prompt_idx += 1; @@ -200,6 +205,7 @@ pub fn call_api_with_tools_sync( agent: &str, prompts: &[String], temperature: Option, + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &(dyn Fn(&str) + Sync), ) -> Result { std::thread::scope(|s| { @@ -211,7 +217,7 @@ pub fn call_api_with_tools_sync( let prov = format!("agent:{}", agent); rt.block_on( crate::store::TASK_PROVENANCE.scope(prov, - call_api_with_tools(agent, prompts, temperature, log)) + call_api_with_tools(agent, prompts, temperature, bail_fn, log)) ) }).join().unwrap() }) diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index 198039a..fe4dd39 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -26,12 +26,20 @@ use std::path::PathBuf; /// Agent definition: config (from JSON header) + prompt (raw markdown body). #[derive(Clone, Debug)] +/// A single step in a multi-step agent. +pub struct AgentStep { + pub prompt: String, + /// Phase label for PID file tracking (e.g. "surface", "observe"). + /// Parsed from `=== PROMPT phase:name ===` or auto-generated as "step-N". + pub phase: String, +} + pub struct AgentDef { pub agent: String, pub query: String, - /// Prompt steps — single-step agents have one entry, multi-step have several. + /// Steps — single-step agents have one entry, multi-step have several. /// Steps are separated by `=== PROMPT ===` in the .agent file. - pub prompts: Vec, + pub steps: Vec, pub model: String, pub schedule: String, pub tools: Vec, @@ -70,28 +78,64 @@ struct AgentHeader { fn default_model() -> String { "sonnet".into() } /// Parse an agent file: first line is JSON config, rest is the prompt(s). -/// Multiple prompts are separated by `=== PROMPT ===` lines. +/// Multiple prompts are separated by `=== PROMPT [phase:name] ===` lines. fn parse_agent_file(content: &str) -> Option { let (first_line, rest) = content.split_once('\n')?; let header: AgentHeader = serde_json::from_str(first_line.trim()).ok()?; // Skip optional blank line between header and prompt body let body = rest.strip_prefix('\n').unwrap_or(rest); - // Split on === PROMPT === delimiter for multi-step agents - let prompts: Vec = body - .split("\n=== PROMPT ===\n") - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect(); + // Split on === PROMPT ... === lines, capturing the delimiter content + let mut steps: Vec = Vec::new(); + let mut current_prompt = String::new(); + let mut current_phase: Option = None; + let mut step_num = 0; - if prompts.is_empty() { + for line in body.lines() { + if line.starts_with("=== PROMPT") && line.ends_with("===") { + // Save previous step if any + let trimmed = current_prompt.trim().to_string(); + if !trimmed.is_empty() { + steps.push(AgentStep { + prompt: trimmed, + phase: current_phase.take() + .unwrap_or_else(|| format!("step-{}", step_num)), + }); + step_num += 1; + } + + // Parse delimiter: === PROMPT [phase:name] === + let inner = line.strip_prefix("=== PROMPT").unwrap() + .strip_suffix("===").unwrap().trim(); + current_phase = inner.strip_prefix("phase:") + .map(|s| s.trim().to_string()); + current_prompt.clear(); + } else { + if !current_prompt.is_empty() { + current_prompt.push('\n'); + } + current_prompt.push_str(line); + } + } + + // Save final step + let trimmed = current_prompt.trim().to_string(); + if !trimmed.is_empty() { + steps.push(AgentStep { + prompt: trimmed, + phase: current_phase.take() + .unwrap_or_else(|| format!("step-{}", step_num)), + }); + } + + if steps.is_empty() { return None; } Some(AgentDef { agent: header.agent, query: header.query, - prompts, + steps, model: header.model, schedule: header.schedule, tools: header.tools, @@ -744,18 +788,21 @@ pub fn run_agent( vec![] }; - // Resolve placeholders for all prompts. The conversation context + // Resolve placeholders for all steps. The conversation context // carries forward between steps naturally via the LLM's message history. let mut all_keys = keys; - let mut prompts = Vec::new(); - for prompt_template in &def.prompts { - let template = prompt_template.replace("{agent_name}", &def.agent); + let mut resolved_steps = Vec::new(); + for step in &def.steps { + let template = step.prompt.replace("{agent_name}", &def.agent); let (prompt, extra_keys) = resolve_placeholders(&template, store, &graph, &all_keys, count); all_keys.extend(extra_keys); - prompts.push(prompt); + resolved_steps.push(super::prompts::ResolvedStep { + prompt, + phase: step.phase.clone(), + }); } - Ok(super::prompts::AgentBatch { prompts, node_keys: all_keys }) + Ok(super::prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }) } /// Convert a list of keys to ReplayItems with priority and graph metrics. diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index d088de7..5791044 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -226,7 +226,7 @@ fn generate_digest( // Load prompt from agent file; fall back to prompts dir let def = super::defs::get_def("digest"); let template = match &def { - Some(d) => d.prompts.first().cloned().unwrap_or_default(), + Some(d) => d.steps.first().map(|s| s.prompt.clone()).unwrap_or_default(), None => { let path = crate::config::get().prompts_dir.join("digest.md"); std::fs::read_to_string(&path) diff --git a/src/subconscious/knowledge.rs b/src/subconscious/knowledge.rs index ac790f3..5945ce6 100644 --- a/src/subconscious/knowledge.rs +++ b/src/subconscious/knowledge.rs @@ -80,16 +80,19 @@ pub fn run_one_agent_with_keys( log(&format!("targeting: {}", keys.join(", "))); let graph = store.build_graph(); - let mut resolved_prompts = Vec::new(); + let mut resolved_steps = Vec::new(); let mut all_keys: Vec = keys.to_vec(); - for prompt_template in &def.prompts { + for step in &def.steps { let (prompt, extra_keys) = super::defs::resolve_placeholders( - prompt_template, store, &graph, keys, count, + &step.prompt, store, &graph, keys, count, ); all_keys.extend(extra_keys); - resolved_prompts.push(prompt); + resolved_steps.push(super::prompts::ResolvedStep { + prompt, + phase: step.phase.clone(), + }); } - let agent_batch = super::prompts::AgentBatch { prompts: resolved_prompts, node_keys: all_keys }; + let agent_batch = super::prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; // Record visits eagerly so concurrent agents pick different seeds if !agent_batch.node_keys.is_empty() { @@ -138,7 +141,7 @@ fn run_one_agent_inner( ) -> Result { let tools_desc = if def.tools.is_empty() { "no tools".into() } else { format!("{} tools", def.tools.len()) }; - let n_steps = agent_batch.prompts.len(); + let n_steps = agent_batch.steps.len(); for key in &agent_batch.node_keys { log(&format!(" node: {}", key)); @@ -146,7 +149,7 @@ fn run_one_agent_inner( // Guard: reject oversized first prompt (later steps grow via conversation) let max_prompt_bytes = 800_000; - let first_len = agent_batch.prompts[0].len(); + let first_len = agent_batch.steps[0].prompt.len(); if first_len > max_prompt_bytes { let prompt_kb = first_len / 1024; let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); @@ -155,7 +158,7 @@ fn run_one_agent_inner( agent_name, store::compact_timestamp())); let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); - fs::write(&oversize_path, format!("{}{}", header, &agent_batch.prompts[0])).ok(); + fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); log(&format!("oversized prompt logged to {}", oversize_path.display())); return Err(format!( "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", @@ -163,7 +166,7 @@ fn run_one_agent_inner( )); } - // Output directory — use --state-dir if set, otherwise flat per-agent + // Output/state directory — use --state-dir if set, otherwise flat per-agent let output_dir = std::env::var("POC_AGENT_OUTPUT_DIR") .map(std::path::PathBuf::from) .unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name)); @@ -171,16 +174,70 @@ fn run_one_agent_inner( // Safe: agent runs single-threaded, env var read only by our dispatch code unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); } - log(&format!("{} step(s), {}KB initial, model={}, {}, {} nodes, output={}", - n_steps, first_len / 1024, def.model, tools_desc, + // Write PID file with initial phase + let pid = std::process::id(); + let pid_path = output_dir.join(format!("pid-{}", pid)); + let write_pid = |phase: &str| { + let json = format!("{{\"phase\":\"{}\",\"started\":{}}}", phase, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap().as_secs()); + fs::write(&pid_path, &json).ok(); + }; + write_pid(&agent_batch.steps[0].phase); + + let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); + log(&format!("{} step(s) {:?}, {}KB initial, model={}, {}, {} nodes, output={}", + n_steps, phases, first_len / 1024, def.model, tools_desc, agent_batch.node_keys.len(), output_dir.display())); - for (i, p) in agent_batch.prompts.iter().enumerate() { - log(&format!("=== PROMPT {}/{} ===\n\n{}", i + 1, n_steps, p)); + let prompts: Vec = agent_batch.steps.iter() + .map(|s| s.prompt.clone()).collect(); + let step_phases: Vec = agent_batch.steps.iter() + .map(|s| s.phase.clone()).collect(); + + for (i, s) in agent_batch.steps.iter().enumerate() { + log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt)); } log("\n=== CALLING LLM ==="); - let output = llm::call_for_def_multi(def, &agent_batch.prompts, log)?; + // Bail check: between steps, check for other pid files in the state dir. + // If another agent has started, bail — let it have the resources. + let output_dir_clone = output_dir.clone(); + let bail_fn = move |step_idx: usize| -> Result<(), String> { + if step_idx < step_phases.len() { + write_pid(&step_phases[step_idx]); + } + // After step 0 (surface), check for competing agents + if step_idx > 0 { + if let Ok(entries) = fs::read_dir(&output_dir_clone) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if !name_str.starts_with("pid-") { continue; } + let other_pid: u32 = name_str.strip_prefix("pid-") + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + if other_pid == pid || other_pid == 0 { continue; } + // Check if the other process is alive + if unsafe { libc::kill(other_pid as i32, 0) } == 0 { + log(&format!("bail: another agent running (pid {})", other_pid)); + return Err(format!("bailed at step {} — competing agent pid {}", + step_idx + 1, other_pid)); + } else { + // Dead process — clean up stale pid file + fs::remove_file(entry.path()).ok(); + } + } + } + } + Ok(()) + }; + + let output = llm::call_for_def_multi(def, &prompts, Some(&bail_fn), log)?; + + // Clean up PID file + fs::remove_file(&pid_path).ok(); Ok(AgentResult { output, diff --git a/src/subconscious/llm.rs b/src/subconscious/llm.rs index abbda16..3df13d7 100644 --- a/src/subconscious/llm.rs +++ b/src/subconscious/llm.rs @@ -22,16 +22,18 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result }; let prompts = vec![prompt.to_string()]; - super::api::call_api_with_tools_sync(caller, &prompts, None, &log) + super::api::call_api_with_tools_sync(caller, &prompts, None, None, &log) } /// Call a model using an agent definition's configuration (multi-step). +/// Optional bail_fn is called between steps — return Err to stop the pipeline. pub(crate) fn call_for_def_multi( def: &super::defs::AgentDef, prompts: &[String], + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &(dyn Fn(&str) + Sync), ) -> Result { - super::api::call_api_with_tools_sync(&def.agent, prompts, def.temperature, log) + super::api::call_api_with_tools_sync(&def.agent, prompts, def.temperature, bail_fn, log) } /// Parse a JSON response, handling markdown fences. diff --git a/src/subconscious/prompts.rs b/src/subconscious/prompts.rs index 635cc6e..a31ff50 100644 --- a/src/subconscious/prompts.rs +++ b/src/subconscious/prompts.rs @@ -12,9 +12,14 @@ use crate::neuro::{ /// Result of building an agent prompt — includes both the prompt text /// and the keys of nodes selected for processing, so the caller can /// record visits after successful completion. +/// A resolved step ready for execution. +pub struct ResolvedStep { + pub prompt: String, + pub phase: String, +} + pub struct AgentBatch { - /// Prompt steps — single-step agents have one entry, multi-step have several. - pub prompts: Vec, + pub steps: Vec, pub node_keys: Vec, } @@ -364,7 +369,7 @@ pub fn split_plan_prompt(store: &Store, key: &str) -> Result { let graph = store.build_graph(); // Override the query — we have a specific key to split let keys = vec![key.to_string()]; - let template = def.prompts.first().ok_or_else(|| "split.agent has no prompts".to_string())?; + let template = def.steps.first().map(|s| &s.prompt).ok_or_else(|| "split.agent has no steps".to_string())?; let (prompt, _) = super::defs::resolve_placeholders(template, store, &graph, &keys, 1); Ok(prompt) } @@ -386,11 +391,11 @@ pub fn split_extract_prompt(store: &Store, parent_key: &str, child_key: &str, ch pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> { if auto { let batch = agent_prompt(store, "replay", count)?; - for (i, p) in batch.prompts.iter().enumerate() { - if batch.prompts.len() > 1 { - println!("=== STEP {} ===\n", i + 1); + for (i, s) in batch.steps.iter().enumerate() { + if batch.steps.len() > 1 { + println!("=== STEP {} ({}) ===\n", i + 1, s.phase); } - println!("{}", p); + println!("{}", s.prompt); } return Ok(()); }