diff --git a/src/mind/mod.rs b/src/mind/mod.rs index d3c219b..a023d2a 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -18,8 +18,6 @@ pub mod log; // keyboard events > turn results > render ticks > DMN timer > UI messages. use anyhow::Result; -use crossterm::event::{Event, EventStream, KeyEventKind}; -use futures::StreamExt; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, Mutex}; @@ -28,7 +26,7 @@ use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; use crate::agent::api::types as api_types; use crate::config::{self, AppConfig, SessionConfig}; -use crate::user::{self as tui, HotkeyAction}; +use crate::user::{self as tui}; use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage}; use crate::subconscious::learn; @@ -37,9 +35,8 @@ fn compaction_threshold(app: &AppConfig) -> u32 { (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 } -/// Commands that are handled in the main loop, not sent to the agent. -enum Command { - Quit, +/// Result of slash command handling. +pub enum Command { Handled, None, } @@ -348,7 +345,6 @@ impl Mind { ]; match input { - "/quit" | "/exit" => Command::Quit, "/save" => { let _ = self.ui_tx.send(UiMessage::Info( "Conversation is saved automatically (append-only log).".into() @@ -551,7 +547,7 @@ impl Mind { } /// Cycle reasoning effort: none → low → high → none. - fn cycle_reasoning(&mut self, app: &mut tui::App) { + fn cycle_reasoning(&mut self) { if let Ok(mut agent_guard) = self.agent.try_lock() { let next = match agent_guard.reasoning_effort.as_str() { "none" => "low", @@ -559,7 +555,6 @@ impl Mind { _ => "none", }; agent_guard.reasoning_effort = next.to_string(); - app.reasoning_effort = next.to_string(); let label = match next { "none" => "off (monologue hidden)", "low" => "low (brief monologue)", @@ -693,14 +688,69 @@ impl Mind { })); } - async fn shutdown(&mut self) { + pub async fn shutdown(&mut self) { if let Some(handle) = self.turn_handle.take() { handle.abort(); } } + + /// Mind event loop — reacts to user input, turn results, DMN ticks. + pub async fn run( + &mut self, + mut input_rx: tokio::sync::mpsc::UnboundedReceiver, + mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, + ) { + use crate::user::event_loop::MindMessage; + use crate::user::HotkeyAction; + + loop { + let timeout = self.dmn_interval(); + + tokio::select! { + biased; + + Some(msg) = input_rx.recv() => { + match msg { + MindMessage::UserInput(input) => { + match self.handle_command(&input).await { + Command::Handled => {} + Command::None => self.submit_input(input), + } + } + MindMessage::Hotkey(action) => { + match action { + HotkeyAction::CycleReasoning => self.cycle_reasoning(), + HotkeyAction::KillProcess => self.kill_processes().await, + HotkeyAction::Interrupt => self.interrupt().await, + HotkeyAction::CycleAutonomy => self.cycle_autonomy(), + HotkeyAction::AdjustSampling(param, delta) => { + if let Ok(mut agent) = self.agent.try_lock() { + match param { + 0 => agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0), + 1 => agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0), + 2 => agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32, + _ => {} + } + } + } + } + } + } + } + + Some((result, target)) = turn_rx.recv() => { + self.handle_turn_result(result, target).await; + } + + _ = tokio::time::sleep(timeout), if !self.turn_in_progress => { + self.dmn_tick(); + } + } + } + } } -// --- Event loop --- +// --- Startup --- pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let (config, _figment) = config::load_session(&cli)?; @@ -718,8 +768,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let mut idle_state = crate::thalamus::idle::State::new(); idle_state.load(); - // Channel status fetcher - let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::>(4); + // Channel status + let (channel_tx, channel_rx) = tokio::sync::mpsc::channel::>(4); { let tx = channel_tx.clone(); tokio::spawn(async move { @@ -727,21 +777,15 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let _ = tx.send(result).await; }); } - let notify_rx = crate::thalamus::channels::subscribe_all(); - let mut pending_notifications: Vec = Vec::new(); // Create UI channel - let (ui_tx, mut ui_rx) = ui_channel::channel(); + let (ui_tx, ui_rx) = ui_channel::channel(); // Shared state let shared_context = ui_channel::shared_context_state(); let shared_active_tools = ui_channel::shared_active_tools(); - // Initialize TUI - let mut terminal = tui::init_terminal()?; - let mut app = tui::App::new(config.model.clone(), shared_context.clone(), shared_active_tools.clone()); - // Startup info let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into())); let _ = ui_tx.send(UiMessage::Info(format!( @@ -767,8 +811,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { config.app.clone(), config.prompt_file.clone(), Some(conversation_log), - shared_context, - shared_active_tools, + shared_context.clone(), + shared_active_tools.clone(), ))); // Restore conversation from log @@ -793,165 +837,37 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { })); } - let (turn_tx, mut turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); + let (turn_tx, turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); let no_agents = config.no_agents; - let mut session = Mind::new(agent, config, ui_tx.clone(), turn_tx); - session.update_status(); + let mut mind = Mind::new(agent, config, ui_tx.clone(), turn_tx); + mind.update_status(); if !no_agents { - session.start_memory_scoring(); // also sends initial agent snapshots + mind.start_memory_scoring(); } - session.send_context_info(); + mind.send_context_info(); // Start observation socket - let socket_path = session.config.session_dir.join("agent.sock"); - let (observe_input_tx, mut observe_input_rx) = log::input_channel(); + let socket_path = mind.config.session_dir.join("agent.sock"); + let (observe_input_tx, observe_input_rx) = log::input_channel(); if !no_agents { log::start(socket_path, ui_tx.subscribe(), observe_input_tx); } - let mut reader = EventStream::new(); + // Mind ↔ UI channel + let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut render_interval = tokio::time::interval(Duration::from_millis(50)); - render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut dirty = true; + // App for TUI + let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools); - terminal.hide_cursor()?; + // Spawn Mind event loop + tokio::spawn(async move { + mind.run(mind_rx, turn_rx).await; + }); - // Initial render - app.drain_messages(&mut ui_rx); - terminal.draw(|f| app.draw(f))?; - - loop { - let timeout = session.dmn_interval(); - - tokio::select! { - biased; - - maybe_event = reader.next() => { - match maybe_event { - Some(Ok(Event::Key(key))) => { - if key.kind != KeyEventKind::Press { continue; } - app.handle_key(key); - idle_state.user_activity(); - if app.screen == tui::Screen::Thalamus { - let tx = channel_tx.clone(); - tokio::spawn(async move { - let result = crate::thalamus::channels::fetch_all_channels().await; - let _ = tx.send(result).await; - }); - } - dirty = true; - } - Some(Ok(Event::Mouse(mouse))) => { - app.handle_mouse(mouse); - dirty = true; - } - Some(Ok(Event::Resize(w, h))) => { - app.handle_resize(w, h); - terminal.clear()?; - dirty = true; - } - Some(Err(_)) => break, - None => break, - _ => continue, - } - } - - Some(line) = observe_input_rx.recv() => { - app.submitted.push(line); - dirty = true; - } - - Some((result, target)) = turn_rx.recv() => { - session.handle_turn_result(result, target).await; - idle_state.response_activity(); - dirty = true; - } - - _ = render_interval.tick() => { - let new_count = session.agent.lock().await.active_tools.lock().unwrap().len() as u32; - if new_count != app.running_processes { - app.running_processes = new_count; - dirty = true; - } - idle_state.decay_ewma(); - app.update_idle(&idle_state); - - while let Ok(notif) = notify_rx.try_recv() { - pending_notifications.push(notif); - let tx = channel_tx.clone(); - tokio::spawn(async move { - let result = crate::thalamus::channels::fetch_all_channels().await; - let _ = tx.send(result).await; - }); - } - } - - _ = tokio::time::sleep(timeout), if !session.turn_in_progress => { - session.dmn_tick(); - dirty = true; - } - - Some(channels) = channel_rx.recv() => { - app.set_channel_status(channels); - dirty = true; - } - - Some(msg) = ui_rx.recv() => { - app.handle_ui_message(msg); - dirty = true; - } - } - - // Process submitted input - let submitted: Vec = app.submitted.drain(..).collect(); - for input in submitted { - let input = input.trim().to_string(); - if input.is_empty() { continue; } - match session.handle_command(&input).await { - Command::Quit => app.should_quit = true, - Command::Handled => {} - Command::None => session.submit_input(input), - } - } - - // Process hotkey actions - let actions: Vec = app.hotkey_actions.drain(..).collect(); - for action in actions { - match action { - HotkeyAction::CycleReasoning => session.cycle_reasoning(&mut app), - HotkeyAction::KillProcess => session.kill_processes().await, - HotkeyAction::Interrupt => session.interrupt().await, - HotkeyAction::CycleAutonomy => session.cycle_autonomy(), - HotkeyAction::AdjustSampling(param, delta) => { - if let Ok(mut agent) = session.agent.try_lock() { - match param { - 0 => { agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0); app.temperature = agent.temperature; } - 1 => { agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0); app.top_p = agent.top_p; } - 2 => { agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32; app.top_k = agent.top_k; } - _ => {} - } - } - } - } - } - - if app.drain_messages(&mut ui_rx) { - dirty = true; - } - - if dirty { - terminal.draw(|f| app.draw(f))?; - dirty = false; - } - - if app.should_quit { - break; - } - } - - session.shutdown().await; - tui::restore_terminal(&mut terminal)?; - Ok(()) + // Run UI event loop on main thread (needs terminal access) + crate::user::event_loop::run( + app, mind_tx, ui_rx, observe_input_rx, + channel_tx, channel_rx, notify_rx, idle_state, + ).await } diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs new file mode 100644 index 0000000..7c465e5 --- /dev/null +++ b/src/user/event_loop.rs @@ -0,0 +1,140 @@ +// event_loop.rs — TUI event loop +// +// Drives the terminal, renders the UI, dispatches user input and +// hotkey actions to the Mind via a channel. Reads shared state +// (agent, active tools) directly for rendering. + +use anyhow::Result; +use crossterm::event::{Event, EventStream, KeyEventKind}; +use futures::StreamExt; +use std::time::Duration; + +use crate::user::{self as tui, HotkeyAction}; +use crate::user::ui_channel::{self, UiMessage}; + +/// Messages from the UI to the Mind. +pub enum MindMessage { + UserInput(String), + Hotkey(HotkeyAction), +} + +pub async fn run( + mut app: tui::App, + mind_tx: tokio::sync::mpsc::UnboundedSender, + mut ui_rx: ui_channel::UiReceiver, + mut observe_input_rx: tokio::sync::mpsc::UnboundedReceiver, + channel_tx: tokio::sync::mpsc::Sender>, + mut channel_rx: tokio::sync::mpsc::Receiver>, + notify_rx: std::sync::mpsc::Receiver, + mut idle_state: crate::thalamus::idle::State, +) -> Result<()> { + let mut terminal = tui::init_terminal()?; + let mut reader = EventStream::new(); + + let mut render_interval = tokio::time::interval(Duration::from_millis(50)); + render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut dirty = true; + + terminal.hide_cursor()?; + + // Initial render + app.drain_messages(&mut ui_rx); + terminal.draw(|f| app.draw(f))?; + + loop { + tokio::select! { + biased; + + maybe_event = reader.next() => { + match maybe_event { + Some(Ok(Event::Key(key))) => { + if key.kind != KeyEventKind::Press { continue; } + app.handle_key(key); + idle_state.user_activity(); + if app.screen == tui::Screen::Thalamus { + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = crate::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } + dirty = true; + } + Some(Ok(Event::Mouse(mouse))) => { + app.handle_mouse(mouse); + dirty = true; + } + Some(Ok(Event::Resize(w, h))) => { + app.handle_resize(w, h); + terminal.clear()?; + dirty = true; + } + Some(Err(_)) => break, + None => break, + _ => continue, + } + } + + Some(line) = observe_input_rx.recv() => { + app.submitted.push(line); + dirty = true; + } + + _ = render_interval.tick() => { + idle_state.decay_ewma(); + app.update_idle(&idle_state); + + while let Ok(notif) = notify_rx.try_recv() { + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = crate::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } + } + + Some(channels) = channel_rx.recv() => { + app.set_channel_status(channels); + dirty = true; + } + + Some(msg) = ui_rx.recv() => { + app.handle_ui_message(msg); + dirty = true; + } + } + + // Process submitted input + let submitted: Vec = app.submitted.drain(..).collect(); + for input in submitted { + let input = input.trim().to_string(); + if input.is_empty() { continue; } + match input.as_str() { + "/quit" | "/exit" => app.should_quit = true, + _ => { let _ = mind_tx.send(MindMessage::UserInput(input)); } + } + } + + // Send hotkey actions to Mind + let actions: Vec = app.hotkey_actions.drain(..).collect(); + for action in actions { + let _ = mind_tx.send(MindMessage::Hotkey(action)); + } + + if app.drain_messages(&mut ui_rx) { + dirty = true; + } + + if dirty { + terminal.draw(|f| app.draw(f))?; + dirty = false; + } + + if app.should_quit { + break; + } + } + + tui::restore_terminal(&mut terminal)?; + Ok(()) +} diff --git a/src/user/mod.rs b/src/user/mod.rs index b0a8437..1c6c118 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -4,6 +4,7 @@ // machine, DMN, identity) lives in mind/. pub mod ui_channel; +pub mod event_loop; pub mod chat; pub mod context;