// mind/ — Cognitive layer // // Mind state machine, DMN, identity, observation socket. // Everything about how the mind operates, separate from the // user interface (TUI, CLI) and the agent execution (tools, API). pub mod dmn; pub mod identity; pub mod log; // consciousness.rs — Mind state machine and event loop // // The core runtime for the consciousness binary. Mind manages turns, // DMN state, compaction, scoring, and slash commands. The event loop // bridges Mind (cognitive state) with App (TUI rendering). // // The event loop uses biased select! so priorities are deterministic: // keyboard events > turn results > render ticks > DMN timer > UI messages. use anyhow::Result; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; use crate::agent::oneshot::{AutoAgent, AutoStep}; use crate::config::{AppConfig, SessionConfig}; use crate::subconscious::{defs, learn}; // --------------------------------------------------------------------------- // Subconscious agents — forked from conscious agent, run on schedule // --------------------------------------------------------------------------- /// A subconscious agent managed by Mind. struct SubconsciousAgent { auto: AutoAgent, /// Conversation bytes at last trigger. last_trigger_bytes: u64, /// When the agent last ran. last_run: Option, /// Running task handle. handle: Option>>, } /// Names and byte-interval triggers for the built-in subconscious agents. const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[ ("subconscious-surface-observe", 0), // every trigger ("subconscious-journal", 20_000), // every ~20KB of conversation ("subconscious-reflect", 100_000), // every ~100KB of conversation ]; impl SubconsciousAgent { fn new(name: &str, _interval_bytes: u64) -> Option { let def = defs::get_def(name)?; let all_tools = crate::agent::tools::memory_and_journal_tools(); let tools: Vec = if def.tools.is_empty() { all_tools.to_vec() } else { all_tools.into_iter() .filter(|t| def.tools.iter().any(|w| w == t.name)) .collect() }; let steps: Vec = def.steps.iter().map(|s| AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone(), }).collect(); let auto = AutoAgent::new( name.to_string(), tools, steps, def.temperature.unwrap_or(0.6), def.priority, ); Some(Self { auto, last_trigger_bytes: 0, last_run: None, handle: None, }) } fn is_running(&self) -> bool { self.handle.as_ref().is_some_and(|h| !h.is_finished()) } fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool { if self.is_running() { return false; } if interval == 0 { return true; } conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval } } /// Lightweight snapshot of subconscious agent state for the TUI. #[derive(Clone, Default)] pub struct SubconsciousSnapshot { pub name: String, pub running: bool, pub current_phase: String, pub turn: usize, pub walked_count: usize, pub last_run_secs_ago: Option, /// Entries from the last forked run (after fork point). pub last_run_entries: Vec, } impl SubconsciousAgent { fn snapshot(&self) -> SubconsciousSnapshot { SubconsciousSnapshot { name: self.auto.name.clone(), running: self.is_running(), current_phase: self.auto.current_phase.clone(), turn: self.auto.turn, walked_count: self.auto.walked.len(), last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()), last_run_entries: self.auto.last_run_entries.clone(), } } } /// Which pane streaming text should go to. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamTarget { /// User-initiated turn — text goes to conversation pane. Conversation, /// DMN-initiated turn — text goes to autonomous pane. Autonomous, } /// Compaction threshold — context is rebuilt when prompt tokens exceed this. fn compaction_threshold(app: &AppConfig) -> u32 { (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 } /// Shared state between Mind and UI. pub struct MindState { /// Pending user input — UI pushes, Mind consumes after turn completes. pub input: Vec, /// True while a turn is in progress. pub turn_active: bool, /// DMN state pub dmn: dmn::State, pub dmn_turns: u32, pub max_dmn_turns: u32, /// Whether memory scoring is running. pub scoring_in_flight: bool, /// Whether compaction is running. pub compaction_in_flight: bool, /// Per-turn tracking pub last_user_input: Instant, pub consecutive_errors: u32, pub last_turn_had_tools: bool, /// Handle to the currently running turn task. pub turn_handle: Option>, } impl Clone for MindState { fn clone(&self) -> Self { Self { input: self.input.clone(), turn_active: self.turn_active, dmn: self.dmn.clone(), dmn_turns: self.dmn_turns, max_dmn_turns: self.max_dmn_turns, scoring_in_flight: self.scoring_in_flight, compaction_in_flight: self.compaction_in_flight, last_user_input: self.last_user_input, consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, turn_handle: None, // Not cloned — only Mind's loop uses this } } } /// What should happen after a state transition. pub enum MindCommand { /// Run compaction check Compact, /// Run memory scoring Score, /// Abort current turn, kill processes Interrupt, /// Reset session NewSession, /// Nothing to do None, } impl MindState { pub fn new(max_dmn_turns: u32) -> Self { Self { input: Vec::new(), turn_active: false, dmn: if dmn::is_off() { dmn::State::Off } else { dmn::State::Resting { since: Instant::now() } }, dmn_turns: 0, max_dmn_turns, scoring_in_flight: false, compaction_in_flight: false, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, turn_handle: None, } } /// Consume pending user input if no turn is active. /// Returns the text to send; caller is responsible for pushing it /// into the Agent's context and starting the turn. fn take_pending_input(&mut self) -> Option { if self.turn_active || self.input.is_empty() { return None; } let text = self.input.join("\n"); self.input.clear(); self.dmn_turns = 0; self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = dmn::State::Engaged; Some(text) } /// Process turn completion, return model switch name if requested. fn complete_turn(&mut self, result: &Result, target: StreamTarget) -> Option { self.turn_active = false; match result { Ok(turn_result) => { if turn_result.tool_errors > 0 { self.consecutive_errors += turn_result.tool_errors; } else { self.consecutive_errors = 0; } self.last_turn_had_tools = turn_result.had_tool_calls; self.dmn = dmn::transition( &self.dmn, turn_result.yield_requested, turn_result.had_tool_calls, target == StreamTarget::Conversation, ); if turn_result.dmn_pause { self.dmn = dmn::State::Paused; self.dmn_turns = 0; } turn_result.model_switch.clone() } Err(_) => { self.consecutive_errors += 1; self.dmn = dmn::State::Resting { since: Instant::now() }; None } } } /// DMN tick — returns a prompt and target if we should run a turn. fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> { if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) { return None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; return None; } let dmn_ctx = dmn::DmnContext { user_idle: self.last_user_input.elapsed(), consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); Some((prompt, StreamTarget::Autonomous)) } fn interrupt(&mut self) { self.input.clear(); self.dmn = dmn::State::Resting { since: Instant::now() }; } } /// Background task completion events. enum BgEvent { ScoringDone, } // --- Mind: cognitive state machine --- pub type SharedMindState = std::sync::Mutex; pub struct Mind { pub agent: Arc>, pub shared: Arc, pub config: SessionConfig, subconscious: Arc>>, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, bg_rx: std::sync::Mutex>>, _supervisor: crate::thalamus::supervisor::Supervisor, } impl Mind { pub fn new( config: SessionConfig, turn_tx: mpsc::Sender<(Result, StreamTarget)>, ) -> Self { let shared_context = crate::agent::context::shared_context_state(); let shared_active_tools = crate::agent::tools::shared_active_tools(); let client = ApiClient::new(&config.api_base, &config.api_key, &config.model); let conversation_log = log::ConversationLog::new( config.session_dir.join("conversation.jsonl"), ).ok(); let ag = Agent::new( client, config.system_prompt.clone(), config.context_parts.clone(), config.app.clone(), config.prompt_file.clone(), conversation_log, shared_context, shared_active_tools, ); let agent = Arc::new(tokio::sync::Mutex::new(ag)); let subconscious = SUBCONSCIOUS_AGENTS.iter() .filter_map(|(name, interval)| SubconsciousAgent::new(name, *interval)) .collect(); let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); let (turn_watch, _) = tokio::sync::watch::channel(false); let (bg_tx, bg_rx) = mpsc::unbounded_channel(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); Self { agent, shared, config, subconscious: Arc::new(tokio::sync::Mutex::new(subconscious)), turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } /// Initialize — restore log, start daemons and background agents. pub async fn subconscious_snapshots(&self) -> Vec { self.subconscious.lock().await.iter().map(|s| s.snapshot()).collect() } pub async fn init(&self) { // Restore conversation let mut ag = self.agent.lock().await; ag.restore_from_log(); ag.changed.notify_one(); drop(ag); } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { self.turn_watch.subscribe() } /// Execute an Action from a MindState method. async fn run_commands(&self, cmds: Vec) { for cmd in cmds { match cmd { MindCommand::None => {} MindCommand::Compact => { let threshold = compaction_threshold(&self.config.app) as usize; let mut ag = self.agent.lock().await; let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default(); if crate::agent::context::sections_used(§ions) > threshold { ag.compact(); ag.notify("compacted"); } } MindCommand::Score => { let mut s = self.shared.lock().unwrap(); if !s.scoring_in_flight { s.scoring_in_flight = true; drop(s); self.start_memory_scoring(); } } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); let ag = self.agent.lock().await; let mut tools = ag.active_tools.lock().unwrap(); for entry in tools.drain(..) { entry.handle.abort(); } drop(tools); drop(ag); if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); } self.shared.lock().unwrap().turn_active = false; let _ = self.turn_watch.send(false); } MindCommand::NewSession => { { let mut s = self.shared.lock().unwrap(); s.dmn = dmn::State::Resting { since: Instant::now() }; s.dmn_turns = 0; } let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), ).ok(); let mut ag = self.agent.lock().await; let shared_ctx = ag.shared_context.clone(); let shared_tools = ag.active_tools.clone(); *ag = Agent::new( ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model), self.config.system_prompt.clone(), self.config.context_parts.clone(), self.config.app.clone(), self.config.prompt_file.clone(), new_log, shared_ctx, shared_tools, ); } } } } pub fn start_memory_scoring(&self) { let agent = self.agent.clone(); let bg_tx = self.bg_tx.clone(); let cfg = crate::config::get(); let max_age = cfg.scoring_interval_secs; let response_window = cfg.scoring_response_window; tokio::spawn(async move { let (context, client) = { let mut ag = agent.lock().await; if ag.memory_scoring_in_flight { return; } ag.memory_scoring_in_flight = true; (ag.context.clone(), ag.client_clone()) }; let result = learn::score_memories_incremental( &context, max_age as i64, response_window, &client, &agent, ).await; { let mut ag = agent.lock().await; ag.memory_scoring_in_flight = false; if let Ok(ref scores) = result { ag.memory_scores = scores.clone(); } } let _ = bg_tx.send(BgEvent::ScoringDone); }); } /// Push user/DMN message into the agent's context and spawn a turn. /// The text moves from pending_input to ContextState atomically — /// by the time this returns, the message is in context and the turn /// is running. /// Collect results from finished subconscious agents and inject /// their output into the conscious agent's context. async fn collect_subconscious_results(&self) { // Collect finished handles without holding the lock across await let finished: Vec<(usize, tokio::task::JoinHandle>)> = { let mut subs = self.subconscious.lock().await; subs.iter_mut().enumerate().filter_map(|(i, sub)| { if sub.handle.as_ref().is_some_and(|h| h.is_finished()) { sub.last_run = Some(Instant::now()); Some((i, sub.handle.take().unwrap())) } else { None } }).collect() }; for (idx, handle) in finished { match handle.await { Ok(Ok(_)) => { // The outer task already put the AutoAgent back — // read outputs from it let mut subs = self.subconscious.lock().await; let name = subs[idx].auto.name.clone(); let outputs = std::mem::take(&mut subs[idx].auto.outputs); // Walked keys — update all subconscious agents if let Some(walked_str) = outputs.get("walked") { let walked: Vec = walked_str.lines() .map(|l| l.trim().to_string()) .filter(|l| !l.is_empty()) .collect(); for sub in subs.iter_mut() { sub.auto.walked = walked.clone(); } } drop(subs); // Surfaced memories → inject into conscious agent if let Some(surface_str) = outputs.get("surface") { let mut ag = self.agent.lock().await; for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { if let Some(rendered) = crate::cli::node::render_node( &crate::store::Store::load().unwrap_or_default(), key, ) { let mut msg = crate::agent::api::types::Message::user(format!( "\n--- {} (surfaced) ---\n{}\n", key, rendered, )); msg.stamp(); ag.push_entry(crate::agent::context::ConversationEntry::Memory { key: key.to_string(), message: msg, }); } } } // Reflection → inject into conscious agent if let Some(reflection) = outputs.get("reflection") { if !reflection.trim().is_empty() { let mut ag = self.agent.lock().await; ag.push_message(crate::agent::api::types::Message::user(format!( "\n--- subconscious reflection ---\n{}\n", reflection.trim(), ))); } } dbglog!("[mind] {} completed", name); } Ok(Err(e)) => dbglog!("[mind] subconscious agent failed: {}", e), Err(e) => dbglog!("[mind] subconscious agent panicked: {}", e), } } } /// Trigger subconscious agents that are due to run. async fn trigger_subconscious(&self) { if self.config.no_agents { return; } // Get conversation size + memory keys from conscious agent let (conversation_bytes, memory_keys) = { let ag = self.agent.lock().await; let bytes = ag.context.entries.iter() .filter(|e| !e.is_log() && !e.is_memory()) .map(|e| e.message().content_text().len() as u64) .sum::(); let keys: Vec = ag.context.entries.iter().filter_map(|e| { if let crate::agent::context::ConversationEntry::Memory { key, .. } = e { Some(key.clone()) } else { None } }).collect(); (bytes, keys) }; // Find which agents to trigger, take their AutoAgents out let mut to_run: Vec<(usize, AutoAgent)> = Vec::new(); { let mut subs = self.subconscious.lock().await; for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() { if i >= subs.len() { continue; } if !subs[i].should_trigger(conversation_bytes, interval) { continue; } subs[i].last_trigger_bytes = conversation_bytes; // Take the AutoAgent out — task owns it, returns it when done let auto = std::mem::replace(&mut subs[i].auto, AutoAgent::new(String::new(), vec![], vec![], 0.0, 0)); to_run.push((i, auto)); } } if to_run.is_empty() { return; } // Fork from conscious agent and spawn tasks let conscious = self.agent.lock().await; let mut spawns = Vec::new(); for (idx, mut auto) in to_run { dbglog!("[mind] triggering {}", auto.name); let forked = conscious.fork(auto.tools.clone()); let keys = memory_keys.clone(); let handle: tokio::task::JoinHandle<(AutoAgent, Result)> = tokio::spawn(async move { let result = auto.run_forked(&forked, &keys).await; (auto, result) }); spawns.push((idx, handle)); } drop(conscious); // Store handles (type-erased — we'll extract AutoAgent on completion) // We need to store the JoinHandle that returns (AutoAgent, Result) // but SubconsciousAgent.handle expects JoinHandle>. // Wrap: spawn an outer task that extracts the result and puts back the AutoAgent. let subconscious = self.subconscious.clone(); for (idx, handle) in spawns { let subs = subconscious.clone(); let outer = tokio::spawn(async move { let (auto, result) = handle.await.unwrap_or_else( |e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0), Err(format!("task panicked: {}", e)))); // Put the AutoAgent back let mut locked = subs.lock().await; if idx < locked.len() { locked[idx].auto = auto; } result }); let mut subs = self.subconscious.lock().await; if idx < subs.len() { subs[idx].handle = Some(outer); } } } async fn start_turn(&self, text: &str, target: StreamTarget) { { let mut ag = self.agent.lock().await; match target { StreamTarget::Conversation => { ag.push_message(crate::agent::api::types::Message::user(text)); } StreamTarget::Autonomous => { let mut msg = crate::agent::api::types::Message::user(text); msg.stamp(); ag.push_entry(crate::agent::context::ConversationEntry::Dmn(msg)); } } // Compact if over budget before sending let threshold = compaction_threshold(&self.config.app) as usize; ag.publish_context_state(); let used = { let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default(); crate::agent::context::sections_used(§ions) }; if used > threshold { ag.compact(); ag.notify("compacted"); } } self.shared.lock().unwrap().turn_active = true; let _ = self.turn_watch.send(true); let agent = self.agent.clone(); let result_tx = self.turn_tx.clone(); self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move { let result = Agent::turn(agent).await; let _ = result_tx.send((result, target)).await; })); } pub async fn shutdown(&self) { if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); } } /// Mind event loop — locks MindState, calls state methods, executes actions. pub async fn run( &self, mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { let mut bg_rx = self.bg_rx.lock().unwrap().take() .expect("Mind::run() called twice"); loop { let timeout = self.shared.lock().unwrap().dmn.interval(); let turn_active = self.shared.lock().unwrap().turn_active; let mut cmds = Vec::new(); tokio::select! { biased; cmd = input_rx.recv() => { match cmd { Some(cmd) => cmds.push(cmd), None => break, // UI shut down } } Some(bg) = bg_rx.recv() => { match bg { BgEvent::ScoringDone => { self.shared.lock().unwrap().scoring_in_flight = false; } } } Some((result, target)) = turn_rx.recv() => { self.shared.lock().unwrap().turn_handle = None; let model_switch = self.shared.lock().unwrap().complete_turn(&result, target); let _ = self.turn_watch.send(false); if let Some(name) = model_switch { crate::user::chat::cmd_switch_model(&self.agent, &name).await; } // Post-turn maintenance { let mut ag = self.agent.lock().await; ag.age_out_images(); ag.publish_context_state(); } cmds.push(MindCommand::Compact); if !self.config.no_agents { cmds.push(MindCommand::Score); } // Trigger subconscious agents after conscious turn completes self.collect_subconscious_results().await; self.trigger_subconscious().await; } _ = tokio::time::sleep(timeout), if !turn_active => { let tick = self.shared.lock().unwrap().dmn_tick(); if let Some((prompt, target)) = tick { self.start_turn(&prompt, target).await; } } } // Check for pending user input → push to agent context and start turn let pending = self.shared.lock().unwrap().take_pending_input(); if let Some(text) = pending { self.start_turn(&text, StreamTarget::Conversation).await; } self.run_commands(cmds).await; } } }