// mind/ — Cognitive layer // // Mind state machine, DMN, identity, observation socket. // Everything about how the mind operates, separate from the // user interface (TUI, CLI) and the agent execution (tools, API). pub mod dmn; pub mod identity; pub mod log; // consciousness.rs — Mind state machine and event loop // // The core runtime for the consciousness binary. Mind manages turns, // DMN state, compaction, scoring, and slash commands. The event loop // bridges Mind (cognitive state) with App (TUI rendering). // // The event loop uses biased select! so priorities are deterministic: // keyboard events > turn results > render ticks > DMN timer > UI messages. use anyhow::Result; use std::sync::Arc; use std::time::Instant; use tokio::sync::{mpsc, Mutex}; use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; use crate::config::{self, AppConfig, SessionConfig}; use crate::user::{self as tui}; use crate::user::ui_channel::{self, StatusInfo, StreamTarget, UiMessage}; use crate::subconscious::learn; /// Compaction threshold — context is rebuilt when prompt tokens exceed this. fn compaction_threshold(app: &AppConfig) -> u32 { (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 } /// Shared state between Mind and UI. #[derive(Clone)] pub struct MindState { /// Pending user input — UI pushes, Mind consumes after turn completes. pub input: Vec, /// True while a turn is in progress. pub turn_active: bool, /// DMN state pub dmn: dmn::State, pub dmn_turns: u32, pub max_dmn_turns: u32, /// Whether memory scoring is running. pub scoring_in_flight: bool, /// Whether compaction is running. pub compaction_in_flight: bool, /// Per-turn tracking pub last_user_input: Instant, pub consecutive_errors: u32, pub last_turn_had_tools: bool, } pub type SharedMindState = Arc>; /// What should happen after a state transition. pub enum MindCommand { /// Start a turn with this input Turn(String, StreamTarget), /// Run compaction check Compact, /// Run memory scoring Score, /// Abort current turn, kill processes Interrupt, /// Reset session NewSession, /// Nothing to do None, } impl MindState { pub fn new(max_dmn_turns: u32) -> Self { Self { input: Vec::new(), turn_active: false, dmn: if dmn::is_off() { dmn::State::Off } else { dmn::State::Resting { since: Instant::now() } }, dmn_turns: 0, max_dmn_turns, scoring_in_flight: false, compaction_in_flight: false, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, } } /// Consume pending input, return a Turn command if ready. pub fn take_pending_input(&mut self) -> MindCommand { if self.turn_active || self.input.is_empty() { return MindCommand::None; } let text = self.input.join("\n"); self.input.clear(); self.dmn_turns = 0; self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = dmn::State::Engaged; MindCommand::Turn(text, StreamTarget::Conversation) } /// Process turn completion, return model switch name if requested. pub fn complete_turn(&mut self, result: &Result, target: StreamTarget) -> Option { self.turn_active = false; match result { Ok(turn_result) => { if turn_result.tool_errors > 0 { self.consecutive_errors += turn_result.tool_errors; } else { self.consecutive_errors = 0; } self.last_turn_had_tools = turn_result.had_tool_calls; self.dmn = dmn::transition( &self.dmn, turn_result.yield_requested, turn_result.had_tool_calls, target == StreamTarget::Conversation, ); if turn_result.dmn_pause { self.dmn = dmn::State::Paused; self.dmn_turns = 0; } turn_result.model_switch.clone() } Err(_) => { self.consecutive_errors += 1; self.dmn = dmn::State::Resting { since: Instant::now() }; None } } } /// DMN tick — returns a Turn action with the DMN prompt, or None. pub fn dmn_tick(&mut self) -> MindCommand { if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) { return MindCommand::None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; return MindCommand::None; } let dmn_ctx = dmn::DmnContext { user_idle: self.last_user_input.elapsed(), consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); MindCommand::Turn(prompt, StreamTarget::Autonomous) } pub fn interrupt(&mut self) { self.input.clear(); self.dmn = dmn::State::Resting { since: Instant::now() }; } } pub fn shared_mind_state(max_dmn_turns: u32) -> SharedMindState { Arc::new(std::sync::Mutex::new(MindState::new(max_dmn_turns))) } // --- Mind: cognitive state machine --- pub struct Mind { pub agent: Arc>, pub shared: SharedMindState, pub config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_handle: Option>, turn_watch: tokio::sync::watch::Sender, } impl Mind { pub fn new( agent: Arc>, shared: SharedMindState, config: SessionConfig, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, ) -> Self { let (turn_watch, _) = tokio::sync::watch::channel(false); Self { agent, shared, config, ui_tx, turn_tx, turn_handle: None, turn_watch } } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { self.turn_watch.subscribe() } /// Execute an Action from a MindState method. async fn run_commands(&mut self, cmds: Vec) { for cmd in cmds { match cmd { MindCommand::None => {} MindCommand::Compact => { let threshold = compaction_threshold(&self.config.app); let mut ag = self.agent.lock().await; if ag.last_prompt_tokens() > threshold { ag.compact(); } } MindCommand::Score => { let mut s = self.shared.lock().unwrap(); if !s.scoring_in_flight { s.scoring_in_flight = true; drop(s); self.start_memory_scoring(); } } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); let ag = self.agent.lock().await; let mut tools = ag.active_tools.lock().unwrap(); for entry in tools.drain(..) { entry.handle.abort(); } drop(tools); drop(ag); if let Some(h) = self.turn_handle.take() { h.abort(); } self.shared.lock().unwrap().turn_active = false; let _ = self.turn_watch.send(false); } MindCommand::NewSession => { { let mut s = self.shared.lock().unwrap(); s.dmn = dmn::State::Resting { since: Instant::now() }; s.dmn_turns = 0; } let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), ).ok(); let mut ag = self.agent.lock().await; let shared_ctx = ag.shared_context.clone(); let shared_tools = ag.active_tools.clone(); *ag = Agent::new( ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), self.config.system_prompt.clone(), self.config.context_parts.clone(), self.config.app.clone(), self.config.prompt_file.clone(), new_log, shared_ctx, shared_tools, ); } MindCommand::Turn(input, target) => { if target == StreamTarget::Conversation { let _ = self.ui_tx.send(UiMessage::UserInput(input.clone())); } self.shared.lock().unwrap().turn_active = true; let _ = self.turn_watch.send(true); let agent = self.agent.clone(); let ui_tx = self.ui_tx.clone(); let result_tx = self.turn_tx.clone(); self.turn_handle = Some(tokio::spawn(async move { let result = Agent::turn(agent, &input, &ui_tx, target).await; let _ = result_tx.send((result, target)).await; })); } } } } pub fn start_memory_scoring(&self) { let agent = self.agent.clone(); let shared = self.shared.clone(); let ui_tx = self.ui_tx.clone(); let cfg = crate::config::get(); let max_age = cfg.scoring_interval_secs; let response_window = cfg.scoring_response_window; tokio::spawn(async move { let (context, client) = { let mut ag = agent.lock().await; if ag.agent_cycles.memory_scoring_in_flight { return; } ag.agent_cycles.memory_scoring_in_flight = true; (ag.context.clone(), ag.client_clone()) }; let result = learn::score_memories_incremental( &context, max_age as i64, response_window, &client, &ui_tx, ).await; { let mut ag = agent.lock().await; ag.agent_cycles.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } } shared.lock().unwrap().scoring_in_flight = false; }); } pub async fn shutdown(&mut self) { if let Some(handle) = self.turn_handle.take() { handle.abort(); } } /// Mind event loop — locks MindState, calls state methods, executes actions. pub async fn run( &mut self, mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { loop { let timeout = self.shared.lock().unwrap().dmn.interval(); let turn_active = self.shared.lock().unwrap().turn_active; let mut cmds = Vec::new(); tokio::select! { biased; Some(cmd) = input_rx.recv() => { cmds.push(cmd); } Some((result, target)) = turn_rx.recv() => { self.turn_handle = None; let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); let _ = self.turn_watch.send(false); if let Some(name) = model_switch { crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; } // Post-turn maintenance { let mut ag = self.agent.lock().await; ag.age_out_images(); ag.publish_context_state(); } cmds.push(MindCommand::Compact); if !self.config.no_agents { cmds.push(MindCommand::Score); } } _ = tokio::time::sleep(timeout), if !turn_active => { let tick = self.shared.lock().unwrap().dmn_tick(); cmds.push(tick); } } // Always check for pending input cmds.push(self.shared.lock().unwrap().take_pending_input()); self.run_commands(cmds).await; } } } // --- Startup --- pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let (config, _figment) = config::load_session(&cli)?; if config.app.debug { unsafe { std::env::set_var("POC_DEBUG", "1") }; } // Start channel daemons let mut channel_supervisor = crate::thalamus::supervisor::Supervisor::new(); channel_supervisor.load_config(); channel_supervisor.ensure_running(); // Initialize idle state machine let mut idle_state = crate::thalamus::idle::State::new(); idle_state.load(); // Channel status let (channel_tx, channel_rx) = tokio::sync::mpsc::channel::>(4); { let tx = channel_tx.clone(); tokio::spawn(async move { let result = crate::thalamus::channels::fetch_all_channels().await; let _ = tx.send(result).await; }); } let notify_rx = crate::thalamus::channels::subscribe_all(); // Create UI channel let (ui_tx, ui_rx) = ui_channel::channel(); // Shared state let shared_context = ui_channel::shared_context_state(); let shared_active_tools = ui_channel::shared_active_tools(); // Startup info let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into())); let _ = ui_tx.send(UiMessage::Info(format!( " model: {} (available: {})", config.model, config.app.model_names().join(", "), ))); let client = ApiClient::new(&config.api_base, &config.api_key, &config.model); let _ = ui_tx.send(UiMessage::Info(format!(" api: {} ({})", config.api_base, client.backend_label()))); let _ = ui_tx.send(UiMessage::Info(format!( " context: {}K chars ({} config, {} memory files)", config.context_parts.iter().map(|(_, c)| c.len()).sum::() / 1024, config.config_file_count, config.memory_file_count, ))); let conversation_log_path = config.session_dir.join("conversation.jsonl"); let conversation_log = log::ConversationLog::new(conversation_log_path.clone()) .expect("failed to create conversation log"); let _ = ui_tx.send(UiMessage::Info(format!(" log: {}", conversation_log.path().display()))); let agent = Arc::new(Mutex::new(Agent::new( client, config.system_prompt.clone(), config.context_parts.clone(), config.app.clone(), config.prompt_file.clone(), Some(conversation_log), shared_context.clone(), shared_active_tools.clone(), ))); // Restore conversation from log { let mut agent_guard = agent.lock().await; if agent_guard.restore_from_log() { ui_channel::replay_session_to_ui(agent_guard.entries(), &ui_tx); let _ = ui_tx.send(UiMessage::Info("--- restored from conversation log ---".into())); } } // Send initial budget to status bar { let agent_guard = agent.lock().await; let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo { dmn_state: "resting".to_string(), dmn_turns: 0, dmn_max_turns: 0, prompt_tokens: 0, completion_tokens: 0, model: agent_guard.model().to_string(), turn_tools: 0, context_budget: agent_guard.budget().status_string(), })); } let (turn_tx, turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); let no_agents = config.no_agents; let shared_mind = shared_mind_state(config.app.dmn.max_turns); crate::user::event_loop::send_context_info(&config, &ui_tx); let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx); if !no_agents { mind.start_memory_scoring(); } // Start observation socket let socket_path = mind.config.session_dir.join("agent.sock"); let (observe_input_tx, observe_input_rx) = log::input_channel(); if !no_agents { log::start(socket_path, ui_tx.subscribe(), observe_input_tx); } // Mind ↔ UI channel let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel(); // App for TUI let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools); let ui_agent = mind.agent.clone(); let turn_watch = mind.turn_watch(); // Spawn Mind event loop tokio::spawn(async move { mind.run(mind_rx, turn_rx).await; }); crate::user::event_loop::run( app, ui_agent, shared_mind, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, channel_tx, channel_rx, notify_rx, idle_state, ).await }