consciousness/src/mind/mod.rs

728 lines
28 KiB
Rust
Raw Normal View History

2026-04-04 02:46:32 -04:00
// mind/ — Cognitive layer
//
// Mind state machine, DMN, identity, observation socket.
2026-04-04 02:46:32 -04:00
// Everything about how the mind operates, separate from the
// user interface (TUI, CLI) and the agent execution (tools, API).
pub mod dmn;
pub mod identity;
2026-04-05 01:48:11 -04:00
pub mod log;
2026-04-04 02:46:32 -04:00
// consciousness.rs — Mind state machine and event loop
2026-04-04 02:46:32 -04:00
//
// The core runtime for the consciousness binary. Mind manages turns,
2026-04-04 02:46:32 -04:00
// DMN state, compaction, scoring, and slash commands. The event loop
// bridges Mind (cognitive state) with App (TUI rendering).
2026-04-04 02:46:32 -04:00
//
// The event loop uses biased select! so priorities are deterministic:
// keyboard events > turn results > render ticks > DMN timer > UI messages.
use anyhow::Result;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
2026-04-04 02:46:32 -04:00
use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::agent::oneshot::{AutoAgent, AutoStep};
use crate::config::{AppConfig, SessionConfig};
use crate::subconscious::{defs, learn};
// ---------------------------------------------------------------------------
// Subconscious agents — forked from conscious agent, run on schedule
// ---------------------------------------------------------------------------
/// A subconscious agent managed by Mind.
struct SubconsciousAgent {
auto: AutoAgent,
/// Conversation bytes at last trigger.
last_trigger_bytes: u64,
/// When the agent last ran.
last_run: Option<Instant>,
/// Running task handle.
handle: Option<tokio::task::JoinHandle<Result<String, String>>>,
}
/// Names and byte-interval triggers for the built-in subconscious agents.
const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[
("subconscious-surface-observe", 0), // every trigger
("subconscious-journal", 20_000), // every ~20KB of conversation
("subconscious-reflect", 100_000), // every ~100KB of conversation
];
impl SubconsciousAgent {
fn new(name: &str, _interval_bytes: u64) -> Option<Self> {
let def = defs::get_def(name)?;
let all_tools = crate::agent::tools::memory_and_journal_tools();
let tools: Vec<crate::agent::tools::Tool> = if def.tools.is_empty() {
all_tools.to_vec()
} else {
all_tools.into_iter()
.filter(|t| def.tools.iter().any(|w| w == t.name))
.collect()
};
let steps: Vec<AutoStep> = def.steps.iter().map(|s| AutoStep {
prompt: s.prompt.clone(),
phase: s.phase.clone(),
}).collect();
let auto = AutoAgent::new(
name.to_string(), tools, steps,
def.temperature.unwrap_or(0.6), def.priority,
);
Some(Self {
auto,
last_trigger_bytes: 0,
last_run: None,
handle: None,
})
}
fn is_running(&self) -> bool {
self.handle.as_ref().is_some_and(|h| !h.is_finished())
}
fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool {
if self.is_running() { return false; }
if interval == 0 { return true; }
conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval
}
}
/// State shared between all subconscious agents. Lives on Mind,
/// passed to agents at run time. Enables splitting surface/observe
/// into separate agents that share walked keys.
#[derive(Clone, Default)]
pub struct SubconsciousSharedState {
pub walked: Vec<String>,
}
/// Lightweight snapshot of subconscious agent state for the TUI.
#[derive(Clone, Default)]
pub struct SubconsciousSnapshot {
pub name: String,
pub running: bool,
pub current_phase: String,
pub turn: usize,
pub last_run_secs_ago: Option<f64>,
/// Entries from the last forked run (after fork point).
pub last_run_entries: Vec<crate::agent::context::ConversationEntry>,
}
impl SubconsciousAgent {
fn snapshot(&self) -> SubconsciousSnapshot {
SubconsciousSnapshot {
name: self.auto.name.clone(),
running: self.is_running(),
current_phase: self.auto.current_phase.clone(),
turn: self.auto.turn,
last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()),
last_run_entries: self.auto.last_run_entries.clone(),
}
}
}
/// Which pane streaming text should go to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamTarget {
/// User-initiated turn — text goes to conversation pane.
Conversation,
/// DMN-initiated turn — text goes to autonomous pane.
Autonomous,
}
2026-04-04 02:46:32 -04:00
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
/// Shared state between Mind and UI.
pub struct MindState {
/// Pending user input — UI pushes, Mind consumes after turn completes.
pub input: Vec<String>,
/// True while a turn is in progress.
pub turn_active: bool,
/// DMN state
pub dmn: dmn::State,
pub dmn_turns: u32,
pub max_dmn_turns: u32,
/// Whether memory scoring is running.
pub scoring_in_flight: bool,
/// Whether compaction is running.
pub compaction_in_flight: bool,
/// Per-turn tracking
pub last_user_input: Instant,
pub consecutive_errors: u32,
pub last_turn_had_tools: bool,
/// Handle to the currently running turn task.
pub turn_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Clone for MindState {
fn clone(&self) -> Self {
Self {
input: self.input.clone(),
turn_active: self.turn_active,
dmn: self.dmn.clone(),
dmn_turns: self.dmn_turns,
max_dmn_turns: self.max_dmn_turns,
scoring_in_flight: self.scoring_in_flight,
compaction_in_flight: self.compaction_in_flight,
last_user_input: self.last_user_input,
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
turn_handle: None, // Not cloned — only Mind's loop uses this
}
}
}
/// What should happen after a state transition.
pub enum MindCommand {
/// Run compaction check
Compact,
/// Run memory scoring
Score,
/// Abort current turn, kill processes
Interrupt,
/// Reset session
NewSession,
/// Nothing to do
None,
}
impl MindState {
pub fn new(max_dmn_turns: u32) -> Self {
2026-04-04 02:46:32 -04:00
Self {
input: Vec::new(),
turn_active: false,
dmn: if dmn::is_off() { dmn::State::Off }
else { dmn::State::Resting { since: Instant::now() } },
2026-04-04 02:46:32 -04:00
dmn_turns: 0,
max_dmn_turns,
scoring_in_flight: false,
compaction_in_flight: false,
2026-04-04 02:46:32 -04:00
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
turn_handle: None,
2026-04-04 02:46:32 -04:00
}
}
/// Consume pending user input if no turn is active.
/// Returns the text to send; caller is responsible for pushing it
/// into the Agent's context and starting the turn.
fn take_pending_input(&mut self) -> Option<String> {
if self.turn_active || self.input.is_empty() {
return None;
2026-04-04 02:46:32 -04:00
}
let text = self.input.join("\n");
self.input.clear();
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
Some(text)
2026-04-04 02:46:32 -04:00
}
/// Process turn completion, return model switch name if requested.
fn complete_turn(&mut self, result: &Result<TurnResult>, target: StreamTarget) -> Option<String> {
self.turn_active = false;
2026-04-04 02:46:32 -04:00
match result {
Ok(turn_result) => {
if turn_result.tool_errors > 0 {
self.consecutive_errors += turn_result.tool_errors;
} else {
self.consecutive_errors = 0;
}
self.last_turn_had_tools = turn_result.had_tool_calls;
self.dmn = dmn::transition(
&self.dmn,
turn_result.yield_requested,
turn_result.had_tool_calls,
target == StreamTarget::Conversation,
);
if turn_result.dmn_pause {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
}
turn_result.model_switch.clone()
2026-04-04 02:46:32 -04:00
}
Err(_) => {
2026-04-04 02:46:32 -04:00
self.consecutive_errors += 1;
self.dmn = dmn::State::Resting { since: Instant::now() };
None
2026-04-04 02:46:32 -04:00
}
}
}
/// DMN tick — returns a prompt and target if we should run a turn.
fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> {
2026-04-04 02:46:32 -04:00
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
return None;
2026-04-04 02:46:32 -04:00
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
self.dmn = dmn::State::Resting { since: Instant::now() };
2026-04-04 02:46:32 -04:00
self.dmn_turns = 0;
return None;
2026-04-04 02:46:32 -04:00
}
let dmn_ctx = dmn::DmnContext {
user_idle: self.last_user_input.elapsed(),
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
Some((prompt, StreamTarget::Autonomous))
}
fn interrupt(&mut self) {
self.input.clear();
self.dmn = dmn::State::Resting { since: Instant::now() };
}
}
/// Background task completion events.
enum BgEvent {
ScoringDone,
}
// --- Mind: cognitive state machine ---
pub type SharedMindState = std::sync::Mutex<MindState>;
pub struct Mind {
pub agent: Arc<tokio::sync::Mutex<Agent>>,
pub shared: Arc<SharedMindState>,
pub config: SessionConfig,
subconscious: Arc<tokio::sync::Mutex<Vec<SubconsciousAgent>>>,
subconscious_state: Arc<tokio::sync::Mutex<SubconsciousSharedState>>,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
turn_watch: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
_supervisor: crate::thalamus::supervisor::Supervisor,
}
impl Mind {
pub fn new(
config: SessionConfig,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self {
let shared_context = crate::agent::context::shared_context_state();
let shared_active_tools = crate::agent::tools::shared_active_tools();
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
let conversation_log = log::ConversationLog::new(
config.session_dir.join("conversation.jsonl"),
).ok();
let ag = Agent::new(
client,
config.system_prompt.clone(),
config.context_parts.clone(),
config.app.clone(),
config.prompt_file.clone(),
conversation_log,
shared_context,
shared_active_tools,
);
let agent = Arc::new(tokio::sync::Mutex::new(ag));
let subconscious = SUBCONSCIOUS_AGENTS.iter()
.filter_map(|(name, interval)| SubconsciousAgent::new(name, *interval))
.collect();
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
let (turn_watch, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
sup.ensure_running();
let subconscious_state = Arc::new(tokio::sync::Mutex::new(SubconsciousSharedState::default()));
Self { agent, shared, config,
subconscious: Arc::new(tokio::sync::Mutex::new(subconscious)),
subconscious_state,
turn_tx, turn_watch, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
2026-04-04 02:46:32 -04:00
}
/// Initialize — restore log, start daemons and background agents.
pub async fn subconscious_snapshots(&self) -> (Vec<SubconsciousSnapshot>, SubconsciousSharedState) {
let snaps = self.subconscious.lock().await.iter().map(|s| s.snapshot()).collect();
let shared = self.subconscious_state.lock().await.clone();
(snaps, shared)
}
pub async fn init(&self) {
// Restore conversation
let mut ag = self.agent.lock().await;
ag.restore_from_log();
2026-04-05 23:04:10 -04:00
ag.changed.notify_one();
drop(ag);
}
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
self.turn_watch.subscribe()
2026-04-04 02:46:32 -04:00
}
/// Execute an Action from a MindState method.
async fn run_commands(&self, cmds: Vec<MindCommand>) {
for cmd in cmds {
match cmd {
MindCommand::None => {}
MindCommand::Compact => {
let threshold = compaction_threshold(&self.config.app) as usize;
let mut ag = self.agent.lock().await;
let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default();
if crate::agent::context::sections_used(&sections) > threshold {
ag.compact();
ag.notify("compacted");
}
}
MindCommand::Score => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
}
}
MindCommand::Interrupt => {
self.shared.lock().unwrap().interrupt();
let ag = self.agent.lock().await;
let mut tools = ag.active_tools.lock().unwrap();
for entry in tools.drain(..) { entry.handle.abort(); }
drop(tools); drop(ag);
if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); }
self.shared.lock().unwrap().turn_active = false;
let _ = self.turn_watch.send(false);
}
MindCommand::NewSession => {
{
let mut s = self.shared.lock().unwrap();
s.dmn = dmn::State::Resting { since: Instant::now() };
s.dmn_turns = 0;
}
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
let mut ag = self.agent.lock().await;
let shared_ctx = ag.shared_context.clone();
let shared_tools = ag.active_tools.clone();
*ag = Agent::new(
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
self.config.system_prompt.clone(), self.config.context_parts.clone(),
self.config.app.clone(), self.config.prompt_file.clone(),
new_log, shared_ctx, shared_tools,
);
}
}
}
}
pub fn start_memory_scoring(&self) {
let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone();
let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs;
let response_window = cfg.scoring_response_window;
tokio::spawn(async move {
let (context, client) = {
let mut ag = agent.lock().await;
if ag.memory_scoring_in_flight { return; }
ag.memory_scoring_in_flight = true;
(ag.context.clone(), ag.client_clone())
};
let result = learn::score_memories_incremental(
&context, max_age as i64, response_window, &client, &agent,
).await;
{
let mut ag = agent.lock().await;
ag.memory_scoring_in_flight = false;
if let Ok(ref scores) = result { ag.memory_scores = scores.clone(); }
}
let _ = bg_tx.send(BgEvent::ScoringDone);
});
}
/// Push user/DMN message into the agent's context and spawn a turn.
/// The text moves from pending_input to ContextState atomically —
/// by the time this returns, the message is in context and the turn
/// is running.
/// Collect results from finished subconscious agents and inject
/// their output into the conscious agent's context.
async fn collect_subconscious_results(&self) {
// Collect finished handles without holding the lock across await
let finished: Vec<(usize, tokio::task::JoinHandle<Result<String, String>>)> = {
let mut subs = self.subconscious.lock().await;
subs.iter_mut().enumerate().filter_map(|(i, sub)| {
if sub.handle.as_ref().is_some_and(|h| h.is_finished()) {
sub.last_run = Some(Instant::now());
Some((i, sub.handle.take().unwrap()))
} else {
None
}
}).collect()
};
for (idx, handle) in finished {
match handle.await {
Ok(Ok(_)) => {
// The outer task already put the AutoAgent back —
// read outputs from it
let mut subs = self.subconscious.lock().await;
let name = subs[idx].auto.name.clone();
let outputs = std::mem::take(&mut subs[idx].auto.outputs);
// Walked keys — update shared state
if let Some(walked_str) = outputs.get("walked") {
let walked: Vec<String> = walked_str.lines()
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty())
.collect();
self.subconscious_state.lock().await.walked = walked;
}
drop(subs);
// Surfaced memories → inject into conscious agent
if let Some(surface_str) = outputs.get("surface") {
let mut ag = self.agent.lock().await;
for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) {
if let Some(rendered) = crate::cli::node::render_node(
&crate::store::Store::load().unwrap_or_default(), key,
) {
let mut msg = crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
key, rendered,
));
msg.stamp();
ag.push_entry(crate::agent::context::ConversationEntry::Memory {
key: key.to_string(), message: msg,
});
}
}
}
// Reflection → inject into conscious agent
if let Some(reflection) = outputs.get("reflection") {
if !reflection.trim().is_empty() {
let mut ag = self.agent.lock().await;
ag.push_message(crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
reflection.trim(),
)));
}
}
dbglog!("[mind] {} completed", name);
}
Ok(Err(e)) => dbglog!("[mind] subconscious agent failed: {}", e),
Err(e) => dbglog!("[mind] subconscious agent panicked: {}", e),
}
}
}
/// Trigger subconscious agents that are due to run.
async fn trigger_subconscious(&self) {
if self.config.no_agents { return; }
// Get conversation size + memory keys from conscious agent
let (conversation_bytes, memory_keys) = {
let ag = self.agent.lock().await;
let bytes = ag.context.entries.iter()
.filter(|e| !e.is_log() && !e.is_memory())
.map(|e| e.message().content_text().len() as u64)
.sum::<u64>();
let keys: Vec<String> = ag.context.entries.iter().filter_map(|e| {
if let crate::agent::context::ConversationEntry::Memory { key, .. } = e {
Some(key.clone())
} else { None }
}).collect();
(bytes, keys)
};
// Find which agents to trigger, take their AutoAgents out
let mut to_run: Vec<(usize, AutoAgent)> = Vec::new();
{
let mut subs = self.subconscious.lock().await;
for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() {
if i >= subs.len() { continue; }
if !subs[i].should_trigger(conversation_bytes, interval) { continue; }
subs[i].last_trigger_bytes = conversation_bytes;
// Take the AutoAgent out — task owns it, returns it when done
let auto = std::mem::replace(&mut subs[i].auto,
AutoAgent::new(String::new(), vec![], vec![], 0.0, 0));
to_run.push((i, auto));
}
}
if to_run.is_empty() { return; }
// Fork from conscious agent and spawn tasks
let conscious = self.agent.lock().await;
let walked = self.subconscious_state.lock().await.walked.clone();
let mut spawns = Vec::new();
for (idx, mut auto) in to_run {
dbglog!("[mind] triggering {}", auto.name);
let forked = conscious.fork(auto.tools.clone());
let keys = memory_keys.clone();
let w = walked.clone();
let handle: tokio::task::JoinHandle<(AutoAgent, Result<String, String>)> =
tokio::spawn(async move {
let result = auto.run_forked(&forked, &keys, &w).await;
(auto, result)
});
spawns.push((idx, handle));
}
drop(conscious);
// Store handles (type-erased — we'll extract AutoAgent on completion)
// We need to store the JoinHandle that returns (AutoAgent, Result)
// but SubconsciousAgent.handle expects JoinHandle<Result<String, String>>.
// Wrap: spawn an outer task that extracts the result and puts back the AutoAgent.
let subconscious = self.subconscious.clone();
for (idx, handle) in spawns {
let subs = subconscious.clone();
let outer = tokio::spawn(async move {
let (auto, result) = handle.await.unwrap_or_else(
|e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0),
Err(format!("task panicked: {}", e))));
// Put the AutoAgent back
let mut locked = subs.lock().await;
if idx < locked.len() {
locked[idx].auto = auto;
}
result
});
let mut subs = self.subconscious.lock().await;
if idx < subs.len() {
subs[idx].handle = Some(outer);
}
}
}
async fn start_turn(&self, text: &str, target: StreamTarget) {
{
let mut ag = self.agent.lock().await;
match target {
StreamTarget::Conversation => {
ag.push_message(crate::agent::api::types::Message::user(text));
}
StreamTarget::Autonomous => {
let mut msg = crate::agent::api::types::Message::user(text);
msg.stamp();
ag.push_entry(crate::agent::context::ConversationEntry::Dmn(msg));
}
}
// Compact if over budget before sending
let threshold = compaction_threshold(&self.config.app) as usize;
ag.publish_context_state();
let used = {
let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default();
crate::agent::context::sections_used(&sections)
};
if used > threshold {
ag.compact();
ag.notify("compacted");
}
}
self.shared.lock().unwrap().turn_active = true;
let _ = self.turn_watch.send(true);
let agent = self.agent.clone();
let result_tx = self.turn_tx.clone();
self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent).await;
let _ = result_tx.send((result, target)).await;
}));
}
pub async fn shutdown(&self) {
if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); }
2026-04-04 02:46:32 -04:00
}
/// Mind event loop — locks MindState, calls state methods, executes actions.
pub async fn run(
&self,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>,
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
) {
let mut bg_rx = self.bg_rx.lock().unwrap().take()
.expect("Mind::run() called twice");
loop {
let timeout = self.shared.lock().unwrap().dmn.interval();
let turn_active = self.shared.lock().unwrap().turn_active;
let mut cmds = Vec::new();
tokio::select! {
biased;
cmd = input_rx.recv() => {
match cmd {
Some(cmd) => cmds.push(cmd),
None => break, // UI shut down
}
}
Some(bg) = bg_rx.recv() => {
match bg {
BgEvent::ScoringDone => {
self.shared.lock().unwrap().scoring_in_flight = false;
}
}
}
Some((result, target)) = turn_rx.recv() => {
self.shared.lock().unwrap().turn_handle = None;
let model_switch = self.shared.lock().unwrap().complete_turn(&result, target);
let _ = self.turn_watch.send(false);
if let Some(name) = model_switch {
crate::user::chat::cmd_switch_model(&self.agent, &name).await;
}
// Post-turn maintenance
{
let mut ag = self.agent.lock().await;
ag.age_out_images();
ag.publish_context_state();
}
cmds.push(MindCommand::Compact);
if !self.config.no_agents {
cmds.push(MindCommand::Score);
}
// Trigger subconscious agents after conscious turn completes
self.collect_subconscious_results().await;
self.trigger_subconscious().await;
}
_ = tokio::time::sleep(timeout), if !turn_active => {
let tick = self.shared.lock().unwrap().dmn_tick();
if let Some((prompt, target)) = tick {
self.start_turn(&prompt, target).await;
}
}
}
// Check for pending user input → push to agent context and start turn
let pending = self.shared.lock().unwrap().take_pending_input();
if let Some(text) = pending {
self.start_turn(&text, StreamTarget::Conversation).await;
}
self.run_commands(cmds).await;
}
}
2026-04-04 02:46:32 -04:00
}