mind: unify MindCommand, add command queue pattern

MindCommand replaces both Action and MindMessage — one type for
everything: turns, compaction, scoring, hotkeys, new session.

State methods return MindCommand values. The run loop collects
commands into a Vec, then drains them through run_commands().
Compact and Score now flow through the same command path as
everything else.

Removes execute(), MindMessage from event_loop. Mind's run loop
is now: select! → collect commands → run_commands().

mind/mod.rs: 957 → 516 lines.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 03:34:43 -04:00
parent 07ca136c14
commit 01b07a7f28
2 changed files with 115 additions and 105 deletions

View file

@ -47,8 +47,10 @@ pub struct MindState {
pub dmn: dmn::State,
pub dmn_turns: u32,
pub max_dmn_turns: u32,
/// Whether a full matrix /score task is currently running.
/// Whether memory scoring is running.
pub scoring_in_flight: bool,
/// Whether compaction is running.
pub compaction_in_flight: bool,
/// Per-turn tracking
pub last_user_input: Instant,
pub consecutive_errors: u32,
@ -58,9 +60,17 @@ pub struct MindState {
pub type SharedMindState = Arc<std::sync::Mutex<MindState>>;
/// What should happen after a state transition.
pub enum Action {
pub enum MindCommand {
/// Start a turn with this input
Turn(String, StreamTarget),
/// Run compaction check
Compact,
/// Run memory scoring
Score,
/// Hotkey action
Hotkey(crate::user::HotkeyAction),
/// Reset session
NewSession,
/// Nothing to do
None,
}
@ -75,6 +85,7 @@ impl MindState {
dmn_turns: 0,
max_dmn_turns,
scoring_in_flight: false,
compaction_in_flight: false,
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
@ -86,9 +97,9 @@ impl MindState {
}
/// Consume pending input, return a Turn action if ready.
pub fn take_pending_input(&mut self) -> Action {
pub fn take_pending_input(&mut self) -> MindCommand {
if self.turn_active || self.input.is_empty() {
return Action::None;
return MindCommand::None;
}
let text = self.input.join("\n");
self.input.clear();
@ -96,7 +107,7 @@ impl MindState {
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
Action::Turn(text, StreamTarget::Conversation)
MindCommand::Turn(text, StreamTarget::Conversation)
}
/// Process turn completion, return model switch name if requested.
@ -131,16 +142,16 @@ impl MindState {
}
/// DMN tick — returns a Turn action with the DMN prompt, or None.
pub fn dmn_tick(&mut self) -> Action {
pub fn dmn_tick(&mut self) -> MindCommand {
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
return Action::None;
return MindCommand::None;
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
self.dmn = dmn::State::Resting { since: Instant::now() };
self.dmn_turns = 0;
return Action::None;
return MindCommand::None;
}
let dmn_ctx = dmn::DmnContext {
@ -149,7 +160,7 @@ impl MindState {
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
Action::Turn(prompt, StreamTarget::Autonomous)
MindCommand::Turn(prompt, StreamTarget::Autonomous)
}
pub fn dmn_sleep(&mut self) {
@ -229,30 +240,69 @@ impl Mind {
}
/// Execute an Action from a MindState method.
fn execute(&mut self, action: Action) {
if let Action::Turn(input, target) = action {
if target == StreamTarget::Conversation {
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
} else {
let s = self.shared.lock().unwrap();
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} ({}/{})]", s.dmn.label(), s.dmn_turns, s.max_dmn_turns,
)));
async fn run_commands(&mut self, cmds: Vec<MindCommand>) {
for cmd in cmds {
match cmd {
MindCommand::None => {}
MindCommand::Compact => self.check_compaction(),
MindCommand::Score => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
}
}
MindCommand::Hotkey(crate::user::HotkeyAction::Interrupt) => {
self.shared.lock().unwrap().interrupt();
let ag = self.agent.lock().await;
let mut tools = ag.active_tools.lock().unwrap();
for entry in tools.drain(..) { entry.handle.abort(); }
drop(tools); drop(ag);
if let Some(h) = self.turn_handle.take() { h.abort(); }
self.shared.lock().unwrap().turn_active = false;
let _ = self.turn_watch.send(false);
}
MindCommand::Hotkey(crate::user::HotkeyAction::CycleAutonomy) => {
self.shared.lock().unwrap().cycle_autonomy();
}
MindCommand::Hotkey(_) => {}
MindCommand::NewSession => {
self.shared.lock().unwrap().dmn_sleep();
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
let mut ag = self.agent.lock().await;
let shared_ctx = ag.shared_context.clone();
let shared_tools = ag.active_tools.clone();
*ag = Agent::new(
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
self.config.system_prompt.clone(), self.config.context_parts.clone(),
self.config.app.clone(), self.config.prompt_file.clone(),
new_log, shared_ctx, shared_tools,
);
}
MindCommand::Turn(input, target) => {
if target == StreamTarget::Conversation {
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
}
self.shared.lock().unwrap().turn_active = true;
let _ = self.turn_watch.send(true);
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone();
self.turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent, &input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await;
}));
}
}
self.shared.lock().unwrap().turn_active = true;
let _ = self.turn_watch.send(true);
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone();
self.turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent, &input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await;
}));
}
}
pub fn start_memory_scoring(&self) {
let agent = self.agent.clone();
let shared = self.shared.clone();
let ui_tx = self.ui_tx.clone();
let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs;
@ -262,7 +312,6 @@ impl Mind {
let mut ag = agent.lock().await;
if ag.agent_cycles.memory_scoring_in_flight { return; }
ag.agent_cycles.memory_scoring_in_flight = true;
let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots()));
(ag.context.clone(), ag.client_clone())
};
let result = learn::score_memories_incremental(
@ -273,26 +322,21 @@ impl Mind {
ag.agent_cycles.memory_scoring_in_flight = false;
if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); }
}
match result {
Ok(_) => { let ag = agent.lock().await; let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); }
Err(e) => { let _ = ui_tx.send(UiMessage::Debug(format!("[memory-scoring] failed: {:#}", e))); }
}
shared.lock().unwrap().scoring_in_flight = false;
});
}
fn check_compaction(&self) {
let threshold = compaction_threshold(&self.config.app);
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let shared = self.shared.clone();
shared.lock().unwrap().compaction_in_flight = true;
tokio::spawn(async move {
let mut ag = agent.lock().await;
if ag.last_prompt_tokens() > threshold {
let _ = ui_tx.send(UiMessage::Info(format!(
"[compaction: {}K > {}K]", ag.last_prompt_tokens() / 1000, threshold / 1000,
)));
ag.compact();
let _ = ui_tx.send(UiMessage::Info("[compacted]".into()));
}
shared.lock().unwrap().compaction_in_flight = false;
});
}
@ -304,61 +348,20 @@ impl Mind {
/// Mind event loop — locks MindState, calls state methods, executes actions.
pub async fn run(
&mut self,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<crate::user::event_loop::MindMessage>,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>,
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
) {
use crate::user::event_loop::MindMessage;
use crate::user::HotkeyAction;
loop {
let timeout = self.shared.lock().unwrap().dmn_interval();
let turn_active = self.shared.lock().unwrap().turn_active;
let mut cmds = Vec::new();
tokio::select! {
biased;
Some(msg) = input_rx.recv() => {
match msg {
MindMessage::Hotkey(HotkeyAction::CycleAutonomy) => {
self.shared.lock().unwrap().cycle_autonomy();
}
MindMessage::Hotkey(HotkeyAction::Interrupt) => {
self.shared.lock().unwrap().interrupt();
let ag = self.agent.lock().await;
let mut tools = ag.active_tools.lock().unwrap();
for entry in tools.drain(..) { entry.handle.abort(); }
drop(tools); drop(ag);
if let Some(h) = self.turn_handle.take() { h.abort(); }
self.shared.lock().unwrap().turn_active = false;
let _ = self.turn_watch.send(false);
}
MindMessage::NewSession => {
self.shared.lock().unwrap().dmn_sleep();
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
let mut ag = self.agent.lock().await;
let shared_ctx = ag.shared_context.clone();
let shared_tools = ag.active_tools.clone();
*ag = Agent::new(
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
self.config.system_prompt.clone(), self.config.context_parts.clone(),
self.config.app.clone(), self.config.prompt_file.clone(),
new_log, shared_ctx, shared_tools,
);
}
MindMessage::Score => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
}
}
_ => {}
}
let action = self.shared.lock().unwrap().take_pending_input();
self.execute(action);
Some(cmd) = input_rx.recv() => {
cmds.push(cmd);
}
Some((result, target)) = turn_rx.recv() => {
@ -370,22 +373,22 @@ impl Mind {
crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await;
}
self.check_compaction();
if !self.config.no_agents { self.start_memory_scoring(); }
let action = self.shared.lock().unwrap().take_pending_input();
self.execute(action);
cmds.push(MindCommand::Compact);
if !self.config.no_agents {
cmds.push(MindCommand::Score);
}
}
_ = tokio::time::sleep(timeout), if !turn_active => {
let action = self.shared.lock().unwrap().take_pending_input();
self.execute(action);
if !self.shared.lock().unwrap().turn_active {
let action = self.shared.lock().unwrap().dmn_tick();
self.execute(action);
}
let tick = self.shared.lock().unwrap().dmn_tick();
cmds.push(tick);
}
}
// Always check for pending input
cmds.push(self.shared.lock().unwrap().take_pending_input());
self.run_commands(cmds).await;
}
}
}

