Revert to tokio::sync::Mutex, fix lock-across-await bugs, move input ownership to InteractScreen
The std::sync::Mutex detour caught every place a MutexGuard lived across an await point in Agent::turn — the compiler enforced Send safety that tokio::sync::Mutex silently allows. With those fixed, switch back to tokio::sync::Mutex (std::sync blocks tokio worker threads and panics inside the runtime). Input and command dispatch now live in InteractScreen (chat.rs): - Enter pushes directly to SharedMindState.input (no app.submitted hop) - sync_from_agent displays pending input with dimmed color - Slash command table moved from event_loop.rs to chat.rs - cmd_switch_model kept as pub fn for tool-initiated switches Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
3e1be4d353
commit
48beb8b663
9 changed files with 404 additions and 370 deletions
204
src/agent/mod.rs
204
src/agent/mod.rs
|
|
@ -232,61 +232,71 @@ impl Agent {
|
|||
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
|
||||
/// lock is never held across I/O (API streaming, tool dispatch).
|
||||
pub async fn turn(
|
||||
agent: Arc<std::sync::Mutex<Agent>>,
|
||||
agent: Arc<tokio::sync::Mutex<Agent>>,
|
||||
user_input: &str,
|
||||
ui_tx: &UiSender,
|
||||
target: StreamTarget,
|
||||
) -> Result<TurnResult> {
|
||||
// --- Pre-loop setup (lock 1): agent cycle, memories, user input ---
|
||||
let active_tools = {
|
||||
let mut me = agent.lock().unwrap();
|
||||
let mut finished = Vec::new();
|
||||
let tools = {
|
||||
let mut me = agent.lock().await;
|
||||
|
||||
let cycle = me.run_agent_cycle();
|
||||
for key in &cycle.surfaced_keys {
|
||||
if let Some(rendered) = crate::cli::node::render_node(
|
||||
&crate::store::Store::load().unwrap_or_default(), key,
|
||||
) {
|
||||
let mut msg = Message::user(format!(
|
||||
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
|
||||
key, rendered,
|
||||
));
|
||||
msg.stamp();
|
||||
me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg });
|
||||
let cycle = me.run_agent_cycle();
|
||||
for key in &cycle.surfaced_keys {
|
||||
if let Some(rendered) = crate::cli::node::render_node(
|
||||
&crate::store::Store::load().unwrap_or_default(), key,
|
||||
) {
|
||||
let mut msg = Message::user(format!(
|
||||
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
|
||||
key, rendered,
|
||||
));
|
||||
msg.stamp();
|
||||
me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg });
|
||||
}
|
||||
}
|
||||
if let Some(ref reflection) = cycle.reflection {
|
||||
me.push_message(Message::user(format!(
|
||||
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
|
||||
reflection.trim(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
if let Some(ref reflection) = cycle.reflection {
|
||||
me.push_message(Message::user(format!(
|
||||
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
|
||||
reflection.trim(),
|
||||
)));
|
||||
}
|
||||
|
||||
// Collect completed background tool calls
|
||||
let mut bg_ds = DispatchState::new();
|
||||
let finished: Vec<_> = {
|
||||
// Collect completed background tool handles — remove from active list
|
||||
// but don't await yet (MutexGuard isn't Send).
|
||||
let mut tools = me.active_tools.lock().unwrap();
|
||||
let mut done = Vec::new();
|
||||
let mut i = 0;
|
||||
while i < tools.len() {
|
||||
if tools[i].handle.is_finished() {
|
||||
done.push(tools.remove(i));
|
||||
finished.push(tools.remove(i));
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
done
|
||||
|
||||
me.active_tools.clone()
|
||||
};
|
||||
|
||||
// Await finished handles without holding the agent lock
|
||||
let mut bg_results = Vec::new();
|
||||
for entry in finished {
|
||||
if let Ok((call, output)) = entry.handle.await {
|
||||
me.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
|
||||
bg_results.push((call, output));
|
||||
}
|
||||
}
|
||||
|
||||
me.push_message(Message::user(user_input));
|
||||
let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots()));
|
||||
// Re-acquire to apply results and push user input
|
||||
{
|
||||
let mut me = agent.lock().await;
|
||||
let mut bg_ds = DispatchState::new();
|
||||
for (call, output) in bg_results {
|
||||
me.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
|
||||
}
|
||||
me.push_message(Message::user(user_input));
|
||||
let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots()));
|
||||
}
|
||||
|
||||
let tools = me.active_tools.clone();
|
||||
drop(me);
|
||||
tools
|
||||
};
|
||||
|
||||
|
|
@ -299,7 +309,7 @@ impl Agent {
|
|||
|
||||
// --- Lock 2: assemble messages, start stream ---
|
||||
let (mut rx, _stream_guard) = {
|
||||
let me = agent.lock().unwrap();
|
||||
let me = agent.lock().await;
|
||||
let api_messages = me.assemble_api_messages();
|
||||
let sampling = api::SamplingParams {
|
||||
temperature: me.temperature,
|
||||
|
|
@ -328,8 +338,8 @@ impl Agent {
|
|||
// --- Stream complete ---
|
||||
|
||||
// --- Lock 3: process results ---
|
||||
{
|
||||
let mut me = agent.lock().unwrap();
|
||||
let (msg, pending) = {
|
||||
let mut me = agent.lock().await;
|
||||
|
||||
// Handle stream errors with retry logic
|
||||
if let Some(e) = stream_error {
|
||||
|
|
@ -409,81 +419,79 @@ impl Agent {
|
|||
}
|
||||
|
||||
// Collect non-background tool calls fired during streaming
|
||||
let pending: Vec<_> = {
|
||||
let mut tools_guard = active_tools.lock().unwrap();
|
||||
let mut non_bg = Vec::new();
|
||||
let mut i = 0;
|
||||
while i < tools_guard.len() {
|
||||
if !tools_guard[i].background {
|
||||
non_bg.push(tools_guard.remove(i));
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
let mut tools_guard = active_tools.lock().unwrap();
|
||||
let mut non_bg = Vec::new();
|
||||
let mut i = 0;
|
||||
while i < tools_guard.len() {
|
||||
if !tools_guard[i].background {
|
||||
non_bg.push(tools_guard.remove(i));
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
non_bg
|
||||
};
|
||||
}
|
||||
(msg, non_bg)
|
||||
};
|
||||
|
||||
if !pending.is_empty() {
|
||||
me.push_message(msg.clone());
|
||||
// Drop lock before awaiting tool handles
|
||||
drop(me);
|
||||
let mut results = Vec::new();
|
||||
for entry in pending {
|
||||
if let Ok(r) = entry.handle.await {
|
||||
results.push(r);
|
||||
}
|
||||
if !pending.is_empty() {
|
||||
agent.lock().await.push_message(msg.clone());
|
||||
|
||||
// Drop lock before awaiting tool handles
|
||||
let mut results = Vec::new();
|
||||
for entry in pending {
|
||||
if let Ok(r) = entry.handle.await {
|
||||
results.push(r);
|
||||
}
|
||||
// Reacquire to apply results
|
||||
let mut me = agent.lock().unwrap();
|
||||
for (call, output) in results {
|
||||
me.apply_tool_result(&call, output, ui_tx, &mut ds);
|
||||
}
|
||||
// Reacquire to apply results
|
||||
let mut me = agent.lock().await;
|
||||
for (call, output) in results {
|
||||
me.apply_tool_result(&call, output, ui_tx, &mut ds);
|
||||
}
|
||||
me.publish_context_state();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Tool calls (structured API path)
|
||||
if let Some(ref tool_calls) = msg.tool_calls {
|
||||
if !tool_calls.is_empty() {
|
||||
agent.lock().await.push_message(msg.clone());
|
||||
let calls: Vec<ToolCall> = tool_calls.clone();
|
||||
// Drop lock before tool dispatch
|
||||
for call in &calls {
|
||||
Agent::dispatch_tool_call_unlocked(
|
||||
&agent, &active_tools, call, ui_tx, &mut ds,
|
||||
).await;
|
||||
}
|
||||
me.publish_context_state();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Tool calls (structured API path)
|
||||
if let Some(ref tool_calls) = msg.tool_calls {
|
||||
if !tool_calls.is_empty() {
|
||||
me.push_message(msg.clone());
|
||||
let calls: Vec<ToolCall> = tool_calls.clone();
|
||||
// Drop lock before tool dispatch
|
||||
drop(me);
|
||||
for call in &calls {
|
||||
Agent::dispatch_tool_call_unlocked(
|
||||
&agent, &active_tools, call, ui_tx, &mut ds,
|
||||
).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Genuinely text-only response
|
||||
let text = msg.content_text().to_string();
|
||||
let _ = ui_tx.send(UiMessage::Activity(String::new()));
|
||||
me.push_message(msg);
|
||||
|
||||
// Drain pending control flags
|
||||
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
|
||||
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
|
||||
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }
|
||||
|
||||
return Ok(TurnResult {
|
||||
text,
|
||||
yield_requested: ds.yield_requested,
|
||||
had_tool_calls: ds.had_tool_calls,
|
||||
tool_errors: ds.tool_errors,
|
||||
model_switch: ds.model_switch,
|
||||
dmn_pause: ds.dmn_pause,
|
||||
});
|
||||
}
|
||||
|
||||
// Genuinely text-only response
|
||||
let text = msg.content_text().to_string();
|
||||
let _ = ui_tx.send(UiMessage::Activity(String::new()));
|
||||
let mut me = agent.lock().await;
|
||||
me.push_message(msg);
|
||||
|
||||
// Drain pending control flags
|
||||
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
|
||||
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
|
||||
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }
|
||||
|
||||
return Ok(TurnResult {
|
||||
text,
|
||||
yield_requested: ds.yield_requested,
|
||||
had_tool_calls: ds.had_tool_calls,
|
||||
tool_errors: ds.tool_errors,
|
||||
model_switch: ds.model_switch,
|
||||
dmn_pause: ds.dmn_pause,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Dispatch a tool call without holding the agent lock across I/O.
|
||||
/// Used by `turn()` which manages its own locking.
|
||||
async fn dispatch_tool_call_unlocked(
|
||||
agent: &Arc<std::sync::Mutex<Agent>>,
|
||||
agent: &Arc<tokio::sync::Mutex<Agent>>,
|
||||
active_tools: &crate::user::ui_channel::SharedActiveTools,
|
||||
call: &ToolCall,
|
||||
ui_tx: &UiSender,
|
||||
|
|
@ -494,7 +502,7 @@ impl Agent {
|
|||
Err(e) => {
|
||||
let err = format!("Error: malformed tool call arguments: {e}");
|
||||
let _ = ui_tx.send(UiMessage::Activity(format!("rejected: {} (bad args)", call.function.name)));
|
||||
let mut me = agent.lock().unwrap();
|
||||
let mut me = agent.lock().await;
|
||||
me.apply_tool_result(call, err, ui_tx, ds);
|
||||
return;
|
||||
}
|
||||
|
|
@ -532,7 +540,7 @@ impl Agent {
|
|||
};
|
||||
if let Ok((call, output)) = entry.handle.await {
|
||||
// Brief lock to apply result
|
||||
let mut me = agent.lock().unwrap();
|
||||
let mut me = agent.lock().await;
|
||||
me.apply_tool_result(&call, output, ui_tx, ds);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue