mind: split event loop — Mind and UI run independently

Mind::run() owns the cognitive event loop: user input, turn results,
DMN ticks, hotkey actions. The UI event loop (user/event_loop.rs) owns
the terminal: key events, render ticks, channel status display.

They communicate through channels: UI sends MindMessage (user input,
hotkey actions) to Mind. Mind sends UiMessage (status, info) to UI.
UI reads shared state (active tools, context) directly for rendering.

Removes direct coupling between Mind and App:
- cycle_reasoning no longer takes &mut App
- AdjustSampling updates agent only, UI reads from shared state
- /quit handled by UI directly, not routed through Mind

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-05 02:11:32 -04:00
parent 1f06b49503
commit 804d55a702
3 changed files with 227 additions and 170 deletions

View file

@ -18,8 +18,6 @@ pub mod log;
// keyboard events > turn results > render ticks > DMN timer > UI messages.
use anyhow::Result;
use crossterm::event::{Event, EventStream, KeyEventKind};
use futures::StreamExt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex};
@ -28,7 +26,7 @@ use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::agent::api::types as api_types;
use crate::config::{self, AppConfig, SessionConfig};
use crate::user::{self as tui, HotkeyAction};
use crate::user::{self as tui};
use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage};
use crate::subconscious::learn;
@ -37,9 +35,8 @@ fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
/// Commands that are handled in the main loop, not sent to the agent.
enum Command {
Quit,
/// Result of slash command handling.
pub enum Command {
Handled,
None,
}
@ -348,7 +345,6 @@ impl Mind {
];
match input {
"/quit" | "/exit" => Command::Quit,
"/save" => {
let _ = self.ui_tx.send(UiMessage::Info(
"Conversation is saved automatically (append-only log).".into()
@ -551,7 +547,7 @@ impl Mind {
}
/// Cycle reasoning effort: none → low → high → none.
fn cycle_reasoning(&mut self, app: &mut tui::App) {
fn cycle_reasoning(&mut self) {
if let Ok(mut agent_guard) = self.agent.try_lock() {
let next = match agent_guard.reasoning_effort.as_str() {
"none" => "low",
@ -559,7 +555,6 @@ impl Mind {
_ => "none",
};
agent_guard.reasoning_effort = next.to_string();
app.reasoning_effort = next.to_string();
let label = match next {
"none" => "off (monologue hidden)",
"low" => "low (brief monologue)",
@ -693,14 +688,69 @@ impl Mind {
}));
}
async fn shutdown(&mut self) {
pub async fn shutdown(&mut self) {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
}
}
/// Mind event loop — reacts to user input, turn results, DMN ticks.
pub async fn run(
&mut self,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<crate::user::event_loop::MindMessage>,
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
) {
use crate::user::event_loop::MindMessage;
use crate::user::HotkeyAction;
loop {
let timeout = self.dmn_interval();
tokio::select! {
biased;
Some(msg) = input_rx.recv() => {
match msg {
MindMessage::UserInput(input) => {
match self.handle_command(&input).await {
Command::Handled => {}
Command::None => self.submit_input(input),
}
}
MindMessage::Hotkey(action) => {
match action {
HotkeyAction::CycleReasoning => self.cycle_reasoning(),
HotkeyAction::KillProcess => self.kill_processes().await,
HotkeyAction::Interrupt => self.interrupt().await,
HotkeyAction::CycleAutonomy => self.cycle_autonomy(),
HotkeyAction::AdjustSampling(param, delta) => {
if let Ok(mut agent) = self.agent.try_lock() {
match param {
0 => agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0),
1 => agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0),
2 => agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32,
_ => {}
}
}
}
}
}
}
}
// --- Event loop ---
Some((result, target)) = turn_rx.recv() => {
self.handle_turn_result(result, target).await;
}
_ = tokio::time::sleep(timeout), if !self.turn_in_progress => {
self.dmn_tick();
}
}
}
}
}
// --- Startup ---
pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let (config, _figment) = config::load_session(&cli)?;
@ -718,8 +768,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let mut idle_state = crate::thalamus::idle::State::new();
idle_state.load();
// Channel status fetcher
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
// Channel status
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
{
let tx = channel_tx.clone();
tokio::spawn(async move {
@ -727,21 +777,15 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let _ = tx.send(result).await;
});
}
let notify_rx = crate::thalamus::channels::subscribe_all();
let mut pending_notifications: Vec<crate::thalamus::channels::ChannelNotification> = Vec::new();
// Create UI channel
let (ui_tx, mut ui_rx) = ui_channel::channel();
let (ui_tx, ui_rx) = ui_channel::channel();
// Shared state
let shared_context = ui_channel::shared_context_state();
let shared_active_tools = ui_channel::shared_active_tools();
// Initialize TUI
let mut terminal = tui::init_terminal()?;
let mut app = tui::App::new(config.model.clone(), shared_context.clone(), shared_active_tools.clone());
// Startup info
let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into()));
let _ = ui_tx.send(UiMessage::Info(format!(
@ -767,8 +811,8 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
config.app.clone(),
config.prompt_file.clone(),
Some(conversation_log),
shared_context,
shared_active_tools,
shared_context.clone(),
shared_active_tools.clone(),
)));
// Restore conversation from log
@ -793,165 +837,37 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
}));
}
let (turn_tx, mut turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let no_agents = config.no_agents;
let mut session = Mind::new(agent, config, ui_tx.clone(), turn_tx);
session.update_status();
let mut mind = Mind::new(agent, config, ui_tx.clone(), turn_tx);
mind.update_status();
if !no_agents {
session.start_memory_scoring(); // also sends initial agent snapshots
mind.start_memory_scoring();
}
session.send_context_info();
mind.send_context_info();
// Start observation socket
let socket_path = session.config.session_dir.join("agent.sock");
let (observe_input_tx, mut observe_input_rx) = log::input_channel();
let socket_path = mind.config.session_dir.join("agent.sock");
let (observe_input_tx, observe_input_rx) = log::input_channel();
if !no_agents {
log::start(socket_path, ui_tx.subscribe(), observe_input_tx);
}
let mut reader = EventStream::new();
// Mind ↔ UI channel
let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel();
let mut render_interval = tokio::time::interval(Duration::from_millis(50));
render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut dirty = true;
// App for TUI
let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools);
terminal.hide_cursor()?;
// Initial render
app.drain_messages(&mut ui_rx);
terminal.draw(|f| app.draw(f))?;
loop {
let timeout = session.dmn_interval();
tokio::select! {
biased;
maybe_event = reader.next() => {
match maybe_event {
Some(Ok(Event::Key(key))) => {
if key.kind != KeyEventKind::Press { continue; }
app.handle_key(key);
idle_state.user_activity();
if app.screen == tui::Screen::Thalamus {
let tx = channel_tx.clone();
// Spawn Mind event loop
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
mind.run(mind_rx, turn_rx).await;
});
}
dirty = true;
}
Some(Ok(Event::Mouse(mouse))) => {
app.handle_mouse(mouse);
dirty = true;
}
Some(Ok(Event::Resize(w, h))) => {
app.handle_resize(w, h);
terminal.clear()?;
dirty = true;
}
Some(Err(_)) => break,
None => break,
_ => continue,
}
}
Some(line) = observe_input_rx.recv() => {
app.submitted.push(line);
dirty = true;
}
Some((result, target)) = turn_rx.recv() => {
session.handle_turn_result(result, target).await;
idle_state.response_activity();
dirty = true;
}
_ = render_interval.tick() => {
let new_count = session.agent.lock().await.active_tools.lock().unwrap().len() as u32;
if new_count != app.running_processes {
app.running_processes = new_count;
dirty = true;
}
idle_state.decay_ewma();
app.update_idle(&idle_state);
while let Ok(notif) = notify_rx.try_recv() {
pending_notifications.push(notif);
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
}
_ = tokio::time::sleep(timeout), if !session.turn_in_progress => {
session.dmn_tick();
dirty = true;
}
Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels);
dirty = true;
}
Some(msg) = ui_rx.recv() => {
app.handle_ui_message(msg);
dirty = true;
}
}
// Process submitted input
let submitted: Vec<String> = app.submitted.drain(..).collect();
for input in submitted {
let input = input.trim().to_string();
if input.is_empty() { continue; }
match session.handle_command(&input).await {
Command::Quit => app.should_quit = true,
Command::Handled => {}
Command::None => session.submit_input(input),
}
}
// Process hotkey actions
let actions: Vec<HotkeyAction> = app.hotkey_actions.drain(..).collect();
for action in actions {
match action {
HotkeyAction::CycleReasoning => session.cycle_reasoning(&mut app),
HotkeyAction::KillProcess => session.kill_processes().await,
HotkeyAction::Interrupt => session.interrupt().await,
HotkeyAction::CycleAutonomy => session.cycle_autonomy(),
HotkeyAction::AdjustSampling(param, delta) => {
if let Ok(mut agent) = session.agent.try_lock() {
match param {
0 => { agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0); app.temperature = agent.temperature; }
1 => { agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0); app.top_p = agent.top_p; }
2 => { agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32; app.top_k = agent.top_k; }
_ => {}
}
}
}
}
}
if app.drain_messages(&mut ui_rx) {
dirty = true;
}
if dirty {
terminal.draw(|f| app.draw(f))?;
dirty = false;
}
if app.should_quit {
break;
}
}
session.shutdown().await;
tui::restore_terminal(&mut terminal)?;
Ok(())
// Run UI event loop on main thread (needs terminal access)
crate::user::event_loop::run(
app, mind_tx, ui_rx, observe_input_rx,
channel_tx, channel_rx, notify_rx, idle_state,
).await
}

