diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 0db2507..cf9b301 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -36,155 +36,71 @@ fn compaction_threshold(app: &AppConfig) -> u32 { } -/// Shared state between Mind and UI. UI writes, Mind reads. +/// Shared state between Mind and UI. pub struct MindState { /// Pending user input — UI pushes, Mind consumes after turn completes. pub input: Vec, /// True while a turn is in progress. pub turn_active: bool, + /// DMN state + pub dmn: dmn::State, + pub dmn_turns: u32, + pub max_dmn_turns: u32, + /// Whether a full matrix /score task is currently running. + pub scoring_in_flight: bool, + /// Per-turn tracking + pub last_user_input: Instant, + pub consecutive_errors: u32, + pub last_turn_had_tools: bool, } pub type SharedMindState = Arc>; -pub fn shared_mind_state() -> SharedMindState { - Arc::new(std::sync::Mutex::new(MindState { - input: Vec::new(), - turn_active: false, - })) +/// What should happen after a state transition. +pub enum Action { + /// Start a turn with this input + Turn(String, StreamTarget), + /// Nothing to do + None, } -// --- Mind: cognitive state machine --- - -pub struct Mind { - pub agent: Arc>, - pub shared: SharedMindState, - config: SessionConfig, - ui_tx: ui_channel::UiSender, - turn_tx: mpsc::Sender<(Result, StreamTarget)>, - // DMN state - dmn: dmn::State, - dmn_turns: u32, - max_dmn_turns: u32, - - // Turn tracking - turn_in_progress: bool, - turn_handle: Option>, - /// Broadcast when turn_in_progress changes. Commands can wait - /// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`. - turn_watch: tokio::sync::watch::Sender, - - // Per-turn tracking for DMN context - last_user_input: Instant, - consecutive_errors: u32, - last_turn_had_tools: bool, - - // Subconscious orchestration - agent_cycles: crate::subconscious::subconscious::AgentCycleState, - /// Latest memory importance scores from full matrix scoring (manual /score). - memory_scores: Option, - /// Whether a full matrix /score task is currently running. - scoring_in_flight: bool, -} - -impl Mind { - pub fn new( - agent: Arc>, - shared: SharedMindState, - config: SessionConfig, - ui_tx: ui_channel::UiSender, - turn_tx: mpsc::Sender<(Result, StreamTarget)>, - ) -> Self { - let max_dmn_turns = config.app.dmn.max_turns; - let (turn_watch, _) = tokio::sync::watch::channel(false); - +impl MindState { + pub fn new(max_dmn_turns: u32) -> Self { Self { - agent, - shared, - config, - ui_tx, - turn_tx, - dmn: if dmn::is_off() { - dmn::State::Off - } else { - dmn::State::Resting { since: Instant::now() } - }, + input: Vec::new(), + turn_active: false, + dmn: if dmn::is_off() { dmn::State::Off } + else { dmn::State::Resting { since: Instant::now() } }, dmn_turns: 0, max_dmn_turns, - turn_in_progress: false, - turn_handle: None, - turn_watch, + scoring_in_flight: false, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, - agent_cycles: crate::subconscious::subconscious::AgentCycleState::new(""), - memory_scores: None, - scoring_in_flight: false, } } - /// Subscribe to turn state changes. Use `rx.wait_for(|&v| !v).await` - /// to wait until no turn is in progress. - pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { - self.turn_watch.subscribe() - } - - fn set_turn_active(&mut self, active: bool) { - self.turn_in_progress = active; - self.shared.lock().unwrap().turn_active = active; - let _ = self.turn_watch.send(active); - } - - /// How long before the next DMN tick. - fn dmn_interval(&self) -> Duration { + pub fn dmn_interval(&self) -> Duration { self.dmn.interval() } - /// Spawn an agent turn in a background task. - fn spawn_turn(&mut self, input: String, target: StreamTarget) { - let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); - let result_tx = self.turn_tx.clone(); - self.set_turn_active(true); - self.turn_handle = Some(tokio::spawn(async move { - let result = Agent::turn(agent, &input, &ui_tx, target).await; - let _ = result_tx.send((result, target)).await; - })); - } - - /// Submit user input — either queue it (if a turn is running) or - /// start a new turn immediately. - /// Check shared state for pending user input, start a turn if available. - fn check_pending_input(&mut self) { - if self.turn_in_progress { - return; + /// Consume pending input, return a Turn action if ready. + pub fn take_pending_input(&mut self) -> Action { + if self.turn_active || self.input.is_empty() { + return Action::None; } - let input = { - let mut shared = self.shared.lock().unwrap(); - if shared.input.is_empty() { - return; - } - shared.input.join("\n") - }; - self.shared.lock().unwrap().input.clear(); + let text = self.input.join("\n"); + self.input.clear(); self.dmn_turns = 0; self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = dmn::State::Engaged; - let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); - self.update_status(); - self.spawn_turn(input, StreamTarget::Conversation); + Action::Turn(text, StreamTarget::Conversation) } - /// Process a completed turn: update DMN state, check compaction, - /// drain any queued input. - async fn handle_turn_result( - &mut self, - result: Result, - target: StreamTarget, - ) { - self.set_turn_active(false); - self.turn_handle = None; - + /// Process turn completion, return model switch name if requested. + pub fn complete_turn(&mut self, result: &Result, target: StreamTarget) -> Option { + self.turn_active = false; match result { Ok(turn_result) => { if turn_result.tool_errors > 0 { @@ -202,131 +118,28 @@ impl Mind { if turn_result.dmn_pause { self.dmn = dmn::State::Paused; self.dmn_turns = 0; - let _ = self.ui_tx.send(UiMessage::Info( - "DMN paused (agent requested). Ctrl+P or /wake to resume.".into(), - )); - } - if let Some(model_name) = turn_result.model_switch { - crate::user::event_loop::cmd_switch_model( - &self.agent, &model_name, &self.ui_tx, - ).await; } + turn_result.model_switch.clone() } - Err(e) => { + Err(_) => { self.consecutive_errors += 1; - let msg = match target { - StreamTarget::Autonomous => { - UiMessage::DmnAnnotation(format!("[error: {:#}]", e)) - } - StreamTarget::Conversation => { - UiMessage::Info(format!("Error: {:#}", e)) - } - }; - let _ = self.ui_tx.send(msg); - self.dmn = dmn::State::Resting { - since: Instant::now(), - }; + self.dmn = dmn::State::Resting { since: Instant::now() }; + None } } - - self.update_status(); - self.check_compaction(); - if !self.config.no_agents { - self.start_memory_scoring(); - } - self.drain_pending(); } - /// Spawn incremental memory scoring if not already running. - /// Non-blocking — all async work happens in the spawned task. - fn start_memory_scoring(&self) { - let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); - let cfg = crate::config::get(); - let max_age = cfg.scoring_interval_secs; - let response_window = cfg.scoring_response_window; - tokio::spawn(async move { - let (context, client) = { - let mut agent = agent.lock().await; - if agent.agent_cycles.memory_scoring_in_flight { - return; - } - agent.agent_cycles.memory_scoring_in_flight = true; - let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); - (agent.context.clone(), agent.client_clone()) - }; - - let result = learn::score_memories_incremental( - &context, max_age as i64, response_window, &client, &ui_tx, - ).await; - - { - let mut agent = agent.lock().await; - agent.agent_cycles.memory_scoring_in_flight = false; - if let Ok(ref scores) = result { - agent.agent_cycles.memory_scores = scores.clone(); - } - } - match result { - Ok(_) => { - let agent = agent.lock().await; - let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); - } - Err(e) => { - let _ = ui_tx.send(UiMessage::Debug(format!( - "[memory-scoring] failed: {:#}", e, - ))); - } - } - }); - } - - /// Check if compaction is needed after a turn. - fn check_compaction(&self) { - let threshold = compaction_threshold(&self.config.app); - let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); - tokio::spawn(async move { - let mut agent_guard = agent.lock().await; - let tokens = agent_guard.last_prompt_tokens(); - if tokens > threshold { - let _ = ui_tx.send(UiMessage::Info(format!( - "[compaction: {}K > {}K threshold]", - tokens / 1000, - threshold / 1000, - ))); - agent_guard.compact(); - let _ = ui_tx.send(UiMessage::Info( - "[compacted — journal + recent messages]".into(), - )); - } - }); - } - - /// Send any consolidated pending input as a single turn. - fn drain_pending(&mut self) { - self.check_pending_input(); - } - - /// Fire a DMN tick: check max turns, generate prompt, spawn turn. - fn dmn_tick(&mut self) { + /// DMN tick — returns a Turn action with the DMN prompt, or None. + pub fn dmn_tick(&mut self) -> Action { if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) { - return; + return Action::None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { - let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!( - "[dmn: {} consecutive turns, resting (limit: {})]", - self.dmn_turns - 1, - self.max_dmn_turns, - ))); - self.dmn = dmn::State::Resting { - since: Instant::now(), - }; + self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; - self.update_status(); - return; + return Action::None; } let dmn_ctx = dmn::DmnContext { @@ -335,132 +148,28 @@ impl Mind { last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); - let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!( - "[dmn: {} ({}/{})]", - self.dmn.label(), - self.dmn_turns, - self.max_dmn_turns, - ))); - self.update_status(); - self.spawn_turn(prompt, StreamTarget::Autonomous); + Action::Turn(prompt, StreamTarget::Autonomous) } - async fn cmd_new(&mut self) { - let new_log = log::ConversationLog::new( - self.config.session_dir.join("conversation.jsonl"), - ).ok(); - let mut agent_guard = self.agent.lock().await; - let shared_ctx = agent_guard.shared_context.clone(); - let shared_tools = agent_guard.active_tools.clone(); - *agent_guard = Agent::new( - ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), - self.config.system_prompt.clone(), - self.config.context_parts.clone(), - self.config.app.clone(), - self.config.prompt_file.clone(), - new_log, - shared_ctx, - shared_tools, - ); - drop(agent_guard); - self.dmn = dmn::State::Resting { since: Instant::now() }; - let _ = self.ui_tx.send(UiMessage::Info("New session started.".into())); - } - - fn cmd_score(&mut self) { - if self.scoring_in_flight { - let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into())); - return; - } - let agent = self.agent.clone(); - let ui_tx = self.ui_tx.clone(); - self.scoring_in_flight = true; - tokio::spawn(async move { - let (context, client) = { - let ag = agent.lock().await; - (ag.context.clone(), ag.client_clone()) - }; - let result = learn::score_memories(&context, &client, &ui_tx).await; - let ag = agent.lock().await; - match result { - Ok(scores) => ag.publish_context_state_with_scores(Some(&scores)), - Err(e) => { let _ = ui_tx.send(UiMessage::Info(format!("[scoring failed: {:#}]", e))); } - } - }); - } - - fn cmd_dmn_query(&self) { - let _ = self.ui_tx.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn))); - let _ = self.ui_tx.send(UiMessage::Info(format!("Next tick in: {:?}", self.dmn.interval()))); - let _ = self.ui_tx.send(UiMessage::Info(format!( - "Consecutive DMN turns: {}/{}", self.dmn_turns, self.max_dmn_turns, - ))); - } - - fn cmd_dmn_sleep(&mut self) { + pub fn dmn_sleep(&mut self) { self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; - let _ = self.ui_tx.send(UiMessage::Info( - "DMN sleeping (heartbeat every 5 min). Type anything to wake.".into(), - )); } - fn cmd_dmn_wake(&mut self) { - let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off); + pub fn dmn_wake(&mut self) { if matches!(self.dmn, dmn::State::Off) { dmn::set_off(false); } self.dmn = dmn::State::Foraging; self.dmn_turns = 0; - let msg = if was_paused { "DMN unpaused — entering foraging mode." } - else { "DMN waking — entering foraging mode." }; - let _ = self.ui_tx.send(UiMessage::Info(msg.into())); - self.update_status(); } - fn cmd_dmn_pause(&mut self) { + pub fn dmn_pause(&mut self) { self.dmn = dmn::State::Paused; self.dmn_turns = 0; - let _ = self.ui_tx.send(UiMessage::Info( - "DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(), - )); - self.update_status(); } - /// Interrupt: kill processes, abort current turn, clear pending queue. - async fn interrupt(&mut self) { - let count = { - let agent = self.agent.lock().await; - let mut tools = agent.active_tools.lock().unwrap(); - let count = tools.len(); - for entry in tools.drain(..) { - entry.handle.abort(); - } - count - }; - if count == 0 { - if let Some(handle) = self.turn_handle.take() { - handle.abort(); - self.set_turn_active(false); - self.dmn = dmn::State::Resting { since: Instant::now() }; - self.update_status(); - let _ = self.ui_tx.send(UiMessage::Activity(String::new())); - } - } - self.shared.lock().unwrap().input.clear(); - let killed = count; - if killed > 0 || self.turn_in_progress { - let _ = self.ui_tx.send(UiMessage::Info(format!( - "(interrupted — killed {} process(es), turn aborted)", killed, - ))); - } else { - let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into())); - } - } - - /// Cycle reasoning effort: none → low → high → none. - /// Cycle DMN autonomy: foraging → resting → paused → off → foraging. - fn cycle_autonomy(&mut self) { + pub fn cycle_autonomy(&mut self) -> &'static str { let (new_state, label) = match &self.dmn { dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => { (dmn::State::Resting { since: Instant::now() }, "resting") @@ -477,17 +186,117 @@ impl Mind { }; self.dmn = new_state; self.dmn_turns = 0; - let _ = self.ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label))); - self.update_status(); + label } - /// Switch to a named model from the config registry. - fn load_context_groups(&self) -> Vec { - config::get().context_groups.clone() + pub fn interrupt(&mut self) { + self.input.clear(); + self.dmn = dmn::State::Resting { since: Instant::now() }; + } +} + +pub fn shared_mind_state(max_dmn_turns: u32) -> SharedMindState { + Arc::new(std::sync::Mutex::new(MindState::new(max_dmn_turns))) +} + +// --- Mind: cognitive state machine --- + +pub struct Mind { + pub agent: Arc>, + pub shared: SharedMindState, + pub config: SessionConfig, + ui_tx: ui_channel::UiSender, + turn_tx: mpsc::Sender<(Result, StreamTarget)>, + turn_handle: Option>, + turn_watch: tokio::sync::watch::Sender, +} + +impl Mind { + pub fn new( + agent: Arc>, + shared: SharedMindState, + config: SessionConfig, + ui_tx: ui_channel::UiSender, + turn_tx: mpsc::Sender<(Result, StreamTarget)>, + ) -> Self { + let (turn_watch, _) = tokio::sync::watch::channel(false); + Self { agent, shared, config, ui_tx, turn_tx, turn_handle: None, turn_watch } } - fn send_context_info(&self) { - let context_groups = self.load_context_groups(); + pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { + self.turn_watch.subscribe() + } + + /// Execute an Action from a MindState method. + fn execute(&mut self, action: Action) { + if let Action::Turn(input, target) = action { + if target == StreamTarget::Conversation { + let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); + } else { + let s = self.shared.lock().unwrap(); + let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!( + "[dmn: {} ({}/{})]", s.dmn.label(), s.dmn_turns, s.max_dmn_turns, + ))); + } + self.shared.lock().unwrap().turn_active = true; + let _ = self.turn_watch.send(true); + let agent = self.agent.clone(); + let ui_tx = self.ui_tx.clone(); + let result_tx = self.turn_tx.clone(); + self.turn_handle = Some(tokio::spawn(async move { + let result = Agent::turn(agent, &input, &ui_tx, target).await; + let _ = result_tx.send((result, target)).await; + })); + } + } + + pub fn start_memory_scoring(&self) { + let agent = self.agent.clone(); + let ui_tx = self.ui_tx.clone(); + let cfg = crate::config::get(); + let max_age = cfg.scoring_interval_secs; + let response_window = cfg.scoring_response_window; + tokio::spawn(async move { + let (context, client) = { + let mut ag = agent.lock().await; + if ag.agent_cycles.memory_scoring_in_flight { return; } + ag.agent_cycles.memory_scoring_in_flight = true; + let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); + (ag.context.clone(), ag.client_clone()) + }; + let result = learn::score_memories_incremental( + &context, max_age as i64, response_window, &client, &ui_tx, + ).await; + { + let mut ag = agent.lock().await; + ag.agent_cycles.memory_scoring_in_flight = false; + if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } + } + match result { + Ok(_) => { let ag = agent.lock().await; let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); } + Err(e) => { let _ = ui_tx.send(UiMessage::Debug(format!("[memory-scoring] failed: {:#}", e))); } + } + }); + } + + fn check_compaction(&self) { + let threshold = compaction_threshold(&self.config.app); + let agent = self.agent.clone(); + let ui_tx = self.ui_tx.clone(); + tokio::spawn(async move { + let mut ag = agent.lock().await; + if ag.last_prompt_tokens() > threshold { + let _ = ui_tx.send(UiMessage::Info(format!( + "[compaction: {}K > {}K]", ag.last_prompt_tokens() / 1000, threshold / 1000, + ))); + ag.compact(); + let _ = ui_tx.send(UiMessage::Info("[compacted]".into())); + } + }); + } + + pub fn send_context_info(&self) { + let context_groups = config::get().context_groups.clone(); let (instruction_files, memory_files) = identity::context_file_info( &self.config.prompt_file, self.config.app.memory_project.as_deref(), @@ -505,26 +314,23 @@ impl Mind { })); } - fn update_status(&self) { + pub fn update_status(&self) { + let s = self.shared.lock().unwrap(); let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo { - dmn_state: self.dmn.label().to_string(), - dmn_turns: self.dmn_turns, - dmn_max_turns: self.max_dmn_turns, - prompt_tokens: 0, - completion_tokens: 0, - model: String::new(), - turn_tools: 0, + dmn_state: s.dmn.label().to_string(), + dmn_turns: s.dmn_turns, + dmn_max_turns: s.max_dmn_turns, + prompt_tokens: 0, completion_tokens: 0, + model: String::new(), turn_tools: 0, context_budget: String::new(), })); } pub async fn shutdown(&mut self) { - if let Some(handle) = self.turn_handle.take() { - handle.abort(); - } + if let Some(handle) = self.turn_handle.take() { handle.abort(); } } - /// Mind event loop — reacts to user input, turn results, DMN ticks. + /// Mind event loop — locks MindState, calls state methods, executes actions. pub async fn run( &mut self, mut input_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -534,40 +340,122 @@ impl Mind { use crate::user::HotkeyAction; loop { - let timeout = self.dmn_interval(); + let timeout = self.shared.lock().unwrap().dmn_interval(); + let turn_active = self.shared.lock().unwrap().turn_active; tokio::select! { biased; Some(msg) = input_rx.recv() => { - match msg { - MindMessage::Hotkey(action) => { - match action { - HotkeyAction::Interrupt => self.interrupt().await, - HotkeyAction::CycleAutonomy => self.cycle_autonomy(), - _ => {} + { + let mut s = self.shared.lock().unwrap(); + match msg { + MindMessage::Hotkey(HotkeyAction::CycleAutonomy) => { + let label = s.cycle_autonomy(); + let _ = self.ui_tx.send(UiMessage::Info( + format!("DMN → {} (Ctrl+P to cycle)", label), + )); } + MindMessage::NewSession => { + s.dmn_sleep(); + let _ = self.ui_tx.send(UiMessage::Info("New session started.".into())); + } + MindMessage::Score => { + if !s.scoring_in_flight { + s.scoring_in_flight = true; + } else { + let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into())); + } + } + MindMessage::DmnQuery => { + let _ = self.ui_tx.send(UiMessage::Info(format!("DMN: {:?} ({}/{})", s.dmn, s.dmn_turns, s.max_dmn_turns))); + } + MindMessage::DmnSleep => { + s.dmn_sleep(); + let _ = self.ui_tx.send(UiMessage::Info("DMN sleeping.".into())); + } + MindMessage::DmnWake => { + s.dmn_wake(); + let _ = self.ui_tx.send(UiMessage::Info("DMN foraging.".into())); + } + MindMessage::DmnPause => { + s.dmn_pause(); + let _ = self.ui_tx.send(UiMessage::Info("DMN paused.".into())); + } + MindMessage::Hotkey(HotkeyAction::Interrupt) => { + s.interrupt(); + } + _ => {} } - MindMessage::NewSession => self.cmd_new().await, - MindMessage::Score => self.cmd_score(), - MindMessage::DmnQuery => self.cmd_dmn_query(), - MindMessage::DmnSleep => self.cmd_dmn_sleep(), - MindMessage::DmnWake => self.cmd_dmn_wake(), - MindMessage::DmnPause => self.cmd_dmn_pause(), } - self.check_pending_input(); + // Handle interrupt — kill processes and abort turn + if matches!(msg, MindMessage::Hotkey(HotkeyAction::Interrupt)) { + let ag = self.agent.lock().await; + let mut tools = ag.active_tools.lock().unwrap(); + for entry in tools.drain(..) { entry.handle.abort(); } + drop(tools); drop(ag); + if let Some(h) = self.turn_handle.take() { h.abort(); } + self.shared.lock().unwrap().turn_active = false; + let _ = self.turn_watch.send(false); + let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into())); + } + // Handle /new — reset agent + if matches!(msg, MindMessage::NewSession) { + let new_log = log::ConversationLog::new( + self.config.session_dir.join("conversation.jsonl"), + ).ok(); + let mut ag = self.agent.lock().await; + let shared_ctx = ag.shared_context.clone(); + let shared_tools = ag.active_tools.clone(); + *ag = Agent::new( + ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), + self.config.system_prompt.clone(), self.config.context_parts.clone(), + self.config.app.clone(), self.config.prompt_file.clone(), + new_log, shared_ctx, shared_tools, + ); + } + // Handle /score — spawn scoring + if matches!(msg, MindMessage::Score) && self.shared.lock().unwrap().scoring_in_flight { + self.start_memory_scoring(); + } + // Check for pending input + let action = self.shared.lock().unwrap().take_pending_input(); + self.execute(action); + self.update_status(); } Some((result, target)) = turn_rx.recv() => { - self.handle_turn_result(result, target).await; - self.check_pending_input(); + self.turn_handle = None; + let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); + let _ = self.turn_watch.send(false); + + if let Err(ref e) = result { + let msg = match target { + StreamTarget::Autonomous => UiMessage::DmnAnnotation(format!("[error: {:#}]", e)), + StreamTarget::Conversation => UiMessage::Info(format!("Error: {:#}", e)), + }; + let _ = self.ui_tx.send(msg); + } + if let Some(name) = model_switch { + crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; + } + + self.check_compaction(); + if !self.config.no_agents { self.start_memory_scoring(); } + self.update_status(); + + let action = self.shared.lock().unwrap().take_pending_input(); + self.execute(action); } - _ = tokio::time::sleep(timeout), if !self.turn_in_progress => { - self.check_pending_input(); - if !self.turn_in_progress { - self.dmn_tick(); + _ = tokio::time::sleep(timeout), if !turn_active => { + let action = self.shared.lock().unwrap().take_pending_input(); + self.execute(action); + if !self.shared.lock().unwrap().turn_active { + let action = self.shared.lock().unwrap().dmn_tick(); + self.execute(action); } + self.update_status(); } } } @@ -664,7 +552,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let (turn_tx, turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); let no_agents = config.no_agents; - let shared_mind = shared_mind_state(); + let shared_mind = shared_mind_state(config.app.dmn.max_turns); let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx); mind.update_status(); if !no_agents {