From f40d8cfa9d65d08ad3b4f0cdae125527ca439713 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 12 Apr 2026 20:33:23 -0400 Subject: [PATCH] unconscious: release lock during slow spawn work Split trigger() into phases so the Unconscious mutex is only held briefly: - reap_finished(): check handles, restore completed autos - select_to_spawn(): pick agents, take their autos out - prepare_spawn(): slow work (Store::load, query, Agent::new) - NO LOCK - complete_spawn()/abort_spawn(): store results back Previously held the lock for 28+ seconds during Store::load and query execution. Now lock hold time should be milliseconds. Co-Authored-By: Proof of Concept --- src/mind/mod.rs | 14 ++- src/mind/unconscious.rs | 213 ++++++++++++++++++++++++---------------- 2 files changed, 142 insertions(+), 85 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index aa4162a..440be05 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -346,7 +346,19 @@ impl Mind { s.unc_idle = true; } loop { - unc.lock().await.trigger().await; + // Phase 1: quick work under lock + let to_spawn = { + let mut guard = unc.lock().await; + guard.reap_finished(); + guard.select_to_spawn() + }; + // Phase 2: slow work outside lock + for (idx, name, auto) in to_spawn { + match crate::mind::unconscious::prepare_spawn(&name, auto).await { + Ok(result) => unc.lock().await.complete_spawn(idx, result), + Err(auto) => unc.lock().await.abort_spawn(idx, auto), + } + } // Check if conscious became active if *unc_rx.borrow() { break; } // Brief yield to not starve other tasks diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index e41d4b7..aa0932d 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -127,8 +127,13 @@ impl Unconscious { self.agents[idx].enabled = !self.agents[idx].enabled; let new_state = self.agents[idx].enabled; self.save_enabled(); - if new_state && !self.agents[idx].is_running() { - self.spawn_agent(idx).await; + if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { + let agent_name = self.agents[idx].name.clone(); + let auto = self.agents[idx].auto.take().unwrap(); + match prepare_spawn(&agent_name, auto).await { + Ok(result) => self.complete_spawn(idx, result), + Err(auto) => self.abort_spawn(idx, auto), + } } Some(new_state) } @@ -170,8 +175,8 @@ impl Unconscious { self.last_health_check = Some(Instant::now()); } - /// Reap finished agents and spawn new ones. - pub async fn trigger(&mut self) { + /// Reap finished agents (quick, hold lock briefly). + pub fn reap_finished(&mut self) { // Periodic graph health refresh (also on first call) if self.last_health_check .map(|t| t.elapsed() > std::time::Duration::from_secs(600)) @@ -198,109 +203,149 @@ impl Unconscious { } } } + } + /// Select agents to spawn and take their AutoAgents out (quick, hold lock briefly). + /// Returns vec of (index, name, auto, tools) for agents that should spawn. + pub fn select_to_spawn(&mut self) -> Vec<(usize, String, AutoAgent)> { let running = self.agents.iter().filter(|a| a.is_running()).count(); + let mut to_spawn = Vec::new(); + for _ in running..self.max_concurrent { let next = self.agents.iter().enumerate() - .filter(|(_, a)| a.should_run()) + .filter(|(_, a)| a.should_run() && a.auto.is_some()) .min_by_key(|(_, a)| a.last_run); match next { - Some((idx, _)) => self.spawn_agent(idx).await, + Some((idx, _)) => { + let name = self.agents[idx].name.clone(); + let auto = self.agents[idx].auto.take().unwrap(); + to_spawn.push((idx, name, auto)); + } None => break, } } + to_spawn } - async fn spawn_agent(&mut self, idx: usize) { - let name = self.agents[idx].name.clone(); - dbglog!("[unconscious] spawning {}", name); + /// Store spawn result back (quick, hold lock briefly). + pub fn complete_spawn(&mut self, idx: usize, result: SpawnResult) { + self.agents[idx].agent = Some(result.agent); + self.agents[idx].handle = Some(result.handle); + } - let def = match defs::get_def(&name) { - Some(d) => d, - None => return, - }; + /// Restore auto on spawn failure (quick, hold lock briefly). + pub fn abort_spawn(&mut self, idx: usize, auto: AutoAgent) { + self.agents[idx].auto = Some(auto); + } +} - // Run query and resolve placeholders - let mut store = match crate::store::Store::load() { - Ok(s) => s, - Err(e) => { - dbglog!("[unconscious] store load failed: {}", e); - return; - } - }; +/// Result of preparing an agent spawn (created outside the lock). +pub struct SpawnResult { + pub agent: std::sync::Arc, + pub handle: tokio::task::JoinHandle<(AutoAgent, Result<(), String>)>, +} - let exclude: std::collections::HashSet = std::collections::HashSet::new(); - let batch = match defs::run_agent( - &store, &def, def.count.unwrap_or(5), &exclude, - ) { - Ok(b) => b, - Err(e) => { - dbglog!("[unconscious] {} query failed: {}", name, e); - return; - } - }; +/// Prepare an agent spawn — does the slow work (Store::load, query, Agent::new). +/// Called outside the Unconscious lock. +/// On success, auto is consumed (moved into spawned task). +/// On failure, auto is returned so it can be restored. +pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result { + dbglog!("[unconscious] spawning {}", name); - if !batch.node_keys.is_empty() { - store.record_agent_visits(&batch.node_keys, &name).ok(); + let def = match defs::get_def(name) { + Some(d) => d, + None => return Err(auto), + }; + + // Run query and resolve placeholders + let mut store = match crate::store::Store::load() { + Ok(s) => s, + Err(e) => { + dbglog!("[unconscious] store load failed: {}", e); + return Err(auto); } + }; - // Take auto out for the spawned task - let Some(mut auto) = self.agents[idx].auto.take() else { - dbglog!("[unconscious] {} already running", name); - return; - }; - let orig_steps = std::mem::replace(&mut auto.steps, - batch.steps.iter().map(|s| AutoStep { - prompt: s.prompt.clone(), - phase: s.phase.clone(), - }).collect()); + let exclude: std::collections::HashSet = std::collections::HashSet::new(); + let batch = match defs::run_agent( + &store, &def, def.count.unwrap_or(5), &exclude, + ) { + Ok(b) => b, + Err(e) => { + dbglog!("[unconscious] {} query failed: {}", name, e); + return Err(auto); + } + }; - // Create standalone Agent — stored so UI can read context - let config = crate::config::get(); - let base_url = config.api_base_url.as_deref().unwrap_or(""); - let api_key = config.api_key.as_deref().unwrap_or(""); - let model = config.api_model.as_deref().unwrap_or(""); - if base_url.is_empty() || model.is_empty() { - dbglog!("[unconscious] API not configured"); + if !batch.node_keys.is_empty() { + store.record_agent_visits(&batch.node_keys, name).ok(); + } + + let orig_steps = std::mem::replace(&mut auto.steps, + batch.steps.iter().map(|s| AutoStep { + prompt: s.prompt.clone(), + phase: s.phase.clone(), + }).collect()); + + // Create standalone Agent — stored so UI can read context + let config = crate::config::get(); + let base_url = config.api_base_url.as_deref().unwrap_or(""); + let api_key = config.api_key.as_deref().unwrap_or(""); + let model = config.api_model.as_deref().unwrap_or(""); + if base_url.is_empty() || model.is_empty() { + dbglog!("[unconscious] API not configured"); + auto.steps = orig_steps; + return Err(auto); + } + + let cli = crate::user::CliArgs::default(); + let (app, _) = match crate::config::load_app(&cli) { + Ok(r) => r, + Err(e) => { + dbglog!("[unconscious] config: {}", e); auto.steps = orig_steps; - self.agents[idx].auto = Some(auto); - return; + return Err(auto); } + }; - let cli = crate::user::CliArgs::default(); - let (app, _) = match crate::config::load_app(&cli) { - Ok(r) => r, - Err(e) => { - dbglog!("[unconscious] config: {}", e); - auto.steps = orig_steps; - self.agents[idx].auto = Some(auto); - return; + // Unconscious agents have self-contained prompts — no standard context. + let client = crate::agent::api::ApiClient::new(base_url, api_key, model); + let agent = crate::agent::Agent::new( + client, Vec::new(), + app, String::new(), None, + crate::agent::tools::ActiveTools::new(), + auto.tools.clone(), + ).await; + { + let mut st = agent.state.lock().await; + st.provenance = auto.name.clone(); + st.priority = Some(auto.priority); + st.temperature = auto.temperature; + } + + let agent_clone = agent.clone(); + let handle = tokio::spawn(async move { + let result = auto.run_shared(&agent_clone).await; + let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await; + auto.update_stats(stats); + auto.steps = orig_steps; + (auto, result) + }); + + Ok(SpawnResult { agent, handle }) +} + +// Backwards compat: trigger() that does all three phases (still holds lock too long, but works) +impl Unconscious { + pub async fn trigger(&mut self) { + self.reap_finished(); + let to_spawn = self.select_to_spawn(); + for (idx, name, auto) in to_spawn { + match prepare_spawn(&name, auto).await { + Ok(result) => self.complete_spawn(idx, result), + Err(auto) => self.abort_spawn(idx, auto), } - }; - // Unconscious agents have self-contained prompts — no standard context. - let client = crate::agent::api::ApiClient::new(base_url, api_key, model); - let agent = crate::agent::Agent::new( - client, Vec::new(), - app, String::new(), None, - crate::agent::tools::ActiveTools::new(), - auto.tools.clone(), - ).await; - { - let mut st = agent.state.lock().await; - st.provenance = auto.name.clone(); - st.priority = Some(auto.priority); - st.temperature = auto.temperature; } - - self.agents[idx].agent = Some(agent.clone()); - - self.agents[idx].handle = Some(tokio::spawn(async move { - let result = auto.run_shared(&agent).await; - let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent).await; - auto.update_stats(stats); - auto.steps = orig_steps; - (auto, result) - })); } }