diff --git a/src/mind/dmn.rs b/src/mind/dmn.rs index 1e47fb5..86cbfff 100644 --- a/src/mind/dmn.rs +++ b/src/mind/dmn.rs @@ -266,3 +266,230 @@ impl State { } } } + +// --------------------------------------------------------------------------- +// Subconscious — background agents forked from the conscious agent +// --------------------------------------------------------------------------- + +use std::sync::Arc; +use crate::agent::{Agent, oneshot::{AutoAgent, AutoStep}}; +use crate::agent::context::ConversationEntry; +use crate::subconscious::defs; + +/// Names and byte-interval triggers for the built-in subconscious agents. +const AGENTS: &[(&str, u64)] = &[ + ("subconscious-surface", 0), // every trigger + ("subconscious-observe", 0), // every trigger + ("subconscious-thalamus", 0), // every trigger + ("subconscious-journal", 20_000), // every ~20KB of conversation + ("subconscious-reflect", 100_000), // every ~100KB of conversation +]; + +/// Lightweight snapshot for the TUI. +#[derive(Clone, Default)] +pub struct SubconsciousSnapshot { + pub name: String, + pub running: bool, + pub current_phase: String, + pub turn: usize, + pub last_run_secs_ago: Option, + pub last_run_entries: Vec, +} + +struct SubconsciousAgent { + auto: AutoAgent, + last_trigger_bytes: u64, + last_run: Option, + handle: Option)>>, +} + +impl SubconsciousAgent { + fn new(name: &str) -> Option { + let def = defs::get_def(name)?; + + let all_tools = crate::agent::tools::memory_and_journal_tools(); + let tools: Vec = if def.tools.is_empty() { + all_tools.to_vec() + } else { + all_tools.into_iter() + .filter(|t| def.tools.iter().any(|w| w == t.name)) + .collect() + }; + + let steps: Vec = def.steps.iter().map(|s| AutoStep { + prompt: s.prompt.clone(), + phase: s.phase.clone(), + }).collect(); + + let auto = AutoAgent::new( + name.to_string(), tools, steps, + def.temperature.unwrap_or(0.6), def.priority, + ); + + Some(Self { auto, last_trigger_bytes: 0, last_run: None, handle: None }) + } + + fn is_running(&self) -> bool { + self.handle.as_ref().is_some_and(|h| !h.is_finished()) + } + + fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool { + if self.is_running() { return false; } + if interval == 0 { return true; } + conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval + } + + fn snapshot(&self) -> SubconsciousSnapshot { + SubconsciousSnapshot { + name: self.auto.name.clone(), + running: self.is_running(), + current_phase: self.auto.current_phase.clone(), + turn: self.auto.turn, + last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()), + last_run_entries: self.auto.last_run_entries.clone(), + } + } +} + +/// Background agent orchestration — owns the subconscious agents +/// and their shared state (walked keys, etc.). +pub struct Subconscious { + agents: Vec, + pub walked: Vec, +} + +impl Subconscious { + pub fn new() -> Self { + let agents = AGENTS.iter() + .filter_map(|(name, _)| SubconsciousAgent::new(name)) + .collect(); + Self { agents, walked: Vec::new() } + } + + pub fn snapshots(&self) -> Vec { + self.agents.iter().map(|s| s.snapshot()).collect() + } + + /// Collect results from finished agents, inject outputs into the + /// conscious agent's context. + pub async fn collect_results(&mut self, agent: &Arc>) { + let finished: Vec<(usize, tokio::task::JoinHandle<(AutoAgent, Result)>)> = + self.agents.iter_mut().enumerate().filter_map(|(i, sub)| { + if sub.handle.as_ref().is_some_and(|h| h.is_finished()) { + sub.last_run = Some(Instant::now()); + Some((i, sub.handle.take().unwrap())) + } else { + None + } + }).collect(); + + for (idx, handle) in finished { + let (auto_back, result) = handle.await.unwrap_or_else( + |e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0), + Err(format!("task panicked: {}", e)))); + self.agents[idx].auto = auto_back; + + match result { + Ok(_) => { + let name = self.agents[idx].auto.name.clone(); + let outputs = std::mem::take(&mut self.agents[idx].auto.outputs); + + if let Some(walked_str) = outputs.get("walked") { + self.walked = walked_str.lines() + .map(|l| l.trim().to_string()) + .filter(|l| !l.is_empty()) + .collect(); + } + + if let Some(surface_str) = outputs.get("surface") { + let mut ag = agent.lock().await; + for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if let Some(rendered) = crate::cli::node::render_node( + &crate::store::Store::load().unwrap_or_default(), key, + ) { + let mut msg = crate::agent::api::types::Message::user(format!( + "\n--- {} (surfaced) ---\n{}\n", + key, rendered, + )); + msg.stamp(); + ag.push_entry(ConversationEntry::Memory { + key: key.to_string(), message: msg, + }); + } + } + } + + if let Some(reflection) = outputs.get("reflection") { + if !reflection.trim().is_empty() { + let mut ag = agent.lock().await; + ag.push_message(crate::agent::api::types::Message::user(format!( + "\n--- subconscious reflection ---\n{}\n", + reflection.trim(), + ))); + } + } + + if let Some(nudge) = outputs.get("thalamus") { + let nudge = nudge.trim(); + if !nudge.is_empty() && nudge != "ok" { + let mut ag = agent.lock().await; + ag.push_message(crate::agent::api::types::Message::user(format!( + "\n--- thalamus ---\n{}\n", + nudge, + ))); + } + } + + dbglog!("[subconscious] {} completed", name); + } + Err(e) => dbglog!("[subconscious] agent failed: {}", e), + } + } + } + + /// Trigger subconscious agents that are due to run. + pub async fn trigger(&mut self, agent: &Arc>) { + let (conversation_bytes, memory_keys) = { + let ag = agent.lock().await; + let bytes = ag.context.entries.iter() + .filter(|e| !e.is_log() && !e.is_memory()) + .map(|e| e.message().content_text().len() as u64) + .sum::(); + let keys: Vec = ag.context.entries.iter().filter_map(|e| { + if let ConversationEntry::Memory { key, .. } = e { + Some(key.clone()) + } else { None } + }).collect(); + (bytes, keys) + }; + + // Find which agents to trigger, take their AutoAgents out + let mut to_run: Vec<(usize, AutoAgent)> = Vec::new(); + for (i, &(_name, interval)) in AGENTS.iter().enumerate() { + if i >= self.agents.len() { continue; } + if !self.agents[i].should_trigger(conversation_bytes, interval) { continue; } + self.agents[i].last_trigger_bytes = conversation_bytes; + + let auto = std::mem::replace(&mut self.agents[i].auto, + AutoAgent::new(String::new(), vec![], vec![], 0.0, 0)); + to_run.push((i, auto)); + } + + if to_run.is_empty() { return; } + + let conscious = agent.lock().await; + let walked = self.walked.clone(); + for (idx, mut auto) in to_run { + dbglog!("[subconscious] triggering {}", auto.name); + + let forked = conscious.fork(auto.tools.clone()); + let keys = memory_keys.clone(); + let w = walked.clone(); + + self.agents[idx].handle = Some(tokio::spawn(async move { + let result = auto.run_forked(&forked, &keys, &w).await; + (auto, result) + })); + } + } +} diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 09ab23e..5368d04 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -23,108 +23,10 @@ use std::time::Instant; use tokio::sync::mpsc; use crate::agent::{Agent, TurnResult}; use crate::agent::api::ApiClient; -use crate::agent::oneshot::{AutoAgent, AutoStep}; use crate::config::{AppConfig, SessionConfig}; -use crate::subconscious::{defs, learn}; +use crate::subconscious::learn; -// --------------------------------------------------------------------------- -// Subconscious agents — forked from conscious agent, run on schedule -// --------------------------------------------------------------------------- - -/// A subconscious agent managed by Mind. -struct SubconsciousAgent { - auto: AutoAgent, - /// Conversation bytes at last trigger. - last_trigger_bytes: u64, - /// When the agent last ran. - last_run: Option, - /// Running task handle. - handle: Option>>, -} - -/// Names and byte-interval triggers for the built-in subconscious agents. -const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[ - ("subconscious-surface", 0), // every trigger - ("subconscious-observe", 0), // every trigger (after surface) - ("subconscious-thalamus", 0), // every trigger - ("subconscious-journal", 20_000), // every ~20KB of conversation - ("subconscious-reflect", 100_000), // every ~100KB of conversation -]; - -impl SubconsciousAgent { - fn new(name: &str, _interval_bytes: u64) -> Option { - let def = defs::get_def(name)?; - - let all_tools = crate::agent::tools::memory_and_journal_tools(); - let tools: Vec = if def.tools.is_empty() { - all_tools.to_vec() - } else { - all_tools.into_iter() - .filter(|t| def.tools.iter().any(|w| w == t.name)) - .collect() - }; - - let steps: Vec = def.steps.iter().map(|s| AutoStep { - prompt: s.prompt.clone(), - phase: s.phase.clone(), - }).collect(); - - let auto = AutoAgent::new( - name.to_string(), tools, steps, - def.temperature.unwrap_or(0.6), def.priority, - ); - - Some(Self { - auto, - last_trigger_bytes: 0, - last_run: None, - handle: None, - }) - } - - fn is_running(&self) -> bool { - self.handle.as_ref().is_some_and(|h| !h.is_finished()) - } - - fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool { - if self.is_running() { return false; } - if interval == 0 { return true; } - conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval - } -} - -/// State shared between all subconscious agents. Lives on Mind, -/// passed to agents at run time. Enables splitting surface/observe -/// into separate agents that share walked keys. -#[derive(Clone, Default)] -pub struct SubconsciousSharedState { - pub walked: Vec, -} - -/// Lightweight snapshot of subconscious agent state for the TUI. -#[derive(Clone, Default)] -pub struct SubconsciousSnapshot { - pub name: String, - pub running: bool, - pub current_phase: String, - pub turn: usize, - pub last_run_secs_ago: Option, - /// Entries from the last forked run (after fork point). - pub last_run_entries: Vec, -} - -impl SubconsciousAgent { - fn snapshot(&self) -> SubconsciousSnapshot { - SubconsciousSnapshot { - name: self.auto.name.clone(), - running: self.is_running(), - current_phase: self.auto.current_phase.clone(), - turn: self.auto.turn, - last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()), - last_run_entries: self.auto.last_run_entries.clone(), - } - } -} +pub use dmn::{SubconsciousSnapshot, Subconscious}; /// Which pane streaming text should go to. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -300,8 +202,7 @@ pub struct Mind { pub agent: Arc>, pub shared: Arc, pub config: SessionConfig, - subconscious: Arc>>, - subconscious_state: Arc>, + subconscious: tokio::sync::Mutex, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, @@ -334,10 +235,6 @@ impl Mind { ); let agent = Arc::new(tokio::sync::Mutex::new(ag)); - let subconscious = SUBCONSCIOUS_AGENTS.iter() - .filter_map(|(name, interval)| SubconsciousAgent::new(name, *interval)) - .collect(); - let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); let (turn_watch, _) = tokio::sync::watch::channel(false); let (bg_tx, bg_rx) = mpsc::unbounded_channel(); @@ -346,19 +243,19 @@ impl Mind { sup.load_config(); sup.ensure_running(); - let subconscious_state = Arc::new(tokio::sync::Mutex::new(SubconsciousSharedState::default())); Self { agent, shared, config, - subconscious: Arc::new(tokio::sync::Mutex::new(subconscious)), - subconscious_state, + subconscious: tokio::sync::Mutex::new(Subconscious::new()), turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } /// Initialize — restore log, start daemons and background agents. - pub async fn subconscious_snapshots(&self) -> (Vec, SubconsciousSharedState) { - let snaps = self.subconscious.lock().await.iter().map(|s| s.snapshot()).collect(); - let shared = self.subconscious_state.lock().await.clone(); - (snaps, shared) + pub async fn subconscious_snapshots(&self) -> Vec { + self.subconscious.lock().await.snapshots() + } + + pub async fn subconscious_walked(&self) -> Vec { + self.subconscious.lock().await.walked.clone() } pub async fn init(&self) { @@ -453,176 +350,6 @@ impl Mind { }); } - /// Push user/DMN message into the agent's context and spawn a turn. - /// The text moves from pending_input to ContextState atomically — - /// by the time this returns, the message is in context and the turn - /// is running. - /// Collect results from finished subconscious agents and inject - /// their output into the conscious agent's context. - async fn collect_subconscious_results(&self) { - // Collect finished handles without holding the lock across await - let finished: Vec<(usize, tokio::task::JoinHandle>)> = { - let mut subs = self.subconscious.lock().await; - subs.iter_mut().enumerate().filter_map(|(i, sub)| { - if sub.handle.as_ref().is_some_and(|h| h.is_finished()) { - sub.last_run = Some(Instant::now()); - Some((i, sub.handle.take().unwrap())) - } else { - None - } - }).collect() - }; - - for (idx, handle) in finished { - match handle.await { - Ok(Ok(_)) => { - // The outer task already put the AutoAgent back — - // read outputs from it - let mut subs = self.subconscious.lock().await; - let name = subs[idx].auto.name.clone(); - let outputs = std::mem::take(&mut subs[idx].auto.outputs); - - // Walked keys — update shared state - if let Some(walked_str) = outputs.get("walked") { - let walked: Vec = walked_str.lines() - .map(|l| l.trim().to_string()) - .filter(|l| !l.is_empty()) - .collect(); - self.subconscious_state.lock().await.walked = walked; - } - drop(subs); - - // Surfaced memories → inject into conscious agent - if let Some(surface_str) = outputs.get("surface") { - let mut ag = self.agent.lock().await; - for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { - if let Some(rendered) = crate::cli::node::render_node( - &crate::store::Store::load().unwrap_or_default(), key, - ) { - let mut msg = crate::agent::api::types::Message::user(format!( - "\n--- {} (surfaced) ---\n{}\n", - key, rendered, - )); - msg.stamp(); - ag.push_entry(crate::agent::context::ConversationEntry::Memory { - key: key.to_string(), message: msg, - }); - } - } - } - - // Reflection → inject into conscious agent - if let Some(reflection) = outputs.get("reflection") { - if !reflection.trim().is_empty() { - let mut ag = self.agent.lock().await; - ag.push_message(crate::agent::api::types::Message::user(format!( - "\n--- subconscious reflection ---\n{}\n", - reflection.trim(), - ))); - } - } - - // Thalamus nudge → inject into conscious agent - if let Some(nudge) = outputs.get("thalamus") { - let nudge = nudge.trim(); - if !nudge.is_empty() && nudge != "ok" { - let mut ag = self.agent.lock().await; - ag.push_message(crate::agent::api::types::Message::user(format!( - "\n--- thalamus ---\n{}\n", - nudge, - ))); - } - } - - dbglog!("[mind] {} completed", name); - } - Ok(Err(e)) => dbglog!("[mind] subconscious agent failed: {}", e), - Err(e) => dbglog!("[mind] subconscious agent panicked: {}", e), - } - } - } - - /// Trigger subconscious agents that are due to run. - async fn trigger_subconscious(&self) { - if self.config.no_agents { return; } - - // Get conversation size + memory keys from conscious agent - let (conversation_bytes, memory_keys) = { - let ag = self.agent.lock().await; - let bytes = ag.context.entries.iter() - .filter(|e| !e.is_log() && !e.is_memory()) - .map(|e| e.message().content_text().len() as u64) - .sum::(); - let keys: Vec = ag.context.entries.iter().filter_map(|e| { - if let crate::agent::context::ConversationEntry::Memory { key, .. } = e { - Some(key.clone()) - } else { None } - }).collect(); - (bytes, keys) - }; - - // Find which agents to trigger, take their AutoAgents out - let mut to_run: Vec<(usize, AutoAgent)> = Vec::new(); - { - let mut subs = self.subconscious.lock().await; - for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() { - if i >= subs.len() { continue; } - if !subs[i].should_trigger(conversation_bytes, interval) { continue; } - subs[i].last_trigger_bytes = conversation_bytes; - - // Take the AutoAgent out — task owns it, returns it when done - let auto = std::mem::replace(&mut subs[i].auto, - AutoAgent::new(String::new(), vec![], vec![], 0.0, 0)); - to_run.push((i, auto)); - } - } - - if to_run.is_empty() { return; } - - // Fork from conscious agent and spawn tasks - let conscious = self.agent.lock().await; - let walked = self.subconscious_state.lock().await.walked.clone(); - let mut spawns = Vec::new(); - for (idx, mut auto) in to_run { - dbglog!("[mind] triggering {}", auto.name); - - let forked = conscious.fork(auto.tools.clone()); - let keys = memory_keys.clone(); - let w = walked.clone(); - let handle: tokio::task::JoinHandle<(AutoAgent, Result)> = - tokio::spawn(async move { - let result = auto.run_forked(&forked, &keys, &w).await; - (auto, result) - }); - spawns.push((idx, handle)); - } - drop(conscious); - - // Store handles (type-erased — we'll extract AutoAgent on completion) - // We need to store the JoinHandle that returns (AutoAgent, Result) - // but SubconsciousAgent.handle expects JoinHandle>. - // Wrap: spawn an outer task that extracts the result and puts back the AutoAgent. - let subconscious = self.subconscious.clone(); - for (idx, handle) in spawns { - let subs = subconscious.clone(); - let outer = tokio::spawn(async move { - let (auto, result) = handle.await.unwrap_or_else( - |e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0), - Err(format!("task panicked: {}", e)))); - // Put the AutoAgent back - let mut locked = subs.lock().await; - if idx < locked.len() { - locked[idx].auto = auto; - } - result - }); - let mut subs = self.subconscious.lock().await; - if idx < subs.len() { - subs[idx].handle = Some(outer); - } - } - } - async fn start_turn(&self, text: &str, target: StreamTarget) { { let mut ag = self.agent.lock().await; @@ -717,8 +444,11 @@ impl Mind { } // Trigger subconscious agents after conscious turn completes - self.collect_subconscious_results().await; - self.trigger_subconscious().await; + if !self.config.no_agents { + let mut sub = self.subconscious.lock().await; + sub.collect_results(&self.agent).await; + sub.trigger(&self.agent).await; + } } _ = tokio::time::sleep(timeout), if !turn_active => { diff --git a/src/user/mod.rs b/src/user/mod.rs index 5cfb7f4..c06ff76 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -128,7 +128,7 @@ pub struct App { pub(crate) context_info: Option, pub(crate) shared_context: SharedContextState, pub(crate) agent_state: Vec, - pub(crate) subconscious_shared: crate::mind::SubconsciousSharedState, + pub(crate) walked_count: usize, pub(crate) channel_status: Vec, pub(crate) idle_info: Option, } @@ -151,7 +151,7 @@ impl App { should_quit: false, submitted: Vec::new(), context_info: None, shared_context, agent_state: Vec::new(), - subconscious_shared: Default::default(), + walked_count: 0, channel_status: Vec::new(), idle_info: None, } } @@ -408,9 +408,8 @@ pub async fn run( // State sync on every wake idle_state.decay_ewma(); app.update_idle(&idle_state); - let (snaps, shared) = mind.subconscious_snapshots().await; - app.agent_state = snaps; - app.subconscious_shared = shared; + app.agent_state = mind.subconscious_snapshots().await; + app.walked_count = mind.subconscious_walked().await.len(); if !startup_done { if let Ok(mut ag) = agent.try_lock() { let model = ag.model().to_string(); diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index ff49767..9d7bb57 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -76,7 +76,7 @@ impl SubconsciousScreen { let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); lines.push(Line::raw("")); - let walked = app.subconscious_shared.walked.len(); + let walked = app.walked_count; lines.push(Line::styled( format!("── Subconscious Agents ── walked: {}", walked), section)); lines.push(Line::styled(" (↑/↓ select, Enter view log)", hint));