diff --git a/src/mind/mod.rs b/src/mind/mod.rs index cf043f1..fea5187 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -5,6 +5,7 @@ // user interface (TUI, CLI) and the agent execution (tools, API). pub mod subconscious; +pub mod unconscious; pub mod identity; pub mod log; @@ -27,6 +28,7 @@ use crate::config::{AppConfig, SessionConfig}; use crate::subconscious::learn; pub use subconscious::{SubconsciousSnapshot, Subconscious}; +pub use unconscious::{UnconsciousSnapshot, Unconscious}; use crate::agent::context::{AstNode, NodeBody, Section, Ast, ContextState}; @@ -252,6 +254,7 @@ pub struct Mind { pub shared: Arc, pub config: SessionConfig, subconscious: Arc>, + unconscious: tokio::sync::Mutex, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, @@ -291,7 +294,9 @@ impl Mind { subconscious.lock().await.init_output_tool(subconscious.clone()); Self { agent, shared, config, - subconscious, turn_tx, turn_watch, bg_tx, + subconscious, + unconscious: tokio::sync::Mutex::new(Unconscious::new()), + turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } @@ -311,6 +316,10 @@ impl Mind { self.subconscious.lock().await.walked() } + pub async fn unconscious_snapshots(&self) -> Vec { + self.unconscious.lock().await.snapshots() + } + pub async fn init(&self) { // Restore conversation self.agent.restore_from_log().await; @@ -523,6 +532,11 @@ impl Mind { let mut sub = self.subconscious.lock().await; sub.collect_results(&self.agent).await; sub.trigger(&self.agent).await; + drop(sub); + + let mut unc = self.unconscious.lock().await; + unc.collect_results().await; + unc.trigger(); } // Check for pending user input → push to agent context and start turn diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs new file mode 100644 index 0000000..5004c8f --- /dev/null +++ b/src/mind/unconscious.rs @@ -0,0 +1,240 @@ +// unconscious.rs — Graph maintenance agents +// +// Standalone agents that operate on the memory graph without needing +// conversation context. Unlike subconscious agents (which fork the +// conscious agent to share KV cache), unconscious agents create fresh +// Agent instances and select their own target nodes via queries +// defined in their .agent files. +// +// Scheduling is driven by the consolidation plan (neuro/scoring.rs), +// which analyzes graph health metrics and allocates agent runs. + +use std::time::{Duration, Instant}; + +use crate::agent::oneshot::{AutoAgent, AutoStep}; +use crate::agent::tools; +use crate::subconscious::defs; + +/// A single unconscious agent type and its runtime state. +struct UnconsciousAgent { + name: String, + /// How many runs are budgeted (from consolidation plan). + budget: usize, + /// How many runs completed this session. + completed: usize, + /// Currently running task. + handle: Option)>>, + last_run: Option, +} + +impl UnconsciousAgent { + fn is_running(&self) -> bool { + self.handle.as_ref().is_some_and(|h| !h.is_finished()) + } + + fn should_run(&self) -> bool { + if self.is_running() { return false; } + if self.completed >= self.budget { return false; } + // Min interval between runs of the same agent type + if let Some(last) = self.last_run { + if last.elapsed() < Duration::from_secs(60) { return false; } + } + true + } +} + +/// Snapshot for the TUI. +#[derive(Clone)] +pub struct UnconsciousSnapshot { + pub name: String, + pub running: bool, + pub completed: usize, + pub budget: usize, + pub last_run_secs_ago: Option, +} + +/// Orchestrates standalone graph maintenance agents. +pub struct Unconscious { + agents: Vec, + /// Max concurrent agent runs. + max_concurrent: usize, + /// When we last refreshed the consolidation plan. + last_plan_refresh: Option, +} + +impl Unconscious { + pub fn new() -> Self { + Self { + agents: Vec::new(), + max_concurrent: 2, + last_plan_refresh: None, + } + } + + /// Refresh the consolidation plan and update agent budgets. + fn refresh_plan(&mut self) { + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + let plan = crate::neuro::consolidation_plan_quick(&store); + + // Update existing agents or create new ones + for (agent_name, &count) in &plan.counts { + if count == 0 { continue; } + // Only include agents that have .agent definitions + if defs::get_def(agent_name).is_none() { continue; } + + if let Some(existing) = self.agents.iter_mut().find(|a| a.name == *agent_name) { + existing.budget = count; + } else { + self.agents.push(UnconsciousAgent { + name: agent_name.clone(), + budget: count, + completed: 0, + handle: None, + last_run: None, + }); + } + } + + self.last_plan_refresh = Some(Instant::now()); + dbglog!("[unconscious] plan refreshed: {} agent types, {} total runs", + self.agents.len(), + self.agents.iter().map(|a| a.budget).sum::()); + } + + pub fn snapshots(&self) -> Vec { + self.agents.iter().map(|a| UnconsciousSnapshot { + name: a.name.clone(), + running: a.is_running(), + completed: a.completed, + budget: a.budget, + last_run_secs_ago: a.last_run.map(|t| t.elapsed().as_secs_f64()), + }).collect() + } + + /// Collect results from finished agents. + pub async fn collect_results(&mut self) { + for agent in &mut self.agents { + if agent.handle.as_ref().is_some_and(|h| h.is_finished()) { + let handle = agent.handle.take().unwrap(); + agent.last_run = Some(Instant::now()); + agent.completed += 1; + + match handle.await { + Ok((_auto, Ok(text))) => { + let preview = &text[..text.floor_char_boundary(text.len().min(100))]; + dbglog!("[unconscious] {} completed: {}", agent.name, preview); + } + Ok((_auto, Err(e))) => { + dbglog!("[unconscious] {} failed: {}", agent.name, e); + } + Err(e) => { + dbglog!("[unconscious] {} panicked: {}", agent.name, e); + } + } + } + } + } + + /// Trigger agents that are due to run. + pub fn trigger(&mut self) { + // Refresh plan every 30 minutes (or on first call) + let should_refresh = self.last_plan_refresh + .map(|t| t.elapsed() > Duration::from_secs(1800)) + .unwrap_or(true); + if should_refresh { + self.refresh_plan(); + } + + // Count currently running + let running = self.agents.iter().filter(|a| a.is_running()).count(); + if running >= self.max_concurrent { return; } + let slots = self.max_concurrent - running; + + // Find agents that should run, sorted by most work remaining + let mut candidates: Vec = self.agents.iter().enumerate() + .filter(|(_, a)| a.should_run()) + .map(|(i, _)| i) + .collect(); + candidates.sort_by_key(|&i| std::cmp::Reverse( + self.agents[i].budget - self.agents[i].completed + )); + + for idx in candidates.into_iter().take(slots) { + self.spawn_agent(idx); + } + } + + fn spawn_agent(&mut self, idx: usize) { + let name = self.agents[idx].name.clone(); + dbglog!("[unconscious] spawning {} ({}/{})", + name, self.agents[idx].completed + 1, self.agents[idx].budget); + + let def = match defs::get_def(&name) { + Some(d) => d, + None => return, + }; + + // Build tools + let all_tools = tools::memory_and_journal_tools(); + let effective_tools: Vec = if def.tools.is_empty() { + all_tools + } else { + all_tools.into_iter() + .filter(|t| def.tools.iter().any(|w| w == t.name)) + .collect() + }; + + // Run query and resolve placeholders synchronously + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(e) => { + dbglog!("[unconscious] store load failed: {}", e); + return; + } + }; + + // Track which nodes other running agents are working on + // to avoid concurrent collisions + let exclude: std::collections::HashSet = std::collections::HashSet::new(); + + let batch = match defs::run_agent( + &store, &def, def.count.unwrap_or(5), &exclude, + ) { + Ok(b) => b, + Err(e) => { + dbglog!("[unconscious] {} query failed: {}", name, e); + return; + } + }; + + // Record visits + if !batch.node_keys.is_empty() { + let mut store_mut = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + store_mut.record_agent_visits(&batch.node_keys, &name).ok(); + } + + let steps: Vec = batch.steps.iter().map(|s| AutoStep { + prompt: s.prompt.clone(), + phase: s.phase.clone(), + }).collect(); + + let mut auto = AutoAgent::new( + name.clone(), + effective_tools, + steps, + def.temperature.unwrap_or(0.6), + def.priority, + ); + + self.agents[idx].handle = Some(tokio::spawn(async move { + let result = auto.run(None).await; + (auto, result) + })); + } +}