consciousness/src/mind/mod.rs
ProofOfConcept 7aba17e5f0 Compute graph health in consciousness, rename F4 to hippocampus
Graph health stats (alpha, gini, cc, episodic ratio, consolidation
plan) now computed directly by the unconscious module on startup and
every 10 minutes, instead of fetching from the poc-memory daemon.

F4 screen renamed to hippocampus, stripped down to just the health
gauges — daemon task list removed (agents now shown on F3).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 00:45:26 -04:00

569 lines
21 KiB
Rust

// mind/ — Cognitive layer
//
// Mind state machine, DMN, identity, observation socket.
// Everything about how the mind operates, separate from the
// user interface (TUI, CLI) and the agent execution (tools, API).
pub mod subconscious;
pub mod unconscious;
pub mod identity;
pub mod log;
// consciousness.rs — Mind state machine and event loop
//
// The core runtime for the consciousness binary. Mind manages turns,
// DMN state, compaction, scoring, and slash commands. The event loop
// bridges Mind (cognitive state) with App (TUI rendering).
//
// The event loop uses biased select! so priorities are deterministic:
// keyboard events > turn results > render ticks > DMN timer > UI messages.
use anyhow::Result;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::config::{AppConfig, SessionConfig};
use crate::subconscious::learn;
pub use subconscious::{SubconsciousSnapshot, Subconscious};
pub use unconscious::{UnconsciousSnapshot, Unconscious};
use crate::agent::context::{AstNode, NodeBody, Section, Ast, ContextState};
fn load_memory_scores(ctx: &mut ContextState, path: &std::path::Path) {
let data = match std::fs::read_to_string(path) {
Ok(d) => d,
Err(_) => return,
};
let scores: std::collections::BTreeMap<String, f64> = match serde_json::from_str(&data) {
Ok(s) => s,
Err(_) => return,
};
let mut applied = 0;
for i in 0..ctx.conversation().len() {
if let AstNode::Leaf(leaf) = &ctx.conversation()[i] {
if let NodeBody::Memory { key, .. } = leaf.body() {
if let Some(&s) = scores.get(key.as_str()) {
ctx.set_score(Section::Conversation, i, Some(s));
applied += 1;
}
}
}
}
if applied > 0 {
dbglog!("[scoring] loaded {} scores from {}", applied, path.display());
}
}
/// Collect scored memory keys from conversation entries.
fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap<String, f64> {
ctx.conversation().iter()
.filter_map(|node| {
if let AstNode::Leaf(leaf) = node {
if let NodeBody::Memory { key, score: Some(s), .. } = leaf.body() {
return Some((key.clone(), *s));
}
}
None
})
.collect()
}
/// Save memory scores to disk.
fn save_memory_scores(scores: &std::collections::BTreeMap<String, f64>, path: &std::path::Path) {
if let Ok(json) = serde_json::to_string_pretty(scores) {
let _ = std::fs::write(path, json);
dbglog!("[scoring] saved {} scores to {}", scores.len(), path.display());
}
}
/// Which pane streaming text should go to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamTarget {
/// User-initiated turn — text goes to conversation pane.
Conversation,
/// DMN-initiated turn — text goes to autonomous pane.
Autonomous,
}
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
/// Shared state between Mind and UI.
pub struct MindState {
/// Pending user input — UI pushes, Mind consumes after turn completes.
pub input: Vec<String>,
/// True while a turn is in progress.
pub turn_active: bool,
/// DMN state
pub dmn: subconscious::State,
pub dmn_turns: u32,
pub max_dmn_turns: u32,
/// Whether memory scoring is running.
pub scoring_in_flight: bool,
/// Whether compaction is running.
pub compaction_in_flight: bool,
/// Per-turn tracking
pub last_user_input: Instant,
pub consecutive_errors: u32,
pub last_turn_had_tools: bool,
/// Handle to the currently running turn task.
pub turn_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Clone for MindState {
fn clone(&self) -> Self {
Self {
input: self.input.clone(),
turn_active: self.turn_active,
dmn: self.dmn.clone(),
dmn_turns: self.dmn_turns,
max_dmn_turns: self.max_dmn_turns,
scoring_in_flight: self.scoring_in_flight,
compaction_in_flight: self.compaction_in_flight,
last_user_input: self.last_user_input,
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
turn_handle: None, // Not cloned — only Mind's loop uses this
}
}
}
/// What should happen after a state transition.
pub enum MindCommand {
/// Run compaction check
Compact,
/// Run memory scoring
Score,
/// Abort current turn, kill processes
Interrupt,
/// Reset session
NewSession,
/// Nothing to do
None,
}
impl MindState {
pub fn new(max_dmn_turns: u32) -> Self {
Self {
input: Vec::new(),
turn_active: false,
dmn: if subconscious::is_off() { subconscious::State::Off }
else { subconscious::State::Resting { since: Instant::now() } },
dmn_turns: 0,
max_dmn_turns,
scoring_in_flight: false,
compaction_in_flight: false,
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
turn_handle: None,
}
}
/// Is there pending user input waiting?
fn has_pending_input(&self) -> bool {
!self.turn_active && !self.input.is_empty()
}
/// Consume pending user input if no turn is active.
/// Returns the text to send; caller is responsible for pushing it
/// into the Agent's context and starting the turn.
fn take_pending_input(&mut self) -> Option<String> {
if self.turn_active || self.input.is_empty() {
return None;
}
let text = self.input.join("\n");
self.input.clear();
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = subconscious::State::Engaged;
Some(text)
}
/// Process turn completion, return model switch name if requested.
fn complete_turn(&mut self, result: &Result<TurnResult>, target: StreamTarget) -> Option<String> {
self.turn_active = false;
match result {
Ok(turn_result) => {
if turn_result.tool_errors > 0 {
self.consecutive_errors += turn_result.tool_errors;
} else {
self.consecutive_errors = 0;
}
self.last_turn_had_tools = turn_result.had_tool_calls;
self.dmn = subconscious::transition(
&self.dmn,
turn_result.yield_requested,
turn_result.had_tool_calls,
target == StreamTarget::Conversation,
);
if turn_result.dmn_pause {
self.dmn = subconscious::State::Paused;
self.dmn_turns = 0;
}
turn_result.model_switch.clone()
}
Err(_) => {
self.consecutive_errors += 1;
self.dmn = subconscious::State::Resting { since: Instant::now() };
None
}
}
}
/// DMN tick — returns a prompt and target if we should run a turn.
fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> {
if matches!(self.dmn, subconscious::State::Paused | subconscious::State::Off) {
return None;
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
self.dmn = subconscious::State::Resting { since: Instant::now() };
self.dmn_turns = 0;
return None;
}
let dmn_ctx = subconscious::DmnContext {
user_idle: self.last_user_input.elapsed(),
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
Some((prompt, StreamTarget::Autonomous))
}
fn interrupt(&mut self) {
self.input.clear();
self.dmn = subconscious::State::Resting { since: Instant::now() };
}
}
/// Background task completion events.
enum BgEvent {
ScoringDone,
}
// --- Mind: cognitive state machine ---
pub type SharedMindState = std::sync::Mutex<MindState>;
pub struct Mind {
pub agent: Arc<Agent>,
pub shared: Arc<SharedMindState>,
pub config: SessionConfig,
subconscious: Arc<tokio::sync::Mutex<Subconscious>>,
pub unconscious: Arc<tokio::sync::Mutex<Unconscious>>,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
turn_watch: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
_supervisor: crate::thalamus::supervisor::Supervisor,
}
impl Mind {
pub async fn new(
config: SessionConfig,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self {
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
let conversation_log = log::ConversationLog::new(
config.session_dir.join("conversation.jsonl"),
).ok();
let agent = Agent::new(
client,
config.system_prompt.clone(),
config.context_parts.clone(),
config.app.clone(),
config.prompt_file.clone(),
conversation_log,
crate::agent::tools::ActiveTools::new(),
).await;
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
let (turn_watch, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
sup.ensure_running();
let subconscious = Arc::new(tokio::sync::Mutex::new(Subconscious::new()));
subconscious.lock().await.init_output_tool(subconscious.clone());
Self { agent, shared, config,
subconscious,
unconscious: Arc::new(tokio::sync::Mutex::new(Unconscious::new())),
turn_tx, turn_watch, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
}
/// Initialize — restore log, start daemons and background agents.
pub async fn subconscious_snapshots(&self) -> Vec<SubconsciousSnapshot> {
// Lock ordering: subconscious → store (store is bottom-most).
let sub = self.subconscious.lock().await;
let store = crate::store::Store::cached().await.ok();
let store_guard = match &store {
Some(s) => Some(s.lock().await),
None => None,
};
sub.snapshots(store_guard.as_deref())
}
pub async fn subconscious_walked(&self) -> Vec<String> {
self.subconscious.lock().await.walked()
}
pub async fn unconscious_snapshots(&self) -> Vec<UnconsciousSnapshot> {
self.unconscious.lock().await.snapshots()
}
pub async fn init(&self) {
// Restore conversation
self.agent.restore_from_log().await;
// Restore persisted memory scores
let scores_path = self.config.session_dir.join("memory-scores.json");
load_memory_scores(&mut *self.agent.context.lock().await, &scores_path);
self.agent.state.lock().await.changed.notify_one();
// Load persistent subconscious state
let state_path = self.config.session_dir.join("subconscious-state.json");
self.subconscious.lock().await.set_state_path(state_path);
}
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
self.turn_watch.subscribe()
}
/// Execute an Action from a MindState method.
async fn run_commands(&self, cmds: Vec<MindCommand>) {
for cmd in cmds {
match cmd {
MindCommand::None => {}
MindCommand::Compact => {
let threshold = compaction_threshold(&self.config.app) as usize;
if self.agent.context.lock().await.tokens() > threshold {
self.agent.compact().await;
self.agent.state.lock().await.notify("compacted");
}
}
MindCommand::Score => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
}
}
MindCommand::Interrupt => {
self.shared.lock().unwrap().interrupt();
self.agent.state.lock().await.active_tools.abort_all();
if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); }
self.shared.lock().unwrap().turn_active = false;
let _ = self.turn_watch.send(false);
}
MindCommand::NewSession => {
{
let mut s = self.shared.lock().unwrap();
s.dmn = subconscious::State::Resting { since: Instant::now() };
s.dmn_turns = 0;
}
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
{
let mut ctx = self.agent.context.lock().await;
ctx.clear(Section::Conversation);
ctx.conversation_log = new_log;
}
{
let mut st = self.agent.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
self.agent.compact().await;
}
}
}
}
pub fn start_memory_scoring(&self) {
let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone();
let scores_path = self.config.session_dir.join("memory-scores.json");
let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs;
let response_window = cfg.scoring_response_window;
tokio::spawn(async move {
let (context, client) = {
let mut st = agent.state.lock().await;
if st.memory_scoring_in_flight { return; }
st.memory_scoring_in_flight = true;
drop(st);
let ctx = agent.context.lock().await.clone();
(ctx, agent.client.clone())
};
let _result = learn::score_memories_incremental(
&context, max_age as i64, response_window, &client, &agent,
|key: String, score: f64| {
let agent = agent.clone();
let path = scores_path.clone();
async move {
let scores_snapshot = {
let mut ctx = agent.context.lock().await;
for i in 0..ctx.conversation().len() {
if let AstNode::Leaf(leaf) = &ctx.conversation()[i] {
if let NodeBody::Memory { key: k, .. } = leaf.body() {
if *k == key {
ctx.set_score(Section::Conversation, i, Some(score));
}
}
}
}
let snapshot = collect_memory_scores(&ctx);
drop(ctx);
agent.state.lock().await.changed.notify_one();
snapshot
};
save_memory_scores(&scores_snapshot, &path);
}
},
).await;
{
agent.state.lock().await.memory_scoring_in_flight = false;
}
let _ = bg_tx.send(BgEvent::ScoringDone);
});
}
async fn start_turn(&self, text: &str, target: StreamTarget) {
{
match target {
StreamTarget::Conversation => {
self.agent.push_node(AstNode::user_msg(text)).await;
}
StreamTarget::Autonomous => {
self.agent.push_node(AstNode::dmn(text)).await;
}
}
// Compact if over budget before sending
let threshold = compaction_threshold(&self.config.app) as usize;
if self.agent.context.lock().await.tokens() > threshold {
self.agent.compact().await;
self.agent.state.lock().await.notify("compacted");
}
}
self.shared.lock().unwrap().turn_active = true;
let _ = self.turn_watch.send(true);
let agent = self.agent.clone();
let result_tx = self.turn_tx.clone();
self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent).await;
let _ = result_tx.send((result, target)).await;
}));
}
pub async fn shutdown(&self) {
if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); }
}
/// Mind event loop — locks MindState, calls state methods, executes actions.
pub async fn run(
&self,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>,
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
) {
let mut bg_rx = self.bg_rx.lock().unwrap().take()
.expect("Mind::run() called twice");
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
let mut unc_handle: Option<tokio::task::JoinHandle<()>> = None;
loop {
let (timeout, has_input) = {
let me = self.shared.lock().unwrap();
(me.dmn.interval(), me.has_pending_input())
};
let mut cmds = Vec::new();
tokio::select! {
biased;
cmd = input_rx.recv() => {
match cmd {
Some(cmd) => cmds.push(cmd),
None => break, // UI shut down
}
}
Some(bg) = bg_rx.recv() => {
match bg {
BgEvent::ScoringDone => {
self.shared.lock().unwrap().scoring_in_flight = false;
}
}
}
Some((result, target)) = turn_rx.recv() => {
let model_switch = {
let mut s = self.shared.lock().unwrap();
s.turn_handle = None;
s.complete_turn(&result, target)
};
let _ = self.turn_watch.send(false);
if let Some(name) = model_switch {
crate::user::chat::cmd_switch_model(&self.agent, &name).await;
}
cmds.push(MindCommand::Compact);
if !self.config.no_agents {
cmds.push(MindCommand::Score);
}
}
_ = tokio::time::sleep(timeout), if !has_input => {
let tick = self.shared.lock().unwrap().dmn_tick();
if let Some((prompt, target)) = tick {
self.start_turn(&prompt, target).await;
}
}
}
if !self.config.no_agents {
if sub_handle.as_ref().map_or(true, |h| h.is_finished()) {
let sub = self.subconscious.clone();
let agent = self.agent.clone();
sub_handle = Some(tokio::spawn(async move {
let mut s = sub.lock().await;
s.collect_results(&agent).await;
s.trigger(&agent).await;
}));
}
if unc_handle.as_ref().map_or(true, |h| h.is_finished()) {
let unc = self.unconscious.clone();
unc_handle = Some(tokio::spawn(async move {
unc.lock().await.trigger();
}));
}
}
// Check for pending user input → push to agent context and start turn
let pending = self.shared.lock().unwrap().take_pending_input();
if let Some(text) = pending {
self.start_turn(&text, StreamTarget::Conversation).await;
}
self.run_commands(cmds).await;
}
}
}