agent: switch from tokio::sync::Mutex to std::sync::Mutex

The agent lock is never held across await points — turns lock briefly,
do work, drop, then do async API calls. std::sync::Mutex works and
can be locked from sync contexts (screen tick inside terminal.draw).

Fixes: blocking_lock() panic when called inside tokio runtime.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 19:56:56 -04:00
parent f29b4be09c
commit 3e1be4d353
8 changed files with 41 additions and 39 deletions

View file

@ -621,7 +621,7 @@ pub async fn collect_stream(
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
ui_tx: &UiSender,
target: StreamTarget,
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
agent: &std::sync::Arc<std::sync::Mutex<super::Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
) -> StreamResult {
let mut content = String::new();

View file

@ -232,14 +232,14 @@ impl Agent {
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
/// lock is never held across I/O (API streaming, tool dispatch).
pub async fn turn(
agent: Arc<tokio::sync::Mutex<Agent>>,
agent: Arc<std::sync::Mutex<Agent>>,
user_input: &str,
ui_tx: &UiSender,
target: StreamTarget,
) -> Result<TurnResult> {
// --- Pre-loop setup (lock 1): agent cycle, memories, user input ---
let active_tools = {
let mut me = agent.lock().await;
let mut me = agent.lock().unwrap();
let cycle = me.run_agent_cycle();
for key in &cycle.surfaced_keys {
@ -285,9 +285,10 @@ impl Agent {
me.push_message(Message::user(user_input));
let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots()));
me.active_tools.clone()
let tools = me.active_tools.clone();
drop(me);
tools
};
// --- Lock released ---
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
@ -298,7 +299,7 @@ impl Agent {
// --- Lock 2: assemble messages, start stream ---
let (mut rx, _stream_guard) = {
let me = agent.lock().await;
let me = agent.lock().unwrap();
let api_messages = me.assemble_api_messages();
let sampling = api::SamplingParams {
temperature: me.temperature,
@ -328,7 +329,7 @@ impl Agent {
// --- Lock 3: process results ---
{
let mut me = agent.lock().await;
let mut me = agent.lock().unwrap();
// Handle stream errors with retry logic
if let Some(e) = stream_error {
@ -433,7 +434,7 @@ impl Agent {
}
}
// Reacquire to apply results
let mut me = agent.lock().await;
let mut me = agent.lock().unwrap();
for (call, output) in results {
me.apply_tool_result(&call, output, ui_tx, &mut ds);
}
@ -482,7 +483,7 @@ impl Agent {
/// Dispatch a tool call without holding the agent lock across I/O.
/// Used by `turn()` which manages its own locking.
async fn dispatch_tool_call_unlocked(
agent: &Arc<tokio::sync::Mutex<Agent>>,
agent: &Arc<std::sync::Mutex<Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
call: &ToolCall,
ui_tx: &UiSender,
@ -493,7 +494,7 @@ impl Agent {
Err(e) => {
let err = format!("Error: malformed tool call arguments: {e}");
let _ = ui_tx.send(UiMessage::Activity(format!("rejected: {} (bad args)", call.function.name)));
let mut me = agent.lock().await;
let mut me = agent.lock().unwrap();
me.apply_tool_result(call, err, ui_tx, ds);
return;
}
@ -531,7 +532,7 @@ impl Agent {
};
if let Ok((call, output)) = entry.handle.await {
// Brief lock to apply result
let mut me = agent.lock().await;
let mut me = agent.lock().unwrap();
me.apply_tool_result(&call, output, ui_tx, ds);
}
}

View file

@ -14,7 +14,7 @@ pub(super) fn tools() -> [super::Tool; 3] {
.ok_or_else(|| anyhow::anyhow!("'model' parameter is required"))?;
if model.is_empty() { anyhow::bail!("'model' parameter cannot be empty"); }
if let Some(agent) = agent {
let mut a = agent.lock().await;
let mut a = agent.lock().unwrap();
a.pending_model_switch = Some(model.to_string());
}
Ok(format!("Switching to model '{}' after this turn.", model))
@ -24,7 +24,7 @@ pub(super) fn tools() -> [super::Tool; 3] {
parameters_json: r#"{"type":"object","properties":{}}"#,
handler: |agent, _v| Box::pin(async move {
if let Some(agent) = agent {
let mut a = agent.lock().await;
let mut a = agent.lock().unwrap();
a.pending_yield = true;
a.pending_dmn_pause = true;
}
@ -36,7 +36,7 @@ pub(super) fn tools() -> [super::Tool; 3] {
handler: |agent, v| Box::pin(async move {
let msg = v.get("message").and_then(|v| v.as_str()).unwrap_or("Waiting for input.");
if let Some(agent) = agent {
let mut a = agent.lock().await;
let mut a = agent.lock().unwrap();
a.pending_yield = true;
}
Ok(format!("Yielding. {}", msg))

View file

@ -29,7 +29,7 @@ fn default_timeout() -> u64 { 120 }
/// Async tool handler function.
/// Agent is None when called from contexts without an agent (MCP server, subconscious).
pub type ToolHandler = fn(
Option<std::sync::Arc<tokio::sync::Mutex<super::Agent>>>,
Option<std::sync::Arc<std::sync::Mutex<super::Agent>>>,
serde_json::Value,
) -> Pin<Box<dyn Future<Output = anyhow::Result<String>> + Send>>;
@ -94,11 +94,11 @@ pub async fn dispatch(
pub async fn dispatch_with_agent(
name: &str,
args: &serde_json::Value,
agent: Option<std::sync::Arc<tokio::sync::Mutex<super::Agent>>>,
agent: Option<std::sync::Arc<std::sync::Mutex<super::Agent>>>,
) -> String {
// Look up in agent's tools if available, otherwise global
let tool = if let Some(ref a) = agent {
let guard = a.lock().await;
let guard = a.lock().unwrap();
guard.tools.iter().find(|t| t.name == name).copied()
} else {
None

View file

@ -20,7 +20,7 @@ pub fn tool() -> super::Tool {
parameters_json: r#"{"type":"object","properties":{"action":{"type":"string","enum":["push","pop","update","switch"],"description":"Stack operation"},"content":{"type":"string","description":"Task description (for push/update)"},"index":{"type":"integer","description":"Stack index (for switch, 0=bottom)"}},"required":["action"]}"#,
handler: |agent, v| Box::pin(async move {
if let Some(agent) = agent {
let mut a = agent.lock().await;
let mut a = agent.lock().unwrap();
Ok(handle(&v, &mut a.context.working_stack))
} else {
anyhow::bail!("working_stack requires agent context")