unify tool tracking: ActiveToolCall with JoinHandle

One data structure for all in-flight tool calls — metadata for
TUI display + JoinHandle for result collection and cancellation.
Agent spawns tool calls via tokio::spawn, pushes to shared
Arc<Mutex<Vec<ActiveToolCall>>>. TUI reads metadata, can abort().
No separate inflight/background collections.

Non-background: awaited after stream ends.
Background: persists, drained at next turn start.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-04-03 23:42:27 -04:00
parent 17a018ff12
commit a78f310e4d
5 changed files with 106 additions and 82 deletions

View file

@ -85,10 +85,6 @@ pub struct Agent {
pub scoring_in_flight: bool,
/// Shared active tools — Agent writes, TUI reads.
pub active_tools: crate::user::ui_channel::SharedActiveTools,
/// Background tool calls that outlive the current turn.
background_tasks: futures::stream::FuturesUnordered<
std::pin::Pin<Box<dyn std::future::Future<Output = (ToolCall, tools::ToolOutput)> + Send>>
>,
}
fn render_journal(entries: &[journal::JournalEntry]) -> String {
@ -142,7 +138,6 @@ impl Agent {
memory_scores: None,
scoring_in_flight: false,
active_tools,
background_tasks: futures::stream::FuturesUnordered::new(),
};
agent.load_startup_journal();
@ -245,17 +240,29 @@ impl Agent {
}
// Inject completed background task results
// Collect completed background tool calls
{
use futures::{StreamExt, FutureExt};
let mut bg_ds = DispatchState {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
};
while let Some(Some((call, output))) =
std::pin::Pin::new(&mut self.background_tasks).next().now_or_never()
{
// Show result in TUI and inject into conversation
self.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
let finished: Vec<_> = {
let mut tools = self.active_tools.lock().unwrap();
let mut done = Vec::new();
let mut i = 0;
while i < tools.len() {
if tools[i].handle.is_finished() {
done.push(tools.remove(i));
} else {
i += 1;
}
}
done
};
for mut entry in finished {
if let Ok((call, output)) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
}
}
}
@ -296,10 +303,6 @@ impl Agent {
let mut tool_call_buf = String::new();
let mut stream_error = None;
let mut first_content = true;
// Tool calls fired during streaming (XML path)
let mut inflight: futures::stream::FuturesOrdered<
std::pin::Pin<Box<dyn std::future::Future<Output = (ToolCall, tools::ToolOutput)> + Send>>
> = futures::stream::FuturesOrdered::new();
// Buffer for content not yet sent to UI — holds a tail
// that might be a partial <tool_call> tag.
let mut display_buf = String::new();
@ -326,27 +329,26 @@ impl Agent {
name: call.function.name.clone(),
args_summary: args_summary.clone(),
});
self.active_tools.write().unwrap().push(
crate::user::ui_channel::ActiveTool {
id: call.id.clone(),
name: call.function.name.clone(),
detail: args_summary,
started: std::time::Instant::now(),
}
);
let tracker = self.process_tracker.clone();
let is_background = args.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let future = Box::pin(async move {
let call_id = call.id.clone();
let call_name = call.function.name.clone();
let tracker = self.process_tracker.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch(&call.function.name, &args, &tracker).await;
(call, output)
});
if is_background {
self.background_tasks.push(future);
} else {
inflight.push_back(future);
}
self.active_tools.lock().unwrap().push(
crate::user::ui_channel::ActiveToolCall {
id: call_id,
name: call_name,
detail: args_summary,
started: std::time::Instant::now(),
background: is_background,
handle,
}
);
}
// Reset for potential next tool call
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
@ -491,15 +493,31 @@ impl Agent {
empty_retries = 0;
}
// Collect tool calls that were fired during streaming
if !inflight.is_empty() {
use futures::StreamExt;
self.push_message(msg.clone());
while let Some((call, output)) = inflight.next().await {
self.apply_tool_result(&call, output, ui_tx, &mut ds);
// Collect non-background tool calls fired during streaming
{
let pending: Vec<_> = {
let mut tools = self.active_tools.lock().unwrap();
let mut non_bg = Vec::new();
let mut i = 0;
while i < tools.len() {
if !tools[i].background {
non_bg.push(tools.remove(i));
} else {
i += 1;
}
}
non_bg
};
if !pending.is_empty() {
self.push_message(msg.clone());
for mut entry in pending {
if let Ok((call, output)) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, &mut ds);
}
}
self.publish_context_state();
continue;
}
self.publish_context_state();
continue;
}
// Tool calls (structured API path — not fired during stream).
@ -553,45 +571,46 @@ impl Agent {
name: call.function.name.clone(),
args_summary: args_summary.clone(),
});
self.active_tools.write().unwrap().push(
crate::user::ui_channel::ActiveTool {
id: call.id.clone(),
name: call.function.name.clone(),
detail: args_summary,
started: std::time::Instant::now(),
}
);
// Handle working_stack tool — needs &mut self for context state
// Handle working_stack — needs &mut self, can't be spawned
if call.function.name == "working_stack" {
let result = tools::working_stack::handle(&args, &mut self.context.working_stack);
let output = tools::ToolOutput {
text: result.clone(),
is_yield: false,
images: Vec::new(),
model_switch: None,
dmn_pause: false,
};
let _ = ui_tx.send(UiMessage::ToolResult {
name: call.function.name.clone(),
result: output.text.clone(),
});
self.active_tools.write().unwrap().retain(|t| t.id != call.id);
self.push_message(Message::tool_result(&call.id, &output.text));
ds.had_tool_calls = true;
// Re-render the context message so the model sees the updated stack
let output = tools::ToolOutput::text(result.clone());
self.apply_tool_result(call, output, ui_tx, ds);
if !result.starts_with("Error:") {
self.refresh_context_state();
}
return;
}
// Dispatch through unified path
let output =
tools::dispatch(&call.function.name, &args, &self.process_tracker).await;
// Spawn, push to active_tools, await handle
let call_id = call.id.clone();
let call_name = call.function.name.clone();
let call = call.clone();
let tracker = self.process_tracker.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch(&call.function.name, &args, &tracker).await;
(call, output)
});
self.active_tools.lock().unwrap().push(
tools::ActiveToolCall {
id: call_id,
name: call_name,
detail: args_summary,
started: std::time::Instant::now(),
background: false,
handle,
}
);
self.apply_tool_result(call, output, ui_tx, ds);
// Wait for this non-background tool to complete
let entry = {
let mut tools = self.active_tools.lock().unwrap();
// It's the last one we pushed
tools.pop().unwrap()
};
if let Ok((call, output)) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, ds);
}
}
/// Apply a completed tool result to conversation state.
@ -624,7 +643,7 @@ impl Agent {
name: call.function.name.clone(),
result: output.text.clone(),
});
self.active_tools.write().unwrap().retain(|t| t.id != call.id);
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
// Tag memory_render results for context deduplication
if call.function.name == "memory_render" && !output.text.starts_with("Error:") {

View file

@ -118,6 +118,17 @@ pub struct ProcessInfo {
pub started: Instant,
}
/// A tool call in flight — metadata for TUI + JoinHandle for
/// result collection and cancellation.
pub struct ActiveToolCall {
pub id: String,
pub name: String,
pub detail: String,
pub started: Instant,
pub background: bool,
pub handle: tokio::task::JoinHandle<(ToolCall, ToolOutput)>,
}
/// Shared tracker for running child processes. Allows the TUI to
/// display what's running and kill processes by PID.
#[derive(Debug, Clone, Default)]