Batch tool result application: single lock for remove + log + push
apply_tool_results() collects all results, then does one state lock (remove from active_tools + write to log) and one context lock (push all nodes). Eliminates redundant per-result locking. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
31a41fa042
commit
9c0533966a
1 changed files with 43 additions and 18 deletions
|
|
@ -284,11 +284,13 @@ impl Agent {
|
|||
let finished = agent.state.lock().await.active_tools.take_finished();
|
||||
if !finished.is_empty() {
|
||||
let mut bg_ds = DispatchState::new();
|
||||
let mut results = Vec::new();
|
||||
for entry in finished {
|
||||
if let Ok((call, output)) = entry.handle.await {
|
||||
Agent::apply_tool_result(&agent, &call, output, &mut bg_ds).await;
|
||||
results.push((call, output));
|
||||
}
|
||||
}
|
||||
Agent::apply_tool_results(&agent, results, &mut bg_ds).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -386,11 +388,13 @@ impl Agent {
|
|||
ds.had_tool_calls = true;
|
||||
|
||||
let handles = agent.state.lock().await.active_tools.take_foreground();
|
||||
let mut results = Vec::new();
|
||||
for entry in handles {
|
||||
if let Ok((call, output)) = entry.handle.await {
|
||||
Agent::apply_tool_result(&agent, &call, output, &mut ds).await;
|
||||
results.push((call, output));
|
||||
}
|
||||
}
|
||||
Agent::apply_tool_results(&agent, results, &mut ds).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -422,29 +426,50 @@ impl Agent {
|
|||
}
|
||||
}
|
||||
|
||||
async fn apply_tool_result(
|
||||
agent: &Arc<Agent>,
|
||||
call: &PendingToolCall,
|
||||
output: String,
|
||||
ds: &mut DispatchState,
|
||||
) {
|
||||
ds.had_tool_calls = true;
|
||||
if output.starts_with("Error:") {
|
||||
ds.tool_errors += 1;
|
||||
}
|
||||
|
||||
agent.state.lock().await.active_tools.remove(&call.id);
|
||||
|
||||
fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode {
|
||||
if call.name == "memory_render" && !output.starts_with("Error:") {
|
||||
let args: serde_json::Value =
|
||||
serde_json::from_str(&call.arguments).unwrap_or_default();
|
||||
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
|
||||
agent.push_node(AstNode::memory(key, &output)).await;
|
||||
return;
|
||||
return AstNode::memory(key, output);
|
||||
}
|
||||
}
|
||||
AstNode::tool_result(output)
|
||||
}
|
||||
|
||||
agent.push_node(AstNode::tool_result(&output)).await;
|
||||
async fn apply_tool_results(
|
||||
agent: &Arc<Agent>,
|
||||
results: Vec<(PendingToolCall, String)>,
|
||||
ds: &mut DispatchState,
|
||||
) {
|
||||
let mut nodes = Vec::new();
|
||||
for (call, output) in &results {
|
||||
ds.had_tool_calls = true;
|
||||
if output.starts_with("Error:") { ds.tool_errors += 1; }
|
||||
nodes.push(Self::make_tool_result_node(call, output));
|
||||
}
|
||||
|
||||
// Single lock: remove from active, log, push to context
|
||||
{
|
||||
let mut st = agent.state.lock().await;
|
||||
for (call, _) in &results {
|
||||
st.active_tools.remove(&call.id);
|
||||
}
|
||||
for node in &nodes {
|
||||
if let Some(ref log) = st.conversation_log {
|
||||
if let Err(e) = log.append_node(node) {
|
||||
eprintln!("warning: failed to log entry: {:#}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut ctx = agent.context.lock().await;
|
||||
for node in nodes {
|
||||
ctx.push(Section::Conversation, node);
|
||||
}
|
||||
}
|
||||
agent.state.lock().await.changed.notify_one();
|
||||
}
|
||||
|
||||
async fn load_startup_journal(&self) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue