diff --git a/src/mind/mod.rs b/src/mind/mod.rs index ca6d740..9fcc101 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -346,32 +346,44 @@ impl Mind { let mut s = shared_for_unc.lock().unwrap(); s.unc_idle = true; } + + // Get wake notify for event-driven loop + let wake = unc.lock().await.wake.clone(); + let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600)); + health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - // Phase 0: health check outside lock (slow I/O) - let needs_health = unc.lock().await.needs_health_refresh(); + // Do work: reap finished agents, spawn new ones + let (to_spawn, needs_health) = { + let mut guard = unc.lock().await; + guard.reap_finished(); + (guard.select_to_spawn(), guard.needs_health_refresh()) + }; + + // Spawn agents outside lock + for (idx, name, auto) in to_spawn { + match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await { + Ok(result) => unc.lock().await.complete_spawn(idx, result), + Err(auto) => unc.lock().await.abort_spawn(idx, auto), + } + } + + // Health check outside lock (slow I/O) if needs_health { if let Ok(store_arc) = access_local() { let health = crate::subconscious::daemon::compute_graph_health(&store_arc); unc.lock().await.set_health(health); } } - // Phase 1: quick work under lock - let to_spawn = { - let mut guard = unc.lock().await; - guard.reap_finished(); - guard.select_to_spawn() - }; - // Phase 2: slow work outside lock - for (idx, name, auto) in to_spawn { - match crate::mind::unconscious::prepare_spawn(&name, auto).await { - Ok(result) => unc.lock().await.complete_spawn(idx, result), - Err(auto) => unc.lock().await.abort_spawn(idx, auto), + + // Wait for: conscious active, agent finished, or health timer + tokio::select! { + _ = unc_rx.changed() => { + if *unc_rx.borrow() { break; } } + _ = wake.notified() => {} + _ = health_interval.tick() => {} } - // Check if conscious became active - if *unc_rx.borrow() { break; } - // Brief yield to not starve other tasks - tokio::task::yield_now().await; } } }); @@ -637,7 +649,8 @@ impl Mind { }; let mut cmds = Vec::new(); - let mut dmn_expired = false; + #[allow(unused_assignments)] + let mut _dmn_expired = false; tokio::select! { biased; @@ -676,7 +689,7 @@ impl Mind { } } - _ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true, + _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, } if !self.config.no_agents { diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 87c44db..8989264 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -71,6 +71,8 @@ pub struct Unconscious { max_concurrent: usize, pub graph_health: Option, last_health_check: Option, + /// Notified when agent state changes (finished, toggled) + pub wake: std::sync::Arc, } impl Unconscious { @@ -117,6 +119,7 @@ impl Unconscious { agents, max_concurrent, graph_health: None, last_health_check: None, + wake: std::sync::Arc::new(tokio::sync::Notify::new()), } } @@ -130,11 +133,13 @@ impl Unconscious { if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { let agent_name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); - match prepare_spawn(&agent_name, auto).await { + let wake = self.wake.clone(); + match prepare_spawn(&agent_name, auto, wake).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } } + self.wake.notify_one(); // wake loop to consider new state Some(new_state) } @@ -245,7 +250,7 @@ pub struct SpawnResult { /// Called outside the Unconscious lock. /// On success, auto is consumed (moved into spawned task). /// On failure, auto is returned so it can be restored. -pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result { +pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc) -> Result { dbglog!("[unconscious] spawning {}", name); let def = match defs::get_def(name) { @@ -312,6 +317,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), }