From d82a2ae90d5edf863221faaa49d56582ecf050ee Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Thu, 9 Apr 2026 00:21:46 -0400 Subject: [PATCH] Clean up mind loop: fix double locks, async agent triggers, input peek MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - push_node: notify before dropping state lock instead of relocking - Mind::run: single lock for timeout + turn_active + has_input; single lock for turn_handle + complete_turn - Agent triggers (subconscious/unconscious) spawned as async tasks so they don't block the select loop - has_pending_input() peek for DMN sleep guard — don't sleep when there's user input waiting - unconscious: merge collect_results into trigger, single store load Co-Authored-By: Proof of Concept --- src/agent/mod.rs | 2 +- src/mind/mod.rs | 50 ++++++++++++++++++++++++++++------------- src/mind/unconscious.rs | 49 ++++++++++++---------------------------- 3 files changed, 49 insertions(+), 52 deletions(-) diff --git a/src/agent/mod.rs b/src/agent/mod.rs index d517534..4b00bc2 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -275,9 +275,9 @@ impl Agent { eprintln!("warning: failed to log entry: {:#}", e); } } + st.changed.notify_one(); drop(st); self.context.lock().await.push(Section::Conversation, node); - self.state.lock().await.changed.notify_one(); } /// Run the agent turn loop: assemble prompt, stream response, diff --git a/src/mind/mod.rs b/src/mind/mod.rs index fea5187..47cb052 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -165,6 +165,11 @@ impl MindState { } } + /// Is there pending user input waiting? + fn has_pending_input(&self) -> bool { + !self.turn_active && !self.input.is_empty() + } + /// Consume pending user input if no turn is active. /// Returns the text to send; caller is responsible for pushing it /// into the Agent's context and starting the turn. @@ -254,7 +259,7 @@ pub struct Mind { pub shared: Arc, pub config: SessionConfig, subconscious: Arc>, - unconscious: tokio::sync::Mutex, + unconscious: Arc>, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, @@ -295,7 +300,7 @@ impl Mind { Self { agent, shared, config, subconscious, - unconscious: tokio::sync::Mutex::new(Unconscious::new()), + unconscious: Arc::new(tokio::sync::Mutex::new(Unconscious::new())), turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } @@ -480,9 +485,13 @@ impl Mind { ) { let mut bg_rx = self.bg_rx.lock().unwrap().take() .expect("Mind::run() called twice"); + let mut sub_handle: Option> = None; + let mut unc_handle: Option> = None; loop { - let timeout = self.shared.lock().unwrap().dmn.interval(); - let turn_active = self.shared.lock().unwrap().turn_active; + let (timeout, turn_active, has_input) = { + let me = self.shared.lock().unwrap(); + (me.dmn.interval(), me.turn_active, me.has_pending_input()) + }; let mut cmds = Vec::new(); @@ -505,8 +514,11 @@ impl Mind { } Some((result, target)) = turn_rx.recv() => { - self.shared.lock().unwrap().turn_handle = None; - let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); + let model_switch = { + let mut s = self.shared.lock().unwrap(); + s.turn_handle = None; + s.complete_turn(&result, target) + }; let _ = self.turn_watch.send(false); if let Some(name) = model_switch { @@ -519,7 +531,7 @@ impl Mind { } } - _ = tokio::time::sleep(timeout), if !turn_active => { + _ = tokio::time::sleep(timeout), if !has_input => { let tick = self.shared.lock().unwrap().dmn_tick(); if let Some((prompt, target)) = tick { self.start_turn(&prompt, target).await; @@ -527,16 +539,22 @@ impl Mind { } } - // Subconscious: collect finished results, trigger due agents if !self.config.no_agents { - let mut sub = self.subconscious.lock().await; - sub.collect_results(&self.agent).await; - sub.trigger(&self.agent).await; - drop(sub); - - let mut unc = self.unconscious.lock().await; - unc.collect_results().await; - unc.trigger(); + if sub_handle.as_ref().map_or(true, |h| h.is_finished()) { + let sub = self.subconscious.clone(); + let agent = self.agent.clone(); + sub_handle = Some(tokio::spawn(async move { + let mut s = sub.lock().await; + s.collect_results(&agent).await; + s.trigger(&agent).await; + })); + } + if unc_handle.as_ref().map_or(true, |h| h.is_finished()) { + let unc = self.unconscious.clone(); + unc_handle = Some(tokio::spawn(async move { + unc.lock().await.trigger(); + })); + } } // Check for pending user input → push to agent context and start turn diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 5004c8f..d33c8fc 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -114,32 +114,19 @@ impl Unconscious { }).collect() } - /// Collect results from finished agents. - pub async fn collect_results(&mut self) { - for agent in &mut self.agents { - if agent.handle.as_ref().is_some_and(|h| h.is_finished()) { - let handle = agent.handle.take().unwrap(); - agent.last_run = Some(Instant::now()); - agent.completed += 1; - - match handle.await { - Ok((_auto, Ok(text))) => { - let preview = &text[..text.floor_char_boundary(text.len().min(100))]; - dbglog!("[unconscious] {} completed: {}", agent.name, preview); - } - Ok((_auto, Err(e))) => { - dbglog!("[unconscious] {} failed: {}", agent.name, e); - } - Err(e) => { - dbglog!("[unconscious] {} panicked: {}", agent.name, e); - } - } - } - } - } - /// Trigger agents that are due to run. pub fn trigger(&mut self) { + // Reap finished agents + for agent in &mut self.agents { + if agent.handle.as_ref().is_some_and(|h| h.is_finished()) { + agent.last_run = Some(Instant::now()); + agent.completed += 1; + dbglog!("[unconscious] {} completed ({}/{})", + agent.name, agent.completed, agent.budget); + agent.handle = None; + } + } + // Refresh plan every 30 minutes (or on first call) let should_refresh = self.last_plan_refresh .map(|t| t.elapsed() > Duration::from_secs(1800)) @@ -187,8 +174,8 @@ impl Unconscious { .collect() }; - // Run query and resolve placeholders synchronously - let store = match crate::store::Store::load() { + // Run query, resolve placeholders, record visits + let mut store = match crate::store::Store::load() { Ok(s) => s, Err(e) => { dbglog!("[unconscious] store load failed: {}", e); @@ -196,10 +183,7 @@ impl Unconscious { } }; - // Track which nodes other running agents are working on - // to avoid concurrent collisions let exclude: std::collections::HashSet = std::collections::HashSet::new(); - let batch = match defs::run_agent( &store, &def, def.count.unwrap_or(5), &exclude, ) { @@ -210,13 +194,8 @@ impl Unconscious { } }; - // Record visits if !batch.node_keys.is_empty() { - let mut store_mut = match crate::store::Store::load() { - Ok(s) => s, - Err(_) => return, - }; - store_mut.record_agent_visits(&batch.node_keys, &name).ok(); + store.record_agent_visits(&batch.node_keys, &name).ok(); } let steps: Vec = batch.steps.iter().map(|s| AutoStep {