diff --git a/src/agent/runner.rs b/src/agent/runner.rs index e4d2835..9c90f4b 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -80,6 +80,10 @@ pub struct Agent { pub memory_scores: Option, /// Whether a /score task is currently running. pub scoring_in_flight: bool, + /// Background tool calls that outlive the current turn. + background_tasks: futures::stream::FuturesUnordered< + std::pin::Pin + Send>> + >, } fn render_journal(entries: &[journal::JournalEntry]) -> String { @@ -131,6 +135,7 @@ impl Agent { agent_cycles, memory_scores: None, scoring_in_flight: false, + background_tasks: futures::stream::FuturesUnordered::new(), }; agent.load_startup_journal(); @@ -232,6 +237,26 @@ impl Agent { ))); } + // Inject completed background task results + { + use futures::{StreamExt, FutureExt}; + while let Some(Some((call, output))) = + std::pin::Pin::new(&mut self.background_tasks).next().now_or_never() + { + let preview = &output.text[..output.text.len().min(500)]; + let _ = ui_tx.send(UiMessage::Info(format!( + "[background] {} completed: {}", + call.function.name, + &preview[..preview.len().min(80)], + ))); + let notification = format!( + "\nTool: {}\nResult: {}\n", + call.function.name, preview, + ); + self.push_message(Message::user(notification)); + } + } + // User input — clean, just what was typed self.push_message(Message::user(user_input)); let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); @@ -266,8 +291,13 @@ impl Agent { let mut usage = None; let mut finish_reason = None; let mut in_tool_call = false; + let mut tool_call_buf = String::new(); let mut stream_error = None; let mut first_content = true; + // Tool calls fired during streaming (XML path) + let mut inflight: futures::stream::FuturesOrdered< + std::pin::Pin + Send>> + > = futures::stream::FuturesOrdered::new(); // Buffer for content not yet sent to UI — holds a tail // that might be a partial tag. let mut display_buf = String::new(); @@ -282,7 +312,46 @@ impl Agent { content.push_str(&text); if in_tool_call { - // Already inside a tool call — suppress display. + tool_call_buf.push_str(&text); + // Check for closing tag — parse and fire immediately + if let Some(end) = tool_call_buf.find("") { + let body = &tool_call_buf[..end]; + if let Some(call) = crate::user::parsing::parse_tool_call_body(body) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); + let args_summary = summarize_args(&call.function.name, &args); + let _ = ui_tx.send(UiMessage::ToolCall { + name: call.function.name.clone(), + args_summary: args_summary.clone(), + }); + let _ = ui_tx.send(UiMessage::ToolStarted { + id: call.id.clone(), + name: call.function.name.clone(), + detail: args_summary, + }); + let tracker = self.process_tracker.clone(); + let is_background = args.get("run_in_background") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let future = Box::pin(async move { + let output = tools::dispatch(&call.function.name, &args, &tracker).await; + (call, output) + }); + if is_background { + self.background_tasks.push(future); + } else { + inflight.push_back(future); + } + } + // Reset for potential next tool call + let remaining = tool_call_buf[end + "".len()..].to_string(); + tool_call_buf.clear(); + in_tool_call = false; + // Any content after goes back to display + if !remaining.trim().is_empty() { + display_buf.push_str(&remaining); + } + } } else { display_buf.push_str(&text); @@ -417,8 +486,18 @@ impl Agent { empty_retries = 0; } - // Tool calls (structured from API, or recovered from content - // by build_response_message if the model leaked them as XML). + // Collect tool calls that were fired during streaming + if !inflight.is_empty() { + use futures::StreamExt; + self.push_message(msg.clone()); + while let Some((call, output)) = inflight.next().await { + self.apply_tool_result(&call, output, ui_tx, &mut ds); + } + self.publish_context_state(); + continue; + } + + // Tool calls (structured API path — not fired during stream). if let Some(ref tool_calls) = msg.tool_calls { if !tool_calls.is_empty() { self.push_message(msg.clone()); @@ -504,6 +583,20 @@ impl Agent { let output = tools::dispatch(&call.function.name, &args, &self.process_tracker).await; + self.apply_tool_result(call, output, ui_tx, ds); + } + + /// Apply a completed tool result to conversation state. + fn apply_tool_result( + &mut self, + call: &ToolCall, + output: tools::ToolOutput, + ui_tx: &UiSender, + ds: &mut DispatchState, + ) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); + if output.is_yield { ds.yield_requested = true; } else { diff --git a/src/user/parsing.rs b/src/user/parsing.rs index 99ff9c7..587dd3b 100644 --- a/src/user/parsing.rs +++ b/src/user/parsing.rs @@ -16,6 +16,15 @@ use crate::user::types::*; /// Parse leaked tool calls from response text. /// Looks for `...` blocks and tries both /// XML and JSON formats for the body. +/// Parse a single tool call body (content between `` and ``). +pub fn parse_tool_call_body(body: &str) -> Option { + let normalized = normalize_xml_tags(body); + let body = normalized.trim(); + let mut counter = 0u32; + parse_xml_tool_call(body, &mut counter) + .or_else(|| parse_json_tool_call(body, &mut counter)) +} + pub fn parse_leaked_tool_calls(text: &str) -> Vec { // Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "" // This handles streaming tokenizers that split tags across tokens.