fire XML tool calls as they arrive during streaming

When </tool_call> is detected in the content stream, parse and
dispatch immediately via FuturesOrdered. Tool calls execute
concurrently while the stream continues. Results collected in
order after the stream ends.

Structured API path (ToolCallDelta) unchanged — still uses
post-stream parallel dispatch.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-04-03 22:14:35 -04:00
parent 2f0c7ce5c2
commit d25033b9f4
2 changed files with 105 additions and 3 deletions

View file

@ -80,6 +80,10 @@ pub struct Agent {
pub memory_scores: Option<crate::agent::training::MemoryScore>, pub memory_scores: Option<crate::agent::training::MemoryScore>,
/// Whether a /score task is currently running. /// Whether a /score task is currently running.
pub scoring_in_flight: bool, pub scoring_in_flight: bool,
/// Background tool calls that outlive the current turn.
background_tasks: futures::stream::FuturesUnordered<
std::pin::Pin<Box<dyn std::future::Future<Output = (ToolCall, tools::ToolOutput)> + Send>>
>,
} }
fn render_journal(entries: &[journal::JournalEntry]) -> String { fn render_journal(entries: &[journal::JournalEntry]) -> String {
@ -131,6 +135,7 @@ impl Agent {
agent_cycles, agent_cycles,
memory_scores: None, memory_scores: None,
scoring_in_flight: false, scoring_in_flight: false,
background_tasks: futures::stream::FuturesUnordered::new(),
}; };
agent.load_startup_journal(); agent.load_startup_journal();
@ -232,6 +237,26 @@ impl Agent {
))); )));
} }
// Inject completed background task results
{
use futures::{StreamExt, FutureExt};
while let Some(Some((call, output))) =
std::pin::Pin::new(&mut self.background_tasks).next().now_or_never()
{
let preview = &output.text[..output.text.len().min(500)];
let _ = ui_tx.send(UiMessage::Info(format!(
"[background] {} completed: {}",
call.function.name,
&preview[..preview.len().min(80)],
)));
let notification = format!(
"<task-notification>\nTool: {}\nResult: {}\n</task-notification>",
call.function.name, preview,
);
self.push_message(Message::user(notification));
}
}
// User input — clean, just what was typed // User input — clean, just what was typed
self.push_message(Message::user(user_input)); self.push_message(Message::user(user_input));
let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots()));
@ -266,8 +291,13 @@ impl Agent {
let mut usage = None; let mut usage = None;
let mut finish_reason = None; let mut finish_reason = None;
let mut in_tool_call = false; let mut in_tool_call = false;
let mut tool_call_buf = String::new();
let mut stream_error = None; let mut stream_error = None;
let mut first_content = true; let mut first_content = true;
// Tool calls fired during streaming (XML path)
let mut inflight: futures::stream::FuturesOrdered<
std::pin::Pin<Box<dyn std::future::Future<Output = (ToolCall, tools::ToolOutput)> + Send>>
> = futures::stream::FuturesOrdered::new();
// Buffer for content not yet sent to UI — holds a tail // Buffer for content not yet sent to UI — holds a tail
// that might be a partial <tool_call> tag. // that might be a partial <tool_call> tag.
let mut display_buf = String::new(); let mut display_buf = String::new();
@ -282,7 +312,46 @@ impl Agent {
content.push_str(&text); content.push_str(&text);
if in_tool_call { if in_tool_call {
// Already inside a tool call — suppress display. tool_call_buf.push_str(&text);
// Check for closing tag — parse and fire immediately
if let Some(end) = tool_call_buf.find("</tool_call>") {
let body = &tool_call_buf[..end];
if let Some(call) = crate::user::parsing::parse_tool_call_body(body) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args);
let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(),
args_summary: args_summary.clone(),
});
let _ = ui_tx.send(UiMessage::ToolStarted {
id: call.id.clone(),
name: call.function.name.clone(),
detail: args_summary,
});
let tracker = self.process_tracker.clone();
let is_background = args.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let future = Box::pin(async move {
let output = tools::dispatch(&call.function.name, &args, &tracker).await;
(call, output)
});
if is_background {
self.background_tasks.push(future);
} else {
inflight.push_back(future);
}
}
// Reset for potential next tool call
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
tool_call_buf.clear();
in_tool_call = false;
// Any content after </tool_call> goes back to display
if !remaining.trim().is_empty() {
display_buf.push_str(&remaining);
}
}
} else { } else {
display_buf.push_str(&text); display_buf.push_str(&text);
@ -417,8 +486,18 @@ impl Agent {
empty_retries = 0; empty_retries = 0;
} }
// Tool calls (structured from API, or recovered from content // Collect tool calls that were fired during streaming
// by build_response_message if the model leaked them as XML). if !inflight.is_empty() {
use futures::StreamExt;
self.push_message(msg.clone());
while let Some((call, output)) = inflight.next().await {
self.apply_tool_result(&call, output, ui_tx, &mut ds);
}
self.publish_context_state();
continue;
}
// Tool calls (structured API path — not fired during stream).
if let Some(ref tool_calls) = msg.tool_calls { if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() { if !tool_calls.is_empty() {
self.push_message(msg.clone()); self.push_message(msg.clone());
@ -504,6 +583,20 @@ impl Agent {
let output = let output =
tools::dispatch(&call.function.name, &args, &self.process_tracker).await; tools::dispatch(&call.function.name, &args, &self.process_tracker).await;
self.apply_tool_result(call, output, ui_tx, ds);
}
/// Apply a completed tool result to conversation state.
fn apply_tool_result(
&mut self,
call: &ToolCall,
output: tools::ToolOutput,
ui_tx: &UiSender,
ds: &mut DispatchState,
) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
if output.is_yield { if output.is_yield {
ds.yield_requested = true; ds.yield_requested = true;
} else { } else {

View file

@ -16,6 +16,15 @@ use crate::user::types::*;
/// Parse leaked tool calls from response text. /// Parse leaked tool calls from response text.
/// Looks for `<tool_call>...</tool_call>` blocks and tries both /// Looks for `<tool_call>...</tool_call>` blocks and tries both
/// XML and JSON formats for the body. /// XML and JSON formats for the body.
/// Parse a single tool call body (content between `<tool_call>` and `</tool_call>`).
pub fn parse_tool_call_body(body: &str) -> Option<ToolCall> {
let normalized = normalize_xml_tags(body);
let body = normalized.trim();
let mut counter = 0u32;
parse_xml_tool_call(body, &mut counter)
.or_else(|| parse_json_tool_call(body, &mut counter))
}
pub fn parse_leaked_tool_calls(text: &str) -> Vec<ToolCall> { pub fn parse_leaked_tool_calls(text: &str) -> Vec<ToolCall> {
// Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "<function=bash>" // Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "<function=bash>"
// This handles streaming tokenizers that split tags across tokens. // This handles streaming tokenizers that split tags across tokens.