mind: remove Arc from MindState

MindState is now std::sync::Mutex<MindState> owned by Mind, not
Arc-wrapped. Background scoring completion signals through a
BgEvent channel instead of locking shared directly. Retry sends
a Turn command instead of pushing to shared input.

No Arc on Mind (scoped tasks), no Arc on MindState (owned by Mind).
Only Arc<Mutex<Agent>> remains — needed for background turn spawns.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 05:01:45 -04:00
parent 5eaba3c951
commit 7dc515b985
2 changed files with 28 additions and 16 deletions

View file

@ -33,7 +33,6 @@ fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
} }
/// Shared state between Mind and UI. /// Shared state between Mind and UI.
pub struct MindState { pub struct MindState {
/// Pending user input — UI pushes, Mind consumes after turn completes. /// Pending user input — UI pushes, Mind consumes after turn completes.
@ -74,8 +73,6 @@ impl Clone for MindState {
} }
} }
pub type SharedMindState = Arc<std::sync::Mutex<MindState>>;
/// What should happen after a state transition. /// What should happen after a state transition.
pub enum MindCommand { pub enum MindCommand {
/// Start a turn with this input /// Start a turn with this input
@ -183,12 +180,15 @@ impl MindState {
} }
} }
pub fn shared_mind_state(max_dmn_turns: u32) -> SharedMindState { /// Background task completion events.
Arc::new(std::sync::Mutex::new(MindState::new(max_dmn_turns))) enum BgEvent {
ScoringDone,
} }
// --- Mind: cognitive state machine --- // --- Mind: cognitive state machine ---
pub type SharedMindState = std::sync::Mutex<MindState>;
pub struct Mind { pub struct Mind {
pub agent: Arc<Mutex<Agent>>, pub agent: Arc<Mutex<Agent>>,
pub shared: SharedMindState, pub shared: SharedMindState,
@ -196,12 +196,11 @@ pub struct Mind {
ui_tx: ui_channel::UiSender, ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>, turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
turn_watch: tokio::sync::watch::Sender<bool>, turn_watch: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
_supervisor: crate::thalamus::supervisor::Supervisor, _supervisor: crate::thalamus::supervisor::Supervisor,
} }
fn _assert_send_sync<T: Send + Sync>() {}
const _: fn() = _assert_send_sync::<Mind>;
impl Mind { impl Mind {
pub fn new( pub fn new(
config: SessionConfig, config: SessionConfig,
@ -227,14 +226,16 @@ impl Mind {
shared_active_tools, shared_active_tools,
))); )));
let shared = shared_mind_state(config.app.dmn.max_turns); let shared = std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns));
let (turn_watch, _) = tokio::sync::watch::channel(false); let (turn_watch, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let mut sup = crate::thalamus::supervisor::Supervisor::new(); let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config(); sup.load_config();
sup.ensure_running(); sup.ensure_running();
Self { agent, shared, config, ui_tx, turn_tx, turn_watch, _supervisor: sup } Self { agent, shared, config, ui_tx, turn_tx, turn_watch, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
} }
/// Initialize — restore log, start daemons and background agents. /// Initialize — restore log, start daemons and background agents.
@ -323,7 +324,7 @@ impl Mind {
pub fn start_memory_scoring(&self) { pub fn start_memory_scoring(&self) {
let agent = self.agent.clone(); let agent = self.agent.clone();
let shared = self.shared.clone(); let bg_tx = self.bg_tx.clone();
let ui_tx = self.ui_tx.clone(); let ui_tx = self.ui_tx.clone();
let cfg = crate::config::get(); let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs; let max_age = cfg.scoring_interval_secs;
@ -343,7 +344,7 @@ impl Mind {
ag.agent_cycles.memory_scoring_in_flight = false; ag.agent_cycles.memory_scoring_in_flight = false;
if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); }
} }
shared.lock().unwrap().scoring_in_flight = false; let _ = bg_tx.send(BgEvent::ScoringDone);
}); });
} }
@ -357,6 +358,8 @@ impl Mind {
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>, mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>,
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>, mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
) { ) {
let mut bg_rx = self.bg_rx.lock().unwrap().take()
.expect("Mind::run() called twice");
loop { loop {
let timeout = self.shared.lock().unwrap().dmn.interval(); let timeout = self.shared.lock().unwrap().dmn.interval();
let turn_active = self.shared.lock().unwrap().turn_active; let turn_active = self.shared.lock().unwrap().turn_active;
@ -370,6 +373,14 @@ impl Mind {
cmds.push(cmd); cmds.push(cmd);
} }
Some(bg) = bg_rx.recv() => {
match bg {
BgEvent::ScoringDone => {
self.shared.lock().unwrap().scoring_in_flight = false;
}
}
}
Some((result, target)) = turn_rx.recv() => { Some((result, target)) = turn_rx.recv() => {
self.shared.lock().unwrap().turn_handle = None; self.shared.lock().unwrap().turn_handle = None;
let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); let model_switch = self.shared.lock().unwrap().complete_turn(&result, target);

View file

@ -86,7 +86,7 @@ fn send_help(ui_tx: &ui_channel::UiSender) {
async fn cmd_retry( async fn cmd_retry(
agent: &Arc<Mutex<Agent>>, agent: &Arc<Mutex<Agent>>,
shared_mind: &crate::mind::SharedMindState, mind_tx: &tokio::sync::mpsc::UnboundedSender<MindCommand>,
ui_tx: &ui_channel::UiSender, ui_tx: &ui_channel::UiSender,
) { ) {
let mut agent_guard = agent.lock().await; let mut agent_guard = agent.lock().await;
@ -104,7 +104,8 @@ async fn cmd_retry(
Some(text) => { Some(text) => {
let preview_len = text.len().min(60); let preview_len = text.len().min(60);
let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
shared_mind.lock().unwrap().input.push(text); // Send as a Turn command — Mind will process it
let _ = mind_tx.send(MindCommand::Turn(text, crate::user::ui_channel::StreamTarget::Conversation));
} }
None => { None => {
let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
@ -411,12 +412,12 @@ pub async fn run(
"/score" => { let _ = mind_tx.send(MindCommand::Score); } "/score" => { let _ = mind_tx.send(MindCommand::Score); }
"/retry" => { "/retry" => {
let agent = agent.clone(); let agent = agent.clone();
let sm = shared_mind.clone(); let mind_tx = mind_tx.clone();
let ui_tx = ui_tx.clone(); let ui_tx = ui_tx.clone();
let mut tw = turn_watch.clone(); let mut tw = turn_watch.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = tw.wait_for(|&active| !active).await; let _ = tw.wait_for(|&active| !active).await;
cmd_retry(&agent, &sm, &ui_tx).await; cmd_retry(&agent, &mind_tx, &ui_tx).await;
}); });
} }
cmd if cmd.starts_with("/model ") => { cmd if cmd.starts_with("/model ") => {