View file

@ -17,12 +17,7 @@ use crate::config::SessionConfig;
use crate::user::{self as tui, HotkeyAction};
use crate::user::ui_channel::{self, UiMessage};
/// Messages from the UI to the Mind.
pub enum MindMessage {
Hotkey(HotkeyAction),
NewSession,
Score,
}
pub use crate::mind::MindCommand;
fn send_help(ui_tx: &ui_channel::UiSender) {
let commands = &[
@ -189,7 +184,7 @@ pub async fn run(
shared_mind: crate::mind::SharedMindState,
turn_watch: tokio::sync::watch::Receiver<bool>,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindCommand>,
ui_tx: ui_channel::UiSender,
mut ui_rx: ui_channel::UiReceiver,
mut observe_input_rx: tokio::sync::mpsc::UnboundedReceiver<String>,
@ -272,6 +267,18 @@ pub async fn run(
if cur.turn_active != prev_mind.turn_active {
dirty = true;
}
if cur.scoring_in_flight != prev_mind.scoring_in_flight {
if !cur.scoring_in_flight && prev_mind.scoring_in_flight {
let _ = ui_tx.send(UiMessage::Info("[scoring complete]".into()));
}
dirty = true;
}
if cur.compaction_in_flight != prev_mind.compaction_in_flight {
if !cur.compaction_in_flight && prev_mind.compaction_in_flight {
let _ = ui_tx.send(UiMessage::Info("[compacted]".into()));
}
dirty = true;
}
prev_mind = cur;
}
@ -319,7 +326,7 @@ pub async fn run(
let _ = ui_tx.send(UiMessage::Info("(busy)".into()));
}
}
"/new" | "/clear" => { let _ = mind_tx.send(MindMessage::NewSession); }
"/new" | "/clear" => { let _ = mind_tx.send(MindCommand::NewSession); }
"/dmn" => {
let s = shared_mind.lock().unwrap();
let _ = ui_tx.send(UiMessage::Info(format!("DMN: {:?} ({}/{})", s.dmn, s.dmn_turns, s.max_dmn_turns)));
@ -336,7 +343,7 @@ pub async fn run(
shared_mind.lock().unwrap().dmn_pause();
let _ = ui_tx.send(UiMessage::Info("DMN paused.".into()));
}
"/score" => { let _ = mind_tx.send(MindMessage::Score); }
"/score" => { let _ = mind_tx.send(MindCommand::Score); }
"/retry" => {
let agent = agent.clone();
let sm = shared_mind.clone();
@ -371,8 +378,8 @@ pub async fn run(
match action {
HotkeyAction::CycleReasoning => cmd_cycle_reasoning(&agent, &ui_tx),
HotkeyAction::KillProcess => cmd_kill_processes(&agent, &ui_tx).await,
HotkeyAction::Interrupt => { let _ = mind_tx.send(MindMessage::Hotkey(action)); }
HotkeyAction::CycleAutonomy => { let _ = mind_tx.send(MindMessage::Hotkey(action)); }
HotkeyAction::Interrupt => { let _ = mind_tx.send(MindCommand::Hotkey(action)); }
HotkeyAction::CycleAutonomy => { let _ = mind_tx.send(MindCommand::Hotkey(action)); }
HotkeyAction::AdjustSampling(param, delta) => cmd_adjust_sampling(&agent, param, delta),
}
}