From 93f5f8b0c7aa6af56310ae873026a2d5064a7842 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Tue, 7 Apr 2026 03:09:06 -0400 Subject: [PATCH] =?UTF-8?q?Shared=20forked=20agent=20=E2=80=94=20UI=20read?= =?UTF-8?q?s=20subconscious=20entries=20live?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The forked agent is now behind Arc>, stored on SubconsciousAgent and passed to the spawned task. The subconscious detail screen locks it via try_lock() to read entries from the fork point — live during runs, persisted after completion. Removes last_run_entries snapshot. Backend::Forked now holds the shared Arc, all push operations go through the lock. Co-Authored-By: Proof of Concept --- src/agent/oneshot.rs | 112 +++++++++++++++------------------------ src/mind/dmn.rs | 30 ++++++++--- src/user/subconscious.rs | 17 ++++-- 3 files changed, 78 insertions(+), 81 deletions(-) diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index beabaee..5da1dde 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -57,9 +57,6 @@ pub struct AutoAgent { /// Named outputs from the agent's output() tool calls. /// Collected per-run, read by Mind after completion. pub outputs: std::collections::HashMap, - /// Entries added during the last forked run (after the fork point). - /// The subconscious screen shows these. - pub last_run_entries: Vec, // Observable status pub current_phase: String, pub turn: usize, @@ -68,48 +65,43 @@ pub struct AutoAgent { /// Per-run conversation backend — created fresh by run() or run_forked(). enum Backend { Standalone { client: ApiClient, messages: Vec }, - Forked(Agent), + Forked(std::sync::Arc>), } impl Backend { - fn client(&self) -> &ApiClient { + async fn client(&self) -> ApiClient { match self { - Backend::Standalone { client, .. } => client, - Backend::Forked(agent) => &agent.client, + Backend::Standalone { client, .. } => client.clone(), + Backend::Forked(agent) => agent.lock().await.client_clone(), } } - fn messages(&self) -> Vec { + async fn messages(&self) -> Vec { match self { Backend::Standalone { messages, .. } => messages.clone(), - Backend::Forked(agent) => agent.assemble_api_messages(), + Backend::Forked(agent) => agent.lock().await.assemble_api_messages(), } } - fn push_message(&mut self, msg: Message) { + async fn push_message(&mut self, msg: Message) { match self { Backend::Standalone { messages, .. } => messages.push(msg), - Backend::Forked(agent) => agent.push_message(msg), + Backend::Forked(agent) => agent.lock().await.push_message(msg), } } - fn push_raw(&mut self, msg: Message) { + async fn push_raw(&mut self, msg: Message) { match self { Backend::Standalone { messages, .. } => messages.push(msg), Backend::Forked(agent) => { - agent.context.entries.push( + agent.lock().await.context.entries.push( super::context::ConversationEntry::Message(msg)); } } } - fn log(&self, text: String) { - if let Backend::Forked(agent) = self { - if let Some(ref log) = agent.conversation_log { - let entry = super::context::ConversationEntry::Log(text); - log.append(&entry).ok(); - } - } + fn log(&self, _text: String) { + // TODO: write Log entries to the forked agent's conversation log } } @@ -163,7 +155,6 @@ impl AutoAgent { }, priority, outputs: std::collections::HashMap::new(), - last_run_entries: Vec::new(), current_phase: String::new(), turn: 0, } @@ -182,14 +173,11 @@ impl AutoAgent { self.run_with_backend(&mut backend, bail_fn).await } - /// Run forked from a conscious agent's context. Each call gets a - /// fresh fork for KV cache sharing. - /// - /// `memory_keys`: Memory entry keys from conscious context (for {{seen_current}}). - /// `walked`: shared walked keys from previous runs (for {{walked}}). - pub async fn run_forked( + /// Run forked using a shared agent Arc. The UI can lock the same + /// Arc to read entries live during the run. + pub async fn run_forked_shared( &mut self, - agent: &Agent, + agent: &std::sync::Arc>, memory_keys: &[String], walked: &[String], ) -> Result { @@ -198,16 +186,8 @@ impl AutoAgent { phase: s.phase.clone(), }).collect(); let orig_steps = std::mem::replace(&mut self.steps, resolved_steps); - let forked = agent.fork(self.tools.clone()); - let fork_point = forked.context.entries.len(); - let mut backend = Backend::Forked(forked); + let mut backend = Backend::Forked(agent.clone()); let result = self.run_with_backend(&mut backend, None).await; - if let Backend::Forked(ref agent) = backend { - let total = agent.context.entries.len(); - self.last_run_entries = agent.context.entries[fork_point..].to_vec(); - dbglog!("[auto] {} fork_point={} total={} captured={}", - self.name, fork_point, total, self.last_run_entries.len()); - } self.steps = orig_steps; result } @@ -226,7 +206,7 @@ impl AutoAgent { if next_step < self.steps.len() { backend.push_message( - Message::user(&self.steps[next_step].prompt)); + Message::user(&self.steps[next_step].prompt)).await; next_step += 1; } @@ -235,20 +215,19 @@ impl AutoAgent { for _ in 0..max_turns { self.turn += 1; - let messages = backend.messages(); - backend.log(format!("turn {} ({} messages)", - self.turn, messages.len())); + let messages = backend.messages().await; + let client = backend.client().await; dbglog!("[auto] {} turn {} ({} messages)", self.name, self.turn, messages.len()); let (msg, usage_opt) = Self::api_call_with_retry( - &self.name, backend, &self.tools, &messages, + &self.name, &client, &self.tools, &messages, &reasoning, self.sampling, self.priority).await?; if let Some(u) = &usage_opt { - backend.log(format!("tokens: {} prompt + {} completion", - u.prompt_tokens, u.completion_tokens)); + dbglog!("[auto] {} tokens: {} prompt + {} completion", + self.name, u.prompt_tokens, u.completion_tokens); } let has_content = msg.content.is_some(); @@ -261,28 +240,28 @@ impl AutoAgent { let text = msg.content_text().to_string(); if text.is_empty() && !has_content { - backend.log("empty response, retrying".into()); + dbglog!("[auto] {} empty response, retrying", self.name); backend.push_message(Message::user( "[system] Your previous response was empty. \ Please respond with text or use a tool." - )); + )).await; continue; } - backend.log(format!("response: {}", - &text[..text.len().min(200)])); + dbglog!("[auto] {} response: {}", + self.name, &text[..text.len().min(200)]); if next_step < self.steps.len() { if let Some(ref check) = bail_fn { check(next_step)?; } self.current_phase = self.steps[next_step].phase.clone(); - backend.push_message(Message::assistant(&text)); + backend.push_message(Message::assistant(&text)).await; backend.push_message( - Message::user(&self.steps[next_step].prompt)); + Message::user(&self.steps[next_step].prompt)).await; next_step += 1; - backend.log(format!("step {}/{}", - next_step, self.steps.len())); + dbglog!("[auto] {} step {}/{}", + self.name, next_step, self.steps.len()); continue; } @@ -294,14 +273,13 @@ impl AutoAgent { async fn api_call_with_retry( name: &str, - backend: &Backend, + client: &ApiClient, tools: &[agent_tools::Tool], messages: &[Message], reasoning: &str, sampling: super::api::SamplingParams, priority: i32, ) -> Result<(Message, Option), String> { - let client = backend.client(); let mut last_err = None; for attempt in 0..5 { match client.chat_completion_stream_temp( @@ -309,8 +287,8 @@ impl AutoAgent { ).await { Ok((msg, usage)) => { if let Some(ref e) = last_err { - backend.log(format!( - "succeeded after retry (previous: {})", e)); + dbglog!("[auto] {} succeeded after retry (previous: {})", + name, e); } return Ok((msg, usage)); } @@ -322,9 +300,8 @@ impl AutoAgent { || err_str.contains("timed out") || err_str.contains("Connection refused"); if is_transient && attempt < 4 { - backend.log(format!( - "transient error (attempt {}): {}, retrying", - attempt + 1, err_str)); + dbglog!("[auto] {} transient error (attempt {}): {}, retrying", + name, attempt + 1, err_str); tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; last_err = Some(e); continue; @@ -346,18 +323,17 @@ impl AutoAgent { if let Some(ref mut calls) = sanitized.tool_calls { for call in calls { if serde_json::from_str::(&call.function.arguments).is_err() { - backend.log(format!( - "sanitizing malformed args for {}: {}", - call.function.name, &call.function.arguments)); + dbglog!("[auto] {} sanitizing malformed args for {}: {}", + self.name, call.function.name, &call.function.arguments); call.function.arguments = "{}".to_string(); } } } - backend.push_raw(sanitized); + backend.push_raw(sanitized).await; for call in msg.tool_calls.as_ref().unwrap() { - backend.log(format!("tool: {}({})", - call.function.name, &call.function.arguments)); + dbglog!("[auto] {} tool: {}({})", + self.name, call.function.name, &call.function.arguments); let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { Ok(v) => v, @@ -366,7 +342,7 @@ impl AutoAgent { &call.id, "Error: your tool call had malformed JSON arguments. \ Please retry with valid JSON.", - )); + )).await; continue; } }; @@ -383,8 +359,8 @@ impl AutoAgent { agent_tools::dispatch(&call.function.name, &args).await }; - backend.log(format!("result: {} chars", output.len())); - backend.push_raw(Message::tool_result(&call.id, &output)); + dbglog!("[auto] {} result: {} chars", self.name, output.len()); + backend.push_raw(Message::tool_result(&call.id, &output)).await; } } } diff --git a/src/mind/dmn.rs b/src/mind/dmn.rs index 28d6007..1e9c4a0 100644 --- a/src/mind/dmn.rs +++ b/src/mind/dmn.rs @@ -285,15 +285,19 @@ const AGENTS: &[(&str, u64)] = &[ ("subconscious-reflect", 100_000), // every ~100KB of conversation ]; -/// Lightweight snapshot for the TUI. -#[derive(Clone, Default)] +/// Snapshot for the TUI — includes a handle to the forked agent +/// so the detail view can read entries live. +#[derive(Clone)] pub struct SubconsciousSnapshot { pub name: String, pub running: bool, pub current_phase: String, pub turn: usize, pub last_run_secs_ago: Option, - pub last_run_entries: Vec, + /// Shared handle to the forked agent — UI locks to read entries. + pub forked_agent: Option>>, + /// Entry index where the fork diverged. + pub fork_point: usize, } struct SubconsciousAgent { @@ -301,7 +305,11 @@ struct SubconsciousAgent { auto: AutoAgent, last_trigger_bytes: u64, last_run: Option, - last_run_entries: Vec, + /// The forked agent for the current/last run. Shared with the + /// spawned task so the UI can read entries live. + forked_agent: Option>>, + /// Entry index where the fork diverged from the conscious agent. + fork_point: usize, handle: Option)>>, } @@ -331,7 +339,7 @@ impl SubconsciousAgent { Some(Self { name: name.to_string(), auto, last_trigger_bytes: 0, last_run: None, - last_run_entries: Vec::new(), handle: None, + forked_agent: None, fork_point: 0, handle: None, }) } @@ -352,7 +360,8 @@ impl SubconsciousAgent { current_phase: self.auto.current_phase.clone(), turn: self.auto.turn, last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()), - last_run_entries: self.last_run_entries.clone(), + forked_agent: self.forked_agent.clone(), + fork_point: self.fork_point, } } } @@ -393,7 +402,6 @@ impl Subconscious { let (auto_back, result) = handle.await.unwrap_or_else( |e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0), Err(format!("task panicked: {}", e)))); - self.agents[idx].last_run_entries = auto_back.last_run_entries.clone(); self.agents[idx].auto = auto_back; match result { @@ -490,11 +498,17 @@ impl Subconscious { dbglog!("[subconscious] triggering {}", auto.name); let forked = conscious.fork(auto.tools.clone()); + let fork_point = forked.context.entries.len(); + let shared_forked = Arc::new(tokio::sync::Mutex::new(forked)); + + self.agents[idx].forked_agent = Some(shared_forked.clone()); + self.agents[idx].fork_point = fork_point; + let keys = memory_keys.clone(); let w = walked.clone(); self.agents[idx].handle = Some(tokio::spawn(async move { - let result = auto.run_forked(&forked, &keys, &w).await; + let result = auto.run_forked_shared(&shared_forked, &keys, &w).await; (auto, result) })); } diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index 9d7bb57..e688ad4 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -115,7 +115,10 @@ impl SubconsciousScreen { else { format!("{:.1}h ago", s / 3600.0) } }) .unwrap_or_else(|| "never".to_string()); - let entries = snap.last_run_entries.len(); + let entries = snap.forked_agent.as_ref() + .and_then(|a| a.try_lock().ok()) + .map(|ag| ag.context.entries.len().saturating_sub(snap.fork_point)) + .unwrap_or(0); vec![ Span::styled( format!("{}{:<30}", prefix, snap.name), @@ -160,11 +163,17 @@ impl SubconsciousScreen { lines.push(Line::styled(" (Esc/← back, ↑/↓/PgUp/PgDn scroll)", hint)); lines.push(Line::raw("")); - if snap.last_run_entries.is_empty() { + // Read entries from the forked agent (from fork point onward) + let entries: Vec = snap.forked_agent.as_ref() + .and_then(|agent| agent.try_lock().ok()) + .map(|ag| ag.context.entries[snap.fork_point..].to_vec()) + .unwrap_or_default(); + + if entries.is_empty() { lines.push(Line::styled(" (no run data)", hint)); } - for entry in &snap.last_run_entries { + for entry in &entries { if entry.is_log() { if let ConversationEntry::Log(text) = entry { lines.push(Line::styled( @@ -189,14 +198,12 @@ impl SubconsciousScreen { .collect::>().join(", ") }); - // Role header let header = match &tool_info { Some(tools) => format!(" [{} → {}]", role_str, tools), None => format!(" [{}]", role_str), }; lines.push(Line::styled(header, Style::default().fg(role_color))); - // Content (truncated per line) if !text.is_empty() { for line in text.lines().take(20) { lines.push(Line::styled(