mind: add turn_watch, move /retry to event_loop

Add tokio::sync::watch for turn_in_progress state. Commands in the
UI event loop can wait for turns to complete via wait_for() instead
of checking-and-bailing.

Move /retry to event_loop: waits for turn completion, pops agent
history, sends retried text as MindMessage::UserInput. Mind doesn't
need to know about retry — it just sees a new input message.

Make agent field pub on Mind for UI access.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 02:37:51 -04:00
parent 178824fa01
commit b05c956ab8
2 changed files with 63 additions and 36 deletions

View file

@ -48,7 +48,7 @@ pub enum Command {
/// to Mind methods; Mind manages turns, compaction, DMN state, /// to Mind methods; Mind manages turns, compaction, DMN state,
/// and slash commands. /// and slash commands.
pub struct Mind { pub struct Mind {
agent: Arc<Mutex<Agent>>, pub agent: Arc<Mutex<Agent>>,
config: SessionConfig, config: SessionConfig,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>, turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
@ -60,6 +60,9 @@ pub struct Mind {
// Turn tracking // Turn tracking
turn_in_progress: bool, turn_in_progress: bool,
turn_handle: Option<tokio::task::JoinHandle<()>>, turn_handle: Option<tokio::task::JoinHandle<()>>,
/// Broadcast when turn_in_progress changes. Commands can wait
/// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`.
turn_watch: tokio::sync::watch::Sender<bool>,
/// User messages received while a turn is in progress. /// User messages received while a turn is in progress.
/// Consolidated into one message (newline-separated) so the /// Consolidated into one message (newline-separated) so the
/// model sees everything the user typed, not just the first line. /// model sees everything the user typed, not just the first line.
@ -79,13 +82,14 @@ pub struct Mind {
} }
impl Mind { impl Mind {
fn new( pub fn new(
agent: Arc<Mutex<Agent>>, agent: Arc<Mutex<Agent>>,
config: SessionConfig, config: SessionConfig,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>, turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self { ) -> Self {
let max_dmn_turns = config.app.dmn.max_turns; let max_dmn_turns = config.app.dmn.max_turns;
let (turn_watch, _) = tokio::sync::watch::channel(false);
Self { Self {
agent, agent,
@ -101,6 +105,7 @@ impl Mind {
max_dmn_turns, max_dmn_turns,
turn_in_progress: false, turn_in_progress: false,
turn_handle: None, turn_handle: None,
turn_watch,
pending_input: None, pending_input: None,
last_user_input: Instant::now(), last_user_input: Instant::now(),
consecutive_errors: 0, consecutive_errors: 0,
@ -111,6 +116,17 @@ impl Mind {
} }
} }
/// Subscribe to turn state changes. Use `rx.wait_for(|&v| !v).await`
/// to wait until no turn is in progress.
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
self.turn_watch.subscribe()
}
fn set_turn_active(&mut self, active: bool) {
self.turn_in_progress = active;
let _ = self.turn_watch.send(active);
}
/// How long before the next DMN tick. /// How long before the next DMN tick.
fn dmn_interval(&self) -> Duration { fn dmn_interval(&self) -> Duration {
self.dmn.interval() self.dmn.interval()
@ -121,7 +137,7 @@ impl Mind {
let agent = self.agent.clone(); let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone(); let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone(); let result_tx = self.turn_tx.clone();
self.turn_in_progress = true; self.set_turn_active(true);
self.turn_handle = Some(tokio::spawn(async move { self.turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent, &input, &ui_tx, target).await; let result = Agent::turn(agent, &input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await; let _ = result_tx.send((result, target)).await;
@ -158,7 +174,7 @@ impl Mind {
result: Result<TurnResult>, result: Result<TurnResult>,
target: StreamTarget, target: StreamTarget,
) { ) {
self.turn_in_progress = false; self.set_turn_active(false);
self.turn_handle = None; self.turn_handle = None;
match result { match result {
@ -425,36 +441,6 @@ impl Mind {
self.update_status(); self.update_status();
Command::Handled Command::Handled
} }
"/retry" => {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
let mut agent_guard = self.agent.lock().await;
let entries = agent_guard.entries_mut();
let mut last_user_text = None;
while let Some(entry) = entries.last() {
if entry.message().role == api_types::Role::User {
last_user_text = Some(entries.pop().unwrap().message().content_text().to_string());
break;
}
entries.pop();
}
drop(agent_guard);
match last_user_text {
Some(text) => {
let preview_len = text.len().min(60);
let _ = self.ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
self.dmn_turns = 0;
self.dmn = dmn::State::Engaged;
self.spawn_turn(text, StreamTarget::Conversation);
}
None => {
let _ = self.ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
}
}
Command::Handled
}
_ => Command::None, _ => Command::None,
} }
} }
@ -473,7 +459,7 @@ impl Mind {
if count == 0 { if count == 0 {
if let Some(handle) = self.turn_handle.take() { if let Some(handle) = self.turn_handle.take() {
handle.abort(); handle.abort();
self.turn_in_progress = false; self.set_turn_active(false);
self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn = dmn::State::Resting { since: Instant::now() };
self.update_status(); self.update_status();
let _ = self.ui_tx.send(UiMessage::Activity(String::new())); let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
@ -761,13 +747,14 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
// App for TUI // App for TUI
let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools); let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools);
let ui_agent = mind.agent.clone(); let ui_agent = mind.agent.clone();
let turn_watch = mind.turn_watch();
// Spawn Mind event loop // Spawn Mind event loop
tokio::spawn(async move { tokio::spawn(async move {
mind.run(mind_rx, turn_rx).await; mind.run(mind_rx, turn_rx).await;
}); });
crate::user::event_loop::run( crate::user::event_loop::run(
app, ui_agent, mind_tx, ui_tx, ui_rx, observe_input_rx, app, ui_agent, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx,
channel_tx, channel_rx, notify_rx, idle_state, channel_tx, channel_rx, notify_rx, idle_state,
).await ).await
} }

View file

@ -52,6 +52,34 @@ fn send_help(ui_tx: &ui_channel::UiSender) {
)); ));
} }
async fn cmd_retry(
agent: &Arc<Mutex<Agent>>,
ui_tx: &ui_channel::UiSender,
mind_tx: &tokio::sync::mpsc::UnboundedSender<MindMessage>,
) {
let mut agent_guard = agent.lock().await;
let entries = agent_guard.entries_mut();
let mut last_user_text = None;
while let Some(entry) = entries.last() {
if entry.message().role == crate::agent::api::types::Role::User {
last_user_text = Some(entries.pop().unwrap().message().content_text().to_string());
break;
}
entries.pop();
}
drop(agent_guard);
match last_user_text {
Some(text) => {
let preview_len = text.len().min(60);
let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
let _ = mind_tx.send(MindMessage::UserInput(text));
}
None => {
let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
}
}
}
pub async fn cmd_switch_model( pub async fn cmd_switch_model(
agent: &Arc<Mutex<Agent>>, agent: &Arc<Mutex<Agent>>,
name: &str, name: &str,
@ -94,6 +122,7 @@ pub async fn cmd_switch_model(
pub async fn run( pub async fn run(
mut app: tui::App, mut app: tui::App,
agent: Arc<Mutex<Agent>>, agent: Arc<Mutex<Agent>>,
turn_watch: tokio::sync::watch::Receiver<bool>,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>, mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
mut ui_rx: ui_channel::UiReceiver, mut ui_rx: ui_channel::UiReceiver,
@ -203,6 +232,17 @@ pub async fn run(
let _ = ui_tx.send(UiMessage::Info("(busy)".into())); let _ = ui_tx.send(UiMessage::Info("(busy)".into()));
} }
} }
"/retry" => {
let agent = agent.clone();
let ui_tx = ui_tx.clone();
let mind_tx = mind_tx.clone();
let mut tw = turn_watch.clone();
tokio::spawn(async move {
// Wait for any in-progress turn to complete
let _ = tw.wait_for(|&active| !active).await;
cmd_retry(&agent, &ui_tx, &mind_tx).await;
});
}
cmd if cmd.starts_with("/model ") => { cmd if cmd.starts_with("/model ") => {
let name = cmd[7..].trim().to_string(); let name = cmd[7..].trim().to_string();
if name.is_empty() { if name.is_empty() {