From 01b07a7f288c472fb3227cc171d07c64ca503ac8 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 03:34:43 -0400 Subject: [PATCH] mind: unify MindCommand, add command queue pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MindCommand replaces both Action and MindMessage — one type for everything: turns, compaction, scoring, hotkeys, new session. State methods return MindCommand values. The run loop collects commands into a Vec, then drains them through run_commands(). Compact and Score now flow through the same command path as everything else. Removes execute(), MindMessage from event_loop. Mind's run loop is now: select! → collect commands → run_commands(). mind/mod.rs: 957 → 516 lines. Co-Authored-By: Kent Overstreet --- src/mind/mod.rs | 191 +++++++++++++++++++++-------------------- src/user/event_loop.rs | 29 ++++--- 2 files changed, 115 insertions(+), 105 deletions(-) diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 07deb39..47c7161 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -47,8 +47,10 @@ pub struct MindState { pub dmn: dmn::State, pub dmn_turns: u32, pub max_dmn_turns: u32, - /// Whether a full matrix /score task is currently running. + /// Whether memory scoring is running. pub scoring_in_flight: bool, + /// Whether compaction is running. + pub compaction_in_flight: bool, /// Per-turn tracking pub last_user_input: Instant, pub consecutive_errors: u32, @@ -58,9 +60,17 @@ pub struct MindState { pub type SharedMindState = Arc>; /// What should happen after a state transition. -pub enum Action { +pub enum MindCommand { /// Start a turn with this input Turn(String, StreamTarget), + /// Run compaction check + Compact, + /// Run memory scoring + Score, + /// Hotkey action + Hotkey(crate::user::HotkeyAction), + /// Reset session + NewSession, /// Nothing to do None, } @@ -75,6 +85,7 @@ impl MindState { dmn_turns: 0, max_dmn_turns, scoring_in_flight: false, + compaction_in_flight: false, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, @@ -86,9 +97,9 @@ impl MindState { } /// Consume pending input, return a Turn action if ready. - pub fn take_pending_input(&mut self) -> Action { + pub fn take_pending_input(&mut self) -> MindCommand { if self.turn_active || self.input.is_empty() { - return Action::None; + return MindCommand::None; } let text = self.input.join("\n"); self.input.clear(); @@ -96,7 +107,7 @@ impl MindState { self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = dmn::State::Engaged; - Action::Turn(text, StreamTarget::Conversation) + MindCommand::Turn(text, StreamTarget::Conversation) } /// Process turn completion, return model switch name if requested. @@ -131,16 +142,16 @@ impl MindState { } /// DMN tick — returns a Turn action with the DMN prompt, or None. - pub fn dmn_tick(&mut self) -> Action { + pub fn dmn_tick(&mut self) -> MindCommand { if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) { - return Action::None; + return MindCommand::None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; - return Action::None; + return MindCommand::None; } let dmn_ctx = dmn::DmnContext { @@ -149,7 +160,7 @@ impl MindState { last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); - Action::Turn(prompt, StreamTarget::Autonomous) + MindCommand::Turn(prompt, StreamTarget::Autonomous) } pub fn dmn_sleep(&mut self) { @@ -229,30 +240,69 @@ impl Mind { } /// Execute an Action from a MindState method. - fn execute(&mut self, action: Action) { - if let Action::Turn(input, target) = action { - if target == StreamTarget::Conversation { - let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); - } else { - let s = self.shared.lock().unwrap(); - let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!( - "[dmn: {} ({}/{})]", s.dmn.label(), s.dmn_turns, s.max_dmn_turns, - ))); + async fn run_commands(&mut self, cmds: Vec) { + for cmd in cmds { + match cmd { + MindCommand::None => {} + MindCommand::Compact => self.check_compaction(), + MindCommand::Score => { + let mut s = self.shared.lock().unwrap(); + if !s.scoring_in_flight { + s.scoring_in_flight = true; + drop(s); + self.start_memory_scoring(); + } + } + MindCommand::Hotkey(crate::user::HotkeyAction::Interrupt) => { + self.shared.lock().unwrap().interrupt(); + let ag = self.agent.lock().await; + let mut tools = ag.active_tools.lock().unwrap(); + for entry in tools.drain(..) { entry.handle.abort(); } + drop(tools); drop(ag); + if let Some(h) = self.turn_handle.take() { h.abort(); } + self.shared.lock().unwrap().turn_active = false; + let _ = self.turn_watch.send(false); + } + MindCommand::Hotkey(crate::user::HotkeyAction::CycleAutonomy) => { + self.shared.lock().unwrap().cycle_autonomy(); + } + MindCommand::Hotkey(_) => {} + MindCommand::NewSession => { + self.shared.lock().unwrap().dmn_sleep(); + let new_log = log::ConversationLog::new( + self.config.session_dir.join("conversation.jsonl"), + ).ok(); + let mut ag = self.agent.lock().await; + let shared_ctx = ag.shared_context.clone(); + let shared_tools = ag.active_tools.clone(); + *ag = Agent::new( + ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), + self.config.system_prompt.clone(), self.config.context_parts.clone(), + self.config.app.clone(), self.config.prompt_file.clone(), + new_log, shared_ctx, shared_tools, + ); + } + MindCommand::Turn(input, target) => { + if target == StreamTarget::Conversation { + let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); + } + self.shared.lock().unwrap().turn_active = true; + let _ = self.turn_watch.send(true); + let agent = self.agent.clone(); + let ui_tx = self.ui_tx.clone(); + let result_tx = self.turn_tx.clone(); + self.turn_handle = Some(tokio::spawn(async move { + let result = Agent::turn(agent, &input, &ui_tx, target).await; + let _ = result_tx.send((result, target)).await; + })); + } } - self.shared.lock().unwrap().turn_active = true; - let _ = self.turn_watch.send(true); - let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); - let result_tx = self.turn_tx.clone(); - self.turn_handle = Some(tokio::spawn(async move { - let result = Agent::turn(agent, &input, &ui_tx, target).await; - let _ = result_tx.send((result, target)).await; - })); } } pub fn start_memory_scoring(&self) { let agent = self.agent.clone(); + let shared = self.shared.clone(); let ui_tx = self.ui_tx.clone(); let cfg = crate::config::get(); let max_age = cfg.scoring_interval_secs; @@ -262,7 +312,6 @@ impl Mind { let mut ag = agent.lock().await; if ag.agent_cycles.memory_scoring_in_flight { return; } ag.agent_cycles.memory_scoring_in_flight = true; - let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); (ag.context.clone(), ag.client_clone()) }; let result = learn::score_memories_incremental( @@ -273,26 +322,21 @@ impl Mind { ag.agent_cycles.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } } - match result { - Ok(_) => { let ag = agent.lock().await; let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); } - Err(e) => { let _ = ui_tx.send(UiMessage::Debug(format!("[memory-scoring] failed: {:#}", e))); } - } + shared.lock().unwrap().scoring_in_flight = false; }); } fn check_compaction(&self) { let threshold = compaction_threshold(&self.config.app); let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); + let shared = self.shared.clone(); + shared.lock().unwrap().compaction_in_flight = true; tokio::spawn(async move { let mut ag = agent.lock().await; if ag.last_prompt_tokens() > threshold { - let _ = ui_tx.send(UiMessage::Info(format!( - "[compaction: {}K > {}K]", ag.last_prompt_tokens() / 1000, threshold / 1000, - ))); ag.compact(); - let _ = ui_tx.send(UiMessage::Info("[compacted]".into())); } + shared.lock().unwrap().compaction_in_flight = false; }); } @@ -304,61 +348,20 @@ impl Mind { /// Mind event loop — locks MindState, calls state methods, executes actions. pub async fn run( &mut self, - mut input_rx: tokio::sync::mpsc::UnboundedReceiver, + mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { - use crate::user::event_loop::MindMessage; - use crate::user::HotkeyAction; - loop { let timeout = self.shared.lock().unwrap().dmn_interval(); let turn_active = self.shared.lock().unwrap().turn_active; + let mut cmds = Vec::new(); + tokio::select! { biased; - Some(msg) = input_rx.recv() => { - match msg { - MindMessage::Hotkey(HotkeyAction::CycleAutonomy) => { - self.shared.lock().unwrap().cycle_autonomy(); - } - MindMessage::Hotkey(HotkeyAction::Interrupt) => { - self.shared.lock().unwrap().interrupt(); - let ag = self.agent.lock().await; - let mut tools = ag.active_tools.lock().unwrap(); - for entry in tools.drain(..) { entry.handle.abort(); } - drop(tools); drop(ag); - if let Some(h) = self.turn_handle.take() { h.abort(); } - self.shared.lock().unwrap().turn_active = false; - let _ = self.turn_watch.send(false); - } - MindMessage::NewSession => { - self.shared.lock().unwrap().dmn_sleep(); - let new_log = log::ConversationLog::new( - self.config.session_dir.join("conversation.jsonl"), - ).ok(); - let mut ag = self.agent.lock().await; - let shared_ctx = ag.shared_context.clone(); - let shared_tools = ag.active_tools.clone(); - *ag = Agent::new( - ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), - self.config.system_prompt.clone(), self.config.context_parts.clone(), - self.config.app.clone(), self.config.prompt_file.clone(), - new_log, shared_ctx, shared_tools, - ); - } - MindMessage::Score => { - let mut s = self.shared.lock().unwrap(); - if !s.scoring_in_flight { - s.scoring_in_flight = true; - drop(s); - self.start_memory_scoring(); - } - } - _ => {} - } - let action = self.shared.lock().unwrap().take_pending_input(); - self.execute(action); + Some(cmd) = input_rx.recv() => { + cmds.push(cmd); } Some((result, target)) = turn_rx.recv() => { @@ -370,22 +373,22 @@ impl Mind { crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; } - self.check_compaction(); - if !self.config.no_agents { self.start_memory_scoring(); } - - let action = self.shared.lock().unwrap().take_pending_input(); - self.execute(action); + cmds.push(MindCommand::Compact); + if !self.config.no_agents { + cmds.push(MindCommand::Score); + } } _ = tokio::time::sleep(timeout), if !turn_active => { - let action = self.shared.lock().unwrap().take_pending_input(); - self.execute(action); - if !self.shared.lock().unwrap().turn_active { - let action = self.shared.lock().unwrap().dmn_tick(); - self.execute(action); - } + let tick = self.shared.lock().unwrap().dmn_tick(); + cmds.push(tick); } } + + // Always check for pending input + cmds.push(self.shared.lock().unwrap().take_pending_input()); + + self.run_commands(cmds).await; } } } diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index 1615601..dae2c3a 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -17,12 +17,7 @@ use crate::config::SessionConfig; use crate::user::{self as tui, HotkeyAction}; use crate::user::ui_channel::{self, UiMessage}; -/// Messages from the UI to the Mind. -pub enum MindMessage { - Hotkey(HotkeyAction), - NewSession, - Score, -} +pub use crate::mind::MindCommand; fn send_help(ui_tx: &ui_channel::UiSender) { let commands = &[ @@ -189,7 +184,7 @@ pub async fn run( shared_mind: crate::mind::SharedMindState, turn_watch: tokio::sync::watch::Receiver, - mind_tx: tokio::sync::mpsc::UnboundedSender, + mind_tx: tokio::sync::mpsc::UnboundedSender, ui_tx: ui_channel::UiSender, mut ui_rx: ui_channel::UiReceiver, mut observe_input_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -272,6 +267,18 @@ pub async fn run( if cur.turn_active != prev_mind.turn_active { dirty = true; } + if cur.scoring_in_flight != prev_mind.scoring_in_flight { + if !cur.scoring_in_flight && prev_mind.scoring_in_flight { + let _ = ui_tx.send(UiMessage::Info("[scoring complete]".into())); + } + dirty = true; + } + if cur.compaction_in_flight != prev_mind.compaction_in_flight { + if !cur.compaction_in_flight && prev_mind.compaction_in_flight { + let _ = ui_tx.send(UiMessage::Info("[compacted]".into())); + } + dirty = true; + } prev_mind = cur; } @@ -319,7 +326,7 @@ pub async fn run( let _ = ui_tx.send(UiMessage::Info("(busy)".into())); } } - "/new" | "/clear" => { let _ = mind_tx.send(MindMessage::NewSession); } + "/new" | "/clear" => { let _ = mind_tx.send(MindCommand::NewSession); } "/dmn" => { let s = shared_mind.lock().unwrap(); let _ = ui_tx.send(UiMessage::Info(format!("DMN: {:?} ({}/{})", s.dmn, s.dmn_turns, s.max_dmn_turns))); @@ -336,7 +343,7 @@ pub async fn run( shared_mind.lock().unwrap().dmn_pause(); let _ = ui_tx.send(UiMessage::Info("DMN paused.".into())); } - "/score" => { let _ = mind_tx.send(MindMessage::Score); } + "/score" => { let _ = mind_tx.send(MindCommand::Score); } "/retry" => { let agent = agent.clone(); let sm = shared_mind.clone(); @@ -371,8 +378,8 @@ pub async fn run( match action { HotkeyAction::CycleReasoning => cmd_cycle_reasoning(&agent, &ui_tx), HotkeyAction::KillProcess => cmd_kill_processes(&agent, &ui_tx).await, - HotkeyAction::Interrupt => { let _ = mind_tx.send(MindMessage::Hotkey(action)); } - HotkeyAction::CycleAutonomy => { let _ = mind_tx.send(MindMessage::Hotkey(action)); } + HotkeyAction::Interrupt => { let _ = mind_tx.send(MindCommand::Hotkey(action)); } + HotkeyAction::CycleAutonomy => { let _ = mind_tx.send(MindCommand::Hotkey(action)); } HotkeyAction::AdjustSampling(param, delta) => cmd_adjust_sampling(&agent, param, delta), } }