mind: shared MindState for pending input

Add MindState behind Arc<Mutex<>> for state shared between Mind
and UI. Pending user input goes through shared state instead of
MindMessage::UserInput — UI pushes, Mind consumes.

Mind checks for pending input after every event (message received,
turn completed, DMN tick). User input is prioritized over DMN ticks.

This enables the UI to display/edit/cancel queued messages, and
removes the last MindMessage variant that carried data.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 02:52:56 -04:00
parent 05d6bbc912
commit 792e9440af
2 changed files with 61 additions and 49 deletions

View file

@ -36,14 +36,28 @@ fn compaction_threshold(app: &AppConfig) -> u32 {
} }
// --- Mind: all mutable state for a running agent session --- /// Shared state between Mind and UI. UI writes, Mind reads.
pub struct MindState {
/// Pending user input — UI pushes, Mind consumes after turn completes.
pub input: Vec<String>,
/// True while a turn is in progress.
pub turn_active: bool,
}
pub type SharedMindState = Arc<std::sync::Mutex<MindState>>;
pub fn shared_mind_state() -> SharedMindState {
Arc::new(std::sync::Mutex::new(MindState {
input: Vec::new(),
turn_active: false,
}))
}
// --- Mind: cognitive state machine ---
/// Collects the ~15 loose variables that previously lived in run()
/// into a coherent struct with methods. The event loop dispatches
/// to Mind methods; Mind manages turns, compaction, DMN state,
/// and slash commands.
pub struct Mind { pub struct Mind {
pub agent: Arc<Mutex<Agent>>, pub agent: Arc<Mutex<Agent>>,
pub shared: SharedMindState,
config: SessionConfig, config: SessionConfig,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>, turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
@ -58,10 +72,6 @@ pub struct Mind {
/// Broadcast when turn_in_progress changes. Commands can wait /// Broadcast when turn_in_progress changes. Commands can wait
/// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`. /// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`.
turn_watch: tokio::sync::watch::Sender<bool>, turn_watch: tokio::sync::watch::Sender<bool>,
/// User messages received while a turn is in progress.
/// Consolidated into one message (newline-separated) so the
/// model sees everything the user typed, not just the first line.
pending_input: Option<String>,
// Per-turn tracking for DMN context // Per-turn tracking for DMN context
last_user_input: Instant, last_user_input: Instant,
@ -79,6 +89,7 @@ pub struct Mind {
impl Mind { impl Mind {
pub fn new( pub fn new(
agent: Arc<Mutex<Agent>>, agent: Arc<Mutex<Agent>>,
shared: SharedMindState,
config: SessionConfig, config: SessionConfig,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>, turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
@ -88,6 +99,7 @@ impl Mind {
Self { Self {
agent, agent,
shared,
config, config,
ui_tx, ui_tx,
turn_tx, turn_tx,
@ -101,7 +113,6 @@ impl Mind {
turn_in_progress: false, turn_in_progress: false,
turn_handle: None, turn_handle: None,
turn_watch, turn_watch,
pending_input: None,
last_user_input: Instant::now(), last_user_input: Instant::now(),
consecutive_errors: 0, consecutive_errors: 0,
last_turn_had_tools: false, last_turn_had_tools: false,
@ -119,6 +130,7 @@ impl Mind {
fn set_turn_active(&mut self, active: bool) { fn set_turn_active(&mut self, active: bool) {
self.turn_in_progress = active; self.turn_in_progress = active;
self.shared.lock().unwrap().turn_active = active;
let _ = self.turn_watch.send(active); let _ = self.turn_watch.send(active);
} }
@ -141,17 +153,19 @@ impl Mind {
/// Submit user input — either queue it (if a turn is running) or /// Submit user input — either queue it (if a turn is running) or
/// start a new turn immediately. /// start a new turn immediately.
fn submit_input(&mut self, input: String) { /// Check shared state for pending user input, start a turn if available.
fn check_pending_input(&mut self) {
if self.turn_in_progress { if self.turn_in_progress {
match &mut self.pending_input { return;
Some(existing) => {
existing.push('\n');
existing.push_str(&input);
} }
None => self.pending_input = Some(input.clone()), let input = {
let mut shared = self.shared.lock().unwrap();
if shared.input.is_empty() {
return;
} }
let _ = self.ui_tx.send(UiMessage::Info("(queued)".into())); shared.input.join("\n")
} else { };
self.shared.lock().unwrap().input.clear();
self.dmn_turns = 0; self.dmn_turns = 0;
self.consecutive_errors = 0; self.consecutive_errors = 0;
self.last_user_input = Instant::now(); self.last_user_input = Instant::now();
@ -160,7 +174,6 @@ impl Mind {
self.update_status(); self.update_status();
self.spawn_turn(input, StreamTarget::Conversation); self.spawn_turn(input, StreamTarget::Conversation);
} }
}
/// Process a completed turn: update DMN state, check compaction, /// Process a completed turn: update DMN state, check compaction,
/// drain any queued input. /// drain any queued input.
@ -292,15 +305,7 @@ impl Mind {
/// Send any consolidated pending input as a single turn. /// Send any consolidated pending input as a single turn.
fn drain_pending(&mut self) { fn drain_pending(&mut self) {
if let Some(queued) = self.pending_input.take() { self.check_pending_input();
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone()));
self.update_status();
self.spawn_turn(queued, StreamTarget::Conversation);
}
} }
/// Fire a DMN tick: check max turns, generate prompt, spawn turn. /// Fire a DMN tick: check max turns, generate prompt, spawn turn.
@ -442,7 +447,7 @@ impl Mind {
let _ = self.ui_tx.send(UiMessage::Activity(String::new())); let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
} }
} }
self.pending_input = None; self.shared.lock().unwrap().input.clear();
let killed = count; let killed = count;
if killed > 0 || self.turn_in_progress { if killed > 0 || self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info(format!( let _ = self.ui_tx.send(UiMessage::Info(format!(
@ -536,12 +541,11 @@ impl Mind {
Some(msg) = input_rx.recv() => { Some(msg) = input_rx.recv() => {
match msg { match msg {
MindMessage::UserInput(input) => self.submit_input(input),
MindMessage::Hotkey(action) => { MindMessage::Hotkey(action) => {
match action { match action {
HotkeyAction::Interrupt => self.interrupt().await, HotkeyAction::Interrupt => self.interrupt().await,
HotkeyAction::CycleAutonomy => self.cycle_autonomy(), HotkeyAction::CycleAutonomy => self.cycle_autonomy(),
_ => {} // Other hotkeys handled directly by UI _ => {}
} }
} }
MindMessage::NewSession => self.cmd_new().await, MindMessage::NewSession => self.cmd_new().await,
@ -551,18 +555,23 @@ impl Mind {
MindMessage::DmnWake => self.cmd_dmn_wake(), MindMessage::DmnWake => self.cmd_dmn_wake(),
MindMessage::DmnPause => self.cmd_dmn_pause(), MindMessage::DmnPause => self.cmd_dmn_pause(),
} }
self.check_pending_input();
} }
Some((result, target)) = turn_rx.recv() => { Some((result, target)) = turn_rx.recv() => {
self.handle_turn_result(result, target).await; self.handle_turn_result(result, target).await;
self.check_pending_input();
} }
_ = tokio::time::sleep(timeout), if !self.turn_in_progress => { _ = tokio::time::sleep(timeout), if !self.turn_in_progress => {
self.check_pending_input();
if !self.turn_in_progress {
self.dmn_tick(); self.dmn_tick();
} }
} }
} }
} }
}
} }
// --- Startup --- // --- Startup ---
@ -655,7 +664,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1); let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let no_agents = config.no_agents; let no_agents = config.no_agents;
let mut mind = Mind::new(agent, config, ui_tx.clone(), turn_tx); let shared_mind = shared_mind_state();
let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx);
mind.update_status(); mind.update_status();
if !no_agents { if !no_agents {
mind.start_memory_scoring(); mind.start_memory_scoring();
@ -682,7 +692,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
mind.run(mind_rx, turn_rx).await; mind.run(mind_rx, turn_rx).await;
}); });
crate::user::event_loop::run( crate::user::event_loop::run(
app, ui_agent, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, app, ui_agent, shared_mind, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx,
channel_tx, channel_rx, notify_rx, idle_state, channel_tx, channel_rx, notify_rx, idle_state,
).await ).await
} }

View file

@ -19,7 +19,6 @@ use crate::user::ui_channel::{self, UiMessage};
/// Messages from the UI to the Mind. /// Messages from the UI to the Mind.
pub enum MindMessage { pub enum MindMessage {
UserInput(String),
Hotkey(HotkeyAction), Hotkey(HotkeyAction),
DmnSleep, DmnSleep,
DmnWake, DmnWake,
@ -60,8 +59,8 @@ fn send_help(ui_tx: &ui_channel::UiSender) {
async fn cmd_retry( async fn cmd_retry(
agent: &Arc<Mutex<Agent>>, agent: &Arc<Mutex<Agent>>,
shared_mind: &crate::mind::SharedMindState,
ui_tx: &ui_channel::UiSender, ui_tx: &ui_channel::UiSender,
mind_tx: &tokio::sync::mpsc::UnboundedSender<MindMessage>,
) { ) {
let mut agent_guard = agent.lock().await; let mut agent_guard = agent.lock().await;
let entries = agent_guard.entries_mut(); let entries = agent_guard.entries_mut();
@ -78,7 +77,7 @@ async fn cmd_retry(
Some(text) => { Some(text) => {
let preview_len = text.len().min(60); let preview_len = text.len().min(60);
let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
let _ = mind_tx.send(MindMessage::UserInput(text)); shared_mind.lock().unwrap().input.push(text);
} }
None => { None => {
let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
@ -177,6 +176,8 @@ pub async fn cmd_switch_model(
pub async fn run( pub async fn run(
mut app: tui::App, mut app: tui::App,
agent: Arc<Mutex<Agent>>, agent: Arc<Mutex<Agent>>,
shared_mind: crate::mind::SharedMindState,
turn_watch: tokio::sync::watch::Receiver<bool>, turn_watch: tokio::sync::watch::Receiver<bool>,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>, mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>,
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
@ -295,13 +296,12 @@ pub async fn run(
"/score" => { let _ = mind_tx.send(MindMessage::Score); } "/score" => { let _ = mind_tx.send(MindMessage::Score); }
"/retry" => { "/retry" => {
let agent = agent.clone(); let agent = agent.clone();
let sm = shared_mind.clone();
let ui_tx = ui_tx.clone(); let ui_tx = ui_tx.clone();
let mind_tx = mind_tx.clone();
let mut tw = turn_watch.clone(); let mut tw = turn_watch.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Wait for any in-progress turn to complete
let _ = tw.wait_for(|&active| !active).await; let _ = tw.wait_for(|&active| !active).await;
cmd_retry(&agent, &ui_tx, &mind_tx).await; cmd_retry(&agent, &sm, &ui_tx).await;
}); });
} }
cmd if cmd.starts_with("/model ") => { cmd if cmd.starts_with("/model ") => {
@ -316,7 +316,9 @@ pub async fn run(
}); });
} }
} }
_ => { let _ = mind_tx.send(MindMessage::UserInput(input)); } _ => {
shared_mind.lock().unwrap().input.push(input);
}
} }
} }