// oneshot.rs — Autonomous agent execution // // AutoAgent: wraps an Agent with a multi-step prompt sequence and an // async run() method. Used for both oneshot CLI agents (from .agent // files) and subconscious agents forked from the conscious agent. // // Also contains the legacy run_one_agent() pipeline and process // management for spawned agent subprocesses. use crate::store::{self, Store}; use crate::subconscious::{defs, prompts}; use std::fs; use std::path::PathBuf; use std::sync::OnceLock; use super::api::{ApiClient, Message, Usage}; use super::tools::{self as agent_tools}; use super::Agent; // --------------------------------------------------------------------------- // API client — shared across oneshot agent runs // --------------------------------------------------------------------------- static API_CLIENT: OnceLock = OnceLock::new(); fn get_client() -> Result<&'static ApiClient, String> { Ok(API_CLIENT.get_or_init(|| { let config = crate::config::get(); let base_url = config.api_base_url.as_deref().unwrap_or(""); let api_key = config.api_key.as_deref().unwrap_or(""); let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b"); ApiClient::new(base_url, api_key, model) })) } // --------------------------------------------------------------------------- // AutoAgent — multi-step autonomous agent // --------------------------------------------------------------------------- pub struct AutoStep { pub prompt: String, pub phase: String, } /// An autonomous agent that runs a sequence of prompts with tool dispatch. /// /// Persistent across runs — holds config, tools, steps, and inter-run /// state (walked keys). The conversation backend is ephemeral per run. pub struct AutoAgent { pub name: String, pub tools: Vec, pub steps: Vec, sampling: super::api::SamplingParams, priority: i32, /// Named outputs from the agent's output() tool calls. /// Collected per-run, read by Mind after completion. pub outputs: std::collections::BTreeMap, // Observable status pub current_phase: String, pub turn: usize, } /// Per-run conversation backend — created fresh by run() or run_forked(). enum Backend { Standalone { client: ApiClient, messages: Vec }, Forked(std::sync::Arc>), } impl Backend { async fn client(&self) -> ApiClient { match self { Backend::Standalone { client, .. } => client.clone(), Backend::Forked(agent) => agent.lock().await.client_clone(), } } async fn messages(&self) -> Vec { match self { Backend::Standalone { messages, .. } => messages.clone(), Backend::Forked(agent) => agent.lock().await.assemble_api_messages(), } } async fn push_message(&mut self, msg: Message) { match self { Backend::Standalone { messages, .. } => messages.push(msg), Backend::Forked(agent) => agent.lock().await.push_message(msg), } } async fn push_raw(&mut self, msg: Message) { match self { Backend::Standalone { messages, .. } => messages.push(msg), Backend::Forked(agent) => { agent.lock().await.push_message(msg); } } } } /// Resolve {{placeholder}} templates in subconscious agent prompts. fn resolve_prompt( template: &str, memory_keys: &[String], state: &std::collections::BTreeMap, ) -> String { let cfg = crate::config::get(); let template = template.replace("{assistant_name}", &cfg.assistant_name); let mut result = String::with_capacity(template.len()); let mut rest = template.as_str(); while let Some(start) = rest.find("{{") { result.push_str(&rest[..start]); let after = &rest[start + 2..]; if let Some(end) = after.find("}}") { let name = after[..end].trim(); let replacement = if let Some(key) = name.strip_prefix("state:") { state.get(key).cloned().unwrap_or_else(|| "(not set)".to_string()) } else { match name { "seen_current" => format_key_list(memory_keys), _ => { result.push_str("{{"); result.push_str(&after[..end + 2]); rest = &after[end + 2..]; continue; } } }; result.push_str(&replacement); rest = &after[end + 2..]; } else { result.push_str("{{"); rest = after; } } result.push_str(rest); result } fn format_key_list(keys: &[String]) -> String { if keys.is_empty() { "(none)".to_string() } else { keys.iter().map(|k| format!("- {}", k)).collect::>().join("\n") } } impl AutoAgent { pub fn new( name: String, tools: Vec, steps: Vec, temperature: f32, priority: i32, ) -> Self { Self { name, tools, steps, sampling: super::api::SamplingParams { temperature, top_p: 0.95, top_k: 20, }, priority, outputs: std::collections::BTreeMap::new(), current_phase: String::new(), turn: 0, } } /// Run standalone — creates a fresh message list from the global /// API client. Used by oneshot CLI agents. pub async fn run( &mut self, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result { let client = get_client()?.clone(); let mut backend = Backend::Standalone { client, messages: Vec::new(), }; self.run_with_backend(&mut backend, bail_fn).await } /// Run forked using a shared agent Arc. The UI can lock the same /// Arc to read entries live during the run. pub async fn run_forked_shared( &mut self, agent: &std::sync::Arc>, memory_keys: &[String], state: &std::collections::BTreeMap, ) -> Result { let resolved_steps: Vec = self.steps.iter().map(|s| AutoStep { prompt: resolve_prompt(&s.prompt, memory_keys, state), phase: s.phase.clone(), }).collect(); let orig_steps = std::mem::replace(&mut self.steps, resolved_steps); let mut backend = Backend::Forked(agent.clone()); let result = self.run_with_backend(&mut backend, None).await; self.steps = orig_steps; result } async fn run_with_backend( &mut self, backend: &mut Backend, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result { dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len()); self.turn = 0; self.outputs.clear(); self.current_phase = self.steps.first() .map(|s| s.phase.clone()).unwrap_or_default(); let mut next_step = 0; if next_step < self.steps.len() { backend.push_message( Message::user(&self.steps[next_step].prompt)).await; next_step += 1; } let reasoning = crate::config::get().api_reasoning.clone(); let max_turns = 50 * self.steps.len().max(1); for _ in 0..max_turns { self.turn += 1; let messages = backend.messages().await; let client = backend.client().await; dbglog!("[auto] {} turn {} ({} messages)", self.name, self.turn, messages.len()); let (msg, usage_opt) = Self::api_call_with_retry( &self.name, &client, &self.tools, &messages, &reasoning, self.sampling, self.priority).await?; if let Some(u) = &usage_opt { dbglog!("[auto] {} tokens: {} prompt + {} completion", self.name, u.prompt_tokens, u.completion_tokens); } let has_content = msg.content.is_some(); let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); if has_tools { self.dispatch_tools(backend, &msg).await; continue; } let text = msg.content_text().to_string(); if text.is_empty() && !has_content { dbglog!("[auto] {} empty response, retrying", self.name); backend.push_message(Message::user( "[system] Your previous response was empty. \ Please respond with text or use a tool." )).await; continue; } dbglog!("[auto] {} response: {}", self.name, &text[..text.len().min(200)]); backend.push_message(Message::assistant(&text)).await; if next_step < self.steps.len() { if let Some(ref check) = bail_fn { check(next_step)?; } self.current_phase = self.steps[next_step].phase.clone(); backend.push_message( Message::user(&self.steps[next_step].prompt)).await; next_step += 1; dbglog!("[auto] {} step {}/{}", self.name, next_step, self.steps.len()); continue; } return Ok(text); } Err(format!("{}: exceeded {} tool turns", self.name, max_turns)) } async fn api_call_with_retry( name: &str, client: &ApiClient, tools: &[agent_tools::Tool], messages: &[Message], reasoning: &str, sampling: super::api::SamplingParams, priority: i32, ) -> Result<(Message, Option), String> { let mut last_err = None; for attempt in 0..5 { match client.chat_completion_stream_temp( messages, tools, reasoning, sampling, Some(priority), ).await { Ok((msg, usage)) => { if let Some(ref e) = last_err { dbglog!("[auto] {} succeeded after retry (previous: {})", name, e); } return Ok((msg, usage)); } Err(e) => { let err_str = e.to_string(); let is_transient = err_str.contains("IncompleteMessage") || err_str.contains("connection closed") || err_str.contains("connection reset") || err_str.contains("timed out") || err_str.contains("Connection refused"); if is_transient && attempt < 4 { dbglog!("[auto] {} transient error (attempt {}): {}, retrying", name, attempt + 1, err_str); tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; last_err = Some(e); continue; } let msg_bytes: usize = messages.iter() .map(|m| m.content_text().len()).sum(); return Err(format!( "{}: API error (~{}KB, {} messages, {} attempts): {}", name, msg_bytes / 1024, messages.len(), attempt + 1, e)); } } } Err(format!("{}: all retry attempts exhausted", name)) } async fn dispatch_tools(&mut self, backend: &mut Backend, msg: &Message) { let mut sanitized = msg.clone(); if let Some(ref mut calls) = sanitized.tool_calls { for call in calls { if serde_json::from_str::(&call.function.arguments).is_err() { dbglog!("[auto] {} sanitizing malformed args for {}: {}", self.name, call.function.name, &call.function.arguments); call.function.arguments = "{}".to_string(); } } } backend.push_raw(sanitized).await; for call in msg.tool_calls.as_ref().unwrap() { dbglog!("[auto] {} tool: {}({})", self.name, call.function.name, &call.function.arguments); let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { Ok(v) => v, Err(_) => { backend.push_raw(Message::tool_result( &call.id, "Error: your tool call had malformed JSON arguments. \ Please retry with valid JSON.", )).await; continue; } }; // Intercept output() — store in-memory instead of filesystem let output = if call.function.name == "output" { let key = args["key"].as_str().unwrap_or(""); let value = args["value"].as_str().unwrap_or(""); if !key.is_empty() { self.outputs.insert(key.to_string(), value.to_string()); } format!("{}: {}", key, value) } else { let agent = match &*backend { Backend::Forked(a) => Some(a.clone()), _ => None, }; agent_tools::dispatch_with_agent(&call.function.name, &args, agent).await }; dbglog!("[auto] {} result: {} chars", self.name, output.len()); backend.push_raw(Message::tool_result(&call.id, &output)).await; } } } // --------------------------------------------------------------------------- // Agent execution // --------------------------------------------------------------------------- /// Result of running a single agent. pub struct AgentResult { pub output: String, pub node_keys: Vec, /// Directory containing output() files from the agent run. pub state_dir: PathBuf, } /// Run an agent. If keys are provided, use them directly (bypassing the /// agent's query). Otherwise, run the query to select target nodes. pub fn run_one_agent( store: &mut Store, agent_name: &str, count: usize, keys: Option<&[String]>, ) -> Result { let def = defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; // State dir for agent output files let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR") .map(PathBuf::from) .unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name)); fs::create_dir_all(&state_dir) .map_err(|e| format!("create state dir: {}", e))?; unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); } // Build prompt batch — either from explicit keys or the agent's query let agent_batch = if let Some(keys) = keys { dbglog!("[{}] targeting: {}", agent_name, keys.join(", ")); let graph = store.build_graph(); let mut resolved_steps = Vec::new(); let mut all_keys: Vec = keys.to_vec(); for step in &def.steps { let (prompt, extra_keys) = defs::resolve_placeholders( &step.prompt, store, &graph, keys, count, ); all_keys.extend(extra_keys); resolved_steps.push(prompts::ResolvedStep { prompt, phase: step.phase.clone(), }); } let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; if !batch.node_keys.is_empty() { store.record_agent_visits(&batch.node_keys, agent_name).ok(); } batch } else { let effective_count = def.count.unwrap_or(count); defs::run_agent(store, &def, effective_count, &Default::default())? }; // Filter tools based on agent def let all_tools = super::tools::memory_and_journal_tools(); let effective_tools: Vec = if def.tools.is_empty() { all_tools.to_vec() } else { all_tools.into_iter() .filter(|t| def.tools.iter().any(|w| w == &t.name)) .collect() }; let n_steps = agent_batch.steps.len(); // Guard: reject oversized first prompt let max_prompt_bytes = 800_000; let first_len = agent_batch.steps[0].prompt.len(); if first_len > max_prompt_bytes { let prompt_kb = first_len / 1024; let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); fs::create_dir_all(&oversize_dir).ok(); let oversize_path = oversize_dir.join(format!("{}-{}.txt", agent_name, store::compact_timestamp())); let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); return Err(format!( "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", prompt_kb, max_prompt_bytes / 1024, )); } let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes", agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len()); let prompts: Vec = agent_batch.steps.iter() .map(|s| s.prompt.clone()).collect(); let step_phases: Vec = agent_batch.steps.iter() .map(|s| s.phase.clone()).collect(); // Bail check: if the agent defines a bail script, run it between steps. let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name)); let state_dir_for_bail = state_dir.clone(); // Find our own pid file so we can pass it to the bail script let our_pid = std::process::id(); let our_pid_file = format!("pid-{}", our_pid); let bail_fn = move |step_idx: usize| -> Result<(), String> { if let Some(ref script) = bail_script { let status = std::process::Command::new(script) .arg(&our_pid_file) .current_dir(&state_dir_for_bail) .status() .map_err(|e| format!("bail script {:?} failed: {}", script, e))?; if !status.success() { return Err(format!("bailed at step {}: {:?} exited {}", step_idx + 1, script.file_name().unwrap_or_default(), status.code().unwrap_or(-1))); } } Ok(()) }; let output = call_api_with_tools_sync( agent_name, &prompts, &step_phases, def.temperature, def.priority, &effective_tools, Some(&bail_fn))?; Ok(AgentResult { output, node_keys: agent_batch.node_keys, state_dir, }) } // --------------------------------------------------------------------------- // Compatibility wrappers — delegate to AutoAgent // --------------------------------------------------------------------------- /// Run agent prompts through the API with tool support. /// Convenience wrapper around AutoAgent for existing callers. pub async fn call_api_with_tools( agent: &str, prompts: &[String], phases: &[String], temperature: Option, priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result { let steps: Vec = prompts.iter().zip( phases.iter().map(String::as_str) .chain(std::iter::repeat("")) ).map(|(prompt, phase)| AutoStep { prompt: prompt.clone(), phase: phase.to_string(), }).collect(); let mut auto = AutoAgent::new( agent.to_string(), tools.to_vec(), steps, temperature.unwrap_or(0.6), priority, ); auto.run(bail_fn).await } /// Synchronous wrapper — runs on a dedicated thread with its own /// tokio runtime. Safe to call from any context. pub fn call_api_with_tools_sync( agent: &str, prompts: &[String], phases: &[String], temperature: Option, priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result { std::thread::scope(|s| { s.spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|e| format!("tokio runtime: {}", e))?; rt.block_on( call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn) ) }).join().unwrap() }) } // --------------------------------------------------------------------------- // Process management — PID tracking and subprocess spawning // --------------------------------------------------------------------------- pub struct SpawnResult { pub child: std::process::Child, pub log_path: PathBuf, } pub fn spawn_agent( agent_name: &str, state_dir: &std::path::Path, session_id: &str, ) -> Option { let def = defs::get_def(agent_name)?; let first_phase = def.steps.first() .map(|s| s.phase.as_str()) .unwrap_or("step-0"); let log_dir = dirs::home_dir().unwrap_or_default() .join(format!(".consciousness/logs/{}", agent_name)); fs::create_dir_all(&log_dir).ok(); let log_path = log_dir.join(format!("{}.log", store::compact_timestamp())); let agent_log = fs::File::create(&log_path) .unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()); let child = std::process::Command::new("poc-memory") .args(["agent", "run", agent_name, "--count", "1", "--local", "--state-dir", &state_dir.to_string_lossy()]) .env("POC_SESSION_ID", session_id) .stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap())) .stderr(agent_log) .spawn() .ok()?; let pid = child.id(); let pid_path = state_dir.join(format!("pid-{}", pid)); fs::write(&pid_path, first_phase).ok(); Some(SpawnResult { child, log_path }) }