agents: multi-step agent support
Split agent prompts on === PROMPT === delimiter. Each step runs as
a new user message in the same LLM conversation, so context carries
forward naturally between steps. Single-step agents are unchanged.
- AgentDef.prompt -> AgentDef.prompts: Vec<String>
- AgentBatch.prompt -> AgentBatch.prompts: Vec<String>
- API layer injects next prompt after each text response
- {{conversation:N}} parameterized byte budget for conversation context
Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
baf208281d
commit
77d1d39f3f
6 changed files with 166 additions and 65 deletions
|
|
@ -26,11 +26,14 @@ fn get_client() -> Result<&'static ApiClient, String> {
|
|||
}))
|
||||
}
|
||||
|
||||
/// Run an agent prompt through the direct API with tool support.
|
||||
/// Returns the final text response after all tool calls are resolved.
|
||||
/// Run agent prompts through the direct API with tool support.
|
||||
/// For multi-step agents, each prompt is injected as a new user message
|
||||
/// after the previous step's tool loop completes. The conversation
|
||||
/// context carries forward naturally between steps.
|
||||
/// Returns the final text response after all steps complete.
|
||||
pub async fn call_api_with_tools(
|
||||
agent: &str,
|
||||
prompt: &str,
|
||||
prompts: &[String],
|
||||
temperature: Option<f32>,
|
||||
log: &dyn Fn(&str),
|
||||
) -> Result<String, String> {
|
||||
|
|
@ -39,15 +42,18 @@ pub async fn call_api_with_tools(
|
|||
// Set up a UI channel — we drain reasoning tokens into the log
|
||||
let (ui_tx, mut ui_rx) = crate::agent::ui_channel::channel();
|
||||
|
||||
// Build tool definitions — memory tools for graph operations
|
||||
// Build tool definitions — memory and journal tools for graph operations
|
||||
let all_defs = tools::definitions();
|
||||
let tool_defs: Vec<ToolDef> = all_defs.into_iter()
|
||||
.filter(|d| d.function.name.starts_with("memory_"))
|
||||
.filter(|d| d.function.name.starts_with("memory_")
|
||||
|| d.function.name.starts_with("journal_")
|
||||
|| d.function.name == "output")
|
||||
.collect();
|
||||
let tracker = ProcessTracker::new();
|
||||
|
||||
// Start with the prompt as a user message
|
||||
let mut messages = vec![Message::user(prompt)];
|
||||
// Start with the first prompt as a user message
|
||||
let mut messages = vec![Message::user(&prompts[0])];
|
||||
let mut next_prompt_idx = 1; // index of next prompt to inject
|
||||
let reasoning = crate::config::get().api_reasoning.clone();
|
||||
|
||||
let max_turns = 50;
|
||||
|
|
@ -65,8 +71,15 @@ pub async fn call_api_with_tools(
|
|||
let msg_bytes: usize = messages.iter()
|
||||
.map(|m| m.content_text().len())
|
||||
.sum();
|
||||
format!("API error on turn {} (~{}KB payload, {} messages): {}",
|
||||
turn, msg_bytes / 1024, messages.len(), e)
|
||||
let err_str = e.to_string();
|
||||
let hint = if err_str.contains("IncompleteMessage") || err_str.contains("connection closed") {
|
||||
format!(" — likely exceeded model context window (~{}KB ≈ {}K tokens)",
|
||||
msg_bytes / 1024, msg_bytes / 4096)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
format!("API error on turn {} (~{}KB payload, {} messages): {}{}",
|
||||
turn, msg_bytes / 1024, messages.len(), e, hint)
|
||||
})?;
|
||||
|
||||
if let Some(u) = &usage {
|
||||
|
|
@ -125,7 +138,9 @@ pub async fn call_api_with_tools(
|
|||
}
|
||||
};
|
||||
|
||||
let output = if call.function.name.starts_with("memory_") {
|
||||
let output = if call.function.name.starts_with("memory_")
|
||||
|| call.function.name.starts_with("journal_")
|
||||
|| call.function.name == "output" {
|
||||
let prov = format!("agent:{}", agent);
|
||||
match crate::agent::tools::memory::dispatch(
|
||||
&call.function.name, &args, Some(&prov),
|
||||
|
|
@ -151,7 +166,7 @@ pub async fn call_api_with_tools(
|
|||
continue;
|
||||
}
|
||||
|
||||
// Text-only response — we're done
|
||||
// Text-only response — step complete
|
||||
let text = msg.content_text().to_string();
|
||||
if text.is_empty() && !has_content {
|
||||
log("empty response, retrying");
|
||||
|
|
@ -162,6 +177,17 @@ pub async fn call_api_with_tools(
|
|||
}
|
||||
|
||||
log(&format!("\n=== RESPONSE ===\n\n{}", text));
|
||||
|
||||
// If there are more prompts, inject the next one and continue
|
||||
if next_prompt_idx < prompts.len() {
|
||||
messages.push(Message::assistant(&text));
|
||||
let next = &prompts[next_prompt_idx];
|
||||
next_prompt_idx += 1;
|
||||
log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len()));
|
||||
messages.push(Message::user(next));
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(text);
|
||||
}
|
||||
|
||||
|
|
@ -172,7 +198,7 @@ pub async fn call_api_with_tools(
|
|||
/// with its own tokio runtime. Safe to call from any context.
|
||||
pub fn call_api_with_tools_sync(
|
||||
agent: &str,
|
||||
prompt: &str,
|
||||
prompts: &[String],
|
||||
temperature: Option<f32>,
|
||||
log: &(dyn Fn(&str) + Sync),
|
||||
) -> Result<String, String> {
|
||||
|
|
@ -185,7 +211,7 @@ pub fn call_api_with_tools_sync(
|
|||
let prov = format!("agent:{}", agent);
|
||||
rt.block_on(
|
||||
crate::store::TASK_PROVENANCE.scope(prov,
|
||||
call_api_with_tools(agent, prompt, temperature, log))
|
||||
call_api_with_tools(agent, prompts, temperature, log))
|
||||
)
|
||||
}).join().unwrap()
|
||||
})
|
||||
|
|
|
|||
|
|
@ -29,7 +29,9 @@ use std::path::PathBuf;
|
|||
pub struct AgentDef {
|
||||
pub agent: String,
|
||||
pub query: String,
|
||||
pub prompt: String,
|
||||
/// Prompt steps — single-step agents have one entry, multi-step have several.
|
||||
/// Steps are separated by `=== PROMPT ===` in the .agent file.
|
||||
pub prompts: Vec<String>,
|
||||
pub model: String,
|
||||
pub schedule: String,
|
||||
pub tools: Vec<String>,
|
||||
|
|
@ -67,16 +69,29 @@ struct AgentHeader {
|
|||
|
||||
fn default_model() -> String { "sonnet".into() }
|
||||
|
||||
/// Parse an agent file: first line is JSON config, rest is the prompt.
|
||||
/// Parse an agent file: first line is JSON config, rest is the prompt(s).
|
||||
/// Multiple prompts are separated by `=== PROMPT ===` lines.
|
||||
fn parse_agent_file(content: &str) -> Option<AgentDef> {
|
||||
let (first_line, rest) = content.split_once('\n')?;
|
||||
let header: AgentHeader = serde_json::from_str(first_line.trim()).ok()?;
|
||||
// Skip optional blank line between header and prompt body
|
||||
let prompt = rest.strip_prefix('\n').unwrap_or(rest);
|
||||
let body = rest.strip_prefix('\n').unwrap_or(rest);
|
||||
|
||||
// Split on === PROMPT === delimiter for multi-step agents
|
||||
let prompts: Vec<String> = body
|
||||
.split("\n=== PROMPT ===\n")
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
if prompts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(AgentDef {
|
||||
agent: header.agent,
|
||||
query: header.query,
|
||||
prompt: prompt.to_string(),
|
||||
prompts,
|
||||
model: header.model,
|
||||
schedule: header.schedule,
|
||||
tools: header.tools,
|
||||
|
|
@ -253,7 +268,7 @@ fn resolve(
|
|||
result_keys.push(key.clone());
|
||||
}
|
||||
|
||||
text.push_str("Use `poc-memory render KEY` and `poc-memory query \"neighbors('KEY')\"` to explore further.\n");
|
||||
text.push_str("Use memory_render(KEY) and memory_links(KEY) to explore further.\n");
|
||||
|
||||
Some(Resolved { text, keys: result_keys })
|
||||
}
|
||||
|
|
@ -445,9 +460,25 @@ fn resolve(
|
|||
})
|
||||
}
|
||||
|
||||
// input:KEY — read a named output file from the agent's output dir
|
||||
_ if name.starts_with("input:") => {
|
||||
let key = &name[6..];
|
||||
let dir = std::env::var("POC_AGENT_OUTPUT_DIR")
|
||||
.map(std::path::PathBuf::from)
|
||||
.unwrap_or_else(|_| crate::store::memory_dir().join("agent-output").join("default"));
|
||||
let path = dir.join(key);
|
||||
match std::fs::read_to_string(&path) {
|
||||
Ok(text) => Some(Resolved { text, keys: vec![] }),
|
||||
Err(_) => Some(Resolved { text: String::new(), keys: vec![] }),
|
||||
}
|
||||
}
|
||||
|
||||
// conversation — tail of the current session transcript (post-compaction)
|
||||
"conversation" => {
|
||||
let text = resolve_conversation();
|
||||
// conversation:N — same, but with an explicit byte budget
|
||||
_ if name == "conversation" || name.starts_with("conversation:") => {
|
||||
let max_bytes = name.strip_prefix("conversation:")
|
||||
.and_then(|s| s.parse::<usize>().ok());
|
||||
let text = resolve_conversation(max_bytes);
|
||||
if text.is_empty() { None }
|
||||
else { Some(Resolved { text, keys: vec![] }) }
|
||||
}
|
||||
|
|
@ -470,6 +501,24 @@ fn resolve(
|
|||
Some(Resolved { text, keys: vec![] })
|
||||
}
|
||||
|
||||
// latest_journal — the most recent journal entry for the journal agent
|
||||
"latest_journal" => {
|
||||
let text = store.nodes.get("journal")
|
||||
.map(|n| {
|
||||
// Get the last entry (last ## section)
|
||||
let content = &n.content;
|
||||
content.rfind("\n## ")
|
||||
.map(|pos| content[pos..].to_string())
|
||||
.unwrap_or_else(|| {
|
||||
// Take the last 2000 chars if no ## found
|
||||
let start = content.len().saturating_sub(2000);
|
||||
content[start..].to_string()
|
||||
})
|
||||
})
|
||||
.unwrap_or_else(|| "(no previous journal entry)".to_string());
|
||||
Some(Resolved { text, keys: vec!["journal".to_string()] })
|
||||
}
|
||||
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
|
@ -477,7 +526,7 @@ fn resolve(
|
|||
/// Get the tail of the current session's conversation.
|
||||
/// Reads POC_SESSION_ID to find the transcript, extracts the last
|
||||
/// segment (post-compaction), returns the tail (~100K chars).
|
||||
fn resolve_conversation() -> String {
|
||||
fn resolve_conversation(budget: Option<usize>) -> String {
|
||||
let session_id = std::env::var("POC_SESSION_ID").unwrap_or_default();
|
||||
if session_id.is_empty() { return String::new(); }
|
||||
|
||||
|
|
@ -502,12 +551,12 @@ fn resolve_conversation() -> String {
|
|||
};
|
||||
|
||||
let cfg = crate::config::get();
|
||||
let max_bytes = budget.unwrap_or_else(|| cfg.surface_conversation_bytes.unwrap_or(100_000));
|
||||
let mut fragments: Vec<String> = Vec::new();
|
||||
let mut total_bytes = 0;
|
||||
const MAX_BYTES: usize = 200_000;
|
||||
|
||||
for (role, content, ts) in iter {
|
||||
if total_bytes >= MAX_BYTES { break; }
|
||||
if total_bytes >= max_bytes { break; }
|
||||
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
|
||||
let formatted = if !ts.is_empty() {
|
||||
format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], content)
|
||||
|
|
@ -695,19 +744,18 @@ pub fn run_agent(
|
|||
vec![]
|
||||
};
|
||||
|
||||
// Substitute {agent_name} before resolving {{...}} placeholders,
|
||||
// so agents can reference their own notes: {{node:subconscious-notes-{agent_name}}}
|
||||
let template = def.prompt.replace("{agent_name}", &def.agent);
|
||||
let (prompt, extra_keys) = resolve_placeholders(&template, store, &graph, &keys, count);
|
||||
|
||||
// Identity and instructions are now pulled in via {{node:KEY}} placeholders.
|
||||
// Agents should include {{node:core-personality}} and {{node:memory-instructions-core}}
|
||||
// in their prompt templates. The resolve_placeholders call below handles this.
|
||||
|
||||
// Merge query keys with any keys produced by placeholder resolution
|
||||
// Resolve placeholders for all prompts. The conversation context
|
||||
// carries forward between steps naturally via the LLM's message history.
|
||||
let mut all_keys = keys;
|
||||
let mut prompts = Vec::new();
|
||||
for prompt_template in &def.prompts {
|
||||
let template = prompt_template.replace("{agent_name}", &def.agent);
|
||||
let (prompt, extra_keys) = resolve_placeholders(&template, store, &graph, &all_keys, count);
|
||||
all_keys.extend(extra_keys);
|
||||
Ok(super::prompts::AgentBatch { prompt, node_keys: all_keys })
|
||||
prompts.push(prompt);
|
||||
}
|
||||
|
||||
Ok(super::prompts::AgentBatch { prompts, node_keys: all_keys })
|
||||
}
|
||||
|
||||
/// Convert a list of keys to ReplayItems with priority and graph metrics.
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ fn generate_digest(
|
|||
// Load prompt from agent file; fall back to prompts dir
|
||||
let def = super::defs::get_def("digest");
|
||||
let template = match &def {
|
||||
Some(d) => d.prompt.clone(),
|
||||
Some(d) => d.prompts.first().cloned().unwrap_or_default(),
|
||||
None => {
|
||||
let path = crate::config::get().prompts_dir.join("digest.md");
|
||||
std::fs::read_to_string(&path)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ use std::path::PathBuf;
|
|||
pub struct AgentResult {
|
||||
pub output: String,
|
||||
pub node_keys: Vec<String>,
|
||||
/// Directory containing output() files from the agent run.
|
||||
pub output_dir: std::path::PathBuf,
|
||||
}
|
||||
|
||||
/// Run a single agent and return the result (no action application — tools handle that).
|
||||
|
|
@ -78,12 +80,16 @@ pub fn run_one_agent_with_keys(
|
|||
|
||||
log(&format!("targeting: {}", keys.join(", ")));
|
||||
let graph = store.build_graph();
|
||||
let (prompt, extra_keys) = super::defs::resolve_placeholders(
|
||||
&def.prompt, store, &graph, keys, count,
|
||||
);
|
||||
let mut resolved_prompts = Vec::new();
|
||||
let mut all_keys: Vec<String> = keys.to_vec();
|
||||
for prompt_template in &def.prompts {
|
||||
let (prompt, extra_keys) = super::defs::resolve_placeholders(
|
||||
prompt_template, store, &graph, keys, count,
|
||||
);
|
||||
all_keys.extend(extra_keys);
|
||||
let agent_batch = super::prompts::AgentBatch { prompt, node_keys: all_keys };
|
||||
resolved_prompts.push(prompt);
|
||||
}
|
||||
let agent_batch = super::prompts::AgentBatch { prompts: resolved_prompts, node_keys: all_keys };
|
||||
|
||||
// Record visits eagerly so concurrent agents pick different seeds
|
||||
if !agent_batch.node_keys.is_empty() {
|
||||
|
|
@ -130,43 +136,56 @@ fn run_one_agent_inner(
|
|||
_llm_tag: &str,
|
||||
log: &(dyn Fn(&str) + Sync),
|
||||
) -> Result<AgentResult, String> {
|
||||
let prompt_kb = agent_batch.prompt.len() / 1024;
|
||||
let tools_desc = if def.tools.is_empty() { "no tools".into() }
|
||||
else { format!("{} tools", def.tools.len()) };
|
||||
log(&format!("prompt {}KB, model={}, {}, {} nodes",
|
||||
prompt_kb, def.model, tools_desc, agent_batch.node_keys.len()));
|
||||
let n_steps = agent_batch.prompts.len();
|
||||
|
||||
// Guard: reject prompts that would exceed model context.
|
||||
// Rough estimate: 1 token ≈ 4 bytes. Reserve 16K tokens for output.
|
||||
let max_prompt_bytes = 800_000; // ~200K tokens, leaves room for output
|
||||
if agent_batch.prompt.len() > max_prompt_bytes {
|
||||
// Log the oversized prompt for debugging
|
||||
for key in &agent_batch.node_keys {
|
||||
log(&format!(" node: {}", key));
|
||||
}
|
||||
|
||||
// Guard: reject oversized first prompt (later steps grow via conversation)
|
||||
let max_prompt_bytes = 800_000;
|
||||
let first_len = agent_batch.prompts[0].len();
|
||||
if first_len > max_prompt_bytes {
|
||||
let prompt_kb = first_len / 1024;
|
||||
let oversize_dir = store::memory_dir().join("llm-logs").join("oversized");
|
||||
fs::create_dir_all(&oversize_dir).ok();
|
||||
let oversize_path = oversize_dir.join(format!("{}-{}.txt",
|
||||
agent_name, store::compact_timestamp()));
|
||||
let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n",
|
||||
agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys);
|
||||
fs::write(&oversize_path, format!("{}{}", header, agent_batch.prompt)).ok();
|
||||
fs::write(&oversize_path, format!("{}{}", header, &agent_batch.prompts[0])).ok();
|
||||
log(&format!("oversized prompt logged to {}", oversize_path.display()));
|
||||
|
||||
return Err(format!(
|
||||
"prompt too large: {}KB (max {}KB) — seed nodes may be oversized",
|
||||
prompt_kb, max_prompt_bytes / 1024,
|
||||
));
|
||||
}
|
||||
for key in &agent_batch.node_keys {
|
||||
log(&format!(" node: {}", key));
|
||||
|
||||
// Output directory — use --state-dir if set, otherwise flat per-agent
|
||||
let output_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
|
||||
.map(std::path::PathBuf::from)
|
||||
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
|
||||
fs::create_dir_all(&output_dir).ok();
|
||||
// Safe: agent runs single-threaded, env var read only by our dispatch code
|
||||
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
|
||||
|
||||
log(&format!("{} step(s), {}KB initial, model={}, {}, {} nodes, output={}",
|
||||
n_steps, first_len / 1024, def.model, tools_desc,
|
||||
agent_batch.node_keys.len(), output_dir.display()));
|
||||
|
||||
for (i, p) in agent_batch.prompts.iter().enumerate() {
|
||||
log(&format!("=== PROMPT {}/{} ===\n\n{}", i + 1, n_steps, p));
|
||||
}
|
||||
log("\n=== CALLING LLM ===");
|
||||
|
||||
log(&format!("=== PROMPT ===\n\n{}\n\n=== CALLING LLM ===", agent_batch.prompt));
|
||||
|
||||
let output = llm::call_for_def(def, &agent_batch.prompt, log)?;
|
||||
|
||||
let output = llm::call_for_def_multi(def, &agent_batch.prompts, log)?;
|
||||
|
||||
Ok(AgentResult {
|
||||
output,
|
||||
node_keys: agent_batch.node_keys,
|
||||
output_dir,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,16 +21,17 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result<String, String>
|
|||
}
|
||||
};
|
||||
|
||||
super::api::call_api_with_tools_sync(caller, prompt, None, &log)
|
||||
let prompts = vec![prompt.to_string()];
|
||||
super::api::call_api_with_tools_sync(caller, &prompts, None, &log)
|
||||
}
|
||||
|
||||
/// Call a model using an agent definition's configuration.
|
||||
pub(crate) fn call_for_def(
|
||||
/// Call a model using an agent definition's configuration (multi-step).
|
||||
pub(crate) fn call_for_def_multi(
|
||||
def: &super::defs::AgentDef,
|
||||
prompt: &str,
|
||||
prompts: &[String],
|
||||
log: &(dyn Fn(&str) + Sync),
|
||||
) -> Result<String, String> {
|
||||
super::api::call_api_with_tools_sync(&def.agent, prompt, def.temperature, log)
|
||||
super::api::call_api_with_tools_sync(&def.agent, prompts, def.temperature, log)
|
||||
}
|
||||
|
||||
/// Parse a JSON response, handling markdown fences.
|
||||
|
|
|
|||
|
|
@ -13,7 +13,8 @@ use crate::neuro::{
|
|||
/// and the keys of nodes selected for processing, so the caller can
|
||||
/// record visits after successful completion.
|
||||
pub struct AgentBatch {
|
||||
pub prompt: String,
|
||||
/// Prompt steps — single-step agents have one entry, multi-step have several.
|
||||
pub prompts: Vec<String>,
|
||||
pub node_keys: Vec<String>,
|
||||
}
|
||||
|
||||
|
|
@ -363,7 +364,8 @@ pub fn split_plan_prompt(store: &Store, key: &str) -> Result<String, String> {
|
|||
let graph = store.build_graph();
|
||||
// Override the query — we have a specific key to split
|
||||
let keys = vec![key.to_string()];
|
||||
let (prompt, _) = super::defs::resolve_placeholders(&def.prompt, store, &graph, &keys, 1);
|
||||
let template = def.prompts.first().ok_or_else(|| "split.agent has no prompts".to_string())?;
|
||||
let (prompt, _) = super::defs::resolve_placeholders(template, store, &graph, &keys, 1);
|
||||
Ok(prompt)
|
||||
}
|
||||
|
||||
|
|
@ -384,7 +386,12 @@ pub fn split_extract_prompt(store: &Store, parent_key: &str, child_key: &str, ch
|
|||
pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> {
|
||||
if auto {
|
||||
let batch = agent_prompt(store, "replay", count)?;
|
||||
println!("{}", batch.prompt);
|
||||
for (i, p) in batch.prompts.iter().enumerate() {
|
||||
if batch.prompts.len() > 1 {
|
||||
println!("=== STEP {} ===\n", i + 1);
|
||||
}
|
||||
println!("{}", p);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue