From d25033b9f4083f8ba87122d3d3226a1b7c124274 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 3 Apr 2026 22:14:35 -0400 Subject: [PATCH] fire XML tool calls as they arrive during streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When is detected in the content stream, parse and dispatch immediately via FuturesOrdered. Tool calls execute concurrently while the stream continues. Results collected in order after the stream ends. Structured API path (ToolCallDelta) unchanged — still uses post-stream parallel dispatch. Co-Developed-By: Kent Overstreet --- src/agent/runner.rs | 99 +++++++++++++++++++++++++++++++++++++++++++-- src/user/parsing.rs | 9 +++++ 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/agent/runner.rs b/src/agent/runner.rs index e4d2835..9c90f4b 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -80,6 +80,10 @@ pub struct Agent { pub memory_scores: Option, /// Whether a /score task is currently running. pub scoring_in_flight: bool, + /// Background tool calls that outlive the current turn. + background_tasks: futures::stream::FuturesUnordered< + std::pin::Pin + Send>> + >, } fn render_journal(entries: &[journal::JournalEntry]) -> String { @@ -131,6 +135,7 @@ impl Agent { agent_cycles, memory_scores: None, scoring_in_flight: false, + background_tasks: futures::stream::FuturesUnordered::new(), }; agent.load_startup_journal(); @@ -232,6 +237,26 @@ impl Agent { ))); } + // Inject completed background task results + { + use futures::{StreamExt, FutureExt}; + while let Some(Some((call, output))) = + std::pin::Pin::new(&mut self.background_tasks).next().now_or_never() + { + let preview = &output.text[..output.text.len().min(500)]; + let _ = ui_tx.send(UiMessage::Info(format!( + "[background] {} completed: {}", + call.function.name, + &preview[..preview.len().min(80)], + ))); + let notification = format!( + "\nTool: {}\nResult: {}\n", + call.function.name, preview, + ); + self.push_message(Message::user(notification)); + } + } + // User input — clean, just what was typed self.push_message(Message::user(user_input)); let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); @@ -266,8 +291,13 @@ impl Agent { let mut usage = None; let mut finish_reason = None; let mut in_tool_call = false; + let mut tool_call_buf = String::new(); let mut stream_error = None; let mut first_content = true; + // Tool calls fired during streaming (XML path) + let mut inflight: futures::stream::FuturesOrdered< + std::pin::Pin + Send>> + > = futures::stream::FuturesOrdered::new(); // Buffer for content not yet sent to UI — holds a tail // that might be a partial tag. let mut display_buf = String::new(); @@ -282,7 +312,46 @@ impl Agent { content.push_str(&text); if in_tool_call { - // Already inside a tool call — suppress display. + tool_call_buf.push_str(&text); + // Check for closing tag — parse and fire immediately + if let Some(end) = tool_call_buf.find("") { + let body = &tool_call_buf[..end]; + if let Some(call) = crate::user::parsing::parse_tool_call_body(body) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); + let args_summary = summarize_args(&call.function.name, &args); + let _ = ui_tx.send(UiMessage::ToolCall { + name: call.function.name.clone(), + args_summary: args_summary.clone(), + }); + let _ = ui_tx.send(UiMessage::ToolStarted { + id: call.id.clone(), + name: call.function.name.clone(), + detail: args_summary, + }); + let tracker = self.process_tracker.clone(); + let is_background = args.get("run_in_background") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let future = Box::pin(async move { + let output = tools::dispatch(&call.function.name, &args, &tracker).await; + (call, output) + }); + if is_background { + self.background_tasks.push(future); + } else { + inflight.push_back(future); + } + } + // Reset for potential next tool call + let remaining = tool_call_buf[end + "".len()..].to_string(); + tool_call_buf.clear(); + in_tool_call = false; + // Any content after goes back to display + if !remaining.trim().is_empty() { + display_buf.push_str(&remaining); + } + } } else { display_buf.push_str(&text); @@ -417,8 +486,18 @@ impl Agent { empty_retries = 0; } - // Tool calls (structured from API, or recovered from content - // by build_response_message if the model leaked them as XML). + // Collect tool calls that were fired during streaming + if !inflight.is_empty() { + use futures::StreamExt; + self.push_message(msg.clone()); + while let Some((call, output)) = inflight.next().await { + self.apply_tool_result(&call, output, ui_tx, &mut ds); + } + self.publish_context_state(); + continue; + } + + // Tool calls (structured API path — not fired during stream). if let Some(ref tool_calls) = msg.tool_calls { if !tool_calls.is_empty() { self.push_message(msg.clone()); @@ -504,6 +583,20 @@ impl Agent { let output = tools::dispatch(&call.function.name, &args, &self.process_tracker).await; + self.apply_tool_result(call, output, ui_tx, ds); + } + + /// Apply a completed tool result to conversation state. + fn apply_tool_result( + &mut self, + call: &ToolCall, + output: tools::ToolOutput, + ui_tx: &UiSender, + ds: &mut DispatchState, + ) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); + if output.is_yield { ds.yield_requested = true; } else { diff --git a/src/user/parsing.rs b/src/user/parsing.rs index 99ff9c7..587dd3b 100644 --- a/src/user/parsing.rs +++ b/src/user/parsing.rs @@ -16,6 +16,15 @@ use crate::user::types::*; /// Parse leaked tool calls from response text. /// Looks for `...` blocks and tries both /// XML and JSON formats for the body. +/// Parse a single tool call body (content between `` and ``). +pub fn parse_tool_call_body(body: &str) -> Option { + let normalized = normalize_xml_tags(body); + let body = normalized.trim(); + let mut counter = 0u32; + parse_xml_tool_call(body, &mut counter) + .or_else(|| parse_json_tool_call(body, &mut counter)) +} + pub fn parse_leaked_tool_calls(text: &str) -> Vec { // Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "" // This handles streaming tokenizers that split tags across tokens.