// mind/ — Cognitive layer // // Mind state machine, DMN, identity, observation socket. // Everything about how the mind operates, separate from the // user interface (TUI, CLI) and the agent execution (tools, API). pub mod subconscious; pub mod unconscious; pub mod identity; pub mod log; /// A background operation wired off Mind. Each flow (memory scoring, /// finetune scoring, compare) is a struct holding its dependencies and /// a TaskHandle; `trigger()` picks the flow's own "start a fresh run" /// semantics (abort-restart vs no-op-if-running). pub trait MindTriggered { fn trigger(&self); } /// Owns a JoinHandle for a background task with two trigger semantics. /// Uses a sync Mutex for interior mutability so callers can `trigger()` /// off `&self` (Mind is shared via Arc). #[derive(Default)] pub struct TaskHandle(std::sync::Mutex>>); impl TaskHandle { pub fn new() -> Self { Self::default() } /// Abort any running task and start a fresh one. pub fn trigger(&self, fut: F) where F: std::future::Future + Send + 'static { let mut h = self.0.lock().unwrap(); if let Some(old) = h.take() { old.abort(); } *h = Some(tokio::spawn(fut)); } /// No-op if a task is still running; otherwise start a fresh one. pub fn trigger_if_idle(&self, fut: F) where F: std::future::Future + Send + 'static { let mut h = self.0.lock().unwrap(); if let Some(old) = &*h { if !old.is_finished() { return; } } *h = Some(tokio::spawn(fut)); } } // consciousness.rs — Mind state machine and event loop // // The core runtime for the consciousness binary. Mind manages turns, // DMN state, compaction, scoring, and slash commands. The event loop // bridges Mind (cognitive state) with App (TUI rendering). // // The event loop uses biased select! so priorities are deterministic: // keyboard events > turn results > render ticks > DMN timer > UI messages. use anyhow::Result; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; use crate::config::{AppConfig, SessionConfig}; use crate::subconscious::{compare, learn}; use crate::hippocampus::access_local; pub use subconscious::{SubconsciousSnapshot, Subconscious}; pub use unconscious::{UnconsciousSnapshot, Unconscious}; use crate::agent::context::{AstNode, NodeBody, Section, Ast, ContextState}; fn match_scores( nodes: &[AstNode], scores: &std::collections::BTreeMap, ) -> Vec<(usize, f64)> { nodes.iter().enumerate() .filter_map(|(i, node)| { if let AstNode::Leaf(leaf) = node { if let NodeBody::Memory { key, .. } = leaf.body() { return scores.get(key.as_str()).map(|&s| (i, s)); } } None }).collect() } pub(crate) fn find_memory_by_key(ctx: &ContextState, key: &str) -> Option<(Section, usize)> { [(Section::Identity, ctx.identity()), (Section::Conversation, ctx.conversation())] .into_iter() .find_map(|(section, nodes)| { nodes.iter().enumerate().find_map(|(i, node)| { if let AstNode::Leaf(leaf) = node { if let NodeBody::Memory { key: k, .. } = leaf.body() { if k == key { return Some((section, i)); } } } None }) }) } fn load_memory_scores(ctx: &mut ContextState, path: &std::path::Path) { let data = match std::fs::read_to_string(path) { Ok(d) => d, Err(_) => return, }; let scores: std::collections::BTreeMap = match serde_json::from_str(&data) { Ok(s) => s, Err(_) => return, }; let identity_scores = match_scores(ctx.identity(), &scores); let conv_scores = match_scores(ctx.conversation(), &scores); let applied = identity_scores.len() + conv_scores.len(); for (i, s) in identity_scores { ctx.set_score(Section::Identity, i, Some(s)); } for (i, s) in conv_scores { ctx.set_score(Section::Conversation, i, Some(s)); } if applied > 0 { dbglog!("[scoring] loaded {} scores from {}", applied, path.display()); } } /// Collect scored memory keys from identity and conversation entries. pub(crate) fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap { ctx.identity().iter() .chain(ctx.conversation().iter()) .filter_map(|node| { if let AstNode::Leaf(leaf) = node { if let NodeBody::Memory { key, score: Some(s), .. } = leaf.body() { return Some((key.clone(), *s)); } } None }) .collect() } /// Save memory scores to disk. pub(crate) fn save_memory_scores(scores: &std::collections::BTreeMap, path: &std::path::Path) { match serde_json::to_string_pretty(scores) { Ok(json) => match std::fs::write(path, &json) { Ok(()) => dbglog!("[scoring] saved {} scores to {} ({} bytes)", scores.len(), path.display(), json.len()), Err(e) => dbglog!("[scoring] save FAILED ({}): {}", path.display(), e), }, Err(e) => dbglog!("[scoring] serialize FAILED: {}", e), } } /// Which pane streaming text should go to. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamTarget { /// User-initiated turn — text goes to conversation pane. Conversation, /// DMN-initiated turn — text goes to autonomous pane. Autonomous, } /// Compaction threshold — context is rebuilt when prompt tokens exceed this. fn compaction_threshold(app: &AppConfig) -> u32 { (crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100 } /// Shared state between Mind and UI. pub struct MindState { /// Pending user input — UI pushes, Mind consumes after turn completes. pub input: Vec, /// True while a turn is in progress. pub turn_active: bool, /// DMN state pub dmn: subconscious::State, pub dmn_turns: u32, pub max_dmn_turns: u32, /// Whether memory scoring is running. pub scoring_in_flight: bool, /// Whether compaction is running. pub compaction_in_flight: bool, /// Per-turn tracking pub last_user_input: Instant, pub consecutive_errors: u32, pub last_turn_had_tools: bool, /// Handle to the currently running turn task. pub turn_handle: Option>, /// Unconscious agent idle state — true when 60s timer has expired. pub unc_idle: bool, /// When the unconscious idle timer will fire (for UI display). pub unc_idle_deadline: Instant, /// Fine-tuning candidates identified by scoring. pub finetune_candidates: Vec, /// Last scoring run stats for UI display. pub finetune_last_run: Option, /// F7 compare candidates — one per response, showing what the test /// model would say given the same context. pub compare_candidates: Vec, /// F7 compare error from the last run, if any. pub compare_error: Option, } impl Clone for MindState { fn clone(&self) -> Self { Self { input: self.input.clone(), turn_active: self.turn_active, dmn: self.dmn.clone(), dmn_turns: self.dmn_turns, max_dmn_turns: self.max_dmn_turns, scoring_in_flight: self.scoring_in_flight, compaction_in_flight: self.compaction_in_flight, last_user_input: self.last_user_input, consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, turn_handle: None, // Not cloned — only Mind's loop uses this unc_idle: self.unc_idle, unc_idle_deadline: self.unc_idle_deadline, finetune_candidates: self.finetune_candidates.clone(), finetune_last_run: self.finetune_last_run.clone(), compare_candidates: self.compare_candidates.clone(), compare_error: self.compare_error.clone(), } } } /// What should happen after a state transition. pub enum MindCommand { /// Run compaction check Compact, /// Run incremental memory scoring (auto, after turns) Score, /// Run full N×M memory scoring matrix (/score command) ScoreFull, /// Score for finetune candidates ScoreFinetune, /// Run F7 compare: generate alternates with the configured test model /// for every assistant response in the context. Compare, /// Update the finetune divergence threshold and persist to config. SetLearnThreshold(f64), /// Toggle alternate-response generation during scoring; persist to config. SetLearnGenerateAlternates(bool), /// Abort current turn, kill processes Interrupt, /// Reset session NewSession, /// Nothing to do None, } impl MindState { pub fn new(max_dmn_turns: u32) -> Self { Self { input: Vec::new(), turn_active: false, dmn: if subconscious::is_off() { subconscious::State::Off } else { subconscious::State::Resting { since: Instant::now() } }, dmn_turns: 0, max_dmn_turns, scoring_in_flight: false, compaction_in_flight: false, last_user_input: Instant::now(), consecutive_errors: 0, last_turn_had_tools: false, turn_handle: None, unc_idle: false, unc_idle_deadline: Instant::now() + std::time::Duration::from_secs(60), finetune_candidates: Vec::new(), finetune_last_run: None, compare_candidates: Vec::new(), compare_error: None, } } /// Is there pending user input waiting? fn has_pending_input(&self) -> bool { !self.turn_active && !self.input.is_empty() } /// Consume pending user input if no turn is active. /// Returns the text to send; caller is responsible for pushing it /// into the Agent's context and starting the turn. fn take_pending_input(&mut self) -> Option { if self.turn_active || self.input.is_empty() { return None; } let text = self.input.join("\n"); self.input.clear(); self.dmn_turns = 0; self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = subconscious::State::Engaged; Some(text) } /// Process turn completion, return model switch name if requested. fn complete_turn(&mut self, result: &Result, target: StreamTarget) -> Option { self.turn_active = false; match result { Ok(turn_result) => { if turn_result.tool_errors > 0 { self.consecutive_errors += turn_result.tool_errors; } else { self.consecutive_errors = 0; } self.last_turn_had_tools = turn_result.had_tool_calls; self.dmn = subconscious::transition( &self.dmn, turn_result.yield_requested, turn_result.had_tool_calls, target == StreamTarget::Conversation, ); if turn_result.dmn_pause { self.dmn = subconscious::State::Paused; self.dmn_turns = 0; } turn_result.model_switch.clone() } Err(_) => { self.consecutive_errors += 1; self.dmn = subconscious::State::Resting { since: Instant::now() }; None } } } /// DMN tick — returns a prompt and target if we should run a turn. fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> { if matches!(self.dmn, subconscious::State::Paused | subconscious::State::Off) { return None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { self.dmn = subconscious::State::Resting { since: Instant::now() }; self.dmn_turns = 0; return None; } let dmn_ctx = subconscious::DmnContext { user_idle: self.last_user_input.elapsed(), consecutive_errors: self.consecutive_errors, last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); Some((prompt, StreamTarget::Autonomous)) } fn interrupt(&mut self) { self.input.clear(); self.dmn = subconscious::State::Resting { since: Instant::now() }; } } // --- Mind: cognitive state machine --- pub type SharedMindState = std::sync::Mutex; pub struct Mind { pub agent: Arc, pub shared: Arc, pub config: SessionConfig, pub subconscious: Arc>, pub unconscious: Arc>, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, /// Signals conscious activity to the unconscious loop. /// true = active, false = idle opportunity. conscious_active: tokio::sync::watch::Sender, memory_scoring: learn::MemoryScoring, finetune_scoring: learn::FinetuneScoring, compare_scoring: compare::CompareScoring, _supervisor: crate::thalamus::supervisor::Supervisor, } impl Mind { pub async fn new( config: SessionConfig, turn_tx: mpsc::Sender<(Result, StreamTarget)>, ) -> Self { let client = ApiClient::new(&config.api_base, &config.api_key, &config.model); let conversation_log = log::ConversationLog::new( config.session_dir.join("conversation.jsonl"), ).ok(); let agent = Agent::new( client, config.context_parts.clone(), config.app.clone(), conversation_log, crate::agent::tools::ActiveTools::new(), crate::agent::tools::tools(), ).await; // Migrate legacy "file exists = enabled" sentinel for the // generate-alternates flag into the config. One-shot; after this // the sentinel is gone and the config is the source of truth. let legacy_sentinel = dirs::home_dir().unwrap_or_default() .join(".consciousness/cache/finetune-alternates"); if legacy_sentinel.exists() { if !crate::config::app().learn.generate_alternates { let _ = crate::config_writer::set_learn_generate_alternates(true); } let _ = std::fs::remove_file(&legacy_sentinel); } let shared = Arc::new(std::sync::Mutex::new(MindState::new( config.app.dmn.max_turns, ))); let (turn_watch, _) = tokio::sync::watch::channel(false); let (conscious_active, _) = tokio::sync::watch::channel(false); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); let subconscious = Arc::new(crate::Mutex::new(Subconscious::new())); subconscious.lock().await.init_output_tool(subconscious.clone()); let unconscious = Arc::new(crate::Mutex::new(Unconscious::new())); // Spawn the unconscious loop on its own task if !config.no_agents { let unc = unconscious.clone(); let shared_for_unc = shared.clone(); let mut unc_rx = conscious_active.subscribe(); tokio::spawn(async move { const IDLE_DELAY: std::time::Duration = std::time::Duration::from_secs(60); loop { // Wait for conscious side to go inactive if *unc_rx.borrow() { if unc_rx.changed().await.is_err() { break; } continue; } // Conscious is inactive — wait 60s before starting let deadline = tokio::time::Instant::now() + IDLE_DELAY; { let mut s = shared_for_unc.lock().unwrap(); s.unc_idle = false; s.unc_idle_deadline = Instant::now() + IDLE_DELAY; } let went_active = tokio::select! { _ = tokio::time::sleep_until(deadline) => false, r = unc_rx.changed() => r.is_ok(), }; if went_active { continue; } // Idle period reached — run agents until conscious goes active { let mut s = shared_for_unc.lock().unwrap(); s.unc_idle = true; } // Get wake notify for event-driven loop let wake = unc.lock().await.wake.clone(); let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600)); health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { // Do work: reap finished agents, spawn new ones let (to_spawn, needs_health) = { let mut guard = unc.lock().await; guard.reap_finished(); (guard.select_to_spawn(), guard.needs_health_refresh()) }; // Spawn agents outside lock for (idx, name, auto) in to_spawn { match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await { Ok(result) => unc.lock().await.complete_spawn(idx, result), Err(auto) => unc.lock().await.abort_spawn(idx, auto), } } // Health check outside lock (slow I/O) if needs_health { if let Ok(store_arc) = access_local() { let health = crate::subconscious::daemon::compute_graph_health(&store_arc); unc.lock().await.set_health(health); } } // Wait for: conscious active, agent finished, or health timer tokio::select! { _ = unc_rx.changed() => { if *unc_rx.borrow() { break; } } _ = wake.notified() => {} _ = health_interval.tick() => {} } } } }); } let scores_path = config.session_dir.join("memory-scores.json"); let memory_scoring = learn::MemoryScoring::new( agent.clone(), shared.clone(), scores_path); let finetune_scoring = learn::FinetuneScoring::new(agent.clone(), shared.clone()); let compare_scoring = compare::CompareScoring::new(agent.clone(), shared.clone()); Self { agent, shared, config, subconscious, unconscious, turn_tx, turn_watch, conscious_active, memory_scoring, finetune_scoring, compare_scoring, _supervisor: sup } } /// Initialize — restore log, start daemons and background agents. pub async fn subconscious_snapshots(&self) -> Vec { // Lock ordering: subconscious → store (store is bottom-most). let sub = self.subconscious.lock().await; let store_arc = crate::hippocampus::access_local().ok(); let store_guard = match &store_arc { Some(s) => Some(&**s), None => None, }; sub.snapshots(store_guard.as_deref()) } pub async fn subconscious_walked(&self) -> Vec { self.subconscious.lock().await.walked() } pub async fn unconscious_snapshots(&self) -> Vec { let unc = self.unconscious.lock().await; let store_arc = crate::hippocampus::access_local().ok(); let store_guard = match &store_arc { Some(s) => Some(&**s), None => None, }; unc.snapshots(store_guard.as_deref()) } pub async fn init(&self) { // Restore conversation self.agent.restore_from_log().await; // Restore persisted memory scores let scores_path = self.config.session_dir.join("memory-scores.json"); load_memory_scores(&mut *self.agent.context.lock().await, &scores_path); self.agent.state.lock().await.changed.notify_one(); // Load persistent subconscious state let state_path = self.config.session_dir.join("subconscious-state.json"); self.subconscious.lock().await.set_state_path(state_path); // Kick off an incremental scoring pass on startup so memories due // for re-scoring get evaluated without requiring a user message. self.memory_scoring.trigger(); } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { self.turn_watch.subscribe() } /// Execute an Action from a MindState method. async fn run_commands(&self, cmds: Vec) { for cmd in cmds { match cmd { MindCommand::None => {} MindCommand::Compact => { let threshold = compaction_threshold(&self.config.app) as usize; if self.agent.context.lock().await.tokens() > threshold { self.agent.compact().await; self.agent.state.lock().await.notify("compacted"); } } MindCommand::Score => { self.memory_scoring.trigger(); } MindCommand::ScoreFull => { self.memory_scoring.trigger_full(); } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); self.agent.state.lock().await.active_tools.abort_all(); if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); } self.shared.lock().unwrap().turn_active = false; let _ = self.turn_watch.send(false); } MindCommand::NewSession => { { let mut s = self.shared.lock().unwrap(); s.dmn = subconscious::State::Resting { since: Instant::now() }; s.dmn_turns = 0; } let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), ).ok(); { let mut ctx = self.agent.context.lock().await; ctx.clear(Section::Conversation); ctx.conversation_log = new_log; } { let mut st = self.agent.state.lock().await; st.generation += 1; st.last_prompt_tokens = 0; } self.agent.compact().await; } MindCommand::ScoreFinetune => { self.finetune_scoring.trigger(); } MindCommand::Compare => { self.compare_scoring.trigger(); } MindCommand::SetLearnThreshold(value) => { if let Err(e) = crate::config_writer::set_learn_threshold(value) { dbglog!("[learn] failed to persist threshold {}: {:#}", value, e); } } MindCommand::SetLearnGenerateAlternates(value) => { if let Err(e) = crate::config_writer::set_learn_generate_alternates(value) { dbglog!("[learn] failed to persist generate_alternates {}: {:#}", value, e); } } } } } async fn start_turn(&self, text: &str, target: StreamTarget) { { match target { StreamTarget::Conversation => { self.agent.push_node(AstNode::user_msg(text)).await; } StreamTarget::Autonomous => { self.agent.push_node(AstNode::dmn(text)).await; } } // Compact if over budget before sending let threshold = compaction_threshold(&self.config.app) as usize; if self.agent.context.lock().await.tokens() > threshold { self.agent.compact().await; self.agent.state.lock().await.notify("compacted"); } } self.shared.lock().unwrap().turn_active = true; let _ = self.turn_watch.send(true); let _ = self.conscious_active.send(true); let agent = self.agent.clone(); let result_tx = self.turn_tx.clone(); self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move { let result = Agent::turn(agent).await; let _ = result_tx.send((result, target)).await; })); } pub async fn shutdown(&self) { if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); } } /// Mind event loop — locks MindState, calls state methods, executes actions. pub async fn run( &self, mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { // Spawn lock stats logger tokio::spawn(async { let path = dirs::home_dir().unwrap_or_default() .join(".consciousness/lock-stats.json"); let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); loop { interval.tick().await; let stats = crate::locks::lock_stats(); if stats.is_empty() { continue; } let json: Vec = stats.iter() .map(|(loc, s)| serde_json::json!({ "location": loc, "count": s.count, "total_ms": s.total_ns as f64 / 1_000_000.0, "avg_ms": s.avg_ns as f64 / 1_000_000.0, "max_ms": s.max_ns as f64 / 1_000_000.0, })) .collect(); let _ = std::fs::write(&path, serde_json::to_string_pretty(&json).unwrap_or_default()); } }); let mut sub_handle: Option> = None; // Start finetune scoring at startup (scores existing conversation) if !self.config.no_agents { self.finetune_scoring.trigger(); } loop { let (timeout, has_input) = { let me = self.shared.lock().unwrap(); (me.dmn.interval(), me.has_pending_input()) }; let mut cmds = Vec::new(); #[allow(unused_assignments)] let mut _dmn_expired = false; tokio::select! { biased; cmd = input_rx.recv() => { match cmd { Some(cmd) => cmds.push(cmd), None => break, // UI shut down } } Some((result, target)) = turn_rx.recv() => { let _ = self.conscious_active.send(false); let model_switch = { let mut s = self.shared.lock().unwrap(); s.turn_handle = None; s.complete_turn(&result, target) }; let _ = self.turn_watch.send(false); if let Some(name) = model_switch { crate::user::chat::cmd_switch_model(&self.agent, &name).await; } cmds.push(MindCommand::Compact); if !self.config.no_agents { cmds.push(MindCommand::Score); cmds.push(MindCommand::ScoreFinetune); } } _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, } if !self.config.no_agents { if sub_handle.as_ref().map_or(true, |h| h.is_finished()) { let sub = self.subconscious.clone(); let agent = self.agent.clone(); sub_handle = Some(tokio::spawn(async move { let mut s = sub.lock().await; s.collect_results(&agent).await; s.trigger(&agent).await; })); } } // Check for pending user input → push to agent context and start turn let pending = self.shared.lock().unwrap().take_pending_input(); if let Some(text) = pending { self.start_turn(&text, StreamTarget::Conversation).await; } /* else if dmn_expired { let tick = self.shared.lock().unwrap().dmn_tick(); if let Some((prompt, target)) = tick { self.start_turn(&prompt, target).await; } } */ self.run_commands(cmds).await; } } }