diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index 54c3591..21fd340 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -26,11 +26,14 @@ fn get_client() -> Result<&'static ApiClient, String> { })) } -/// Run an agent prompt through the direct API with tool support. -/// Returns the final text response after all tool calls are resolved. +/// Run agent prompts through the direct API with tool support. +/// For multi-step agents, each prompt is injected as a new user message +/// after the previous step's tool loop completes. The conversation +/// context carries forward naturally between steps. +/// Returns the final text response after all steps complete. pub async fn call_api_with_tools( agent: &str, - prompt: &str, + prompts: &[String], temperature: Option, log: &dyn Fn(&str), ) -> Result { @@ -39,15 +42,18 @@ pub async fn call_api_with_tools( // Set up a UI channel — we drain reasoning tokens into the log let (ui_tx, mut ui_rx) = crate::agent::ui_channel::channel(); - // Build tool definitions — memory tools for graph operations + // Build tool definitions — memory and journal tools for graph operations let all_defs = tools::definitions(); let tool_defs: Vec = all_defs.into_iter() - .filter(|d| d.function.name.starts_with("memory_")) + .filter(|d| d.function.name.starts_with("memory_") + || d.function.name.starts_with("journal_") + || d.function.name == "output") .collect(); let tracker = ProcessTracker::new(); - // Start with the prompt as a user message - let mut messages = vec![Message::user(prompt)]; + // Start with the first prompt as a user message + let mut messages = vec![Message::user(&prompts[0])]; + let mut next_prompt_idx = 1; // index of next prompt to inject let reasoning = crate::config::get().api_reasoning.clone(); let max_turns = 50; @@ -65,8 +71,15 @@ pub async fn call_api_with_tools( let msg_bytes: usize = messages.iter() .map(|m| m.content_text().len()) .sum(); - format!("API error on turn {} (~{}KB payload, {} messages): {}", - turn, msg_bytes / 1024, messages.len(), e) + let err_str = e.to_string(); + let hint = if err_str.contains("IncompleteMessage") || err_str.contains("connection closed") { + format!(" — likely exceeded model context window (~{}KB ≈ {}K tokens)", + msg_bytes / 1024, msg_bytes / 4096) + } else { + String::new() + }; + format!("API error on turn {} (~{}KB payload, {} messages): {}{}", + turn, msg_bytes / 1024, messages.len(), e, hint) })?; if let Some(u) = &usage { @@ -125,7 +138,9 @@ pub async fn call_api_with_tools( } }; - let output = if call.function.name.starts_with("memory_") { + let output = if call.function.name.starts_with("memory_") + || call.function.name.starts_with("journal_") + || call.function.name == "output" { let prov = format!("agent:{}", agent); match crate::agent::tools::memory::dispatch( &call.function.name, &args, Some(&prov), @@ -151,7 +166,7 @@ pub async fn call_api_with_tools( continue; } - // Text-only response — we're done + // Text-only response — step complete let text = msg.content_text().to_string(); if text.is_empty() && !has_content { log("empty response, retrying"); @@ -162,6 +177,17 @@ pub async fn call_api_with_tools( } log(&format!("\n=== RESPONSE ===\n\n{}", text)); + + // If there are more prompts, inject the next one and continue + if next_prompt_idx < prompts.len() { + messages.push(Message::assistant(&text)); + let next = &prompts[next_prompt_idx]; + next_prompt_idx += 1; + log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len())); + messages.push(Message::user(next)); + continue; + } + return Ok(text); } @@ -172,7 +198,7 @@ pub async fn call_api_with_tools( /// with its own tokio runtime. Safe to call from any context. pub fn call_api_with_tools_sync( agent: &str, - prompt: &str, + prompts: &[String], temperature: Option, log: &(dyn Fn(&str) + Sync), ) -> Result { @@ -185,7 +211,7 @@ pub fn call_api_with_tools_sync( let prov = format!("agent:{}", agent); rt.block_on( crate::store::TASK_PROVENANCE.scope(prov, - call_api_with_tools(agent, prompt, temperature, log)) + call_api_with_tools(agent, prompts, temperature, log)) ) }).join().unwrap() }) diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index 3f5b0b7..198039a 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -29,7 +29,9 @@ use std::path::PathBuf; pub struct AgentDef { pub agent: String, pub query: String, - pub prompt: String, + /// Prompt steps — single-step agents have one entry, multi-step have several. + /// Steps are separated by `=== PROMPT ===` in the .agent file. + pub prompts: Vec, pub model: String, pub schedule: String, pub tools: Vec, @@ -67,16 +69,29 @@ struct AgentHeader { fn default_model() -> String { "sonnet".into() } -/// Parse an agent file: first line is JSON config, rest is the prompt. +/// Parse an agent file: first line is JSON config, rest is the prompt(s). +/// Multiple prompts are separated by `=== PROMPT ===` lines. fn parse_agent_file(content: &str) -> Option { let (first_line, rest) = content.split_once('\n')?; let header: AgentHeader = serde_json::from_str(first_line.trim()).ok()?; // Skip optional blank line between header and prompt body - let prompt = rest.strip_prefix('\n').unwrap_or(rest); + let body = rest.strip_prefix('\n').unwrap_or(rest); + + // Split on === PROMPT === delimiter for multi-step agents + let prompts: Vec = body + .split("\n=== PROMPT ===\n") + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + if prompts.is_empty() { + return None; + } + Some(AgentDef { agent: header.agent, query: header.query, - prompt: prompt.to_string(), + prompts, model: header.model, schedule: header.schedule, tools: header.tools, @@ -253,7 +268,7 @@ fn resolve( result_keys.push(key.clone()); } - text.push_str("Use `poc-memory render KEY` and `poc-memory query \"neighbors('KEY')\"` to explore further.\n"); + text.push_str("Use memory_render(KEY) and memory_links(KEY) to explore further.\n"); Some(Resolved { text, keys: result_keys }) } @@ -445,9 +460,25 @@ fn resolve( }) } + // input:KEY — read a named output file from the agent's output dir + _ if name.starts_with("input:") => { + let key = &name[6..]; + let dir = std::env::var("POC_AGENT_OUTPUT_DIR") + .map(std::path::PathBuf::from) + .unwrap_or_else(|_| crate::store::memory_dir().join("agent-output").join("default")); + let path = dir.join(key); + match std::fs::read_to_string(&path) { + Ok(text) => Some(Resolved { text, keys: vec![] }), + Err(_) => Some(Resolved { text: String::new(), keys: vec![] }), + } + } + // conversation — tail of the current session transcript (post-compaction) - "conversation" => { - let text = resolve_conversation(); + // conversation:N — same, but with an explicit byte budget + _ if name == "conversation" || name.starts_with("conversation:") => { + let max_bytes = name.strip_prefix("conversation:") + .and_then(|s| s.parse::().ok()); + let text = resolve_conversation(max_bytes); if text.is_empty() { None } else { Some(Resolved { text, keys: vec![] }) } } @@ -470,6 +501,24 @@ fn resolve( Some(Resolved { text, keys: vec![] }) } + // latest_journal — the most recent journal entry for the journal agent + "latest_journal" => { + let text = store.nodes.get("journal") + .map(|n| { + // Get the last entry (last ## section) + let content = &n.content; + content.rfind("\n## ") + .map(|pos| content[pos..].to_string()) + .unwrap_or_else(|| { + // Take the last 2000 chars if no ## found + let start = content.len().saturating_sub(2000); + content[start..].to_string() + }) + }) + .unwrap_or_else(|| "(no previous journal entry)".to_string()); + Some(Resolved { text, keys: vec!["journal".to_string()] }) + } + _ => None, } } @@ -477,7 +526,7 @@ fn resolve( /// Get the tail of the current session's conversation. /// Reads POC_SESSION_ID to find the transcript, extracts the last /// segment (post-compaction), returns the tail (~100K chars). -fn resolve_conversation() -> String { +fn resolve_conversation(budget: Option) -> String { let session_id = std::env::var("POC_SESSION_ID").unwrap_or_default(); if session_id.is_empty() { return String::new(); } @@ -502,12 +551,12 @@ fn resolve_conversation() -> String { }; let cfg = crate::config::get(); + let max_bytes = budget.unwrap_or_else(|| cfg.surface_conversation_bytes.unwrap_or(100_000)); let mut fragments: Vec = Vec::new(); let mut total_bytes = 0; - const MAX_BYTES: usize = 200_000; for (role, content, ts) in iter { - if total_bytes >= MAX_BYTES { break; } + if total_bytes >= max_bytes { break; } let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name }; let formatted = if !ts.is_empty() { format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], content) @@ -695,19 +744,18 @@ pub fn run_agent( vec![] }; - // Substitute {agent_name} before resolving {{...}} placeholders, - // so agents can reference their own notes: {{node:subconscious-notes-{agent_name}}} - let template = def.prompt.replace("{agent_name}", &def.agent); - let (prompt, extra_keys) = resolve_placeholders(&template, store, &graph, &keys, count); - - // Identity and instructions are now pulled in via {{node:KEY}} placeholders. - // Agents should include {{node:core-personality}} and {{node:memory-instructions-core}} - // in their prompt templates. The resolve_placeholders call below handles this. - - // Merge query keys with any keys produced by placeholder resolution + // Resolve placeholders for all prompts. The conversation context + // carries forward between steps naturally via the LLM's message history. let mut all_keys = keys; - all_keys.extend(extra_keys); - Ok(super::prompts::AgentBatch { prompt, node_keys: all_keys }) + let mut prompts = Vec::new(); + for prompt_template in &def.prompts { + let template = prompt_template.replace("{agent_name}", &def.agent); + let (prompt, extra_keys) = resolve_placeholders(&template, store, &graph, &all_keys, count); + all_keys.extend(extra_keys); + prompts.push(prompt); + } + + Ok(super::prompts::AgentBatch { prompts, node_keys: all_keys }) } /// Convert a list of keys to ReplayItems with priority and graph metrics. diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index f749687..d088de7 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -226,7 +226,7 @@ fn generate_digest( // Load prompt from agent file; fall back to prompts dir let def = super::defs::get_def("digest"); let template = match &def { - Some(d) => d.prompt.clone(), + Some(d) => d.prompts.first().cloned().unwrap_or_default(), None => { let path = crate::config::get().prompts_dir.join("digest.md"); std::fs::read_to_string(&path) diff --git a/src/subconscious/knowledge.rs b/src/subconscious/knowledge.rs index dd6a89d..ac790f3 100644 --- a/src/subconscious/knowledge.rs +++ b/src/subconscious/knowledge.rs @@ -22,6 +22,8 @@ use std::path::PathBuf; pub struct AgentResult { pub output: String, pub node_keys: Vec, + /// Directory containing output() files from the agent run. + pub output_dir: std::path::PathBuf, } /// Run a single agent and return the result (no action application — tools handle that). @@ -78,12 +80,16 @@ pub fn run_one_agent_with_keys( log(&format!("targeting: {}", keys.join(", "))); let graph = store.build_graph(); - let (prompt, extra_keys) = super::defs::resolve_placeholders( - &def.prompt, store, &graph, keys, count, - ); + let mut resolved_prompts = Vec::new(); let mut all_keys: Vec = keys.to_vec(); - all_keys.extend(extra_keys); - let agent_batch = super::prompts::AgentBatch { prompt, node_keys: all_keys }; + for prompt_template in &def.prompts { + let (prompt, extra_keys) = super::defs::resolve_placeholders( + prompt_template, store, &graph, keys, count, + ); + all_keys.extend(extra_keys); + resolved_prompts.push(prompt); + } + let agent_batch = super::prompts::AgentBatch { prompts: resolved_prompts, node_keys: all_keys }; // Record visits eagerly so concurrent agents pick different seeds if !agent_batch.node_keys.is_empty() { @@ -130,43 +136,56 @@ fn run_one_agent_inner( _llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result { - let prompt_kb = agent_batch.prompt.len() / 1024; let tools_desc = if def.tools.is_empty() { "no tools".into() } else { format!("{} tools", def.tools.len()) }; - log(&format!("prompt {}KB, model={}, {}, {} nodes", - prompt_kb, def.model, tools_desc, agent_batch.node_keys.len())); + let n_steps = agent_batch.prompts.len(); - // Guard: reject prompts that would exceed model context. - // Rough estimate: 1 token ≈ 4 bytes. Reserve 16K tokens for output. - let max_prompt_bytes = 800_000; // ~200K tokens, leaves room for output - if agent_batch.prompt.len() > max_prompt_bytes { - // Log the oversized prompt for debugging + for key in &agent_batch.node_keys { + log(&format!(" node: {}", key)); + } + + // Guard: reject oversized first prompt (later steps grow via conversation) + let max_prompt_bytes = 800_000; + let first_len = agent_batch.prompts[0].len(); + if first_len > max_prompt_bytes { + let prompt_kb = first_len / 1024; let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); fs::create_dir_all(&oversize_dir).ok(); let oversize_path = oversize_dir.join(format!("{}-{}.txt", agent_name, store::compact_timestamp())); let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); - fs::write(&oversize_path, format!("{}{}", header, agent_batch.prompt)).ok(); + fs::write(&oversize_path, format!("{}{}", header, &agent_batch.prompts[0])).ok(); log(&format!("oversized prompt logged to {}", oversize_path.display())); - return Err(format!( "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", prompt_kb, max_prompt_bytes / 1024, )); } - for key in &agent_batch.node_keys { - log(&format!(" node: {}", key)); + + // Output directory — use --state-dir if set, otherwise flat per-agent + let output_dir = std::env::var("POC_AGENT_OUTPUT_DIR") + .map(std::path::PathBuf::from) + .unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name)); + fs::create_dir_all(&output_dir).ok(); + // Safe: agent runs single-threaded, env var read only by our dispatch code + unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); } + + log(&format!("{} step(s), {}KB initial, model={}, {}, {} nodes, output={}", + n_steps, first_len / 1024, def.model, tools_desc, + agent_batch.node_keys.len(), output_dir.display())); + + for (i, p) in agent_batch.prompts.iter().enumerate() { + log(&format!("=== PROMPT {}/{} ===\n\n{}", i + 1, n_steps, p)); } + log("\n=== CALLING LLM ==="); - log(&format!("=== PROMPT ===\n\n{}\n\n=== CALLING LLM ===", agent_batch.prompt)); - - let output = llm::call_for_def(def, &agent_batch.prompt, log)?; - + let output = llm::call_for_def_multi(def, &agent_batch.prompts, log)?; Ok(AgentResult { output, node_keys: agent_batch.node_keys, + output_dir, }) } diff --git a/src/subconscious/llm.rs b/src/subconscious/llm.rs index deee2ab..abbda16 100644 --- a/src/subconscious/llm.rs +++ b/src/subconscious/llm.rs @@ -21,16 +21,17 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result } }; - super::api::call_api_with_tools_sync(caller, prompt, None, &log) + let prompts = vec![prompt.to_string()]; + super::api::call_api_with_tools_sync(caller, &prompts, None, &log) } -/// Call a model using an agent definition's configuration. -pub(crate) fn call_for_def( +/// Call a model using an agent definition's configuration (multi-step). +pub(crate) fn call_for_def_multi( def: &super::defs::AgentDef, - prompt: &str, + prompts: &[String], log: &(dyn Fn(&str) + Sync), ) -> Result { - super::api::call_api_with_tools_sync(&def.agent, prompt, def.temperature, log) + super::api::call_api_with_tools_sync(&def.agent, prompts, def.temperature, log) } /// Parse a JSON response, handling markdown fences. diff --git a/src/subconscious/prompts.rs b/src/subconscious/prompts.rs index 201e5f8..635cc6e 100644 --- a/src/subconscious/prompts.rs +++ b/src/subconscious/prompts.rs @@ -13,7 +13,8 @@ use crate::neuro::{ /// and the keys of nodes selected for processing, so the caller can /// record visits after successful completion. pub struct AgentBatch { - pub prompt: String, + /// Prompt steps — single-step agents have one entry, multi-step have several. + pub prompts: Vec, pub node_keys: Vec, } @@ -363,7 +364,8 @@ pub fn split_plan_prompt(store: &Store, key: &str) -> Result { let graph = store.build_graph(); // Override the query — we have a specific key to split let keys = vec![key.to_string()]; - let (prompt, _) = super::defs::resolve_placeholders(&def.prompt, store, &graph, &keys, 1); + let template = def.prompts.first().ok_or_else(|| "split.agent has no prompts".to_string())?; + let (prompt, _) = super::defs::resolve_placeholders(template, store, &graph, &keys, 1); Ok(prompt) } @@ -384,7 +386,12 @@ pub fn split_extract_prompt(store: &Store, parent_key: &str, child_key: &str, ch pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> { if auto { let batch = agent_prompt(store, "replay", count)?; - println!("{}", batch.prompt); + for (i, p) in batch.prompts.iter().enumerate() { + if batch.prompts.len() > 1 { + println!("=== STEP {} ===\n", i + 1); + } + println!("{}", p); + } return Ok(()); }