// oneshot.rs — Autonomous agent execution // // AutoAgent: wraps an Agent with a multi-step prompt sequence and an // async run() method. Used for both oneshot CLI agents (from .agent // files) and subconscious agents forked from the conscious agent. // // Also contains the legacy run_one_agent() pipeline and process // management for spawned agent subprocesses. use crate::store::{self, Store}; use crate::subconscious::{defs, prompts}; use std::collections::HashMap; use std::fs; use std::path::PathBuf; use super::context::AstNode; use super::tools::{self as agent_tools}; use super::Agent; // --------------------------------------------------------------------------- // Agent logging — shared by Mind and CLI paths // --------------------------------------------------------------------------- /// Stats from a single run. #[derive(Clone, Default, serde::Serialize, serde::Deserialize)] pub struct RunStats { pub messages: usize, pub tool_calls: usize, pub tool_failures: usize, pub tool_calls_by_type: HashMap, } /// Per-tool accumulated stats. #[derive(Clone, Default, serde::Serialize, serde::Deserialize)] pub struct ToolStats { pub last: usize, pub ewma: f64, pub total: usize, } /// Persisted stats for an agent (survives restarts). #[derive(Clone, Default, serde::Serialize, serde::Deserialize)] pub struct PersistedStats { pub runs: usize, pub last_stats: Option, /// Per-tool-type stats: last, ewma, total. pub by_tool: HashMap, /// Failed calls stats. pub failures: ToolStats, } fn stats_path() -> std::path::PathBuf { dirs::home_dir().unwrap_or_default() .join(".consciousness/agent-stats.json") } static AGENT_STATS: std::sync::OnceLock>> = std::sync::OnceLock::new(); fn stats_map() -> &'static std::sync::Mutex> { AGENT_STATS.get_or_init(|| { let map: HashMap = std::fs::read_to_string(stats_path()).ok() .and_then(|s| serde_json::from_str(&s).ok()) .unwrap_or_default(); std::sync::Mutex::new(map) }) } pub fn get_stats(name: &str) -> PersistedStats { stats_map().lock().ok() .and_then(|m| m.get(name).cloned()) .unwrap_or_default() } pub fn set_stats(name: &str, stats: PersistedStats) { if let Ok(mut map) = stats_map().lock() { map.insert(name.to_string(), stats); if let Ok(json) = serde_json::to_string_pretty(&*map) { let _ = std::fs::write(stats_path(), json); } } } /// Save agent conversation to JSON log file. /// Used by both mind-run agents and CLI-run agents. pub async fn save_agent_log(name: &str, agent: &std::sync::Arc) -> RunStats { let dir = dirs::home_dir().unwrap_or_default() .join(format!(".consciousness/logs/{}", name)); let ctx = agent.context.lock().await; let stats = compute_run_stats(ctx.conversation()); if std::fs::create_dir_all(&dir).is_ok() { let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S"); let path = dir.join(format!("{}.json", ts)); let mut context: Vec<&super::context::AstNode> = Vec::new(); for section in ctx.sections() { context.extend(section); } if let Ok(json) = serde_json::to_string_pretty(&context) { let _ = std::fs::write(&path, json); } } dbglog!("[agent] {} — {} msgs, {} tool calls", name, stats.messages, stats.tool_calls); stats } fn compute_run_stats(conversation: &[super::context::AstNode]) -> RunStats { use super::context::{AstNode, NodeBody}; let mut messages = 0usize; let mut tool_calls = 0usize; let mut tool_failures = 0usize; let mut by_type: HashMap = HashMap::new(); for node in conversation { if let AstNode::Branch { children, .. } = node { messages += 1; for child in children { if let AstNode::Leaf(leaf) = child { match leaf.body() { NodeBody::ToolCall { name, .. } => { tool_calls += 1; *by_type.entry(name.to_string()).or_default() += 1; } NodeBody::ToolResult(text) => { // Detect failures from error patterns in result let t = text.trim_start(); if t.starts_with("Error") || t.starts_with("error:") || t.starts_with("Failed") || t.contains("not found") { tool_failures += 1; } } _ => {} } } } } } RunStats { messages, tool_calls, tool_failures, tool_calls_by_type: by_type } } // --------------------------------------------------------------------------- // AutoAgent — multi-step autonomous agent // --------------------------------------------------------------------------- pub struct AutoStep { pub prompt: String, pub phase: String, } /// An autonomous agent that runs a sequence of prompts with tool dispatch. /// /// Persistent across runs — holds config, tools, steps, and inter-run /// state (walked keys). The conversation backend is ephemeral per run. pub struct AutoAgent { pub name: String, pub tools: Vec, pub steps: Vec, pub current_phase: String, pub turn: usize, pub enabled: bool, pub temperature: f32, pub priority: i32, } /// Per-run conversation backend — wraps a forked agent. struct Backend(std::sync::Arc); impl Backend { async fn push_node(&mut self, node: AstNode) { self.0.push_node(node).await; } } /// Resolve {{placeholder}} templates in subconscious agent prompts. fn resolve_prompt( template: &str, memory_keys: &[String], state: &std::collections::BTreeMap, recently_written: &[String], ) -> String { let cfg = crate::config::get(); let template = template.replace("{assistant_name}", &cfg.assistant_name); let mut result = String::with_capacity(template.len()); let mut rest = template.as_str(); while let Some(start) = rest.find("{{") { result.push_str(&rest[..start]); let after = &rest[start + 2..]; if let Some(end) = after.find("}}") { let name = after[..end].trim(); let replacement = if let Some(key) = name.strip_prefix("state:") { state.get(key).cloned().unwrap_or_else(|| "(not set)".to_string()) } else { match name { "seen_current" => format_key_list(memory_keys), "recently_written" => format_key_list(recently_written), _ => { result.push_str("{{"); result.push_str(&after[..end + 2]); rest = &after[end + 2..]; continue; } } }; result.push_str(&replacement); rest = &after[end + 2..]; } else { result.push_str("{{"); rest = after; } } result.push_str(rest); result } fn format_key_list(keys: &[String]) -> String { if keys.is_empty() { "(none)".to_string() } else { keys.iter().map(|k| format!("- {}", k)).collect::>().join("\n") } } impl AutoAgent { pub fn new( name: String, tools: Vec, steps: Vec, temperature: f32, priority: i32, ) -> Self { Self { name, tools, steps, current_phase: String::new(), turn: 0, enabled: true, temperature, priority, } } pub async fn run( &mut self, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result<(), String> { let config = crate::config::get(); let base_url = config.api_base_url.as_deref().unwrap_or(""); let api_key = config.api_key.as_deref().unwrap_or(""); let model = config.api_model.as_deref().unwrap_or(""); if base_url.is_empty() || model.is_empty() { return Err("API not configured (no base_url or model)".to_string()); } let client = super::api::ApiClient::new(base_url, api_key, model); // Load system prompt + identity from config let cli = crate::user::CliArgs::default(); let (app, _) = crate::config::load_app(&cli) .map_err(|e| format!("config: {}", e))?; let personality = crate::config::reload_for_model( &app, &app.prompts.other, ).await.map_err(|e| format!("config: {}", e))?; let agent = Agent::new( client, personality, app, String::new(), None, super::tools::ActiveTools::new(), super::tools::tools(), ).await; { let mut st = agent.state.lock().await; st.provenance = format!("standalone:{}", self.name); st.tools = self.tools.clone(); st.temperature = self.temperature; st.priority = Some(self.priority); } let mut backend = Backend(agent.clone()); let result = self.run_with_backend(&mut backend, bail_fn).await; save_agent_log(&self.name, &agent).await; result } /// Run using a pre-created agent Arc. The caller retains the Arc /// so the UI can lock it to read entries live. pub async fn run_shared( &mut self, agent: &std::sync::Arc, ) -> Result<(), String> { let mut backend = Backend(agent.clone()); self.run_with_backend(&mut backend, None).await } /// Run forked using a shared agent Arc. The UI can lock the same /// Arc to read entries live during the run. pub async fn run_forked_shared( &mut self, agent: &std::sync::Arc, memory_keys: &[String], state: &std::collections::BTreeMap, recently_written: &[String], ) -> Result<(), String> { let resolved_steps: Vec = self.steps.iter().map(|s| AutoStep { prompt: resolve_prompt(&s.prompt, memory_keys, state, recently_written), phase: s.phase.clone(), }).collect(); let orig_steps = std::mem::replace(&mut self.steps, resolved_steps); let mut backend = Backend(agent.clone()); let result = self.run_with_backend(&mut backend, None).await; self.steps = orig_steps; result } /// Update stats after a run completes. Called with the stats from save_agent_log. pub fn update_stats(&self, run_stats: RunStats) { const ALPHA: f64 = 0.3; let old = get_stats(&self.name); // Update per-tool stats let mut by_tool = old.by_tool; for (tool, count) in &run_stats.tool_calls_by_type { let entry = by_tool.entry(tool.clone()).or_default(); entry.last = *count; entry.ewma = ALPHA * (*count as f64) + (1.0 - ALPHA) * entry.ewma; entry.total += count; } // Update failure stats let failures = ToolStats { last: run_stats.tool_failures, ewma: ALPHA * (run_stats.tool_failures as f64) + (1.0 - ALPHA) * old.failures.ewma, total: old.failures.total + run_stats.tool_failures, }; let new = PersistedStats { runs: old.runs + 1, last_stats: Some(run_stats), by_tool, failures, }; set_stats(&self.name, new); } async fn run_with_backend( &mut self, backend: &mut Backend, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result<(), String> { dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len()); for (i, step) in self.steps.iter().enumerate() { self.turn = i + 1; self.current_phase = step.phase.clone(); if let Some(ref check) = bail_fn { check(i)?; } backend.push_node(AstNode::system_msg(&step.prompt)).await; Agent::turn(backend.0.clone()).await .map_err(|e| format!("{}: {}", self.name, e))?; } Ok(()) } } // --------------------------------------------------------------------------- // Agent execution // --------------------------------------------------------------------------- /// Result of running a single agent. pub struct AgentResult { pub node_keys: Vec, /// Directory containing output() files from the agent run. pub state_dir: PathBuf, } /// Run an agent. If keys are provided, use them directly (bypassing the /// agent's query). Otherwise, run the query to select target nodes. pub async fn run_one_agent( store: &mut Store, agent_name: &str, count: usize, keys: Option<&[String]>, ) -> Result { let def = defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; // State dir for agent output files let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR") .map(PathBuf::from) .unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name)); fs::create_dir_all(&state_dir) .map_err(|e| format!("create state dir: {}", e))?; unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); } // Build prompt batch — either from explicit keys or the agent's query let agent_batch = if let Some(keys) = keys { dbglog!("[{}] targeting: {}", agent_name, keys.join(", ")); let mut resolved_steps = Vec::new(); let mut all_keys: Vec = keys.to_vec(); for step in &def.steps { let (prompt, extra_keys) = defs::resolve_placeholders( &step.prompt, keys, count, ).await; all_keys.extend(extra_keys); resolved_steps.push(prompts::ResolvedStep { prompt, phase: step.phase.clone(), }); } let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; if !batch.node_keys.is_empty() { store.record_agent_visits(&batch.node_keys, agent_name).ok(); } batch } else { let effective_count = def.count.unwrap_or(count); defs::run_agent(&def, effective_count, &Default::default()).await? }; // Base memory tools + extras from agent def (matching unconscious.rs pattern) let base_tools = super::tools::memory::memory_tools().to_vec(); let extra_tools = super::tools::memory::journal_tools().to_vec(); let mut effective_tools: Vec = if def.tools.is_empty() { let mut all = base_tools; all.extend(extra_tools); all } else { let mut tools = base_tools; for name in &def.tools { if let Some(t) = extra_tools.iter().find(|t| t.name == *name) { tools.push(t.clone()); } } tools }; effective_tools.push(super::tools::Tool { name: "output", description: "Produce a named output value for passing between steps.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Output name"},"value":{"type":"string","description":"Output value"}},"required":["key","value"]}"#, handler: std::sync::Arc::new(|_agent, v| Box::pin(async move { let key = v["key"].as_str() .ok_or_else(|| anyhow::anyhow!("output requires 'key'"))?; if key.starts_with("pid-") || key.contains('/') || key.contains("..") { anyhow::bail!("invalid output key: {}", key); } let value = v["value"].as_str() .ok_or_else(|| anyhow::anyhow!("output requires 'value'"))?; let dir = std::env::var("POC_AGENT_OUTPUT_DIR") .map_err(|_| anyhow::anyhow!("no output directory set"))?; let path = std::path::Path::new(&dir).join(key); std::fs::write(&path, value) .map_err(|e| anyhow::anyhow!("writing output {}: {}", path.display(), e))?; Ok(format!("{}: {}", key, value)) })), }); let n_steps = agent_batch.steps.len(); // Guard: reject oversized first prompt let max_prompt_bytes = 800_000; let first_len = agent_batch.steps[0].prompt.len(); if first_len > max_prompt_bytes { let prompt_kb = first_len / 1024; let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); fs::create_dir_all(&oversize_dir).ok(); let oversize_path = oversize_dir.join(format!("{}-{}.txt", agent_name, store::compact_timestamp())); let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); return Err(format!( "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", prompt_kb, max_prompt_bytes / 1024, )); } let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes", agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len()); let prompts: Vec = agent_batch.steps.iter() .map(|s| s.prompt.clone()).collect(); let step_phases: Vec = agent_batch.steps.iter() .map(|s| s.phase.clone()).collect(); // Bail check: if the agent defines a bail script, run it between steps. let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name)); let state_dir_for_bail = state_dir.clone(); // Find our own pid file so we can pass it to the bail script let our_pid = std::process::id(); let our_pid_file = format!("pid-{}", our_pid); let bail_fn = move |step_idx: usize| -> Result<(), String> { if let Some(ref script) = bail_script { let status = std::process::Command::new(script) .arg(&our_pid_file) .current_dir(&state_dir_for_bail) .status() .map_err(|e| format!("bail script {:?} failed: {}", script, e))?; if !status.success() { return Err(format!("bailed at step {}: {:?} exited {}", step_idx + 1, script.file_name().unwrap_or_default(), status.code().unwrap_or(-1))); } } Ok(()) }; call_api_with_tools_sync( agent_name, &prompts, &step_phases, def.temperature, def.priority, &effective_tools, Some(&bail_fn))?; Ok(AgentResult { node_keys: agent_batch.node_keys, state_dir, }) } // --------------------------------------------------------------------------- // Compatibility wrappers — delegate to AutoAgent // --------------------------------------------------------------------------- /// Run agent prompts through the API with tool support. /// Convenience wrapper around AutoAgent for existing callers. pub async fn call_api_with_tools( agent: &str, prompts: &[String], phases: &[String], temperature: Option, priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result<(), String> { let steps: Vec = prompts.iter().zip( phases.iter().map(String::as_str) .chain(std::iter::repeat("")) ).map(|(prompt, phase)| AutoStep { prompt: prompt.clone(), phase: phase.to_string(), }).collect(); let mut auto = AutoAgent::new( agent.to_string(), tools.to_vec(), steps, temperature.unwrap_or(0.6), priority, ); auto.run(bail_fn).await } /// Synchronous wrapper — runs on a dedicated thread with its own /// tokio runtime. Safe to call from any context. pub fn call_api_with_tools_sync( agent: &str, prompts: &[String], phases: &[String], temperature: Option, priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result<(), String> { std::thread::scope(|s| { s.spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|e| format!("tokio runtime: {}", e))?; rt.block_on( call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn) ) }).join().unwrap() }) } // --------------------------------------------------------------------------- // Process management — PID tracking and subprocess spawning // --------------------------------------------------------------------------- pub struct SpawnResult { pub child: std::process::Child, pub log_path: PathBuf, } pub fn spawn_agent( agent_name: &str, state_dir: &std::path::Path, session_id: &str, ) -> Option { let def = defs::get_def(agent_name)?; let first_phase = def.steps.first() .map(|s| s.phase.as_str()) .unwrap_or("step-0"); let log_dir = dirs::home_dir().unwrap_or_default() .join(format!(".consciousness/logs/{}", agent_name)); fs::create_dir_all(&log_dir).ok(); let log_path = log_dir.join(format!("{}.log", store::compact_timestamp())); let agent_log = fs::File::create(&log_path) .unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()); let child = std::process::Command::new("poc-memory") .args(["agent", "run", agent_name, "--count", "1", "--local", "--state-dir", &state_dir.to_string_lossy()]) .env("POC_SESSION_ID", session_id) .stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap())) .stderr(agent_log) .spawn() .ok()?; let pid = child.id(); let pid_path = state_dir.join(format!("pid-{}", pid)); fs::write(&pid_path, first_phase).ok(); Some(SpawnResult { child, log_path }) }