From b05c956ab8743abc144f443b4fa02aa26657b5e3 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 02:37:51 -0400 Subject: [PATCH] mind: add turn_watch, move /retry to event_loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add tokio::sync::watch for turn_in_progress state. Commands in the UI event loop can wait for turns to complete via wait_for() instead of checking-and-bailing. Move /retry to event_loop: waits for turn completion, pops agent history, sends retried text as MindMessage::UserInput. Mind doesn't need to know about retry — it just sees a new input message. Make agent field pub on Mind for UI access. Co-Authored-By: Kent Overstreet --- src/mind/mod.rs | 59 ++++++++++++++++-------------------------- src/user/event_loop.rs | 40 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index d78e85b..5890d07 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -48,7 +48,7 @@ pub enum Command { /// to Mind methods; Mind manages turns, compaction, DMN state, /// and slash commands. pub struct Mind { - agent: Arc>, + pub agent: Arc>, config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, @@ -60,6 +60,9 @@ pub struct Mind { // Turn tracking turn_in_progress: bool, turn_handle: Option>, + /// Broadcast when turn_in_progress changes. Commands can wait + /// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`. + turn_watch: tokio::sync::watch::Sender, /// User messages received while a turn is in progress. /// Consolidated into one message (newline-separated) so the /// model sees everything the user typed, not just the first line. @@ -79,13 +82,14 @@ pub struct Mind { } impl Mind { - fn new( + pub fn new( agent: Arc>, config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, ) -> Self { let max_dmn_turns = config.app.dmn.max_turns; + let (turn_watch, _) = tokio::sync::watch::channel(false); Self { agent, @@ -101,6 +105,7 @@ impl Mind { max_dmn_turns, turn_in_progress: false, turn_handle: None, + turn_watch, pending_input: None, last_user_input: Instant::now(), consecutive_errors: 0, @@ -111,6 +116,17 @@ impl Mind { } } + /// Subscribe to turn state changes. Use `rx.wait_for(|&v| !v).await` + /// to wait until no turn is in progress. + pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { + self.turn_watch.subscribe() + } + + fn set_turn_active(&mut self, active: bool) { + self.turn_in_progress = active; + let _ = self.turn_watch.send(active); + } + /// How long before the next DMN tick. fn dmn_interval(&self) -> Duration { self.dmn.interval() @@ -121,7 +137,7 @@ impl Mind { let agent = self.agent.clone(); let ui_tx = self.ui_tx.clone(); let result_tx = self.turn_tx.clone(); - self.turn_in_progress = true; + self.set_turn_active(true); self.turn_handle = Some(tokio::spawn(async move { let result = Agent::turn(agent, &input, &ui_tx, target).await; let _ = result_tx.send((result, target)).await; @@ -158,7 +174,7 @@ impl Mind { result: Result, target: StreamTarget, ) { - self.turn_in_progress = false; + self.set_turn_active(false); self.turn_handle = None; match result { @@ -425,36 +441,6 @@ impl Mind { self.update_status(); Command::Handled } - "/retry" => { - if self.turn_in_progress { - let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into())); - return Command::Handled; - } - let mut agent_guard = self.agent.lock().await; - let entries = agent_guard.entries_mut(); - let mut last_user_text = None; - while let Some(entry) = entries.last() { - if entry.message().role == api_types::Role::User { - last_user_text = Some(entries.pop().unwrap().message().content_text().to_string()); - break; - } - entries.pop(); - } - drop(agent_guard); - match last_user_text { - Some(text) => { - let preview_len = text.len().min(60); - let _ = self.ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); - self.dmn_turns = 0; - self.dmn = dmn::State::Engaged; - self.spawn_turn(text, StreamTarget::Conversation); - } - None => { - let _ = self.ui_tx.send(UiMessage::Info("(nothing to retry)".into())); - } - } - Command::Handled - } _ => Command::None, } } @@ -473,7 +459,7 @@ impl Mind { if count == 0 { if let Some(handle) = self.turn_handle.take() { handle.abort(); - self.turn_in_progress = false; + self.set_turn_active(false); self.dmn = dmn::State::Resting { since: Instant::now() }; self.update_status(); let _ = self.ui_tx.send(UiMessage::Activity(String::new())); @@ -761,13 +747,14 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { // App for TUI let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools); let ui_agent = mind.agent.clone(); + let turn_watch = mind.turn_watch(); // Spawn Mind event loop tokio::spawn(async move { mind.run(mind_rx, turn_rx).await; }); crate::user::event_loop::run( - app, ui_agent, mind_tx, ui_tx, ui_rx, observe_input_rx, + app, ui_agent, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, channel_tx, channel_rx, notify_rx, idle_state, ).await } diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index eaf1374..50c9c6f 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -52,6 +52,34 @@ fn send_help(ui_tx: &ui_channel::UiSender) { )); } +async fn cmd_retry( + agent: &Arc>, + ui_tx: &ui_channel::UiSender, + mind_tx: &tokio::sync::mpsc::UnboundedSender, +) { + let mut agent_guard = agent.lock().await; + let entries = agent_guard.entries_mut(); + let mut last_user_text = None; + while let Some(entry) = entries.last() { + if entry.message().role == crate::agent::api::types::Role::User { + last_user_text = Some(entries.pop().unwrap().message().content_text().to_string()); + break; + } + entries.pop(); + } + drop(agent_guard); + match last_user_text { + Some(text) => { + let preview_len = text.len().min(60); + let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); + let _ = mind_tx.send(MindMessage::UserInput(text)); + } + None => { + let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); + } + } +} + pub async fn cmd_switch_model( agent: &Arc>, name: &str, @@ -94,6 +122,7 @@ pub async fn cmd_switch_model( pub async fn run( mut app: tui::App, agent: Arc>, + turn_watch: tokio::sync::watch::Receiver, mind_tx: tokio::sync::mpsc::UnboundedSender, ui_tx: ui_channel::UiSender, mut ui_rx: ui_channel::UiReceiver, @@ -203,6 +232,17 @@ pub async fn run( let _ = ui_tx.send(UiMessage::Info("(busy)".into())); } } + "/retry" => { + let agent = agent.clone(); + let ui_tx = ui_tx.clone(); + let mind_tx = mind_tx.clone(); + let mut tw = turn_watch.clone(); + tokio::spawn(async move { + // Wait for any in-progress turn to complete + let _ = tw.wait_for(|&active| !active).await; + cmd_retry(&agent, &ui_tx, &mind_tx).await; + }); + } cmd if cmd.starts_with("/model ") => { let name = cmd[7..].trim().to_string(); if name.is_empty() {