diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index bde1704..66a8811 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -621,7 +621,7 @@ pub async fn collect_stream( rx: &mut mpsc::UnboundedReceiver, ui_tx: &UiSender, target: StreamTarget, - agent: &std::sync::Arc>, + agent: &std::sync::Arc>, active_tools: &crate::user::ui_channel::SharedActiveTools, ) -> StreamResult { let mut content = String::new(); diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 533190c..7f0bdd0 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -232,14 +232,14 @@ impl Agent { /// Takes Arc> and manages locking internally so the /// lock is never held across I/O (API streaming, tool dispatch). pub async fn turn( - agent: Arc>, + agent: Arc>, user_input: &str, ui_tx: &UiSender, target: StreamTarget, ) -> Result { // --- Pre-loop setup (lock 1): agent cycle, memories, user input --- let active_tools = { - let mut me = agent.lock().await; + let mut me = agent.lock().unwrap(); let cycle = me.run_agent_cycle(); for key in &cycle.surfaced_keys { @@ -285,9 +285,10 @@ impl Agent { me.push_message(Message::user(user_input)); let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots())); - me.active_tools.clone() + let tools = me.active_tools.clone(); + drop(me); + tools }; - // --- Lock released --- let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; @@ -298,7 +299,7 @@ impl Agent { // --- Lock 2: assemble messages, start stream --- let (mut rx, _stream_guard) = { - let me = agent.lock().await; + let me = agent.lock().unwrap(); let api_messages = me.assemble_api_messages(); let sampling = api::SamplingParams { temperature: me.temperature, @@ -328,7 +329,7 @@ impl Agent { // --- Lock 3: process results --- { - let mut me = agent.lock().await; + let mut me = agent.lock().unwrap(); // Handle stream errors with retry logic if let Some(e) = stream_error { @@ -433,7 +434,7 @@ impl Agent { } } // Reacquire to apply results - let mut me = agent.lock().await; + let mut me = agent.lock().unwrap(); for (call, output) in results { me.apply_tool_result(&call, output, ui_tx, &mut ds); } @@ -482,7 +483,7 @@ impl Agent { /// Dispatch a tool call without holding the agent lock across I/O. /// Used by `turn()` which manages its own locking. async fn dispatch_tool_call_unlocked( - agent: &Arc>, + agent: &Arc>, active_tools: &crate::user::ui_channel::SharedActiveTools, call: &ToolCall, ui_tx: &UiSender, @@ -493,7 +494,7 @@ impl Agent { Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); let _ = ui_tx.send(UiMessage::Activity(format!("rejected: {} (bad args)", call.function.name))); - let mut me = agent.lock().await; + let mut me = agent.lock().unwrap(); me.apply_tool_result(call, err, ui_tx, ds); return; } @@ -531,7 +532,7 @@ impl Agent { }; if let Ok((call, output)) = entry.handle.await { // Brief lock to apply result - let mut me = agent.lock().await; + let mut me = agent.lock().unwrap(); me.apply_tool_result(&call, output, ui_tx, ds); } } diff --git a/src/agent/tools/control.rs b/src/agent/tools/control.rs index daa5ef5..843e981 100644 --- a/src/agent/tools/control.rs +++ b/src/agent/tools/control.rs @@ -14,7 +14,7 @@ pub(super) fn tools() -> [super::Tool; 3] { .ok_or_else(|| anyhow::anyhow!("'model' parameter is required"))?; if model.is_empty() { anyhow::bail!("'model' parameter cannot be empty"); } if let Some(agent) = agent { - let mut a = agent.lock().await; + let mut a = agent.lock().unwrap(); a.pending_model_switch = Some(model.to_string()); } Ok(format!("Switching to model '{}' after this turn.", model)) @@ -24,7 +24,7 @@ pub(super) fn tools() -> [super::Tool; 3] { parameters_json: r#"{"type":"object","properties":{}}"#, handler: |agent, _v| Box::pin(async move { if let Some(agent) = agent { - let mut a = agent.lock().await; + let mut a = agent.lock().unwrap(); a.pending_yield = true; a.pending_dmn_pause = true; } @@ -36,7 +36,7 @@ pub(super) fn tools() -> [super::Tool; 3] { handler: |agent, v| Box::pin(async move { let msg = v.get("message").and_then(|v| v.as_str()).unwrap_or("Waiting for input."); if let Some(agent) = agent { - let mut a = agent.lock().await; + let mut a = agent.lock().unwrap(); a.pending_yield = true; } Ok(format!("Yielding. {}", msg)) diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 95ef89f..8e42353 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -29,7 +29,7 @@ fn default_timeout() -> u64 { 120 } /// Async tool handler function. /// Agent is None when called from contexts without an agent (MCP server, subconscious). pub type ToolHandler = fn( - Option>>, + Option>>, serde_json::Value, ) -> Pin> + Send>>; @@ -94,11 +94,11 @@ pub async fn dispatch( pub async fn dispatch_with_agent( name: &str, args: &serde_json::Value, - agent: Option>>, + agent: Option>>, ) -> String { // Look up in agent's tools if available, otherwise global let tool = if let Some(ref a) = agent { - let guard = a.lock().await; + let guard = a.lock().unwrap(); guard.tools.iter().find(|t| t.name == name).copied() } else { None diff --git a/src/agent/tools/working_stack.rs b/src/agent/tools/working_stack.rs index 696e170..6cc3970 100644 --- a/src/agent/tools/working_stack.rs +++ b/src/agent/tools/working_stack.rs @@ -20,7 +20,7 @@ pub fn tool() -> super::Tool { parameters_json: r#"{"type":"object","properties":{"action":{"type":"string","enum":["push","pop","update","switch"],"description":"Stack operation"},"content":{"type":"string","description":"Task description (for push/update)"},"index":{"type":"integer","description":"Stack index (for switch, 0=bottom)"}},"required":["action"]}"#, handler: |agent, v| Box::pin(async move { if let Some(agent) = agent { - let mut a = agent.lock().await; + let mut a = agent.lock().unwrap(); Ok(handle(&v, &mut a.context.working_stack)) } else { anyhow::bail!("working_stack requires agent context") diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 902bdd3..881e989 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -20,7 +20,8 @@ pub mod log; use anyhow::Result; use std::sync::Arc; use std::time::Instant; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::mpsc; +use std::sync::Mutex; use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; @@ -241,7 +242,7 @@ impl Mind { /// Initialize — restore log, start daemons and background agents. pub async fn init(&self) { // Restore conversation - let mut ag = self.agent.lock().await; + let mut ag = self.agent.lock().unwrap(); ag.restore_from_log(); drop(ag); } @@ -257,7 +258,7 @@ impl Mind { MindCommand::None => {} MindCommand::Compact => { let threshold = compaction_threshold(&self.config.app); - let mut ag = self.agent.lock().await; + let mut ag = self.agent.lock().unwrap(); if ag.last_prompt_tokens() > threshold { ag.compact(); } @@ -272,7 +273,7 @@ impl Mind { } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); - let ag = self.agent.lock().await; + let ag = self.agent.lock().unwrap(); let mut tools = ag.active_tools.lock().unwrap(); for entry in tools.drain(..) { entry.handle.abort(); } drop(tools); drop(ag); @@ -289,7 +290,7 @@ impl Mind { let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), ).ok(); - let mut ag = self.agent.lock().await; + let mut ag = self.agent.lock().unwrap(); let shared_ctx = ag.shared_context.clone(); let shared_tools = ag.active_tools.clone(); *ag = Agent::new( @@ -323,7 +324,7 @@ impl Mind { let response_window = cfg.scoring_response_window; tokio::spawn(async move { let (context, client) = { - let mut ag = agent.lock().await; + let mut ag = agent.lock().unwrap(); if ag.agent_cycles.memory_scoring_in_flight { return; } ag.agent_cycles.memory_scoring_in_flight = true; (ag.context.clone(), ag.client_clone()) @@ -332,7 +333,7 @@ impl Mind { &context, max_age as i64, response_window, &client, &ui_tx, ).await; { - let mut ag = agent.lock().await; + let mut ag = agent.lock().unwrap(); ag.agent_cycles.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } } @@ -387,7 +388,7 @@ impl Mind { // Post-turn maintenance { - let mut ag = self.agent.lock().await; + let mut ag = self.agent.lock().unwrap(); ag.age_out_images(); ag.publish_context_state(); } diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index db1553b..0f9c43a 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -602,7 +602,7 @@ fn str_err(r: Result) -> anyhow::Result { /// digest_daily tool handler: generate a daily digest async fn handle_digest_daily( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let date = str_err(get_str_required(&args, "date"))?; @@ -613,7 +613,7 @@ async fn handle_digest_daily( /// digest_weekly tool handler: generate a weekly digest async fn handle_digest_weekly( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let week_label = str_err(get_str_required(&args, "week"))?; @@ -624,7 +624,7 @@ async fn handle_digest_weekly( /// digest_monthly tool handler: generate a monthly digest async fn handle_digest_monthly( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let month = str_err(get_str_required(&args, "month"))?; @@ -635,7 +635,7 @@ async fn handle_digest_monthly( /// digest_auto tool handler: auto-generate all missing digests async fn handle_digest_auto( - _agent: Option>>, + _agent: Option>>, _args: serde_json::Value, ) -> anyhow::Result { let mut store = str_err(Store::load())?; @@ -645,7 +645,7 @@ async fn handle_digest_auto( /// digest_links tool handler: parse and apply digest links async fn handle_digest_links( - _agent: Option>>, + _agent: Option>>, _args: serde_json::Value, ) -> anyhow::Result { let mut store = str_err(Store::load())?; diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index c0dc01b..afba332 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -9,7 +9,7 @@ use ratatui::crossterm::event::{Event, EventStream, KeyEventKind}; use futures::StreamExt; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use std::sync::Mutex; use crate::agent::Agent; use crate::agent::api::ApiClient; @@ -116,8 +116,8 @@ pub async fn start(cli: crate::user::CliArgs) -> Result<()> { let mind = crate::mind::Mind::new(config, ui_tx.clone(), turn_tx); - let shared_context = mind.agent.lock().await.shared_context.clone(); - let shared_active_tools = mind.agent.lock().await.active_tools.clone(); + let shared_context = mind.agent.lock().unwrap().shared_context.clone(); + let shared_active_tools = mind.agent.lock().unwrap().active_tools.clone(); let mut result = Ok(()); tokio_scoped::scope(|s| { @@ -164,7 +164,7 @@ async fn cmd_retry_inner( mind_tx: &tokio::sync::mpsc::UnboundedSender, ui_tx: &ui_channel::UiSender, ) { - let mut agent_guard = agent.lock().await; + let mut agent_guard = agent.lock().unwrap(); let entries = agent_guard.entries_mut(); let mut last_user_text = None; while let Some(entry) = entries.last() { @@ -211,7 +211,7 @@ fn hotkey_cycle_reasoning(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender } async fn hotkey_kill_processes(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { - let active_tools = mind.agent.lock().await.active_tools.clone(); + let active_tools = mind.agent.lock().unwrap().active_tools.clone(); let mut tools = active_tools.lock().unwrap(); if tools.is_empty() { let _ = ui_tx.send(UiMessage::Info("(no running tool calls)".into())); @@ -284,7 +284,7 @@ pub async fn cmd_switch_model( ui_tx: &ui_channel::UiSender, ) { let resolved = { - let ag = agent.lock().await; + let ag = agent.lock().unwrap(); match ag.app_config.resolve_model(name) { Ok(r) => r, Err(e) => { @@ -296,11 +296,11 @@ pub async fn cmd_switch_model( let new_client = ApiClient::new(&resolved.api_base, &resolved.api_key, &resolved.model_id); let prompt_changed = { - let ag = agent.lock().await; + let ag = agent.lock().unwrap(); resolved.prompt_file != ag.prompt_file }; - let mut ag = agent.lock().await; + let mut ag = agent.lock().unwrap(); ag.swap_client(new_client); if prompt_changed {