From 792e9440af26f7fcbfebe8f5557bf8bd21921cbd Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 02:52:56 -0400 Subject: [PATCH] mind: shared MindState for pending input MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add MindState behind Arc> for state shared between Mind and UI. Pending user input goes through shared state instead of MindMessage::UserInput — UI pushes, Mind consumes. Mind checks for pending input after every event (message received, turn completed, DMN tick). User input is prioritized over DMN ticks. This enables the UI to display/edit/cancel queued messages, and removes the last MindMessage variant that carried data. Co-Authored-By: Kent Overstreet --- src/mind/mod.rs | 94 +++++++++++++++++++++++------------------- src/user/event_loop.rs | 16 +++---- 2 files changed, 61 insertions(+), 49 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 565f509..0db2507 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -36,14 +36,28 @@ fn compaction_threshold(app: &AppConfig) -> u32 { } -// --- Mind: all mutable state for a running agent session --- +/// Shared state between Mind and UI. UI writes, Mind reads. +pub struct MindState { + /// Pending user input — UI pushes, Mind consumes after turn completes. + pub input: Vec, + /// True while a turn is in progress. + pub turn_active: bool, +} + +pub type SharedMindState = Arc>; + +pub fn shared_mind_state() -> SharedMindState { + Arc::new(std::sync::Mutex::new(MindState { + input: Vec::new(), + turn_active: false, + })) +} + +// --- Mind: cognitive state machine --- -/// Collects the ~15 loose variables that previously lived in run() -/// into a coherent struct with methods. The event loop dispatches -/// to Mind methods; Mind manages turns, compaction, DMN state, -/// and slash commands. pub struct Mind { pub agent: Arc>, + pub shared: SharedMindState, config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, @@ -58,10 +72,6 @@ pub struct Mind { /// Broadcast when turn_in_progress changes. Commands can wait /// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`. turn_watch: tokio::sync::watch::Sender, - /// User messages received while a turn is in progress. - /// Consolidated into one message (newline-separated) so the - /// model sees everything the user typed, not just the first line. - pending_input: Option, // Per-turn tracking for DMN context last_user_input: Instant, @@ -79,6 +89,7 @@ pub struct Mind { impl Mind { pub fn new( agent: Arc>, + shared: SharedMindState, config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, @@ -88,6 +99,7 @@ impl Mind { Self { agent, + shared, config, ui_tx, turn_tx, @@ -101,7 +113,6 @@ impl Mind { turn_in_progress: false, turn_handle: None, turn_watch, - pending_input: None, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, @@ -119,6 +130,7 @@ impl Mind { fn set_turn_active(&mut self, active: bool) { self.turn_in_progress = active; + self.shared.lock().unwrap().turn_active = active; let _ = self.turn_watch.send(active); } @@ -141,25 +153,26 @@ impl Mind { /// Submit user input — either queue it (if a turn is running) or /// start a new turn immediately. - fn submit_input(&mut self, input: String) { + /// Check shared state for pending user input, start a turn if available. + fn check_pending_input(&mut self) { if self.turn_in_progress { - match &mut self.pending_input { - Some(existing) => { - existing.push('\n'); - existing.push_str(&input); - } - None => self.pending_input = Some(input.clone()), - } - let _ = self.ui_tx.send(UiMessage::Info("(queued)".into())); - } else { - self.dmn_turns = 0; - self.consecutive_errors = 0; - self.last_user_input = Instant::now(); - self.dmn = dmn::State::Engaged; - let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); - self.update_status(); - self.spawn_turn(input, StreamTarget::Conversation); + return; } + let input = { + let mut shared = self.shared.lock().unwrap(); + if shared.input.is_empty() { + return; + } + shared.input.join("\n") + }; + self.shared.lock().unwrap().input.clear(); + self.dmn_turns = 0; + self.consecutive_errors = 0; + self.last_user_input = Instant::now(); + self.dmn = dmn::State::Engaged; + let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); + self.update_status(); + self.spawn_turn(input, StreamTarget::Conversation); } /// Process a completed turn: update DMN state, check compaction, @@ -292,15 +305,7 @@ impl Mind { /// Send any consolidated pending input as a single turn. fn drain_pending(&mut self) { - if let Some(queued) = self.pending_input.take() { - self.dmn_turns = 0; - self.consecutive_errors = 0; - self.last_user_input = Instant::now(); - self.dmn = dmn::State::Engaged; - let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone())); - self.update_status(); - self.spawn_turn(queued, StreamTarget::Conversation); - } + self.check_pending_input(); } /// Fire a DMN tick: check max turns, generate prompt, spawn turn. @@ -442,7 +447,7 @@ impl Mind { let _ = self.ui_tx.send(UiMessage::Activity(String::new())); } } - self.pending_input = None; + self.shared.lock().unwrap().input.clear(); let killed = count; if killed > 0 || self.turn_in_progress { let _ = self.ui_tx.send(UiMessage::Info(format!( @@ -536,12 +541,11 @@ impl Mind { Some(msg) = input_rx.recv() => { match msg { - MindMessage::UserInput(input) => self.submit_input(input), MindMessage::Hotkey(action) => { match action { HotkeyAction::Interrupt => self.interrupt().await, HotkeyAction::CycleAutonomy => self.cycle_autonomy(), - _ => {} // Other hotkeys handled directly by UI + _ => {} } } MindMessage::NewSession => self.cmd_new().await, @@ -551,14 +555,19 @@ impl Mind { MindMessage::DmnWake => self.cmd_dmn_wake(), MindMessage::DmnPause => self.cmd_dmn_pause(), } + self.check_pending_input(); } Some((result, target)) = turn_rx.recv() => { self.handle_turn_result(result, target).await; + self.check_pending_input(); } _ = tokio::time::sleep(timeout), if !self.turn_in_progress => { - self.dmn_tick(); + self.check_pending_input(); + if !self.turn_in_progress { + self.dmn_tick(); + } } } } @@ -655,7 +664,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let (turn_tx, turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); let no_agents = config.no_agents; - let mut mind = Mind::new(agent, config, ui_tx.clone(), turn_tx); + let shared_mind = shared_mind_state(); + let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx); mind.update_status(); if !no_agents { mind.start_memory_scoring(); @@ -682,7 +692,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { mind.run(mind_rx, turn_rx).await; }); crate::user::event_loop::run( - app, ui_agent, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, + app, ui_agent, shared_mind, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, channel_tx, channel_rx, notify_rx, idle_state, ).await } diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index a42399a..749d953 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -19,7 +19,6 @@ use crate::user::ui_channel::{self, UiMessage}; /// Messages from the UI to the Mind. pub enum MindMessage { - UserInput(String), Hotkey(HotkeyAction), DmnSleep, DmnWake, @@ -60,8 +59,8 @@ fn send_help(ui_tx: &ui_channel::UiSender) { async fn cmd_retry( agent: &Arc>, + shared_mind: &crate::mind::SharedMindState, ui_tx: &ui_channel::UiSender, - mind_tx: &tokio::sync::mpsc::UnboundedSender, ) { let mut agent_guard = agent.lock().await; let entries = agent_guard.entries_mut(); @@ -78,7 +77,7 @@ async fn cmd_retry( Some(text) => { let preview_len = text.len().min(60); let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); - let _ = mind_tx.send(MindMessage::UserInput(text)); + shared_mind.lock().unwrap().input.push(text); } None => { let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); @@ -177,6 +176,8 @@ pub async fn cmd_switch_model( pub async fn run( mut app: tui::App, agent: Arc>, + shared_mind: crate::mind::SharedMindState, + turn_watch: tokio::sync::watch::Receiver, mind_tx: tokio::sync::mpsc::UnboundedSender, ui_tx: ui_channel::UiSender, @@ -295,13 +296,12 @@ pub async fn run( "/score" => { let _ = mind_tx.send(MindMessage::Score); } "/retry" => { let agent = agent.clone(); + let sm = shared_mind.clone(); let ui_tx = ui_tx.clone(); - let mind_tx = mind_tx.clone(); let mut tw = turn_watch.clone(); tokio::spawn(async move { - // Wait for any in-progress turn to complete let _ = tw.wait_for(|&active| !active).await; - cmd_retry(&agent, &ui_tx, &mind_tx).await; + cmd_retry(&agent, &sm, &ui_tx).await; }); } cmd if cmd.starts_with("/model ") => { @@ -316,7 +316,9 @@ pub async fn run( }); } } - _ => { let _ = mind_tx.send(MindMessage::UserInput(input)); } + _ => { + shared_mind.lock().unwrap().input.push(input); + } } }