diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 39e52b5..a0f605e 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -185,7 +185,6 @@ pub struct Agent { pub changed: Arc, } - impl Agent { pub fn new( client: ApiClient, @@ -241,9 +240,46 @@ impl Agent { agent } + /// Create a lightweight agent forked from this one's context. + /// + /// The forked agent shares the same conversation prefix (system prompt, + /// personality, journal, entries) for KV cache sharing. The caller + /// appends the subconscious prompt as a user message and runs the turn. + pub fn fork(&self, tools: Vec) -> Self { + let tokenizer = tiktoken_rs::cl100k_base() + .expect("failed to load cl100k_base tokenizer"); + + Self { + client: self.client.clone(), + tools, + last_prompt_tokens: 0, + reasoning_effort: "none".to_string(), + temperature: self.temperature, + top_p: self.top_p, + top_k: self.top_k, + activities: Vec::new(), + next_activity_id: 0, + pending_yield: false, + pending_model_switch: None, + pending_dmn_pause: false, + conversation_log: None, + tokenizer, + context: self.context.clone(), + shared_context: context::shared_context_state(), + app_config: self.app_config.clone(), + prompt_file: self.prompt_file.clone(), + session_id: self.session_id.clone(), + generation: 0, + memory_scoring_in_flight: false, + memory_scores: Vec::new(), + active_tools: tools::shared_active_tools(), + changed: Arc::new(tokio::sync::Notify::new()), + } + } + /// Assemble the full message list for the API call from typed sources. /// System prompt + personality context + journal + conversation messages. - fn assemble_api_messages(&self) -> Vec { + pub fn assemble_api_messages(&self) -> Vec { let mut msgs = Vec::new(); msgs.push(Message::system(&self.context.system_prompt)); let ctx = self.context.render_context_message(); diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index b09e5ec..6250dd4 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -1,12 +1,11 @@ -// oneshot.rs — One-shot agent execution +// oneshot.rs — Autonomous agent execution // -// Runs an agent definition (from agents/*.agent files) through the API: -// build prompt → call LLM with tools → return result. Agents apply -// changes via tool calls during the LLM call — no action parsing needed. +// AutoAgent: wraps an Agent with a multi-step prompt sequence and an +// async run() method. Used for both oneshot CLI agents (from .agent +// files) and subconscious agents forked from the conscious agent. // -// This is distinct from the interactive agent loop in agent/mod.rs: -// oneshot agents run a fixed prompt sequence and exit, while the -// interactive agent has a turn loop with streaming and TUI. +// Also contains the legacy run_one_agent() pipeline and process +// management for spawned agent subprocesses. use crate::store::{self, Store}; use crate::subconscious::{defs, prompts}; @@ -18,6 +17,7 @@ use std::sync::OnceLock; use super::api::ApiClient; use super::api::types::*; use super::tools::{self as agent_tools}; +use super::Agent; // --------------------------------------------------------------------------- // API client — shared across oneshot agent runs @@ -35,6 +35,259 @@ fn get_client() -> Result<&'static ApiClient, String> { })) } +// --------------------------------------------------------------------------- +// AutoAgent — multi-step autonomous agent +// --------------------------------------------------------------------------- + +pub struct AutoStep { + pub prompt: String, + pub phase: String, +} + +/// An autonomous agent that runs a sequence of prompts with tool dispatch. +/// +/// For oneshot agents: created from the global API client with prompts +/// from .agent files. Messages start empty. +/// +/// For subconscious agents: created from a forked Agent whose context +/// provides the conversation prefix for KV cache sharing. Messages +/// start from the agent's assembled context. +pub struct AutoAgent { + pub name: String, + client: ApiClient, + tools: Vec, + messages: Vec, + steps: Vec, + next_step: usize, + sampling: super::api::SamplingParams, + priority: i32, + // Observable status + pub current_phase: String, + pub turn: usize, +} + +impl AutoAgent { + /// Create from the global API client with no initial context. + /// Used by oneshot CLI agents. + pub fn new( + name: String, + tools: Vec, + steps: Vec, + temperature: f32, + priority: i32, + ) -> Result { + let client = get_client()?.clone(); + let phase = steps.first().map(|s| s.phase.clone()).unwrap_or_default(); + Ok(Self { + name, + client, + tools, + messages: Vec::new(), + steps, + next_step: 0, + sampling: super::api::SamplingParams { + temperature, + top_p: 0.95, + top_k: 20, + }, + priority, + current_phase: phase, + turn: 0, + }) + } + + /// Fork from an existing agent for subconscious use. Clones the + /// agent's context (system + personality + journal + entries) so + /// the API sees the same token prefix → KV cache sharing. + pub fn from_agent( + name: String, + agent: &Agent, + tools: Vec, + steps: Vec, + priority: i32, + ) -> Self { + let forked = agent.fork(tools); + let phase = steps.first().map(|s| s.phase.clone()).unwrap_or_default(); + Self { + name, + client: forked.client_clone(), + tools: forked.tools.clone(), + messages: forked.assemble_api_messages(), + steps, + next_step: 0, + sampling: super::api::SamplingParams { + temperature: forked.temperature, + top_p: forked.top_p, + top_k: forked.top_k, + }, + priority, + current_phase: phase, + turn: 0, + } + } + + /// Run all steps to completion. Returns the final text response. + pub async fn run( + &mut self, + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, + log: &dyn Fn(&str), + ) -> Result { + // Inject first step prompt + if self.next_step < self.steps.len() { + self.messages.push(Message::user(&self.steps[self.next_step].prompt)); + self.next_step += 1; + } + + let reasoning = crate::config::get().api_reasoning.clone(); + let max_turns = 50 * self.steps.len().max(1); + + for _ in 0..max_turns { + self.turn += 1; + log(&format!("\n=== TURN {} ({} messages) ===\n", + self.turn, self.messages.len())); + + // API call with retries + let (msg, usage_opt) = self.api_call_with_retry(&reasoning, log).await?; + + if let Some(u) = &usage_opt { + log(&format!("tokens: {} prompt + {} completion", + u.prompt_tokens, u.completion_tokens)); + } + + let has_content = msg.content.is_some(); + let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); + + if has_tools { + self.dispatch_tools(&msg, log).await; + continue; + } + + // Text-only response — step complete + let text = msg.content_text().to_string(); + if text.is_empty() && !has_content { + log("empty response, retrying"); + self.messages.push(Message::user( + "[system] Your previous response was empty. \ + Please respond with text or use a tool." + )); + continue; + } + + log(&format!("\n=== RESPONSE ===\n\n{}", text)); + + // More steps? Check bail, inject next prompt. + if self.next_step < self.steps.len() { + if let Some(ref check) = bail_fn { + check(self.next_step)?; + } + self.current_phase = self.steps[self.next_step].phase.clone(); + self.messages.push(Message::assistant(&text)); + self.messages.push(Message::user(&self.steps[self.next_step].prompt)); + self.next_step += 1; + log(&format!("\n=== STEP {}/{} ===\n", + self.next_step, self.steps.len())); + continue; + } + + return Ok(text); + } + + Err(format!("{}: exceeded {} tool turns", self.name, max_turns)) + } + + async fn api_call_with_retry( + &self, + reasoning: &str, + log: &dyn Fn(&str), + ) -> Result<(Message, Option), String> { + let mut last_err = None; + for attempt in 0..5 { + match self.client.chat_completion_stream_temp( + &self.messages, + &self.tools, + reasoning, + self.sampling, + Some(self.priority), + ).await { + Ok((msg, usage)) => { + if let Some(ref e) = last_err { + log(&format!("succeeded after retry (previous error: {})", e)); + } + return Ok((msg, usage)); + } + Err(e) => { + let err_str = e.to_string(); + let is_transient = err_str.contains("IncompleteMessage") + || err_str.contains("connection closed") + || err_str.contains("connection reset") + || err_str.contains("timed out") + || err_str.contains("Connection refused"); + if is_transient && attempt < 4 { + log(&format!("transient error (attempt {}): {}, retrying...", + attempt + 1, err_str)); + tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; + last_err = Some(e); + continue; + } + let msg_bytes: usize = self.messages.iter() + .map(|m| m.content_text().len()) + .sum(); + return Err(format!( + "{}: API error on turn {} (~{}KB, {} messages, {} attempts): {}", + self.name, self.turn, msg_bytes / 1024, + self.messages.len(), attempt + 1, e)); + } + } + } + unreachable!() + } + + async fn dispatch_tools(&mut self, msg: &Message, log: &dyn Fn(&str)) { + // Push sanitized assistant message with tool calls + let mut sanitized = msg.clone(); + if let Some(ref mut calls) = sanitized.tool_calls { + for call in calls { + if serde_json::from_str::(&call.function.arguments).is_err() { + log(&format!("sanitizing malformed args for {}: {}", + call.function.name, &call.function.arguments)); + call.function.arguments = "{}".to_string(); + } + } + } + self.messages.push(sanitized); + + for call in msg.tool_calls.as_ref().unwrap() { + log(&format!("\nTOOL CALL: {}({})", + call.function.name, &call.function.arguments)); + + let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { + Ok(v) => v, + Err(_) => { + log(&format!("malformed tool call args: {}", &call.function.arguments)); + self.messages.push(Message::tool_result( + &call.id, + "Error: your tool call had malformed JSON arguments. \ + Please retry with valid JSON.", + )); + continue; + } + }; + + let output = agent_tools::dispatch(&call.function.name, &args).await; + + if std::env::var("POC_AGENT_VERBOSE").is_ok() { + log(&format!("TOOL RESULT ({} chars):\n{}", output.len(), output)); + } else { + let preview: String = output.lines().next() + .unwrap_or("").chars().take(100).collect(); + log(&format!("Result: {}", preview)); + } + + self.messages.push(Message::tool_result(&call.id, &output)); + } + } +} + // --------------------------------------------------------------------------- // Agent execution // --------------------------------------------------------------------------- @@ -176,14 +429,11 @@ pub fn run_one_agent( } // --------------------------------------------------------------------------- -// Multi-step API turn loop +// Compatibility wrappers — delegate to AutoAgent // --------------------------------------------------------------------------- /// Run agent prompts through the API with tool support. -/// For multi-step agents, each prompt is injected as a new user message -/// after the previous step's tool loop completes. The conversation -/// context carries forward naturally between steps. -/// Returns the final text response after all steps complete. +/// Convenience wrapper around AutoAgent for existing callers. pub async fn call_api_with_tools( agent: &str, prompts: &[String], @@ -194,159 +444,26 @@ pub async fn call_api_with_tools( bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &dyn Fn(&str), ) -> Result { - let client = get_client()?; + let steps: Vec = prompts.iter().zip( + phases.iter().map(String::as_str) + .chain(std::iter::repeat("")) + ).map(|(prompt, phase)| AutoStep { + prompt: prompt.clone(), + phase: phase.to_string(), + }).collect(); - let first_phase = phases.first().map(|s| s.as_str()).unwrap_or(""); - let _provenance = std::cell::RefCell::new( - if first_phase.is_empty() { format!("agent:{}", agent) } - else { format!("agent:{}:{}", agent, first_phase) } - ); - - let mut messages = vec![Message::user(&prompts[0])]; - let mut next_prompt_idx = 1; - let reasoning = crate::config::get().api_reasoning.clone(); - - let max_turns = 50 * prompts.len(); - for turn in 0..max_turns { - log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len())); - - let mut last_err = None; - let mut msg_opt = None; - let mut usage_opt = None; - for attempt in 0..5 { - let sampling = super::api::SamplingParams { - temperature: temperature.unwrap_or(0.6), - top_p: 0.95, - top_k: 20, - }; - match client.chat_completion_stream_temp( - &messages, - tools, - &reasoning, - sampling, - Some(priority), - ).await { - Ok((msg, usage)) => { - msg_opt = Some(msg); - usage_opt = usage; - break; - } - Err(e) => { - let err_str = e.to_string(); - let is_transient = err_str.contains("IncompleteMessage") - || err_str.contains("connection closed") - || err_str.contains("connection reset") - || err_str.contains("timed out") - || err_str.contains("Connection refused"); - if is_transient && attempt < 4 { - log(&format!("transient error (attempt {}): {}, retrying...", - attempt + 1, err_str)); - tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; - last_err = Some(e); - continue; - } - let msg_bytes: usize = messages.iter() - .map(|m| m.content_text().len()) - .sum(); - return Err(format!( - "API error on turn {} (~{}KB payload, {} messages, {} attempts): {}", - turn, msg_bytes / 1024, messages.len(), attempt + 1, e)); - } - } - } - let msg = msg_opt.unwrap(); - if let Some(ref e) = last_err { - log(&format!("succeeded after retry (previous error: {})", e)); - } - - if let Some(u) = &usage_opt { - log(&format!("tokens: {} prompt + {} completion", - u.prompt_tokens, u.completion_tokens)); - } - - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); - - if has_tools { - let mut sanitized = msg.clone(); - if let Some(ref mut calls) = sanitized.tool_calls { - for call in calls { - if serde_json::from_str::(&call.function.arguments).is_err() { - log(&format!("sanitizing malformed args for {}: {}", - call.function.name, &call.function.arguments)); - call.function.arguments = "{}".to_string(); - } - } - } - messages.push(sanitized); - - for call in msg.tool_calls.as_ref().unwrap() { - log(&format!("\nTOOL CALL: {}({})", - call.function.name, - &call.function.arguments)); - - let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { - Ok(v) => v, - Err(_) => { - log(&format!("malformed tool call args: {}", &call.function.arguments)); - messages.push(Message::tool_result( - &call.id, - "Error: your tool call had malformed JSON arguments. Please retry with valid JSON.", - )); - continue; - } - }; - - let output = agent_tools::dispatch(&call.function.name, &args).await; - - if std::env::var("POC_AGENT_VERBOSE").is_ok() { - log(&format!("TOOL RESULT ({} chars):\n{}", output.len(), output)); - } else { - let preview: String = output.lines().next().unwrap_or("").chars().take(100).collect(); - log(&format!("Result: {}", preview)); - } - - messages.push(Message::tool_result(&call.id, &output)); - } - continue; - } - - // Text-only response — step complete - let text = msg.content_text().to_string(); - if text.is_empty() && !has_content { - log("empty response, retrying"); - messages.push(Message::user( - "[system] Your previous response was empty. Please respond with text or use a tool." - )); - continue; - } - - log(&format!("\n=== RESPONSE ===\n\n{}", text)); - - // If there are more prompts, check bail condition and inject the next one - if next_prompt_idx < prompts.len() { - if let Some(ref check) = bail_fn { - check(next_prompt_idx)?; - } - if let Some(phase) = phases.get(next_prompt_idx) { - *_provenance.borrow_mut() = format!("agent:{}:{}", agent, phase); - } - messages.push(Message::assistant(&text)); - let next = &prompts[next_prompt_idx]; - next_prompt_idx += 1; - log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len())); - messages.push(Message::user(next)); - continue; - } - - return Ok(text); - } - - Err(format!("agent exceeded {} tool turns", max_turns)) + let mut auto = AutoAgent::new( + agent.to_string(), + tools.to_vec(), + steps, + temperature.unwrap_or(0.6), + priority, + )?; + auto.run(bail_fn, log).await } -/// Synchronous wrapper — runs the async function on a dedicated thread -/// with its own tokio runtime. Safe to call from any context. +/// Synchronous wrapper — runs on a dedicated thread with its own +/// tokio runtime. Safe to call from any context. pub fn call_api_with_tools_sync( agent: &str, prompts: &[String], @@ -364,7 +481,7 @@ pub fn call_api_with_tools_sync( .build() .map_err(|e| format!("tokio runtime: {}", e))?; rt.block_on( - call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log) + call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log) ) }).join().unwrap() })