diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 2e64c7f..0874701 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -85,10 +85,6 @@ pub struct Agent { pub scoring_in_flight: bool, /// Shared active tools — Agent writes, TUI reads. pub active_tools: crate::user::ui_channel::SharedActiveTools, - /// Background tool calls that outlive the current turn. - background_tasks: futures::stream::FuturesUnordered< - std::pin::Pin + Send>> - >, } fn render_journal(entries: &[journal::JournalEntry]) -> String { @@ -142,7 +138,6 @@ impl Agent { memory_scores: None, scoring_in_flight: false, active_tools, - background_tasks: futures::stream::FuturesUnordered::new(), }; agent.load_startup_journal(); @@ -245,17 +240,29 @@ impl Agent { } // Inject completed background task results + // Collect completed background tool calls { - use futures::{StreamExt, FutureExt}; let mut bg_ds = DispatchState { yield_requested: false, had_tool_calls: false, tool_errors: 0, model_switch: None, dmn_pause: false, }; - while let Some(Some((call, output))) = - std::pin::Pin::new(&mut self.background_tasks).next().now_or_never() - { - // Show result in TUI and inject into conversation - self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + let finished: Vec<_> = { + let mut tools = self.active_tools.lock().unwrap(); + let mut done = Vec::new(); + let mut i = 0; + while i < tools.len() { + if tools[i].handle.is_finished() { + done.push(tools.remove(i)); + } else { + i += 1; + } + } + done + }; + for mut entry in finished { + if let Ok((call, output)) = entry.handle.await { + self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + } } } @@ -296,10 +303,6 @@ impl Agent { let mut tool_call_buf = String::new(); let mut stream_error = None; let mut first_content = true; - // Tool calls fired during streaming (XML path) - let mut inflight: futures::stream::FuturesOrdered< - std::pin::Pin + Send>> - > = futures::stream::FuturesOrdered::new(); // Buffer for content not yet sent to UI — holds a tail // that might be a partial tag. let mut display_buf = String::new(); @@ -326,27 +329,26 @@ impl Agent { name: call.function.name.clone(), args_summary: args_summary.clone(), }); - self.active_tools.write().unwrap().push( - crate::user::ui_channel::ActiveTool { - id: call.id.clone(), - name: call.function.name.clone(), - detail: args_summary, - started: std::time::Instant::now(), - } - ); - let tracker = self.process_tracker.clone(); let is_background = args.get("run_in_background") .and_then(|v| v.as_bool()) .unwrap_or(false); - let future = Box::pin(async move { + let call_id = call.id.clone(); + let call_name = call.function.name.clone(); + let tracker = self.process_tracker.clone(); + let handle = tokio::spawn(async move { let output = tools::dispatch(&call.function.name, &args, &tracker).await; (call, output) }); - if is_background { - self.background_tasks.push(future); - } else { - inflight.push_back(future); - } + self.active_tools.lock().unwrap().push( + crate::user::ui_channel::ActiveToolCall { + id: call_id, + name: call_name, + detail: args_summary, + started: std::time::Instant::now(), + background: is_background, + handle, + } + ); } // Reset for potential next tool call let remaining = tool_call_buf[end + "".len()..].to_string(); @@ -491,15 +493,31 @@ impl Agent { empty_retries = 0; } - // Collect tool calls that were fired during streaming - if !inflight.is_empty() { - use futures::StreamExt; - self.push_message(msg.clone()); - while let Some((call, output)) = inflight.next().await { - self.apply_tool_result(&call, output, ui_tx, &mut ds); + // Collect non-background tool calls fired during streaming + { + let pending: Vec<_> = { + let mut tools = self.active_tools.lock().unwrap(); + let mut non_bg = Vec::new(); + let mut i = 0; + while i < tools.len() { + if !tools[i].background { + non_bg.push(tools.remove(i)); + } else { + i += 1; + } + } + non_bg + }; + if !pending.is_empty() { + self.push_message(msg.clone()); + for mut entry in pending { + if let Ok((call, output)) = entry.handle.await { + self.apply_tool_result(&call, output, ui_tx, &mut ds); + } + } + self.publish_context_state(); + continue; } - self.publish_context_state(); - continue; } // Tool calls (structured API path — not fired during stream). @@ -553,45 +571,46 @@ impl Agent { name: call.function.name.clone(), args_summary: args_summary.clone(), }); - self.active_tools.write().unwrap().push( - crate::user::ui_channel::ActiveTool { - id: call.id.clone(), - name: call.function.name.clone(), - detail: args_summary, - started: std::time::Instant::now(), - } - ); - - // Handle working_stack tool — needs &mut self for context state + // Handle working_stack — needs &mut self, can't be spawned if call.function.name == "working_stack" { let result = tools::working_stack::handle(&args, &mut self.context.working_stack); - let output = tools::ToolOutput { - text: result.clone(), - is_yield: false, - images: Vec::new(), - model_switch: None, - dmn_pause: false, - }; - let _ = ui_tx.send(UiMessage::ToolResult { - name: call.function.name.clone(), - result: output.text.clone(), - }); - self.active_tools.write().unwrap().retain(|t| t.id != call.id); - self.push_message(Message::tool_result(&call.id, &output.text)); - ds.had_tool_calls = true; - - // Re-render the context message so the model sees the updated stack + let output = tools::ToolOutput::text(result.clone()); + self.apply_tool_result(call, output, ui_tx, ds); if !result.starts_with("Error:") { self.refresh_context_state(); } return; } - // Dispatch through unified path - let output = - tools::dispatch(&call.function.name, &args, &self.process_tracker).await; + // Spawn, push to active_tools, await handle + let call_id = call.id.clone(); + let call_name = call.function.name.clone(); + let call = call.clone(); + let tracker = self.process_tracker.clone(); + let handle = tokio::spawn(async move { + let output = tools::dispatch(&call.function.name, &args, &tracker).await; + (call, output) + }); + self.active_tools.lock().unwrap().push( + tools::ActiveToolCall { + id: call_id, + name: call_name, + detail: args_summary, + started: std::time::Instant::now(), + background: false, + handle, + } + ); - self.apply_tool_result(call, output, ui_tx, ds); + // Wait for this non-background tool to complete + let entry = { + let mut tools = self.active_tools.lock().unwrap(); + // It's the last one we pushed + tools.pop().unwrap() + }; + if let Ok((call, output)) = entry.handle.await { + self.apply_tool_result(&call, output, ui_tx, ds); + } } /// Apply a completed tool result to conversation state. @@ -624,7 +643,7 @@ impl Agent { name: call.function.name.clone(), result: output.text.clone(), }); - self.active_tools.write().unwrap().retain(|t| t.id != call.id); + self.active_tools.lock().unwrap().retain(|t| t.id != call.id); // Tag memory_render results for context deduplication if call.function.name == "memory_render" && !output.text.starts_with("Error:") { diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 0392113..70a5402 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -118,6 +118,17 @@ pub struct ProcessInfo { pub started: Instant, } +/// A tool call in flight — metadata for TUI + JoinHandle for +/// result collection and cancellation. +pub struct ActiveToolCall { + pub id: String, + pub name: String, + pub detail: String, + pub started: Instant, + pub background: bool, + pub handle: tokio::task::JoinHandle<(ToolCall, ToolOutput)>, +} + /// Shared tracker for running child processes. Allows the TUI to /// display what's running and kill processes by PID. #[derive(Debug, Clone, Default)] diff --git a/src/user/tui/context_screen.rs b/src/user/tui/context_screen.rs index c6967d2..d4ac5c0 100644 --- a/src/user/tui/context_screen.rs +++ b/src/user/tui/context_screen.rs @@ -168,7 +168,7 @@ impl App { ))); lines.push(Line::raw(format!(" Reasoning: {}", self.reasoning_effort))); lines.push(Line::raw(format!(" Running processes: {}", self.running_processes))); - lines.push(Line::raw(format!(" Active tools: {}", self.active_tools.read().unwrap().len()))); + lines.push(Line::raw(format!(" Active tools: {}", self.active_tools.lock().unwrap().len()))); let block = Block::default() .title_top(Line::from(SCREEN_LEGEND).left_aligned()) diff --git a/src/user/tui/main_screen.rs b/src/user/tui/main_screen.rs index fa8ce75..bc95d69 100644 --- a/src/user/tui/main_screen.rs +++ b/src/user/tui/main_screen.rs @@ -17,7 +17,7 @@ impl App { /// Draw the main (F1) screen — four-pane layout with status bar. pub(crate) fn draw_main(&mut self, frame: &mut Frame, size: Rect) { // Main layout: content area + active tools overlay + status bar - let active_tools = self.active_tools.read().unwrap(); + let active_tools = self.active_tools.lock().unwrap(); let tool_lines = active_tools.len() as u16; let main_chunks = Layout::default() .direction(Direction::Vertical) diff --git a/src/user/ui_channel.rs b/src/user/ui_channel.rs index 29512ef..d4d8bc0 100644 --- a/src/user/ui_channel.rs +++ b/src/user/ui_channel.rs @@ -22,20 +22,14 @@ pub fn shared_context_state() -> SharedContextState { Arc::new(RwLock::new(Vec::new())) } -/// Active tool info for TUI display. -#[derive(Debug, Clone)] -pub struct ActiveTool { - pub id: String, - pub name: String, - pub detail: String, - pub started: std::time::Instant, -} +// ActiveToolCall lives in agent::tools — re-export for TUI access +pub use crate::agent::tools::ActiveToolCall; -/// Shared active tools — agent writes, TUI reads. -pub type SharedActiveTools = Arc>>; +/// Shared active tool calls — agent spawns, TUI reads metadata / aborts. +pub type SharedActiveTools = Arc>>; pub fn shared_active_tools() -> SharedActiveTools { - Arc::new(RwLock::new(Vec::new())) + Arc::new(std::sync::Mutex::new(Vec::new())) } /// Which pane streaming text should go to.