From 7dc515b985d541a888a8c50a31a31b697eec3f06 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 05:01:45 -0400 Subject: [PATCH] mind: remove Arc from MindState MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MindState is now std::sync::Mutex owned by Mind, not Arc-wrapped. Background scoring completion signals through a BgEvent channel instead of locking shared directly. Retry sends a Turn command instead of pushing to shared input. No Arc on Mind (scoped tasks), no Arc on MindState (owned by Mind). Only Arc> remains — needed for background turn spawns. Co-Authored-By: Kent Overstreet Signed-off-by: Kent Overstreet --- src/mind/mod.rs | 35 +++++++++++++++++++++++------------ src/user/event_loop.rs | 9 +++++---- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 7b4cb1c..8e11856 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -33,7 +33,6 @@ fn compaction_threshold(app: &AppConfig) -> u32 { (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 } - /// Shared state between Mind and UI. pub struct MindState { /// Pending user input — UI pushes, Mind consumes after turn completes. @@ -74,8 +73,6 @@ impl Clone for MindState { } } -pub type SharedMindState = Arc>; - /// What should happen after a state transition. pub enum MindCommand { /// Start a turn with this input @@ -183,12 +180,15 @@ impl MindState { } } -pub fn shared_mind_state(max_dmn_turns: u32) -> SharedMindState { - Arc::new(std::sync::Mutex::new(MindState::new(max_dmn_turns))) +/// Background task completion events. +enum BgEvent { + ScoringDone, } // --- Mind: cognitive state machine --- +pub type SharedMindState = std::sync::Mutex; + pub struct Mind { pub agent: Arc>, pub shared: SharedMindState, @@ -196,12 +196,11 @@ pub struct Mind { ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, + bg_tx: mpsc::UnboundedSender, + bg_rx: std::sync::Mutex>>, _supervisor: crate::thalamus::supervisor::Supervisor, } -fn _assert_send_sync() {} -const _: fn() = _assert_send_sync::; - impl Mind { pub fn new( config: SessionConfig, @@ -227,14 +226,16 @@ impl Mind { shared_active_tools, ))); - let shared = shared_mind_state(config.app.dmn.max_turns); + let shared = std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)); let (turn_watch, _) = tokio::sync::watch::channel(false); + let (bg_tx, bg_rx) = mpsc::unbounded_channel(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); - Self { agent, shared, config, ui_tx, turn_tx, turn_watch, _supervisor: sup } + Self { agent, shared, config, ui_tx, turn_tx, turn_watch, bg_tx, + bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } /// Initialize — restore log, start daemons and background agents. @@ -323,7 +324,7 @@ impl Mind { pub fn start_memory_scoring(&self) { let agent = self.agent.clone(); - let shared = self.shared.clone(); + let bg_tx = self.bg_tx.clone(); let ui_tx = self.ui_tx.clone(); let cfg = crate::config::get(); let max_age = cfg.scoring_interval_secs; @@ -343,7 +344,7 @@ impl Mind { ag.agent_cycles.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } } - shared.lock().unwrap().scoring_in_flight = false; + let _ = bg_tx.send(BgEvent::ScoringDone); }); } @@ -357,6 +358,8 @@ impl Mind { mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { + let mut bg_rx = self.bg_rx.lock().unwrap().take() + .expect("Mind::run() called twice"); loop { let timeout = self.shared.lock().unwrap().dmn.interval(); let turn_active = self.shared.lock().unwrap().turn_active; @@ -370,6 +373,14 @@ impl Mind { cmds.push(cmd); } + Some(bg) = bg_rx.recv() => { + match bg { + BgEvent::ScoringDone => { + self.shared.lock().unwrap().scoring_in_flight = false; + } + } + } + Some((result, target)) = turn_rx.recv() => { self.shared.lock().unwrap().turn_handle = None; let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index 476d75b..fc1045f 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -86,7 +86,7 @@ fn send_help(ui_tx: &ui_channel::UiSender) { async fn cmd_retry( agent: &Arc>, - shared_mind: &crate::mind::SharedMindState, + mind_tx: &tokio::sync::mpsc::UnboundedSender, ui_tx: &ui_channel::UiSender, ) { let mut agent_guard = agent.lock().await; @@ -104,7 +104,8 @@ async fn cmd_retry( Some(text) => { let preview_len = text.len().min(60); let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); - shared_mind.lock().unwrap().input.push(text); + // Send as a Turn command — Mind will process it + let _ = mind_tx.send(MindCommand::Turn(text, crate::user::ui_channel::StreamTarget::Conversation)); } None => { let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); @@ -411,12 +412,12 @@ pub async fn run( "/score" => { let _ = mind_tx.send(MindCommand::Score); } "/retry" => { let agent = agent.clone(); - let sm = shared_mind.clone(); + let mind_tx = mind_tx.clone(); let ui_tx = ui_tx.clone(); let mut tw = turn_watch.clone(); tokio::spawn(async move { let _ = tw.wait_for(|&active| !active).await; - cmd_retry(&agent, &sm, &ui_tx).await; + cmd_retry(&agent, &mind_tx, &ui_tx).await; }); } cmd if cmd.starts_with("/model ") => {