unconscious: event-driven loop via tokio::select!

Replace yield_now() polling with proper event-driven wakeups:
- Add wake: Arc<Notify> to Unconscious struct
- Spawned agents call wake.notify_one() on completion
- Loop uses select! on: unc_rx.changed(), wake.notified(), health timer

Eliminates spinning (was 27.9M iterations per interval).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 22:38:01 -04:00
parent 19789b7e74
commit 4d22a28794
2 changed files with 42 additions and 22 deletions

View file

@ -346,32 +346,44 @@ impl Mind {
let mut s = shared_for_unc.lock().unwrap(); let mut s = shared_for_unc.lock().unwrap();
s.unc_idle = true; s.unc_idle = true;
} }
// Get wake notify for event-driven loop
let wake = unc.lock().await.wake.clone();
let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600));
health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
// Phase 0: health check outside lock (slow I/O) // Do work: reap finished agents, spawn new ones
let needs_health = unc.lock().await.needs_health_refresh(); let (to_spawn, needs_health) = {
let mut guard = unc.lock().await;
guard.reap_finished();
(guard.select_to_spawn(), guard.needs_health_refresh())
};
// Spawn agents outside lock
for (idx, name, auto) in to_spawn {
match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await {
Ok(result) => unc.lock().await.complete_spawn(idx, result),
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
}
}
// Health check outside lock (slow I/O)
if needs_health { if needs_health {
if let Ok(store_arc) = access_local() { if let Ok(store_arc) = access_local() {
let health = crate::subconscious::daemon::compute_graph_health(&store_arc); let health = crate::subconscious::daemon::compute_graph_health(&store_arc);
unc.lock().await.set_health(health); unc.lock().await.set_health(health);
} }
} }
// Phase 1: quick work under lock
let to_spawn = { // Wait for: conscious active, agent finished, or health timer
let mut guard = unc.lock().await; tokio::select! {
guard.reap_finished(); _ = unc_rx.changed() => {
guard.select_to_spawn() if *unc_rx.borrow() { break; }
};
// Phase 2: slow work outside lock
for (idx, name, auto) in to_spawn {
match crate::mind::unconscious::prepare_spawn(&name, auto).await {
Ok(result) => unc.lock().await.complete_spawn(idx, result),
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
} }
_ = wake.notified() => {}
_ = health_interval.tick() => {}
} }
// Check if conscious became active
if *unc_rx.borrow() { break; }
// Brief yield to not starve other tasks
tokio::task::yield_now().await;
} }
} }
}); });
@ -637,7 +649,8 @@ impl Mind {
}; };
let mut cmds = Vec::new(); let mut cmds = Vec::new();
let mut dmn_expired = false; #[allow(unused_assignments)]
let mut _dmn_expired = false;
tokio::select! { tokio::select! {
biased; biased;
@ -676,7 +689,7 @@ impl Mind {
} }
} }
_ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true, _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true,
} }
if !self.config.no_agents { if !self.config.no_agents {

View file

@ -71,6 +71,8 @@ pub struct Unconscious {
max_concurrent: usize, max_concurrent: usize,
pub graph_health: Option<crate::subconscious::daemon::GraphHealth>, pub graph_health: Option<crate::subconscious::daemon::GraphHealth>,
last_health_check: Option<Instant>, last_health_check: Option<Instant>,
/// Notified when agent state changes (finished, toggled)
pub wake: std::sync::Arc<tokio::sync::Notify>,
} }
impl Unconscious { impl Unconscious {
@ -117,6 +119,7 @@ impl Unconscious {
agents, max_concurrent, agents, max_concurrent,
graph_health: None, graph_health: None,
last_health_check: None, last_health_check: None,
wake: std::sync::Arc::new(tokio::sync::Notify::new()),
} }
} }
@ -130,11 +133,13 @@ impl Unconscious {
if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() {
let agent_name = self.agents[idx].name.clone(); let agent_name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap(); let auto = self.agents[idx].auto.take().unwrap();
match prepare_spawn(&agent_name, auto).await { let wake = self.wake.clone();
match prepare_spawn(&agent_name, auto, wake).await {
Ok(result) => self.complete_spawn(idx, result), Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto), Err(auto) => self.abort_spawn(idx, auto),
} }
} }
self.wake.notify_one(); // wake loop to consider new state
Some(new_state) Some(new_state)
} }
@ -245,7 +250,7 @@ pub struct SpawnResult {
/// Called outside the Unconscious lock. /// Called outside the Unconscious lock.
/// On success, auto is consumed (moved into spawned task). /// On success, auto is consumed (moved into spawned task).
/// On failure, auto is returned so it can be restored. /// On failure, auto is returned so it can be restored.
pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResult, AutoAgent> { pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc<tokio::sync::Notify>) -> Result<SpawnResult, AutoAgent> {
dbglog!("[unconscious] spawning {}", name); dbglog!("[unconscious] spawning {}", name);
let def = match defs::get_def(name) { let def = match defs::get_def(name) {
@ -312,6 +317,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResul
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await; let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await;
auto.update_stats(stats); auto.update_stats(stats);
auto.steps = orig_steps; auto.steps = orig_steps;
wake.notify_one(); // wake the loop to reap and maybe spawn more
(auto, result) (auto, result)
}); });
@ -323,8 +329,9 @@ impl Unconscious {
pub async fn trigger(&mut self) { pub async fn trigger(&mut self) {
self.reap_finished(); self.reap_finished();
let to_spawn = self.select_to_spawn(); let to_spawn = self.select_to_spawn();
let wake = self.wake.clone();
for (idx, name, auto) in to_spawn { for (idx, name, auto) in to_spawn {
match prepare_spawn(&name, auto).await { match prepare_spawn(&name, auto, wake.clone()).await {
Ok(result) => self.complete_spawn(idx, result), Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto), Err(auto) => self.abort_spawn(idx, auto),
} }