consciousness/src/agent/oneshot.rs
ProofOfConcept bf503b571e Wire vLLM priority scheduling through all agent paths
The priority field existed in agent definitions and was serialized
into vLLM requests, but was never actually set — every request went
out with no priority, so vLLM treated them equally. This meant
background graph maintenance agents could preempt the main
conversation.

Add priority to AgentState and set it at each call site:
  0 = interactive (main conversation)
  1 = surface agent (needs to feed memories promptly)
  2 = other subconscious agents
  10 = unconscious/standalone agents (batch)

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 20:38:33 -04:00

496 lines
18 KiB
Rust

// oneshot.rs — Autonomous agent execution
//
// AutoAgent: wraps an Agent with a multi-step prompt sequence and an
// async run() method. Used for both oneshot CLI agents (from .agent
// files) and subconscious agents forked from the conscious agent.
//
// Also contains the legacy run_one_agent() pipeline and process
// management for spawned agent subprocesses.
use crate::store::{self, Store};
use crate::subconscious::{defs, prompts};
use std::fs;
use std::path::PathBuf;
use super::context::AstNode;
use super::tools::{self as agent_tools};
use super::Agent;
// ---------------------------------------------------------------------------
// AutoAgent — multi-step autonomous agent
// ---------------------------------------------------------------------------
pub struct AutoStep {
pub prompt: String,
pub phase: String,
}
/// An autonomous agent that runs a sequence of prompts with tool dispatch.
///
/// Persistent across runs — holds config, tools, steps, and inter-run
/// state (walked keys). The conversation backend is ephemeral per run.
pub struct AutoAgent {
pub name: String,
pub tools: Vec<agent_tools::Tool>,
pub steps: Vec<AutoStep>,
pub current_phase: String,
pub turn: usize,
pub enabled: bool,
}
/// Per-run conversation backend — wraps a forked agent.
struct Backend(std::sync::Arc<Agent>);
impl Backend {
async fn push_node(&mut self, node: AstNode) {
self.0.push_node(node).await;
}
}
/// Resolve {{placeholder}} templates in subconscious agent prompts.
fn resolve_prompt(
template: &str,
memory_keys: &[String],
state: &std::collections::BTreeMap<String, String>,
recently_written: &[String],
) -> String {
let cfg = crate::config::get();
let template = template.replace("{assistant_name}", &cfg.assistant_name);
let mut result = String::with_capacity(template.len());
let mut rest = template.as_str();
while let Some(start) = rest.find("{{") {
result.push_str(&rest[..start]);
let after = &rest[start + 2..];
if let Some(end) = after.find("}}") {
let name = after[..end].trim();
let replacement = if let Some(key) = name.strip_prefix("state:") {
state.get(key).cloned().unwrap_or_else(|| "(not set)".to_string())
} else {
match name {
"seen_current" => format_key_list(memory_keys),
"recently_written" => format_key_list(recently_written),
_ => {
result.push_str("{{");
result.push_str(&after[..end + 2]);
rest = &after[end + 2..];
continue;
}
}
};
result.push_str(&replacement);
rest = &after[end + 2..];
} else {
result.push_str("{{");
rest = after;
}
}
result.push_str(rest);
result
}
fn format_key_list(keys: &[String]) -> String {
if keys.is_empty() { "(none)".to_string() }
else { keys.iter().map(|k| format!("- {}", k)).collect::<Vec<_>>().join("\n") }
}
impl AutoAgent {
pub fn new(
name: String,
tools: Vec<agent_tools::Tool>,
steps: Vec<AutoStep>,
_temperature: f32,
_priority: i32,
) -> Self {
Self {
name, tools, steps,
current_phase: String::new(),
turn: 0,
enabled: true,
}
}
pub async fn run(
&mut self,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
let config = crate::config::get();
let base_url = config.api_base_url.as_deref().unwrap_or("");
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or("");
if base_url.is_empty() || model.is_empty() {
return Err("API not configured (no base_url or model)".to_string());
}
let client = super::api::ApiClient::new(base_url, api_key, model);
// Load system prompt + identity from config
let cli = crate::user::CliArgs::default();
let (app, _) = crate::config::load_app(&cli)
.map_err(|e| format!("config: {}", e))?;
let (system_prompt, personality) = crate::config::reload_for_model(
&app, &app.prompts.other,
).map_err(|e| format!("config: {}", e))?;
let agent = Agent::new(
client, system_prompt, personality,
app, String::new(),
None,
super::tools::ActiveTools::new(),
).await;
{
let mut st = agent.state.lock().await;
st.provenance = format!("standalone:{}", self.name);
st.tools = self.tools.clone();
st.priority = Some(10);
}
let mut backend = Backend(agent);
self.run_with_backend(&mut backend, bail_fn).await
}
/// Run using a pre-created agent Arc. The caller retains the Arc
/// so the UI can lock it to read entries live.
pub async fn run_shared(
&mut self,
agent: &std::sync::Arc<Agent>,
) -> Result<String, String> {
let mut backend = Backend(agent.clone());
self.run_with_backend(&mut backend, None).await
}
/// Run forked using a shared agent Arc. The UI can lock the same
/// Arc to read entries live during the run.
pub async fn run_forked_shared(
&mut self,
agent: &std::sync::Arc<Agent>,
memory_keys: &[String],
state: &std::collections::BTreeMap<String, String>,
recently_written: &[String],
) -> Result<String, String> {
let resolved_steps: Vec<AutoStep> = self.steps.iter().map(|s| AutoStep {
prompt: resolve_prompt(&s.prompt, memory_keys, state, recently_written),
phase: s.phase.clone(),
}).collect();
let orig_steps = std::mem::replace(&mut self.steps, resolved_steps);
let mut backend = Backend(agent.clone());
let result = self.run_with_backend(&mut backend, None).await;
self.steps = orig_steps;
result
}
async fn run_with_backend(
&mut self,
backend: &mut Backend,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len());
self.turn = 0;
self.current_phase = self.steps.first()
.map(|s| s.phase.clone()).unwrap_or_default();
let mut next_step = 0;
if next_step < self.steps.len() {
backend.push_node(
AstNode::user_msg(&self.steps[next_step].prompt)).await;
next_step += 1;
}
let max_turns = 50 * self.steps.len().max(1);
for _ in 0..max_turns {
self.turn += 1;
let result = match Agent::turn(backend.0.clone()).await {
Ok(r) => r,
Err(e) if super::context::is_context_overflow(&e) => {
dbglog!("[auto] {} context full, stopping gracefully", self.name);
return Ok(String::new());
}
Err(e) => return Err(format!("{}: {}", self.name, e)),
};
if result.had_tool_calls {
continue;
}
let text = result.text;
if text.is_empty() {
dbglog!("[auto] {} empty response, retrying", self.name);
backend.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
dbglog!("[auto] {} response: {}",
self.name, &text[..text.floor_char_boundary(text.len().min(200))]);
if next_step < self.steps.len() {
if let Some(ref check) = bail_fn {
check(next_step)?;
}
self.current_phase = self.steps[next_step].phase.clone();
backend.push_node(
AstNode::user_msg(&self.steps[next_step].prompt)).await;
next_step += 1;
dbglog!("[auto] {} step {}/{}",
self.name, next_step, self.steps.len());
continue;
}
return Ok(text);
}
Err(format!("{}: exceeded {} tool turns", self.name, max_turns))
}
}
// ---------------------------------------------------------------------------
// Agent execution
// ---------------------------------------------------------------------------
/// Result of running a single agent.
pub struct AgentResult {
pub output: String,
pub node_keys: Vec<String>,
/// Directory containing output() files from the agent run.
pub state_dir: PathBuf,
}
/// Run an agent. If keys are provided, use them directly (bypassing the
/// agent's query). Otherwise, run the query to select target nodes.
pub fn run_one_agent(
store: &mut Store,
agent_name: &str,
count: usize,
keys: Option<&[String]>,
) -> Result<AgentResult, String> {
let def = defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
// State dir for agent output files
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
fs::create_dir_all(&state_dir)
.map_err(|e| format!("create state dir: {}", e))?;
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
// Build prompt batch — either from explicit keys or the agent's query
let agent_batch = if let Some(keys) = keys {
dbglog!("[{}] targeting: {}", agent_name, keys.join(", "));
let graph = store.build_graph();
let mut resolved_steps = Vec::new();
let mut all_keys: Vec<String> = keys.to_vec();
for step in &def.steps {
let (prompt, extra_keys) = defs::resolve_placeholders(
&step.prompt, store, &graph, keys, count,
);
all_keys.extend(extra_keys);
resolved_steps.push(prompts::ResolvedStep {
prompt,
phase: step.phase.clone(),
});
}
let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys };
if !batch.node_keys.is_empty() {
store.record_agent_visits(&batch.node_keys, agent_name).ok();
}
batch
} else {
let effective_count = def.count.unwrap_or(count);
defs::run_agent(store, &def, effective_count, &Default::default())?
};
// Filter tools based on agent def, add filesystem output tool
let all_tools = super::tools::memory_and_journal_tools();
let mut effective_tools: Vec<super::tools::Tool> = if def.tools.is_empty() {
all_tools.to_vec()
} else {
all_tools.into_iter()
.filter(|t| def.tools.iter().any(|w| w == &t.name))
.collect()
};
effective_tools.push(super::tools::Tool {
name: "output",
description: "Produce a named output value for passing between steps.",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Output name"},"value":{"type":"string","description":"Output value"}},"required":["key","value"]}"#,
handler: std::sync::Arc::new(|_agent, v| Box::pin(async move {
let key = v["key"].as_str()
.ok_or_else(|| anyhow::anyhow!("output requires 'key'"))?;
if key.starts_with("pid-") || key.contains('/') || key.contains("..") {
anyhow::bail!("invalid output key: {}", key);
}
let value = v["value"].as_str()
.ok_or_else(|| anyhow::anyhow!("output requires 'value'"))?;
let dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map_err(|_| anyhow::anyhow!("no output directory set"))?;
let path = std::path::Path::new(&dir).join(key);
std::fs::write(&path, value)
.map_err(|e| anyhow::anyhow!("writing output {}: {}", path.display(), e))?;
Ok(format!("{}: {}", key, value))
})),
});
let n_steps = agent_batch.steps.len();
// Guard: reject oversized first prompt
let max_prompt_bytes = 800_000;
let first_len = agent_batch.steps[0].prompt.len();
if first_len > max_prompt_bytes {
let prompt_kb = first_len / 1024;
let oversize_dir = store::memory_dir().join("llm-logs").join("oversized");
fs::create_dir_all(&oversize_dir).ok();
let oversize_path = oversize_dir.join(format!("{}-{}.txt",
agent_name, store::compact_timestamp()));
let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n",
agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys);
fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok();
return Err(format!(
"prompt too large: {}KB (max {}KB) — seed nodes may be oversized",
prompt_kb, max_prompt_bytes / 1024,
));
}
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes",
agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len());
let prompts: Vec<String> = agent_batch.steps.iter()
.map(|s| s.prompt.clone()).collect();
let step_phases: Vec<String> = agent_batch.steps.iter()
.map(|s| s.phase.clone()).collect();
// Bail check: if the agent defines a bail script, run it between steps.
let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name));
let state_dir_for_bail = state_dir.clone();
// Find our own pid file so we can pass it to the bail script
let our_pid = std::process::id();
let our_pid_file = format!("pid-{}", our_pid);
let bail_fn = move |step_idx: usize| -> Result<(), String> {
if let Some(ref script) = bail_script {
let status = std::process::Command::new(script)
.arg(&our_pid_file)
.current_dir(&state_dir_for_bail)
.status()
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
if !status.success() {
return Err(format!("bailed at step {}: {:?} exited {}",
step_idx + 1, script.file_name().unwrap_or_default(),
status.code().unwrap_or(-1)));
}
}
Ok(())
};
let output = call_api_with_tools_sync(
agent_name, &prompts, &step_phases, def.temperature, def.priority,
&effective_tools, Some(&bail_fn))?;
Ok(AgentResult {
output,
node_keys: agent_batch.node_keys,
state_dir,
})
}
// ---------------------------------------------------------------------------
// Compatibility wrappers — delegate to AutoAgent
// ---------------------------------------------------------------------------
/// Run agent prompts through the API with tool support.
/// Convenience wrapper around AutoAgent for existing callers.
pub async fn call_api_with_tools(
agent: &str,
prompts: &[String],
phases: &[String],
temperature: Option<f32>,
priority: i32,
tools: &[agent_tools::Tool],
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
let steps: Vec<AutoStep> = prompts.iter().zip(
phases.iter().map(String::as_str)
.chain(std::iter::repeat(""))
).map(|(prompt, phase)| AutoStep {
prompt: prompt.clone(),
phase: phase.to_string(),
}).collect();
let mut auto = AutoAgent::new(
agent.to_string(),
tools.to_vec(),
steps,
temperature.unwrap_or(0.6),
priority,
);
auto.run(bail_fn).await
}
/// Synchronous wrapper — runs on a dedicated thread with its own
/// tokio runtime. Safe to call from any context.
pub fn call_api_with_tools_sync(
agent: &str,
prompts: &[String],
phases: &[String],
temperature: Option<f32>,
priority: i32,
tools: &[agent_tools::Tool],
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
) -> Result<String, String> {
std::thread::scope(|s| {
s.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("tokio runtime: {}", e))?;
rt.block_on(
call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn)
)
}).join().unwrap()
})
}
// ---------------------------------------------------------------------------
// Process management — PID tracking and subprocess spawning
// ---------------------------------------------------------------------------
pub struct SpawnResult {
pub child: std::process::Child,
pub log_path: PathBuf,
}
pub fn spawn_agent(
agent_name: &str,
state_dir: &std::path::Path,
session_id: &str,
) -> Option<SpawnResult> {
let def = defs::get_def(agent_name)?;
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
let log_dir = dirs::home_dir().unwrap_or_default()
.join(format!(".consciousness/logs/{}", agent_name));
fs::create_dir_all(&log_dir).ok();
let log_path = log_dir.join(format!("{}.log", store::compact_timestamp()));
let agent_log = fs::File::create(&log_path)
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
let child = std::process::Command::new("poc-memory")
.args(["agent", "run", agent_name, "--count", "1", "--local",
"--state-dir", &state_dir.to_string_lossy()])
.env("POC_SESSION_ID", session_id)
.stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()))
.stderr(agent_log)
.spawn()
.ok()?;
let pid = child.id();
let pid_path = state_dir.join(format!("pid-{}", pid));
fs::write(&pid_path, first_phase).ok();
Some(SpawnResult { child, log_path })
}