From 74945e5754d9a1e89d1009f7e5f13ca429426a19 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Fri, 10 Apr 2026 03:20:12 -0400 Subject: [PATCH] Move unconscious agents to their own task with watch channel Instead of managing idle timers in the mind event loop, the unconscious agents run on a dedicated task that watches a conscious_active channel. 60s after conscious activity stops, agents start looping. Conscious activity cancels the timer. Expose mind state (DMN, scoring, unconscious timer) on the thalamus screen. Co-Authored-By: ProofOfConcept --- src/mind/mod.rs | 78 ++++++++++++++++++++++++++++++++++---------- src/user/mod.rs | 3 ++ src/user/thalamus.rs | 21 ++++++++++++ 3 files changed, 84 insertions(+), 18 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index d0885ed..010829f 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -113,6 +113,10 @@ pub struct MindState { pub last_turn_had_tools: bool, /// Handle to the currently running turn task. pub turn_handle: Option>, + /// Unconscious agent idle state — true when 60s timer has expired. + pub unc_idle: bool, + /// When the unconscious idle timer will fire (for UI display). + pub unc_idle_deadline: Instant, } impl Clone for MindState { @@ -129,6 +133,8 @@ impl Clone for MindState { consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, turn_handle: None, // Not cloned — only Mind's loop uses this + unc_idle: self.unc_idle, + unc_idle_deadline: self.unc_idle_deadline, } } } @@ -164,6 +170,8 @@ impl MindState { consecutive_errors: 0, last_turn_had_tools: false, turn_handle: None, + unc_idle: false, + unc_idle_deadline: Instant::now() + std::time::Duration::from_secs(60), } } @@ -264,6 +272,9 @@ pub struct Mind { pub unconscious: Arc>, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, + /// Signals conscious activity to the unconscious loop. + /// true = active, false = idle opportunity. + conscious_active: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, bg_rx: std::sync::Mutex>>, _supervisor: crate::thalamus::supervisor::Supervisor, @@ -291,6 +302,7 @@ impl Mind { let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); let (turn_watch, _) = tokio::sync::watch::channel(false); + let (conscious_active, _) = tokio::sync::watch::channel(false); let (bg_tx, bg_rx) = mpsc::unbounded_channel(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); @@ -300,10 +312,53 @@ impl Mind { let subconscious = Arc::new(tokio::sync::Mutex::new(Subconscious::new())); subconscious.lock().await.init_output_tool(subconscious.clone()); + let unconscious = Arc::new(tokio::sync::Mutex::new(Unconscious::new())); + + // Spawn the unconscious loop on its own task + if !config.no_agents { + let unc = unconscious.clone(); + let shared_for_unc = shared.clone(); + let mut unc_rx = conscious_active.subscribe(); + tokio::spawn(async move { + const IDLE_DELAY: std::time::Duration = std::time::Duration::from_secs(60); + loop { + // Wait for conscious side to go inactive + if *unc_rx.borrow() { + if unc_rx.changed().await.is_err() { break; } + continue; + } + // Conscious is inactive — wait 60s before starting + let deadline = tokio::time::Instant::now() + IDLE_DELAY; + { + let mut s = shared_for_unc.lock().unwrap(); + s.unc_idle = false; + s.unc_idle_deadline = Instant::now() + IDLE_DELAY; + } + let went_active = tokio::select! { + _ = tokio::time::sleep_until(deadline) => false, + r = unc_rx.changed() => r.is_ok(), + }; + if went_active { continue; } + + // Idle period reached — run agents until conscious goes active + { + let mut s = shared_for_unc.lock().unwrap(); + s.unc_idle = true; + } + loop { + unc.lock().await.trigger().await; + // Check if conscious became active + if *unc_rx.borrow() { break; } + // Brief yield to not starve other tasks + tokio::task::yield_now().await; + } + } + }); + } + Self { agent, shared, config, - subconscious, - unconscious: Arc::new(tokio::sync::Mutex::new(Unconscious::new())), - turn_tx, turn_watch, bg_tx, + subconscious, unconscious, + turn_tx, turn_watch, conscious_active, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } @@ -504,6 +559,7 @@ impl Mind { } self.shared.lock().unwrap().turn_active = true; let _ = self.turn_watch.send(true); + let _ = self.conscious_active.send(true); let agent = self.agent.clone(); let result_tx = self.turn_tx.clone(); self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move { @@ -525,9 +581,6 @@ impl Mind { let mut bg_rx = self.bg_rx.lock().unwrap().take() .expect("Mind::run() called twice"); let mut sub_handle: Option> = None; - let mut unc_handle: Option> = None; - let mut unc_idle_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(60); - let mut unc_idle = false; loop { let (timeout, has_input) = { let me = self.shared.lock().unwrap(); @@ -555,13 +608,8 @@ impl Mind { } } - _ = tokio::time::sleep_until(unc_idle_deadline), if !unc_idle && !self.config.no_agents => { - unc_idle = true; - } - Some((result, target)) = turn_rx.recv() => { - unc_idle_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(60); - unc_idle = false; + let _ = self.conscious_active.send(false); let model_switch = { let mut s = self.shared.lock().unwrap(); s.turn_handle = None; @@ -592,12 +640,6 @@ impl Mind { s.trigger(&agent).await; })); } - if unc_idle && unc_handle.as_ref().map_or(true, |h| h.is_finished()) { - let unc = self.unconscious.clone(); - unc_handle = Some(tokio::spawn(async move { - unc.lock().await.trigger().await; - })); - } } // Check for pending user input → push to agent context and start turn diff --git a/src/user/mod.rs b/src/user/mod.rs index 6904234..ec35423 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -109,6 +109,7 @@ struct App { context_info: Option, agent_state: Vec, unconscious_state: Vec, + mind_state: Option, graph_health: Option, /// Agent toggle requests from UI — consumed by mind loop. pub agent_toggles: Vec, @@ -137,6 +138,7 @@ impl App { context_info: None, agent_state: Vec::new(), unconscious_state: Vec::new(), + mind_state: None, graph_health: None, agent_toggles: Vec::new(), walked_count: 0, @@ -403,6 +405,7 @@ async fn run( } app.unconscious_state = unc.snapshots(); app.graph_health = unc.graph_health.clone(); + app.mind_state = Some(mind.shared.lock().unwrap().clone()); } app.walked_count = mind.subconscious_walked().await.len(); if !startup_done { diff --git a/src/user/thalamus.rs b/src/user/thalamus.rs index 38fef38..5556596 100644 --- a/src/user/thalamus.rs +++ b/src/user/thalamus.rs @@ -100,6 +100,27 @@ impl ScreenView for ThalamusScreen { } lines.push(Line::raw("")); + // Mind state + lines.push(Line::styled("── Mind ──", section)); + lines.push(Line::raw("")); + if let Some(ref ms) = app.mind_state { + lines.push(Line::raw(format!(" DMN: {} (turn {}/{})", + ms.dmn.label(), ms.dmn_turns, ms.max_dmn_turns))); + lines.push(Line::raw(format!(" Turn active: {}", ms.turn_active))); + lines.push(Line::raw(format!(" Scoring: {}", ms.scoring_in_flight))); + let unc_status = if ms.unc_idle { + "idle (agents running)".to_string() + } else { + let remaining = ms.unc_idle_deadline + .saturating_duration_since(std::time::Instant::now()); + format!("{:.0}s until idle", remaining.as_secs_f64()) + }; + lines.push(Line::raw(format!(" Unconscious: {}", unc_status))); + } else { + lines.push(Line::styled(" not initialized", dim)); + } + lines.push(Line::raw("")); + // Channel status lines.push(Line::styled("── Channels ──", section)); lines.push(Line::raw(""));