unconscious: release lock during slow spawn work

Split trigger() into phases so the Unconscious mutex is only held briefly:
- reap_finished(): check handles, restore completed autos
- select_to_spawn(): pick agents, take their autos out
- prepare_spawn(): slow work (Store::load, query, Agent::new) - NO LOCK
- complete_spawn()/abort_spawn(): store results back

Previously held the lock for 28+ seconds during Store::load and query
execution. Now lock hold time should be milliseconds.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-12 20:33:23 -04:00
parent f56fc3a7c7
commit f40d8cfa9d
2 changed files with 142 additions and 85 deletions

View file

@ -346,7 +346,19 @@ impl Mind {
s.unc_idle = true; s.unc_idle = true;
} }
loop { loop {
unc.lock().await.trigger().await; // Phase 1: quick work under lock
let to_spawn = {
let mut guard = unc.lock().await;
guard.reap_finished();
guard.select_to_spawn()
};
// Phase 2: slow work outside lock
for (idx, name, auto) in to_spawn {
match crate::mind::unconscious::prepare_spawn(&name, auto).await {
Ok(result) => unc.lock().await.complete_spawn(idx, result),
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
}
}
// Check if conscious became active // Check if conscious became active
if *unc_rx.borrow() { break; } if *unc_rx.borrow() { break; }
// Brief yield to not starve other tasks // Brief yield to not starve other tasks

View file

@ -127,8 +127,13 @@ impl Unconscious {
self.agents[idx].enabled = !self.agents[idx].enabled; self.agents[idx].enabled = !self.agents[idx].enabled;
let new_state = self.agents[idx].enabled; let new_state = self.agents[idx].enabled;
self.save_enabled(); self.save_enabled();
if new_state && !self.agents[idx].is_running() { if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() {
self.spawn_agent(idx).await; let agent_name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap();
match prepare_spawn(&agent_name, auto).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
}
} }
Some(new_state) Some(new_state)
} }
@ -170,8 +175,8 @@ impl Unconscious {
self.last_health_check = Some(Instant::now()); self.last_health_check = Some(Instant::now());
} }
/// Reap finished agents and spawn new ones. /// Reap finished agents (quick, hold lock briefly).
pub async fn trigger(&mut self) { pub fn reap_finished(&mut self) {
// Periodic graph health refresh (also on first call) // Periodic graph health refresh (also on first call)
if self.last_health_check if self.last_health_check
.map(|t| t.elapsed() > std::time::Duration::from_secs(600)) .map(|t| t.elapsed() > std::time::Duration::from_secs(600))
@ -198,109 +203,149 @@ impl Unconscious {
} }
} }
} }
}
/// Select agents to spawn and take their AutoAgents out (quick, hold lock briefly).
/// Returns vec of (index, name, auto, tools) for agents that should spawn.
pub fn select_to_spawn(&mut self) -> Vec<(usize, String, AutoAgent)> {
let running = self.agents.iter().filter(|a| a.is_running()).count(); let running = self.agents.iter().filter(|a| a.is_running()).count();
let mut to_spawn = Vec::new();
for _ in running..self.max_concurrent { for _ in running..self.max_concurrent {
let next = self.agents.iter().enumerate() let next = self.agents.iter().enumerate()
.filter(|(_, a)| a.should_run()) .filter(|(_, a)| a.should_run() && a.auto.is_some())
.min_by_key(|(_, a)| a.last_run); .min_by_key(|(_, a)| a.last_run);
match next { match next {
Some((idx, _)) => self.spawn_agent(idx).await, Some((idx, _)) => {
let name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap();
to_spawn.push((idx, name, auto));
}
None => break, None => break,
} }
} }
to_spawn
} }
async fn spawn_agent(&mut self, idx: usize) { /// Store spawn result back (quick, hold lock briefly).
let name = self.agents[idx].name.clone(); pub fn complete_spawn(&mut self, idx: usize, result: SpawnResult) {
dbglog!("[unconscious] spawning {}", name); self.agents[idx].agent = Some(result.agent);
self.agents[idx].handle = Some(result.handle);
}
let def = match defs::get_def(&name) { /// Restore auto on spawn failure (quick, hold lock briefly).
Some(d) => d, pub fn abort_spawn(&mut self, idx: usize, auto: AutoAgent) {
None => return, self.agents[idx].auto = Some(auto);
}; }
}
// Run query and resolve placeholders /// Result of preparing an agent spawn (created outside the lock).
let mut store = match crate::store::Store::load() { pub struct SpawnResult {
Ok(s) => s, pub agent: std::sync::Arc<crate::agent::Agent>,
Err(e) => { pub handle: tokio::task::JoinHandle<(AutoAgent, Result<(), String>)>,
dbglog!("[unconscious] store load failed: {}", e); }
return;
}
};
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new(); /// Prepare an agent spawn — does the slow work (Store::load, query, Agent::new).
let batch = match defs::run_agent( /// Called outside the Unconscious lock.
&store, &def, def.count.unwrap_or(5), &exclude, /// On success, auto is consumed (moved into spawned task).
) { /// On failure, auto is returned so it can be restored.
Ok(b) => b, pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResult, AutoAgent> {
Err(e) => { dbglog!("[unconscious] spawning {}", name);
dbglog!("[unconscious] {} query failed: {}", name, e);
return;
}
};
if !batch.node_keys.is_empty() { let def = match defs::get_def(name) {
store.record_agent_visits(&batch.node_keys, &name).ok(); Some(d) => d,
None => return Err(auto),
};
// Run query and resolve placeholders
let mut store = match crate::store::Store::load() {
Ok(s) => s,
Err(e) => {
dbglog!("[unconscious] store load failed: {}", e);
return Err(auto);
} }
};
// Take auto out for the spawned task let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
let Some(mut auto) = self.agents[idx].auto.take() else { let batch = match defs::run_agent(
dbglog!("[unconscious] {} already running", name); &store, &def, def.count.unwrap_or(5), &exclude,
return; ) {
}; Ok(b) => b,
let orig_steps = std::mem::replace(&mut auto.steps, Err(e) => {
batch.steps.iter().map(|s| AutoStep { dbglog!("[unconscious] {} query failed: {}", name, e);
prompt: s.prompt.clone(), return Err(auto);
phase: s.phase.clone(), }
}).collect()); };
// Create standalone Agent — stored so UI can read context if !batch.node_keys.is_empty() {
let config = crate::config::get(); store.record_agent_visits(&batch.node_keys, name).ok();
let base_url = config.api_base_url.as_deref().unwrap_or(""); }
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or(""); let orig_steps = std::mem::replace(&mut auto.steps,
if base_url.is_empty() || model.is_empty() { batch.steps.iter().map(|s| AutoStep {
dbglog!("[unconscious] API not configured"); prompt: s.prompt.clone(),
phase: s.phase.clone(),
}).collect());
// Create standalone Agent — stored so UI can read context
let config = crate::config::get();
let base_url = config.api_base_url.as_deref().unwrap_or("");
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or("");
if base_url.is_empty() || model.is_empty() {
dbglog!("[unconscious] API not configured");
auto.steps = orig_steps;
return Err(auto);
}
let cli = crate::user::CliArgs::default();
let (app, _) = match crate::config::load_app(&cli) {
Ok(r) => r,
Err(e) => {
dbglog!("[unconscious] config: {}", e);
auto.steps = orig_steps; auto.steps = orig_steps;
self.agents[idx].auto = Some(auto); return Err(auto);
return;
} }
};
let cli = crate::user::CliArgs::default(); // Unconscious agents have self-contained prompts — no standard context.
let (app, _) = match crate::config::load_app(&cli) { let client = crate::agent::api::ApiClient::new(base_url, api_key, model);
Ok(r) => r, let agent = crate::agent::Agent::new(
Err(e) => { client, Vec::new(),
dbglog!("[unconscious] config: {}", e); app, String::new(), None,
auto.steps = orig_steps; crate::agent::tools::ActiveTools::new(),
self.agents[idx].auto = Some(auto); auto.tools.clone(),
return; ).await;
{
let mut st = agent.state.lock().await;
st.provenance = auto.name.clone();
st.priority = Some(auto.priority);
st.temperature = auto.temperature;
}
let agent_clone = agent.clone();
let handle = tokio::spawn(async move {
let result = auto.run_shared(&agent_clone).await;
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await;
auto.update_stats(stats);
auto.steps = orig_steps;
(auto, result)
});
Ok(SpawnResult { agent, handle })
}
// Backwards compat: trigger() that does all three phases (still holds lock too long, but works)
impl Unconscious {
pub async fn trigger(&mut self) {
self.reap_finished();
let to_spawn = self.select_to_spawn();
for (idx, name, auto) in to_spawn {
match prepare_spawn(&name, auto).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
} }
};
// Unconscious agents have self-contained prompts — no standard context.
let client = crate::agent::api::ApiClient::new(base_url, api_key, model);
let agent = crate::agent::Agent::new(
client, Vec::new(),
app, String::new(), None,
crate::agent::tools::ActiveTools::new(),
auto.tools.clone(),
).await;
{
let mut st = agent.state.lock().await;
st.provenance = auto.name.clone();
st.priority = Some(auto.priority);
st.temperature = auto.temperature;
} }
self.agents[idx].agent = Some(agent.clone());
self.agents[idx].handle = Some(tokio::spawn(async move {
let result = auto.run_shared(&agent).await;
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent).await;
auto.update_stats(stats);
auto.steps = orig_steps;
(auto, result)
}));
} }
} }