Shared forked agent — UI reads subconscious entries live

The forked agent is now behind Arc<tokio::sync::Mutex<Agent>>,
stored on SubconsciousAgent and passed to the spawned task. The
subconscious detail screen locks it via try_lock() to read entries
from the fork point — live during runs, persisted after completion.

Removes last_run_entries snapshot. Backend::Forked now holds the
shared Arc, all push operations go through the lock.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-07 03:09:06 -04:00
parent 77b68ecc50
commit 93f5f8b0c7
3 changed files with 78 additions and 81 deletions

View file

@ -57,9 +57,6 @@ pub struct AutoAgent {
/// Named outputs from the agent's output() tool calls.
/// Collected per-run, read by Mind after completion.
pub outputs: std::collections::HashMap<String, String>,
/// Entries added during the last forked run (after the fork point).
/// The subconscious screen shows these.
pub last_run_entries: Vec<super::context::ConversationEntry>,
// Observable status
pub current_phase: String,
pub turn: usize,
@ -68,48 +65,43 @@ pub struct AutoAgent {
/// Per-run conversation backend — created fresh by run() or run_forked().
enum Backend {
Standalone { client: ApiClient, messages: Vec<Message> },
Forked(Agent),
Forked(std::sync::Arc<tokio::sync::Mutex<Agent>>),
}
impl Backend {
fn client(&self) -> &ApiClient {
async fn client(&self) -> ApiClient {
match self {
Backend::Standalone { client, .. } => client,
Backend::Forked(agent) => &agent.client,
Backend::Standalone { client, .. } => client.clone(),
Backend::Forked(agent) => agent.lock().await.client_clone(),
}
}
fn messages(&self) -> Vec<Message> {
async fn messages(&self) -> Vec<Message> {
match self {
Backend::Standalone { messages, .. } => messages.clone(),
Backend::Forked(agent) => agent.assemble_api_messages(),
Backend::Forked(agent) => agent.lock().await.assemble_api_messages(),
}
}
fn push_message(&mut self, msg: Message) {
async fn push_message(&mut self, msg: Message) {
match self {
Backend::Standalone { messages, .. } => messages.push(msg),
Backend::Forked(agent) => agent.push_message(msg),
Backend::Forked(agent) => agent.lock().await.push_message(msg),
}
}
fn push_raw(&mut self, msg: Message) {
async fn push_raw(&mut self, msg: Message) {
match self {
Backend::Standalone { messages, .. } => messages.push(msg),
Backend::Forked(agent) => {
agent.context.entries.push(
agent.lock().await.context.entries.push(
super::context::ConversationEntry::Message(msg));
}
}
}
fn log(&self, text: String) {
if let Backend::Forked(agent) = self {
if let Some(ref log) = agent.conversation_log {
let entry = super::context::ConversationEntry::Log(text);
log.append(&entry).ok();
}
}
fn log(&self, _text: String) {
// TODO: write Log entries to the forked agent's conversation log
}
}
@ -163,7 +155,6 @@ impl AutoAgent {
},
priority,
outputs: std::collections::HashMap::new(),
last_run_entries: Vec::new(),
current_phase: String::new(),
turn: 0,
}
@ -182,14 +173,11 @@ impl AutoAgent {
self.run_with_backend(&mut backend, bail_fn).await
}
/// Run forked from a conscious agent's context. Each call gets a
/// fresh fork for KV cache sharing.
///
/// `memory_keys`: Memory entry keys from conscious context (for {{seen_current}}).
/// `walked`: shared walked keys from previous runs (for {{walked}}).
pub async fn run_forked(
/// Run forked using a shared agent Arc. The UI can lock the same
/// Arc to read entries live during the run.
pub async fn run_forked_shared(
&mut self,
agent: &Agent,
agent: &std::sync::Arc<tokio::sync::Mutex<Agent>>,
memory_keys: &[String],
walked: &[String],
) -> Result<String, String> {
@ -198,16 +186,8 @@ impl AutoAgent {
phase: s.phase.clone(),
}).collect();
let orig_steps = std::mem::replace(&mut self.steps, resolved_steps);
let forked = agent.fork(self.tools.clone());
let fork_point = forked.context.entries.len();
let mut backend = Backend::Forked(forked);
let mut backend = Backend::Forked(agent.clone());
let result = self.run_with_backend(&mut backend, None).await;
if let Backend::Forked(ref agent) = backend {
let total = agent.context.entries.len();
self.last_run_entries = agent.context.entries[fork_point..].to_vec();
dbglog!("[auto] {} fork_point={} total={} captured={}",
self.name, fork_point, total, self.last_run_entries.len());
}
self.steps = orig_steps;
result
}
@ -226,7 +206,7 @@ impl AutoAgent {
if next_step < self.steps.len() {
backend.push_message(
Message::user(&self.steps[next_step].prompt));
Message::user(&self.steps[next_step].prompt)).await;
next_step += 1;
}
@ -235,20 +215,19 @@ impl AutoAgent {
for _ in 0..max_turns {
self.turn += 1;
let messages = backend.messages();
backend.log(format!("turn {} ({} messages)",
self.turn, messages.len()));
let messages = backend.messages().await;
let client = backend.client().await;
dbglog!("[auto] {} turn {} ({} messages)",
self.name, self.turn, messages.len());
let (msg, usage_opt) = Self::api_call_with_retry(
&self.name, backend, &self.tools, &messages,
&self.name, &client, &self.tools, &messages,
&reasoning, self.sampling, self.priority).await?;
if let Some(u) = &usage_opt {
backend.log(format!("tokens: {} prompt + {} completion",
u.prompt_tokens, u.completion_tokens));
dbglog!("[auto] {} tokens: {} prompt + {} completion",
self.name, u.prompt_tokens, u.completion_tokens);
}
let has_content = msg.content.is_some();
@ -261,28 +240,28 @@ impl AutoAgent {
let text = msg.content_text().to_string();
if text.is_empty() && !has_content {
backend.log("empty response, retrying".into());
dbglog!("[auto] {} empty response, retrying", self.name);
backend.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
)).await;
continue;
}
backend.log(format!("response: {}",
&text[..text.len().min(200)]));
dbglog!("[auto] {} response: {}",
self.name, &text[..text.len().min(200)]);
if next_step < self.steps.len() {
if let Some(ref check) = bail_fn {
check(next_step)?;
}
self.current_phase = self.steps[next_step].phase.clone();
backend.push_message(Message::assistant(&text));
backend.push_message(Message::assistant(&text)).await;
backend.push_message(
Message::user(&self.steps[next_step].prompt));
Message::user(&self.steps[next_step].prompt)).await;
next_step += 1;
backend.log(format!("step {}/{}",
next_step, self.steps.len()));
dbglog!("[auto] {} step {}/{}",
self.name, next_step, self.steps.len());
continue;
}
@ -294,14 +273,13 @@ impl AutoAgent {
async fn api_call_with_retry(
name: &str,
backend: &Backend,
client: &ApiClient,
tools: &[agent_tools::Tool],
messages: &[Message],
reasoning: &str,
sampling: super::api::SamplingParams,
priority: i32,
) -> Result<(Message, Option<Usage>), String> {
let client = backend.client();
let mut last_err = None;
for attempt in 0..5 {
match client.chat_completion_stream_temp(
@ -309,8 +287,8 @@ impl AutoAgent {
).await {
Ok((msg, usage)) => {
if let Some(ref e) = last_err {
backend.log(format!(
"succeeded after retry (previous: {})", e));
dbglog!("[auto] {} succeeded after retry (previous: {})",
name, e);
}
return Ok((msg, usage));
}
@ -322,9 +300,8 @@ impl AutoAgent {
|| err_str.contains("timed out")
|| err_str.contains("Connection refused");
if is_transient && attempt < 4 {
backend.log(format!(
"transient error (attempt {}): {}, retrying",
attempt + 1, err_str));
dbglog!("[auto] {} transient error (attempt {}): {}, retrying",
name, attempt + 1, err_str);
tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await;
last_err = Some(e);
continue;
@ -346,18 +323,17 @@ impl AutoAgent {
if let Some(ref mut calls) = sanitized.tool_calls {
for call in calls {
if serde_json::from_str::<serde_json::Value>(&call.function.arguments).is_err() {
backend.log(format!(
"sanitizing malformed args for {}: {}",
call.function.name, &call.function.arguments));
dbglog!("[auto] {} sanitizing malformed args for {}: {}",
self.name, call.function.name, &call.function.arguments);
call.function.arguments = "{}".to_string();
}
}
}
backend.push_raw(sanitized);
backend.push_raw(sanitized).await;
for call in msg.tool_calls.as_ref().unwrap() {
backend.log(format!("tool: {}({})",
call.function.name, &call.function.arguments));
dbglog!("[auto] {} tool: {}({})",
self.name, call.function.name, &call.function.arguments);
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
Ok(v) => v,
@ -366,7 +342,7 @@ impl AutoAgent {
&call.id,
"Error: your tool call had malformed JSON arguments. \
Please retry with valid JSON.",
));
)).await;
continue;
}
};
@ -383,8 +359,8 @@ impl AutoAgent {
agent_tools::dispatch(&call.function.name, &args).await
};
backend.log(format!("result: {} chars", output.len()));
backend.push_raw(Message::tool_result(&call.id, &output));
dbglog!("[auto] {} result: {} chars", self.name, output.len());
backend.push_raw(Message::tool_result(&call.id, &output)).await;
}
}
}