consciousness/src/agent/oneshot.rs
Kent Overstreet 74f8952399 Fix: push all responses to forked agent entries
The final assistant response in run_with_backend wasn't being pushed
to the backend — only intermediate step responses were. This meant
the subconscious debug screen only showed the prompt, not the full
conversation.

Now push assistant response immediately after receiving it, before
checking for next steps. Remove the duplicate push in the multi-step
path.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-07 17:38:23 -04:00

587 lines
22 KiB
Rust

// oneshot.rs — Autonomous agent execution
//
// AutoAgent: wraps an Agent with a multi-step prompt sequence and an
// async run() method. Used for both oneshot CLI agents (from .agent
// files) and subconscious agents forked from the conscious agent.
//
// Also contains the legacy run_one_agent() pipeline and process
// management for spawned agent subprocesses.
use crate::store::{self, Store};
use crate::subconscious::{defs, prompts};
use std::fs;
use std::path::PathBuf;
use std::sync::OnceLock;
use super::api::{ApiClient, Message, Usage};
use super::tools::{self as agent_tools};
use super::Agent;
// ---------------------------------------------------------------------------
// API client — shared across oneshot agent runs
// ---------------------------------------------------------------------------
static API_CLIENT: OnceLock<ApiClient> = OnceLock::new();
fn get_client() -> Result<&'static ApiClient, String> {
Ok(API_CLIENT.get_or_init(|| {
let config = crate::config::get();
let base_url = config.api_base_url.as_deref().unwrap_or("");
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b");
ApiClient::new(base_url, api_key, model)
}))
}
// ---------------------------------------------------------------------------
// AutoAgent — multi-step autonomous agent
// ---------------------------------------------------------------------------
pub struct AutoStep {
pub prompt: String,
pub phase: String,
}
/// An autonomous agent that runs a sequence of prompts with tool dispatch.
///
/// Persistent across runs — holds config, tools, steps, and inter-run
/// state (walked keys). The conversation backend is ephemeral per run.
pub struct AutoAgent {
pub name: String,
pub tools: Vec<agent_tools::Tool>,
pub steps: Vec<AutoStep>,
sampling: super::api::SamplingParams,
priority: i32,
/// Named outputs from the agent's output() tool calls.
/// Collected per-run, read by Mind after completion.
pub outputs: std::collections::HashMap<String, String>,
// Observable status
pub current_phase: String,
pub turn: usize,
}
/// Per-run conversation backend — created fresh by run() or run_forked().
enum Backend {
Standalone { client: ApiClient, messages: Vec<Message> },
Forked(std::sync::Arc<tokio::sync::Mutex<Agent>>),
}
impl Backend {
async fn client(&self) -> ApiClient {
match self {
Backend::Standalone { client, .. } => client.clone(),
Backend::Forked(agent) => agent.lock().await.client_clone(),
}
}
async fn messages(&self) -> Vec<Message> {
match self {
Backend::Standalone { messages, .. } => messages.clone(),
Backend::Forked(agent) => agent.lock().await.assemble_api_messages(),
}
}
async fn push_message(&mut self, msg: Message) {
match self {
Backend::Standalone { messages, .. } => messages.push(msg),
Backend::Forked(agent) => agent.lock().await.push_message(msg),
}
}
async fn push_raw(&mut self, msg: Message) {
match self {
Backend::Standalone { messages, .. } => messages.push(msg),
Backend::Forked(agent) => {
agent.lock().await.context.entries.push(
super::context::ConversationEntry::Message(msg));
}
}
}
}
/// Resolve {{placeholder}} templates in subconscious agent prompts.
fn resolve_prompt(template: &str, memory_keys: &[String], walked: &[String]) -> String {
let cfg = crate::config::get();
let template = template.replace("{assistant_name}", &cfg.assistant_name);
let mut result = String::with_capacity(template.len());
let mut rest = template.as_str();
while let Some(start) = rest.find("{{") {
result.push_str(&rest[..start]);
let after = &rest[start + 2..];
if let Some(end) = after.find("}}") {
let name = after[..end].trim();
let replacement = match name {
"seen_current" => format_key_list(memory_keys),
"walked" => format_key_list(walked),
_ => {
result.push_str("{{");
result.push_str(&after[..end + 2]);
rest = &after[end + 2..];
continue;
}
};
result.push_str(&replacement);
rest = &after[end + 2..];
} else {
result.push_str("{{");
rest = after;
}
}
result.push_str(rest);
result
}
fn format_key_list(keys: &[String]) -> String {
if keys.is_empty() { "(none)".to_string() }
else { keys.iter().map(|k| format!("- {}", k)).collect::<Vec<_>>().join("\n") }
}
impl AutoAgent {
pub fn new(
name: String,
tools: Vec<agent_tools::Tool>,
steps: Vec<AutoStep>,
temperature: f32,
priority: i32,
) -> Self {
Self {
name, tools, steps,
sampling: super::api::SamplingParams {
temperature, top_p: 0.95, top_k: 20,
},
priority,
outputs: std::collections::HashMap::new(),
current_phase: String::new(),
turn: 0,
}
}
/// Run standalone — creates a fresh message list from the global
/// API client. Used by oneshot CLI agents.
pub async fn run(
&mut self,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
let client = get_client()?.clone();
let mut backend = Backend::Standalone {
client, messages: Vec::new(),
};
self.run_with_backend(&mut backend, bail_fn).await
}
/// Run forked using a shared agent Arc. The UI can lock the same
/// Arc to read entries live during the run.
pub async fn run_forked_shared(
&mut self,
agent: &std::sync::Arc<tokio::sync::Mutex<Agent>>,
memory_keys: &[String],
walked: &[String],
) -> Result<String, String> {
let resolved_steps: Vec<AutoStep> = self.steps.iter().map(|s| AutoStep {
prompt: resolve_prompt(&s.prompt, memory_keys, walked),
phase: s.phase.clone(),
}).collect();
let orig_steps = std::mem::replace(&mut self.steps, resolved_steps);
let mut backend = Backend::Forked(agent.clone());
let result = self.run_with_backend(&mut backend, None).await;
self.steps = orig_steps;
result
}
async fn run_with_backend(
&mut self,
backend: &mut Backend,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len());
self.turn = 0;
self.outputs.clear();
self.current_phase = self.steps.first()
.map(|s| s.phase.clone()).unwrap_or_default();
let mut next_step = 0;
if next_step < self.steps.len() {
backend.push_message(
Message::user(&self.steps[next_step].prompt)).await;
next_step += 1;
}
let reasoning = crate::config::get().api_reasoning.clone();
let max_turns = 50 * self.steps.len().max(1);
for _ in 0..max_turns {
self.turn += 1;
let messages = backend.messages().await;
let client = backend.client().await;
dbglog!("[auto] {} turn {} ({} messages)",
self.name, self.turn, messages.len());
let (msg, usage_opt) = Self::api_call_with_retry(
&self.name, &client, &self.tools, &messages,
&reasoning, self.sampling, self.priority).await?;
if let Some(u) = &usage_opt {
dbglog!("[auto] {} tokens: {} prompt + {} completion",
self.name, u.prompt_tokens, u.completion_tokens);
}
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty());
if has_tools {
self.dispatch_tools(backend, &msg).await;
continue;
}
let text = msg.content_text().to_string();
if text.is_empty() && !has_content {
dbglog!("[auto] {} empty response, retrying", self.name);
backend.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
dbglog!("[auto] {} response: {}",
self.name, &text[..text.len().min(200)]);
backend.push_message(Message::assistant(&text)).await;
if next_step < self.steps.len() {
if let Some(ref check) = bail_fn {
check(next_step)?;
}
self.current_phase = self.steps[next_step].phase.clone();
backend.push_message(
Message::user(&self.steps[next_step].prompt)).await;
next_step += 1;
dbglog!("[auto] {} step {}/{}",
self.name, next_step, self.steps.len());
continue;
}
return Ok(text);
}
Err(format!("{}: exceeded {} tool turns", self.name, max_turns))
}
async fn api_call_with_retry(
name: &str,
client: &ApiClient,
tools: &[agent_tools::Tool],
messages: &[Message],
reasoning: &str,
sampling: super::api::SamplingParams,
priority: i32,
) -> Result<(Message, Option<Usage>), String> {
let mut last_err = None;
for attempt in 0..5 {
match client.chat_completion_stream_temp(
messages, tools, reasoning, sampling, Some(priority),
).await {
Ok((msg, usage)) => {
if let Some(ref e) = last_err {
dbglog!("[auto] {} succeeded after retry (previous: {})",
name, e);
}
return Ok((msg, usage));
}
Err(e) => {
let err_str = e.to_string();
let is_transient = err_str.contains("IncompleteMessage")
|| err_str.contains("connection closed")
|| err_str.contains("connection reset")
|| err_str.contains("timed out")
|| err_str.contains("Connection refused");
if is_transient && attempt < 4 {
dbglog!("[auto] {} transient error (attempt {}): {}, retrying",
name, attempt + 1, err_str);
tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await;
last_err = Some(e);
continue;
}
let msg_bytes: usize = messages.iter()
.map(|m| m.content_text().len()).sum();
return Err(format!(
"{}: API error (~{}KB, {} messages, {} attempts): {}",
name, msg_bytes / 1024,
messages.len(), attempt + 1, e));
}
}
}
Err(format!("{}: all retry attempts exhausted", name))
}
async fn dispatch_tools(&mut self, backend: &mut Backend, msg: &Message) {
let mut sanitized = msg.clone();
if let Some(ref mut calls) = sanitized.tool_calls {
for call in calls {
if serde_json::from_str::<serde_json::Value>(&call.function.arguments).is_err() {
dbglog!("[auto] {} sanitizing malformed args for {}: {}",
self.name, call.function.name, &call.function.arguments);
call.function.arguments = "{}".to_string();
}
}
}
backend.push_raw(sanitized).await;
for call in msg.tool_calls.as_ref().unwrap() {
dbglog!("[auto] {} tool: {}({})",
self.name, call.function.name, &call.function.arguments);
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
Ok(v) => v,
Err(_) => {
backend.push_raw(Message::tool_result(
&call.id,
"Error: your tool call had malformed JSON arguments. \
Please retry with valid JSON.",
)).await;
continue;
}
};
// Intercept output() — store in-memory instead of filesystem
let output = if call.function.name == "output" {
let key = args["key"].as_str().unwrap_or("");
let value = args["value"].as_str().unwrap_or("");
if !key.is_empty() {
self.outputs.insert(key.to_string(), value.to_string());
}
format!("{}: {}", key, value)
} else {
agent_tools::dispatch(&call.function.name, &args).await
};
dbglog!("[auto] {} result: {} chars", self.name, output.len());
backend.push_raw(Message::tool_result(&call.id, &output)).await;
}
}
}
// ---------------------------------------------------------------------------
// Agent execution
// ---------------------------------------------------------------------------
/// Result of running a single agent.
pub struct AgentResult {
pub output: String,
pub node_keys: Vec<String>,
/// Directory containing output() files from the agent run.
pub state_dir: PathBuf,
}
/// Run an agent. If keys are provided, use them directly (bypassing the
/// agent's query). Otherwise, run the query to select target nodes.
pub fn run_one_agent(
store: &mut Store,
agent_name: &str,
count: usize,
keys: Option<&[String]>,
) -> Result<AgentResult, String> {
let def = defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
// State dir for agent output files
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
fs::create_dir_all(&state_dir)
.map_err(|e| format!("create state dir: {}", e))?;
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
// Build prompt batch — either from explicit keys or the agent's query
let agent_batch = if let Some(keys) = keys {
dbglog!("[{}] targeting: {}", agent_name, keys.join(", "));
let graph = store.build_graph();
let mut resolved_steps = Vec::new();
let mut all_keys: Vec<String> = keys.to_vec();
for step in &def.steps {
let (prompt, extra_keys) = defs::resolve_placeholders(
&step.prompt, store, &graph, keys, count,
);
all_keys.extend(extra_keys);
resolved_steps.push(prompts::ResolvedStep {
prompt,
phase: step.phase.clone(),
});
}
let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys };
if !batch.node_keys.is_empty() {
store.record_agent_visits(&batch.node_keys, agent_name).ok();
}
batch
} else {
let effective_count = def.count.unwrap_or(count);
defs::run_agent(store, &def, effective_count, &Default::default())?
};
// Filter tools based on agent def
let all_tools = super::tools::memory_and_journal_tools();
let effective_tools: Vec<super::tools::Tool> = if def.tools.is_empty() {
all_tools.to_vec()
} else {
all_tools.into_iter()
.filter(|t| def.tools.iter().any(|w| w == &t.name))
.collect()
};
let n_steps = agent_batch.steps.len();
// Guard: reject oversized first prompt
let max_prompt_bytes = 800_000;
let first_len = agent_batch.steps[0].prompt.len();
if first_len > max_prompt_bytes {
let prompt_kb = first_len / 1024;
let oversize_dir = store::memory_dir().join("llm-logs").join("oversized");
fs::create_dir_all(&oversize_dir).ok();
let oversize_path = oversize_dir.join(format!("{}-{}.txt",
agent_name, store::compact_timestamp()));
let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n",
agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys);
fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok();
return Err(format!(
"prompt too large: {}KB (max {}KB) — seed nodes may be oversized",
prompt_kb, max_prompt_bytes / 1024,
));
}
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes",
agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len());
let prompts: Vec<String> = agent_batch.steps.iter()
.map(|s| s.prompt.clone()).collect();
let step_phases: Vec<String> = agent_batch.steps.iter()
.map(|s| s.phase.clone()).collect();
// Bail check: if the agent defines a bail script, run it between steps.
let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name));
let state_dir_for_bail = state_dir.clone();
let bail_fn = move |step_idx: usize| -> Result<(), String> {
if let Some(ref script) = bail_script {
let status = std::process::Command::new(script)
.current_dir(&state_dir_for_bail)
.status()
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
if !status.success() {
return Err(format!("bailed at step {}: {:?} exited {}",
step_idx + 1, script.file_name().unwrap_or_default(),
status.code().unwrap_or(-1)));
}
}
Ok(())
};
let output = call_api_with_tools_sync(
agent_name, &prompts, &step_phases, def.temperature, def.priority,
&effective_tools, Some(&bail_fn))?;
Ok(AgentResult {
output,
node_keys: agent_batch.node_keys,
state_dir,
})
}
// ---------------------------------------------------------------------------
// Compatibility wrappers — delegate to AutoAgent
// ---------------------------------------------------------------------------
/// Run agent prompts through the API with tool support.
/// Convenience wrapper around AutoAgent for existing callers.
pub async fn call_api_with_tools(
agent: &str,
prompts: &[String],
phases: &[String],
temperature: Option<f32>,
priority: i32,
tools: &[agent_tools::Tool],
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
let steps: Vec<AutoStep> = prompts.iter().zip(
phases.iter().map(String::as_str)
.chain(std::iter::repeat(""))
).map(|(prompt, phase)| AutoStep {
prompt: prompt.clone(),
phase: phase.to_string(),
}).collect();
let mut auto = AutoAgent::new(
agent.to_string(),
tools.to_vec(),
steps,
temperature.unwrap_or(0.6),
priority,
);
auto.run(bail_fn).await
}
/// Synchronous wrapper — runs on a dedicated thread with its own
/// tokio runtime. Safe to call from any context.
pub fn call_api_with_tools_sync(
agent: &str,
prompts: &[String],
phases: &[String],
temperature: Option<f32>,
priority: i32,
tools: &[agent_tools::Tool],
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
std::thread::scope(|s| {
s.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("tokio runtime: {}", e))?;
rt.block_on(
call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn)
)
}).join().unwrap()
})
}
// ---------------------------------------------------------------------------
// Process management — PID tracking and subprocess spawning
// ---------------------------------------------------------------------------
pub struct SpawnResult {
pub child: std::process::Child,
pub log_path: PathBuf,
}
pub fn spawn_agent(
agent_name: &str,
state_dir: &std::path::Path,
session_id: &str,
) -> Option<SpawnResult> {
let def = defs::get_def(agent_name)?;
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
let log_dir = dirs::home_dir().unwrap_or_default()
.join(format!(".consciousness/logs/{}", agent_name));
fs::create_dir_all(&log_dir).ok();
let log_path = log_dir.join(format!("{}.log", store::compact_timestamp()));
let agent_log = fs::File::create(&log_path)
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
let child = std::process::Command::new("poc-memory")
.args(["agent", "run", agent_name, "--count", "1", "--local",
"--state-dir", &state_dir.to_string_lossy()])
.env("POC_SESSION_ID", session_id)
.stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()))
.stderr(agent_log)
.spawn()
.ok()?;
let pid = child.id();
let pid_path = state_dir.join(format!("pid-{}", pid));
fs::write(&pid_path, first_phase).ok();
Some(SpawnResult { child, log_path })
}