Move unconscious agents to their own task with watch channel

Instead of managing idle timers in the mind event loop, the
unconscious agents run on a dedicated task that watches a
conscious_active channel. 60s after conscious activity stops,
agents start looping. Conscious activity cancels the timer.

Expose mind state (DMN, scoring, unconscious timer) on the
thalamus screen.

Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-10 03:20:12 -04:00
parent 7a6322c2bf
commit 74945e5754
3 changed files with 84 additions and 18 deletions

View file

@ -113,6 +113,10 @@ pub struct MindState {
pub last_turn_had_tools: bool,
/// Handle to the currently running turn task.
pub turn_handle: Option<tokio::task::JoinHandle<()>>,
/// Unconscious agent idle state — true when 60s timer has expired.
pub unc_idle: bool,
/// When the unconscious idle timer will fire (for UI display).
pub unc_idle_deadline: Instant,
}
impl Clone for MindState {
@ -129,6 +133,8 @@ impl Clone for MindState {
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
turn_handle: None, // Not cloned — only Mind's loop uses this
unc_idle: self.unc_idle,
unc_idle_deadline: self.unc_idle_deadline,
}
}
}
@ -164,6 +170,8 @@ impl MindState {
consecutive_errors: 0,
last_turn_had_tools: false,
turn_handle: None,
unc_idle: false,
unc_idle_deadline: Instant::now() + std::time::Duration::from_secs(60),
}
}
@ -264,6 +272,9 @@ pub struct Mind {
pub unconscious: Arc<tokio::sync::Mutex<Unconscious>>,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
turn_watch: tokio::sync::watch::Sender<bool>,
/// Signals conscious activity to the unconscious loop.
/// true = active, false = idle opportunity.
conscious_active: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
_supervisor: crate::thalamus::supervisor::Supervisor,
@ -291,6 +302,7 @@ impl Mind {
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
let (turn_watch, _) = tokio::sync::watch::channel(false);
let (conscious_active, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
@ -300,10 +312,53 @@ impl Mind {
let subconscious = Arc::new(tokio::sync::Mutex::new(Subconscious::new()));
subconscious.lock().await.init_output_tool(subconscious.clone());
let unconscious = Arc::new(tokio::sync::Mutex::new(Unconscious::new()));
// Spawn the unconscious loop on its own task
if !config.no_agents {
let unc = unconscious.clone();
let shared_for_unc = shared.clone();
let mut unc_rx = conscious_active.subscribe();
tokio::spawn(async move {
const IDLE_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
loop {
// Wait for conscious side to go inactive
if *unc_rx.borrow() {
if unc_rx.changed().await.is_err() { break; }
continue;
}
// Conscious is inactive — wait 60s before starting
let deadline = tokio::time::Instant::now() + IDLE_DELAY;
{
let mut s = shared_for_unc.lock().unwrap();
s.unc_idle = false;
s.unc_idle_deadline = Instant::now() + IDLE_DELAY;
}
let went_active = tokio::select! {
_ = tokio::time::sleep_until(deadline) => false,
r = unc_rx.changed() => r.is_ok(),
};
if went_active { continue; }
// Idle period reached — run agents until conscious goes active
{
let mut s = shared_for_unc.lock().unwrap();
s.unc_idle = true;
}
loop {
unc.lock().await.trigger().await;
// Check if conscious became active
if *unc_rx.borrow() { break; }
// Brief yield to not starve other tasks
tokio::task::yield_now().await;
}
}
});
}
Self { agent, shared, config,
subconscious,
unconscious: Arc::new(tokio::sync::Mutex::new(Unconscious::new())),
turn_tx, turn_watch, bg_tx,
subconscious, unconscious,
turn_tx, turn_watch, conscious_active, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
}
@ -504,6 +559,7 @@ impl Mind {
}
self.shared.lock().unwrap().turn_active = true;
let _ = self.turn_watch.send(true);
let _ = self.conscious_active.send(true);
let agent = self.agent.clone();
let result_tx = self.turn_tx.clone();
self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move {
@ -525,9 +581,6 @@ impl Mind {
let mut bg_rx = self.bg_rx.lock().unwrap().take()
.expect("Mind::run() called twice");
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
let mut unc_handle: Option<tokio::task::JoinHandle<()>> = None;
let mut unc_idle_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(60);
let mut unc_idle = false;
loop {
let (timeout, has_input) = {
let me = self.shared.lock().unwrap();
@ -555,13 +608,8 @@ impl Mind {
}
}
_ = tokio::time::sleep_until(unc_idle_deadline), if !unc_idle && !self.config.no_agents => {
unc_idle = true;
}
Some((result, target)) = turn_rx.recv() => {
unc_idle_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(60);
unc_idle = false;
let _ = self.conscious_active.send(false);
let model_switch = {
let mut s = self.shared.lock().unwrap();
s.turn_handle = None;
@ -592,12 +640,6 @@ impl Mind {
s.trigger(&agent).await;
}));
}
if unc_idle && unc_handle.as_ref().map_or(true, |h| h.is_finished()) {
let unc = self.unconscious.clone();
unc_handle = Some(tokio::spawn(async move {
unc.lock().await.trigger().await;
}));
}
}
// Check for pending user input → push to agent context and start turn

View file

@ -109,6 +109,7 @@ struct App {
context_info: Option<ContextInfo>,
agent_state: Vec<crate::mind::SubconsciousSnapshot>,
unconscious_state: Vec<crate::mind::UnconsciousSnapshot>,
mind_state: Option<crate::mind::MindState>,
graph_health: Option<crate::subconscious::daemon::GraphHealth>,
/// Agent toggle requests from UI — consumed by mind loop.
pub agent_toggles: Vec<String>,
@ -137,6 +138,7 @@ impl App {
context_info: None,
agent_state: Vec::new(),
unconscious_state: Vec::new(),
mind_state: None,
graph_health: None,
agent_toggles: Vec::new(),
walked_count: 0,
@ -403,6 +405,7 @@ async fn run(
}
app.unconscious_state = unc.snapshots();
app.graph_health = unc.graph_health.clone();
app.mind_state = Some(mind.shared.lock().unwrap().clone());
}
app.walked_count = mind.subconscious_walked().await.len();
if !startup_done {

View file

@ -100,6 +100,27 @@ impl ScreenView for ThalamusScreen {
}
lines.push(Line::raw(""));
// Mind state
lines.push(Line::styled("── Mind ──", section));
lines.push(Line::raw(""));
if let Some(ref ms) = app.mind_state {
lines.push(Line::raw(format!(" DMN: {} (turn {}/{})",
ms.dmn.label(), ms.dmn_turns, ms.max_dmn_turns)));
lines.push(Line::raw(format!(" Turn active: {}", ms.turn_active)));
lines.push(Line::raw(format!(" Scoring: {}", ms.scoring_in_flight)));
let unc_status = if ms.unc_idle {
"idle (agents running)".to_string()
} else {
let remaining = ms.unc_idle_deadline
.saturating_duration_since(std::time::Instant::now());
format!("{:.0}s until idle", remaining.as_secs_f64())
};
lines.push(Line::raw(format!(" Unconscious: {}", unc_status)));
} else {
lines.push(Line::styled(" not initialized", dim));
}
lines.push(Line::raw(""));
// Channel status
lines.push(Line::styled("── Channels ──", section));
lines.push(Line::raw(""));