140
src/user/event_loop.rs Normal file
View file

@ -0,0 +1,140 @@
// event_loop.rs — TUI event loop
//
// Drives the terminal, renders the UI, dispatches user input and
// hotkey actions to the Mind via a channel. Reads shared state
// (agent, active tools) directly for rendering.
use anyhow::Result;
use crossterm::event::{Event, EventStream, KeyEventKind};
use futures::StreamExt;
use std::time::Duration;
use crate::user::{self as tui, HotkeyAction};
use crate::user::ui_channel::{self, UiMessage};
/// Messages from the UI to the Mind.
pub enum MindMessage {
UserInput(String),
Hotkey(HotkeyAction),
}
pub async fn run(
mut app: tui::App,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindMessage>,
mut ui_rx: ui_channel::UiReceiver,
mut observe_input_rx: tokio::sync::mpsc::UnboundedReceiver<String>,
channel_tx: tokio::sync::mpsc::Sender<Vec<(String, bool, u32)>>,
mut channel_rx: tokio::sync::mpsc::Receiver<Vec<(String, bool, u32)>>,
notify_rx: std::sync::mpsc::Receiver<crate::thalamus::channels::ChannelNotification>,
mut idle_state: crate::thalamus::idle::State,
) -> Result<()> {
let mut terminal = tui::init_terminal()?;
let mut reader = EventStream::new();
let mut render_interval = tokio::time::interval(Duration::from_millis(50));
render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut dirty = true;
terminal.hide_cursor()?;
// Initial render
app.drain_messages(&mut ui_rx);
terminal.draw(|f| app.draw(f))?;
loop {
tokio::select! {
biased;
maybe_event = reader.next() => {
match maybe_event {
Some(Ok(Event::Key(key))) => {
if key.kind != KeyEventKind::Press { continue; }
app.handle_key(key);
idle_state.user_activity();
if app.screen == tui::Screen::Thalamus {
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
dirty = true;
}
Some(Ok(Event::Mouse(mouse))) => {
app.handle_mouse(mouse);
dirty = true;
}
Some(Ok(Event::Resize(w, h))) => {
app.handle_resize(w, h);
terminal.clear()?;
dirty = true;
}
Some(Err(_)) => break,
None => break,
_ => continue,
}
}
Some(line) = observe_input_rx.recv() => {
app.submitted.push(line);
dirty = true;
}
_ = render_interval.tick() => {
idle_state.decay_ewma();
app.update_idle(&idle_state);
while let Ok(notif) = notify_rx.try_recv() {
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
}
Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels);
dirty = true;
}
Some(msg) = ui_rx.recv() => {
app.handle_ui_message(msg);
dirty = true;
}
}
// Process submitted input
let submitted: Vec<String> = app.submitted.drain(..).collect();
for input in submitted {
let input = input.trim().to_string();
if input.is_empty() { continue; }
match input.as_str() {
"/quit" | "/exit" => app.should_quit = true,
_ => { let _ = mind_tx.send(MindMessage::UserInput(input)); }
}
}
// Send hotkey actions to Mind
let actions: Vec<HotkeyAction> = app.hotkey_actions.drain(..).collect();
for action in actions {
let _ = mind_tx.send(MindMessage::Hotkey(action));
}
if app.drain_messages(&mut ui_rx) {
dirty = true;
}
if dirty {
terminal.draw(|f| app.draw(f))?;
dirty = false;
}
if app.should_quit {
break;
}
}
tui::restore_terminal(&mut terminal)?;
Ok(())
}

View file

@ -4,6 +4,7 @@
// machine, DMN, identity) lives in mind/.
pub mod ui_channel;
pub mod event_loop;
pub mod chat;
pub mod context;