From 48beb8b66326c8dcfa059f0c9f5f133f454584ce Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sun, 5 Apr 2026 21:13:48 -0400 Subject: [PATCH] Revert to tokio::sync::Mutex, fix lock-across-await bugs, move input ownership to InteractScreen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The std::sync::Mutex detour caught every place a MutexGuard lived across an await point in Agent::turn — the compiler enforced Send safety that tokio::sync::Mutex silently allows. With those fixed, switch back to tokio::sync::Mutex (std::sync blocks tokio worker threads and panics inside the runtime). Input and command dispatch now live in InteractScreen (chat.rs): - Enter pushes directly to SharedMindState.input (no app.submitted hop) - sync_from_agent displays pending input with dimmed color - Slash command table moved from event_loop.rs to chat.rs - cmd_switch_model kept as pub fn for tool-initiated switches Co-Authored-By: Proof of Concept --- src/agent/api/mod.rs | 2 +- src/agent/mod.rs | 204 ++++++++++---------- src/agent/tools/control.rs | 6 +- src/agent/tools/mod.rs | 6 +- src/agent/tools/working_stack.rs | 2 +- src/mind/mod.rs | 26 ++- src/subconscious/digest.rs | 10 +- src/user/chat.rs | 309 ++++++++++++++++++++++++++----- src/user/event_loop.rs | 209 ++------------------- 9 files changed, 404 insertions(+), 370 deletions(-) diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 66a8811..bde1704 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -621,7 +621,7 @@ pub async fn collect_stream( rx: &mut mpsc::UnboundedReceiver, ui_tx: &UiSender, target: StreamTarget, - agent: &std::sync::Arc>, + agent: &std::sync::Arc>, active_tools: &crate::user::ui_channel::SharedActiveTools, ) -> StreamResult { let mut content = String::new(); diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 7f0bdd0..0e56e70 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -232,61 +232,71 @@ impl Agent { /// Takes Arc> and manages locking internally so the /// lock is never held across I/O (API streaming, tool dispatch). pub async fn turn( - agent: Arc>, + agent: Arc>, user_input: &str, ui_tx: &UiSender, target: StreamTarget, ) -> Result { // --- Pre-loop setup (lock 1): agent cycle, memories, user input --- let active_tools = { - let mut me = agent.lock().unwrap(); + let mut finished = Vec::new(); + let tools = { + let mut me = agent.lock().await; - let cycle = me.run_agent_cycle(); - for key in &cycle.surfaced_keys { - if let Some(rendered) = crate::cli::node::render_node( - &crate::store::Store::load().unwrap_or_default(), key, - ) { - let mut msg = Message::user(format!( - "\n--- {} (surfaced) ---\n{}\n", - key, rendered, - )); - msg.stamp(); - me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); + let cycle = me.run_agent_cycle(); + for key in &cycle.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node( + &crate::store::Store::load().unwrap_or_default(), key, + ) { + let mut msg = Message::user(format!( + "\n--- {} (surfaced) ---\n{}\n", + key, rendered, + )); + msg.stamp(); + me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); + } + } + if let Some(ref reflection) = cycle.reflection { + me.push_message(Message::user(format!( + "\n--- subconscious reflection ---\n{}\n", + reflection.trim(), + ))); } - } - if let Some(ref reflection) = cycle.reflection { - me.push_message(Message::user(format!( - "\n--- subconscious reflection ---\n{}\n", - reflection.trim(), - ))); - } - // Collect completed background tool calls - let mut bg_ds = DispatchState::new(); - let finished: Vec<_> = { + // Collect completed background tool handles — remove from active list + // but don't await yet (MutexGuard isn't Send). let mut tools = me.active_tools.lock().unwrap(); - let mut done = Vec::new(); let mut i = 0; while i < tools.len() { if tools[i].handle.is_finished() { - done.push(tools.remove(i)); + finished.push(tools.remove(i)); } else { i += 1; } } - done + + me.active_tools.clone() }; + + // Await finished handles without holding the agent lock + let mut bg_results = Vec::new(); for entry in finished { if let Ok((call, output)) = entry.handle.await { - me.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + bg_results.push((call, output)); } } - me.push_message(Message::user(user_input)); - let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots())); + // Re-acquire to apply results and push user input + { + let mut me = agent.lock().await; + let mut bg_ds = DispatchState::new(); + for (call, output) in bg_results { + me.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + } + me.push_message(Message::user(user_input)); + let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots())); + } - let tools = me.active_tools.clone(); - drop(me); tools }; @@ -299,7 +309,7 @@ impl Agent { // --- Lock 2: assemble messages, start stream --- let (mut rx, _stream_guard) = { - let me = agent.lock().unwrap(); + let me = agent.lock().await; let api_messages = me.assemble_api_messages(); let sampling = api::SamplingParams { temperature: me.temperature, @@ -328,8 +338,8 @@ impl Agent { // --- Stream complete --- // --- Lock 3: process results --- - { - let mut me = agent.lock().unwrap(); + let (msg, pending) = { + let mut me = agent.lock().await; // Handle stream errors with retry logic if let Some(e) = stream_error { @@ -409,81 +419,79 @@ impl Agent { } // Collect non-background tool calls fired during streaming - let pending: Vec<_> = { - let mut tools_guard = active_tools.lock().unwrap(); - let mut non_bg = Vec::new(); - let mut i = 0; - while i < tools_guard.len() { - if !tools_guard[i].background { - non_bg.push(tools_guard.remove(i)); - } else { - i += 1; - } + let mut tools_guard = active_tools.lock().unwrap(); + let mut non_bg = Vec::new(); + let mut i = 0; + while i < tools_guard.len() { + if !tools_guard[i].background { + non_bg.push(tools_guard.remove(i)); + } else { + i += 1; } - non_bg - }; + } + (msg, non_bg) + }; - if !pending.is_empty() { - me.push_message(msg.clone()); - // Drop lock before awaiting tool handles - drop(me); - let mut results = Vec::new(); - for entry in pending { - if let Ok(r) = entry.handle.await { - results.push(r); - } + if !pending.is_empty() { + agent.lock().await.push_message(msg.clone()); + + // Drop lock before awaiting tool handles + let mut results = Vec::new(); + for entry in pending { + if let Ok(r) = entry.handle.await { + results.push(r); } - // Reacquire to apply results - let mut me = agent.lock().unwrap(); - for (call, output) in results { - me.apply_tool_result(&call, output, ui_tx, &mut ds); + } + // Reacquire to apply results + let mut me = agent.lock().await; + for (call, output) in results { + me.apply_tool_result(&call, output, ui_tx, &mut ds); + } + me.publish_context_state(); + continue; + } + + // Tool calls (structured API path) + if let Some(ref tool_calls) = msg.tool_calls { + if !tool_calls.is_empty() { + agent.lock().await.push_message(msg.clone()); + let calls: Vec = tool_calls.clone(); + // Drop lock before tool dispatch + for call in &calls { + Agent::dispatch_tool_call_unlocked( + &agent, &active_tools, call, ui_tx, &mut ds, + ).await; } - me.publish_context_state(); continue; } - - // Tool calls (structured API path) - if let Some(ref tool_calls) = msg.tool_calls { - if !tool_calls.is_empty() { - me.push_message(msg.clone()); - let calls: Vec = tool_calls.clone(); - // Drop lock before tool dispatch - drop(me); - for call in &calls { - Agent::dispatch_tool_call_unlocked( - &agent, &active_tools, call, ui_tx, &mut ds, - ).await; - } - continue; - } - } - - // Genuinely text-only response - let text = msg.content_text().to_string(); - let _ = ui_tx.send(UiMessage::Activity(String::new())); - me.push_message(msg); - - // Drain pending control flags - if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } - if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } - if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; } - - return Ok(TurnResult { - text, - yield_requested: ds.yield_requested, - had_tool_calls: ds.had_tool_calls, - tool_errors: ds.tool_errors, - model_switch: ds.model_switch, - dmn_pause: ds.dmn_pause, - }); } + + // Genuinely text-only response + let text = msg.content_text().to_string(); + let _ = ui_tx.send(UiMessage::Activity(String::new())); + let mut me = agent.lock().await; + me.push_message(msg); + + // Drain pending control flags + if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } + if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } + if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; } + + return Ok(TurnResult { + text, + yield_requested: ds.yield_requested, + had_tool_calls: ds.had_tool_calls, + tool_errors: ds.tool_errors, + model_switch: ds.model_switch, + dmn_pause: ds.dmn_pause, + }); } } /// Dispatch a tool call without holding the agent lock across I/O. /// Used by `turn()` which manages its own locking. async fn dispatch_tool_call_unlocked( - agent: &Arc>, + agent: &Arc>, active_tools: &crate::user::ui_channel::SharedActiveTools, call: &ToolCall, ui_tx: &UiSender, @@ -494,7 +502,7 @@ impl Agent { Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); let _ = ui_tx.send(UiMessage::Activity(format!("rejected: {} (bad args)", call.function.name))); - let mut me = agent.lock().unwrap(); + let mut me = agent.lock().await; me.apply_tool_result(call, err, ui_tx, ds); return; } @@ -532,7 +540,7 @@ impl Agent { }; if let Ok((call, output)) = entry.handle.await { // Brief lock to apply result - let mut me = agent.lock().unwrap(); + let mut me = agent.lock().await; me.apply_tool_result(&call, output, ui_tx, ds); } } diff --git a/src/agent/tools/control.rs b/src/agent/tools/control.rs index 843e981..daa5ef5 100644 --- a/src/agent/tools/control.rs +++ b/src/agent/tools/control.rs @@ -14,7 +14,7 @@ pub(super) fn tools() -> [super::Tool; 3] { .ok_or_else(|| anyhow::anyhow!("'model' parameter is required"))?; if model.is_empty() { anyhow::bail!("'model' parameter cannot be empty"); } if let Some(agent) = agent { - let mut a = agent.lock().unwrap(); + let mut a = agent.lock().await; a.pending_model_switch = Some(model.to_string()); } Ok(format!("Switching to model '{}' after this turn.", model)) @@ -24,7 +24,7 @@ pub(super) fn tools() -> [super::Tool; 3] { parameters_json: r#"{"type":"object","properties":{}}"#, handler: |agent, _v| Box::pin(async move { if let Some(agent) = agent { - let mut a = agent.lock().unwrap(); + let mut a = agent.lock().await; a.pending_yield = true; a.pending_dmn_pause = true; } @@ -36,7 +36,7 @@ pub(super) fn tools() -> [super::Tool; 3] { handler: |agent, v| Box::pin(async move { let msg = v.get("message").and_then(|v| v.as_str()).unwrap_or("Waiting for input."); if let Some(agent) = agent { - let mut a = agent.lock().unwrap(); + let mut a = agent.lock().await; a.pending_yield = true; } Ok(format!("Yielding. {}", msg)) diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 8e42353..95ef89f 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -29,7 +29,7 @@ fn default_timeout() -> u64 { 120 } /// Async tool handler function. /// Agent is None when called from contexts without an agent (MCP server, subconscious). pub type ToolHandler = fn( - Option>>, + Option>>, serde_json::Value, ) -> Pin> + Send>>; @@ -94,11 +94,11 @@ pub async fn dispatch( pub async fn dispatch_with_agent( name: &str, args: &serde_json::Value, - agent: Option>>, + agent: Option>>, ) -> String { // Look up in agent's tools if available, otherwise global let tool = if let Some(ref a) = agent { - let guard = a.lock().unwrap(); + let guard = a.lock().await; guard.tools.iter().find(|t| t.name == name).copied() } else { None diff --git a/src/agent/tools/working_stack.rs b/src/agent/tools/working_stack.rs index 6cc3970..696e170 100644 --- a/src/agent/tools/working_stack.rs +++ b/src/agent/tools/working_stack.rs @@ -20,7 +20,7 @@ pub fn tool() -> super::Tool { parameters_json: r#"{"type":"object","properties":{"action":{"type":"string","enum":["push","pop","update","switch"],"description":"Stack operation"},"content":{"type":"string","description":"Task description (for push/update)"},"index":{"type":"integer","description":"Stack index (for switch, 0=bottom)"}},"required":["action"]}"#, handler: |agent, v| Box::pin(async move { if let Some(agent) = agent { - let mut a = agent.lock().unwrap(); + let mut a = agent.lock().await; Ok(handle(&v, &mut a.context.working_stack)) } else { anyhow::bail!("working_stack requires agent context") diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 881e989..8543b68 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -21,8 +21,6 @@ use anyhow::Result; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; -use std::sync::Mutex; - use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; use crate::config::{AppConfig, SessionConfig}; @@ -191,8 +189,8 @@ enum BgEvent { pub type SharedMindState = std::sync::Mutex; pub struct Mind { - pub agent: Arc>, - pub shared: SharedMindState, + pub agent: Arc>, + pub shared: Arc, pub config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, @@ -216,7 +214,7 @@ impl Mind { config.session_dir.join("conversation.jsonl"), ).ok(); - let agent = Arc::new(Mutex::new(Agent::new( + let agent = Arc::new(tokio::sync::Mutex::new(Agent::new( client, config.system_prompt.clone(), config.context_parts.clone(), @@ -227,7 +225,7 @@ impl Mind { shared_active_tools, ))); - let shared = std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)); + let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); let (turn_watch, _) = tokio::sync::watch::channel(false); let (bg_tx, bg_rx) = mpsc::unbounded_channel(); @@ -242,7 +240,7 @@ impl Mind { /// Initialize — restore log, start daemons and background agents. pub async fn init(&self) { // Restore conversation - let mut ag = self.agent.lock().unwrap(); + let mut ag = self.agent.lock().await; ag.restore_from_log(); drop(ag); } @@ -258,7 +256,7 @@ impl Mind { MindCommand::None => {} MindCommand::Compact => { let threshold = compaction_threshold(&self.config.app); - let mut ag = self.agent.lock().unwrap(); + let mut ag = self.agent.lock().await; if ag.last_prompt_tokens() > threshold { ag.compact(); } @@ -273,7 +271,7 @@ impl Mind { } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); - let ag = self.agent.lock().unwrap(); + let ag = self.agent.lock().await; let mut tools = ag.active_tools.lock().unwrap(); for entry in tools.drain(..) { entry.handle.abort(); } drop(tools); drop(ag); @@ -290,7 +288,7 @@ impl Mind { let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), ).ok(); - let mut ag = self.agent.lock().unwrap(); + let mut ag = self.agent.lock().await; let shared_ctx = ag.shared_context.clone(); let shared_tools = ag.active_tools.clone(); *ag = Agent::new( @@ -324,7 +322,7 @@ impl Mind { let response_window = cfg.scoring_response_window; tokio::spawn(async move { let (context, client) = { - let mut ag = agent.lock().unwrap(); + let mut ag = agent.lock().await; if ag.agent_cycles.memory_scoring_in_flight { return; } ag.agent_cycles.memory_scoring_in_flight = true; (ag.context.clone(), ag.client_clone()) @@ -333,7 +331,7 @@ impl Mind { &context, max_age as i64, response_window, &client, &ui_tx, ).await; { - let mut ag = agent.lock().unwrap(); + let mut ag = agent.lock().await; ag.agent_cycles.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } } @@ -383,12 +381,12 @@ impl Mind { let _ = self.turn_watch.send(false); if let Some(name) = model_switch { - crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; + crate::user::chat::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; } // Post-turn maintenance { - let mut ag = self.agent.lock().unwrap(); + let mut ag = self.agent.lock().await; ag.age_out_images(); ag.publish_context_state(); } diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index 0f9c43a..db1553b 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -602,7 +602,7 @@ fn str_err(r: Result) -> anyhow::Result { /// digest_daily tool handler: generate a daily digest async fn handle_digest_daily( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let date = str_err(get_str_required(&args, "date"))?; @@ -613,7 +613,7 @@ async fn handle_digest_daily( /// digest_weekly tool handler: generate a weekly digest async fn handle_digest_weekly( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let week_label = str_err(get_str_required(&args, "week"))?; @@ -624,7 +624,7 @@ async fn handle_digest_weekly( /// digest_monthly tool handler: generate a monthly digest async fn handle_digest_monthly( - _agent: Option>>, + _agent: Option>>, args: serde_json::Value, ) -> anyhow::Result { let month = str_err(get_str_required(&args, "month"))?; @@ -635,7 +635,7 @@ async fn handle_digest_monthly( /// digest_auto tool handler: auto-generate all missing digests async fn handle_digest_auto( - _agent: Option>>, + _agent: Option>>, _args: serde_json::Value, ) -> anyhow::Result { let mut store = str_err(Store::load())?; @@ -645,7 +645,7 @@ async fn handle_digest_auto( /// digest_links tool handler: parse and apply digest links async fn handle_digest_links( - _agent: Option>>, + _agent: Option>>, _args: serde_json::Value, ) -> anyhow::Result { let mut store = str_err(Store::load())?; diff --git a/src/user/chat.rs b/src/user/chat.rs index 5eabb7c..83424c2 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -17,6 +17,166 @@ use super::{ screen_legend, }; use crate::user::ui_channel::{UiMessage, StreamTarget}; +use crate::mind::MindCommand; + +// --- Slash command table --- + +type CmdHandler = fn(&InteractScreen, &str); + +struct SlashCommand { + name: &'static str, + help: &'static str, + handler: CmdHandler, +} + +fn commands() -> Vec { vec![ + SlashCommand { name: "/quit", help: "Exit consciousness", + handler: |_, _| {} }, + SlashCommand { name: "/new", help: "Start fresh session (saves current)", + handler: |s, _| { let _ = s.mind_tx.send(MindCommand::NewSession); } }, + SlashCommand { name: "/save", help: "Save session to disk", + handler: |s, _| { let _ = s.ui_tx.send(UiMessage::Info("Conversation is saved automatically.".into())); } }, + SlashCommand { name: "/retry", help: "Re-run last turn", + handler: |s, _| { + let agent = s.agent.clone(); + let mind_tx = s.mind_tx.clone(); + let ui_tx = s.ui_tx.clone(); + tokio::spawn(async move { + let mut ag = agent.lock().await; + let entries = ag.entries_mut(); + let mut last_user_text = None; + while let Some(entry) = entries.last() { + if entry.message().role == crate::agent::api::types::Role::User { + last_user_text = Some(entries.pop().unwrap().message().content_text().to_string()); + break; + } + entries.pop(); + } + drop(ag); + match last_user_text { + Some(text) => { + let preview = &text[..text.len().min(60)]; + let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", preview))); + let _ = mind_tx.send(MindCommand::Turn(text, StreamTarget::Conversation)); + } + None => { + let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); + } + } + }); + } }, + SlashCommand { name: "/model", help: "Show/switch model (/model )", + handler: |s, arg| { + if arg.is_empty() { + if let Ok(ag) = s.agent.try_lock() { + let _ = s.ui_tx.send(UiMessage::Info(format!("Current model: {}", ag.model()))); + let names = ag.app_config.model_names(); + if !names.is_empty() { + let _ = s.ui_tx.send(UiMessage::Info(format!("Available: {}", names.join(", ")))); + } + } else { + let _ = s.ui_tx.send(UiMessage::Info("(busy)".into())); + } + } else { + let agent = s.agent.clone(); + let ui_tx = s.ui_tx.clone(); + let name = arg.to_string(); + tokio::spawn(async move { + cmd_switch_model(&agent, &name, &ui_tx).await; + }); + } + } }, + SlashCommand { name: "/score", help: "Score memory importance", + handler: |s, _| { let _ = s.mind_tx.send(MindCommand::Score); } }, + SlashCommand { name: "/dmn", help: "Show DMN state", + handler: |s, _| { + let st = s.shared_mind.lock().unwrap(); + let _ = s.ui_tx.send(UiMessage::Info(format!( + "DMN: {:?} ({}/{})", st.dmn, st.dmn_turns, st.max_dmn_turns, + ))); + } }, + SlashCommand { name: "/sleep", help: "Put DMN to sleep", + handler: |s, _| { + let mut st = s.shared_mind.lock().unwrap(); + st.dmn = crate::mind::dmn::State::Resting { since: std::time::Instant::now() }; + st.dmn_turns = 0; + let _ = s.ui_tx.send(UiMessage::Info("DMN sleeping.".into())); + } }, + SlashCommand { name: "/wake", help: "Wake DMN to foraging", + handler: |s, _| { + let mut st = s.shared_mind.lock().unwrap(); + if matches!(st.dmn, crate::mind::dmn::State::Off) { crate::mind::dmn::set_off(false); } + st.dmn = crate::mind::dmn::State::Foraging; + st.dmn_turns = 0; + let _ = s.ui_tx.send(UiMessage::Info("DMN foraging.".into())); + } }, + SlashCommand { name: "/pause", help: "Full stop — no autonomous ticks (Ctrl+P)", + handler: |s, _| { + let mut st = s.shared_mind.lock().unwrap(); + st.dmn = crate::mind::dmn::State::Paused; + st.dmn_turns = 0; + let _ = s.ui_tx.send(UiMessage::Info("DMN paused.".into())); + } }, + SlashCommand { name: "/help", help: "Show this help", + handler: |s, _| { send_help(&s.ui_tx); } }, +]} + +fn dispatch_command(input: &str) -> Option { + let cmd_name = input.split_whitespace().next()?; + commands().into_iter().find(|c| c.name == cmd_name) +} + +/// Switch model — used by both /model command and tool-initiated switches. +pub async fn cmd_switch_model( + agent: &std::sync::Arc>, + name: &str, + ui_tx: &crate::user::ui_channel::UiSender, +) { + let resolved = { + let ag = agent.lock().await; + match ag.app_config.resolve_model(name) { + Ok(r) => r, + Err(e) => { + let _ = ui_tx.send(UiMessage::Info(format!("{}", e))); + return; + } + } + }; + let new_client = crate::agent::api::ApiClient::new( + &resolved.api_base, &resolved.api_key, &resolved.model_id, + ); + let prompt_changed = { + let ag = agent.lock().await; + resolved.prompt_file != ag.prompt_file + }; + let mut ag = agent.lock().await; + ag.swap_client(new_client); + if prompt_changed { + ag.prompt_file = resolved.prompt_file.clone(); + ag.compact(); + let _ = ui_tx.send(UiMessage::Info(format!( + "Switched to {} ({}) — prompt: {}, recompacted", + name, resolved.model_id, resolved.prompt_file, + ))); + } else { + let _ = ui_tx.send(UiMessage::Info(format!( + "Switched to {} ({})", name, resolved.model_id, + ))); + } +} + +pub(crate) fn send_help(ui_tx: &crate::user::ui_channel::UiSender) { + for cmd in &commands() { + let _ = ui_tx.send(UiMessage::Info(format!(" {:12} {}", cmd.name, cmd.help))); + } + let _ = ui_tx.send(UiMessage::Info(String::new())); + let _ = ui_tx.send(UiMessage::Info( + "Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(), + )); + let _ = ui_tx.send(UiMessage::Info( + " Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill".into(), + )); +} /// Turn marker for the conversation pane gutter. #[derive(Clone, Copy, PartialEq, Default)] @@ -245,12 +405,21 @@ pub(crate) struct InteractScreen { // State sync with agent — double buffer last_generation: u64, last_entries: Vec, + pending_display_count: usize, /// Reference to agent for state sync - agent: std::sync::Arc>, + agent: std::sync::Arc>, + shared_mind: std::sync::Arc, + mind_tx: tokio::sync::mpsc::UnboundedSender, + ui_tx: crate::user::ui_channel::UiSender, } impl InteractScreen { - pub fn new(agent: std::sync::Arc>) -> Self { + pub fn new( + agent: std::sync::Arc>, + shared_mind: std::sync::Arc, + mind_tx: tokio::sync::mpsc::UnboundedSender, + ui_tx: crate::user::ui_channel::UiSender, + ) -> Self { Self { autonomous: PaneState::new(true), conversation: PaneState::new(true), @@ -266,7 +435,11 @@ impl InteractScreen { call_timeout_secs: 60, last_generation: 0, last_entries: Vec::new(), + pending_display_count: 0, agent, + shared_mind, + mind_tx, + ui_tx, } } @@ -317,59 +490,99 @@ impl InteractScreen { } } - /// Sync conversation display from agent entries. + /// Sync conversation display from agent entries + pending input. fn sync_from_agent(&mut self) { - let agent = self.agent.lock().unwrap(); - let generation = agent.generation; - let entries = agent.entries(); - - // Phase 1: detect desync and pop - if generation != self.last_generation { - self.conversation = PaneState::new(true); - self.autonomous = PaneState::new(true); - self.tools = PaneState::new(false); - self.last_entries.clear(); - } else { - // Pop entries from the tail that don't match - while !self.last_entries.is_empty() { - let i = self.last_entries.len() - 1; - if entries.get(i) == Some(&self.last_entries[i]) { - break; - } - let popped = self.last_entries.pop().unwrap(); - for (target, _, _) in Self::route_entry(&popped) { - match target { - PaneTarget::Conversation | PaneTarget::ConversationAssistant - => self.conversation.pop_line(), - PaneTarget::Tools | PaneTarget::ToolResult - => self.tools.pop_line(), - } - } - } + // Pop previously-displayed pending input + for _ in 0..self.pending_display_count { + self.conversation.pop_line(); } + self.pending_display_count = 0; - // Phase 2: push new entries - let start = self.last_entries.len(); - for entry in entries.iter().skip(start) { - for (target, text, marker) in Self::route_entry(entry) { - match target { - PaneTarget::Conversation => - self.conversation.push_line_with_marker(text, Color::Cyan, marker), - PaneTarget::ConversationAssistant => - self.conversation.push_line_with_marker(text, Color::Reset, marker), - PaneTarget::Tools => - self.tools.push_line(text, Color::Yellow), - PaneTarget::ToolResult => { - for line in text.lines().take(20) { - self.tools.push_line(format!(" {}", line), Color::DarkGray); + // Sync agent entries + if let Ok(agent) = self.agent.try_lock() { + let generation = agent.generation; + let entries = agent.entries(); + + // Phase 1: detect desync and pop + if generation != self.last_generation { + self.conversation = PaneState::new(true); + self.autonomous = PaneState::new(true); + self.tools = PaneState::new(false); + self.last_entries.clear(); + } else { + // Pop entries from the tail that don't match + while !self.last_entries.is_empty() { + let i = self.last_entries.len() - 1; + if entries.get(i) == Some(&self.last_entries[i]) { + break; + } + let popped = self.last_entries.pop().unwrap(); + for (target, _, _) in Self::route_entry(&popped) { + match target { + PaneTarget::Conversation | PaneTarget::ConversationAssistant + => self.conversation.pop_line(), + PaneTarget::Tools | PaneTarget::ToolResult + => self.tools.pop_line(), } } } } - self.last_entries.push(entry.clone()); + + // Phase 2: push new entries + let start = self.last_entries.len(); + for entry in entries.iter().skip(start) { + for (target, text, marker) in Self::route_entry(entry) { + match target { + PaneTarget::Conversation => + self.conversation.push_line_with_marker(text, Color::Cyan, marker), + PaneTarget::ConversationAssistant => + self.conversation.push_line_with_marker(text, Color::Reset, marker), + PaneTarget::Tools => + self.tools.push_line(text, Color::Yellow), + PaneTarget::ToolResult => { + for line in text.lines().take(20) { + self.tools.push_line(format!(" {}", line), Color::DarkGray); + } + } + } + } + self.last_entries.push(entry.clone()); + } + + self.last_generation = generation; } - self.last_generation = generation; + // Display pending input (queued in Mind, not yet accepted) + let mind = self.shared_mind.lock().unwrap(); + for input in &mind.input { + self.conversation.push_line_with_marker( + input.clone(), Color::DarkGray, Marker::User, + ); + self.pending_display_count += 1; + } + } + + /// Dispatch user input — slash commands or conversation. + fn dispatch_input(&self, input: &str, app: &mut App) { + let input = input.trim(); + if input.is_empty() { return; } + + if input == "/quit" || input == "/exit" { + app.should_quit = true; + return; + } + + if input.starts_with('/') { + if let Some(cmd) = dispatch_command(input) { + (cmd.handler)(self, &input[cmd.name.len()..].trim_start()); + } else { + let _ = self.ui_tx.send(UiMessage::Info(format!("Unknown command: {}", input.split_whitespace().next().unwrap_or(input)))); + } + return; + } + + // Regular input → queue to Mind + self.shared_mind.lock().unwrap().input.push(input.to_string()); } /// Process a UiMessage — update pane state. @@ -680,7 +893,8 @@ impl ScreenView for InteractScreen { self.input_history.push(input.clone()); } self.history_index = None; - // TODO: push to submitted via app or return action + self.dispatch_input(&input, app); + self.textarea = new_textarea(vec![String::new()]); } } KeyCode::Up if key.modifiers.contains(KeyModifiers::CONTROL) => self.scroll_active_up(3), @@ -727,6 +941,7 @@ impl ScreenView for InteractScreen { self.draw_main(frame, area, app); None } + } /// Draw the conversation pane with a two-column layout: marker gutter + text. diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index afba332..21ffe0c 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -9,7 +9,7 @@ use ratatui::crossterm::event::{Event, EventStream, KeyEventKind}; use futures::StreamExt; use std::sync::Arc; use std::time::Duration; -use std::sync::Mutex; +use tokio::sync::Mutex; use crate::agent::Agent; use crate::agent::api::ApiClient; @@ -19,89 +19,6 @@ use crate::user::ui_channel::{self, UiMessage}; // ── Slash commands ───────────────────────────────────────────── -type CmdHandler = for<'a> fn( - &'a crate::mind::Mind, - &'a tokio::sync::mpsc::UnboundedSender, - &'a ui_channel::UiSender, - &'a str, -); - -struct SlashCommand { - name: &'static str, - help: &'static str, - handler: CmdHandler, -} - -fn commands() -> Vec { vec![ - SlashCommand { name: "/quit", help: "Exit consciousness", - handler: |_, _, _, _| {} }, - SlashCommand { name: "/new", help: "Start fresh session (saves current)", - handler: |_, tx, _, _| { let _ = tx.send(MindCommand::NewSession); } }, - SlashCommand { name: "/save", help: "Save session to disk", - handler: |_, _, ui, _| { let _ = ui.send(UiMessage::Info("Conversation is saved automatically.".into())); } }, - SlashCommand { name: "/retry", help: "Re-run last turn", - handler: |m, tx, ui, _| { - let agent = m.agent.clone(); - let mind_tx = tx.clone(); - let ui_tx = ui.clone(); - let mut tw = m.turn_watch(); - tokio::spawn(async move { - let _ = tw.wait_for(|&active| !active).await; - cmd_retry_inner(&agent, &mind_tx, &ui_tx).await; - }); - } }, - SlashCommand { name: "/model", help: "Show/switch model (/model )", - handler: |m, _, ui, arg| { - if arg.is_empty() { - if let Ok(ag) = m.agent.try_lock() { - let _ = ui.send(UiMessage::Info(format!("Current model: {}", ag.model()))); - let names = ag.app_config.model_names(); - if !names.is_empty() { - let _ = ui.send(UiMessage::Info(format!("Available: {}", names.join(", ")))); - } - } else { - let _ = ui.send(UiMessage::Info("(busy)".into())); - } - } else { - let agent = m.agent.clone(); - let ui_tx = ui.clone(); - let name = arg.to_string(); - tokio::spawn(async move { cmd_switch_model(&agent, &name, &ui_tx).await; }); - } - } }, - SlashCommand { name: "/score", help: "Score memory importance", - handler: |_, tx, _, _| { let _ = tx.send(MindCommand::Score); } }, - SlashCommand { name: "/dmn", help: "Show DMN state", - handler: |m, _, ui, _| { - let s = m.shared.lock().unwrap(); - let _ = ui.send(UiMessage::Info(format!("DMN: {:?} ({}/{})", s.dmn, s.dmn_turns, s.max_dmn_turns))); - } }, - SlashCommand { name: "/sleep", help: "Put DMN to sleep", - handler: |m, _, ui, _| { - let mut s = m.shared.lock().unwrap(); - s.dmn = crate::mind::dmn::State::Resting { since: std::time::Instant::now() }; - s.dmn_turns = 0; - let _ = ui.send(UiMessage::Info("DMN sleeping.".into())); - } }, - SlashCommand { name: "/wake", help: "Wake DMN to foraging", - handler: |m, _, ui, _| { - let mut s = m.shared.lock().unwrap(); - if matches!(s.dmn, crate::mind::dmn::State::Off) { crate::mind::dmn::set_off(false); } - s.dmn = crate::mind::dmn::State::Foraging; - s.dmn_turns = 0; - let _ = ui.send(UiMessage::Info("DMN foraging.".into())); - } }, - SlashCommand { name: "/pause", help: "Full stop — no autonomous ticks (Ctrl+P)", - handler: |m, _, ui, _| { - let mut s = m.shared.lock().unwrap(); - s.dmn = crate::mind::dmn::State::Paused; - s.dmn_turns = 0; - let _ = ui.send(UiMessage::Info("DMN paused.".into())); - } }, - SlashCommand { name: "/help", help: "Show this help", - handler: |_, _, ui, _| { send_help(ui); } }, -]} - /// Top-level entry point — creates Mind and UI, wires them together. pub async fn start(cli: crate::user::CliArgs) -> Result<()> { let (config, _figment) = crate::config::load_session(&cli)?; @@ -116,8 +33,8 @@ pub async fn start(cli: crate::user::CliArgs) -> Result<()> { let mind = crate::mind::Mind::new(config, ui_tx.clone(), turn_tx); - let shared_context = mind.agent.lock().unwrap().shared_context.clone(); - let shared_active_tools = mind.agent.lock().unwrap().active_tools.clone(); + let shared_context = mind.agent.lock().await.shared_context.clone(); + let shared_active_tools = mind.agent.lock().await.active_tools.clone(); let mut result = Ok(()); tokio_scoped::scope(|s| { @@ -138,55 +55,7 @@ pub async fn start(cli: crate::user::CliArgs) -> Result<()> { result } -fn dispatch_command(input: &str) -> Option { - let cmd_name = input.split_whitespace().next()?; - commands().into_iter().find(|c| c.name == cmd_name) -} -fn send_help(ui_tx: &ui_channel::UiSender) { - for cmd in &commands() { - let _ = ui_tx.send(UiMessage::Info(format!(" {:12} {}", cmd.name, cmd.help))); - } - let _ = ui_tx.send(UiMessage::Info(String::new())); - let _ = ui_tx.send(UiMessage::Info( - "Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(), - )); - let _ = ui_tx.send(UiMessage::Info( - " Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill F10=context F2=agents".into(), - )); - let _ = ui_tx.send(UiMessage::Info( - " Shift+click for native text selection (copy/paste)".into(), - )); -} - -async fn cmd_retry_inner( - agent: &Arc>, - mind_tx: &tokio::sync::mpsc::UnboundedSender, - ui_tx: &ui_channel::UiSender, -) { - let mut agent_guard = agent.lock().unwrap(); - let entries = agent_guard.entries_mut(); - let mut last_user_text = None; - while let Some(entry) = entries.last() { - if entry.message().role == crate::agent::api::types::Role::User { - last_user_text = Some(entries.pop().unwrap().message().content_text().to_string()); - break; - } - entries.pop(); - } - drop(agent_guard); - match last_user_text { - Some(text) => { - let preview_len = text.len().min(60); - let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len]))); - // Send as a Turn command — Mind will process it - let _ = mind_tx.send(MindCommand::Turn(text, crate::user::ui_channel::StreamTarget::Conversation)); - } - None => { - let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); - } - } -} fn hotkey_cycle_reasoning(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { if let Ok(mut ag) = mind.agent.try_lock() { @@ -211,7 +80,7 @@ fn hotkey_cycle_reasoning(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender } async fn hotkey_kill_processes(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { - let active_tools = mind.agent.lock().unwrap().active_tools.clone(); + let active_tools = mind.agent.lock().await.active_tools.clone(); let mut tools = active_tools.lock().unwrap(); if tools.is_empty() { let _ = ui_tx.send(UiMessage::Info("(no running tool calls)".into())); @@ -278,44 +147,6 @@ pub fn send_context_info(config: &crate::config::SessionConfig, ui_tx: &ui_chann ))); } -pub async fn cmd_switch_model( - agent: &Arc>, - name: &str, - ui_tx: &ui_channel::UiSender, -) { - let resolved = { - let ag = agent.lock().unwrap(); - match ag.app_config.resolve_model(name) { - Ok(r) => r, - Err(e) => { - let _ = ui_tx.send(UiMessage::Info(format!("{}", e))); - return; - } - } - }; - - let new_client = ApiClient::new(&resolved.api_base, &resolved.api_key, &resolved.model_id); - let prompt_changed = { - let ag = agent.lock().unwrap(); - resolved.prompt_file != ag.prompt_file - }; - - let mut ag = agent.lock().unwrap(); - ag.swap_client(new_client); - - if prompt_changed { - ag.prompt_file = resolved.prompt_file.clone(); - ag.compact(); - let _ = ui_tx.send(UiMessage::Info(format!( - "Switched to {} ({}) — prompt: {}, recompacted", - name, resolved.model_id, resolved.prompt_file, - ))); - } else { - let _ = ui_tx.send(UiMessage::Info(format!( - "Switched to {} ({})", name, resolved.model_id, - ))); - } -} fn diff_mind_state( cur: &crate::mind::MindState, @@ -336,8 +167,6 @@ fn diff_mind_state( } // Input consumed — Mind started a turn with it if !prev.input.is_empty() && cur.input.is_empty() { - let text = prev.input.join("\n"); - let _ = ui_tx.send(UiMessage::UserInput(text)); *dirty = true; } if cur.turn_active != prev.turn_active { @@ -382,7 +211,9 @@ pub async fn run( let notify_rx = crate::thalamus::channels::subscribe_all(); // InteractScreen held separately for UiMessage routing - let mut interact = crate::user::chat::InteractScreen::new(mind.agent.clone()); + let mut interact = crate::user::chat::InteractScreen::new( + mind.agent.clone(), mind.shared.clone(), mind_tx.clone(), ui_tx.clone(), + ); // Overlay screens: F2=conscious, F3=subconscious, F4=unconscious, F5=thalamus let mut screens: Vec> = vec![ Box::new(crate::user::context::ConsciousScreen::new()), @@ -443,11 +274,10 @@ pub async fn run( continue; } - // Global keys (Ctrl combos) - app.handle_global_key(key); - - // Store pending key for active overlay screen - pending_key = Some(key); + // Global keys (Ctrl combos) — only pass to screen if not consumed + if !app.handle_global_key(key) { + pending_key = Some(key); + } dirty = true; } Some(Ok(Event::Mouse(_mouse))) => { @@ -509,23 +339,6 @@ pub async fn run( } } - // Process submitted input - let submitted: Vec = app.submitted.drain(..).collect(); - for input in submitted { - let input = input.trim().to_string(); - if input.is_empty() { continue; } - if input == "/quit" || input == "/exit" { - app.should_quit = true; - } else if let Some(cmd) = dispatch_command(&input) { - (cmd.handler)(mind, &mind_tx, &ui_tx, &input[cmd.name.len()..].trim_start()); - } else { - let mut s = shared_mind.lock().unwrap(); - diff_mind_state(&s, &prev_mind, &ui_tx, &mut dirty); - s.input.push(input); - prev_mind = s.clone(); - } - } - // Handle hotkey actions let actions: Vec = app.hotkey_actions.drain(..).collect(); for action in actions {