From 03146195795914c569032c7beb9041268a0243f4 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Wed, 8 Apr 2026 23:39:48 -0400 Subject: [PATCH] Add mind/unconscious.rs: standalone graph maintenance agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unconscious agents (organize, linker, distill, etc.) run independently of the conversation context. They create fresh Agent instances, select target nodes via their .agent file queries, and are scheduled by the consolidation plan which analyzes graph health metrics. Key differences from subconscious agents: - No fork — standalone agents with fresh context - Self-selecting — queries in .agent files pick target nodes - Budget-driven — consolidation plan allocates runs per type - Max 2 concurrent, 60s min interval between same-type runs Wired into Mind event loop alongside subconscious trigger/collect. TUI display not yet implemented. Co-Authored-By: Proof of Concept --- src/mind/mod.rs | 16 ++- src/mind/unconscious.rs | 240 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 src/mind/unconscious.rs diff --git a/src/mind/mod.rs b/src/mind/mod.rs index cf043f1..fea5187 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -5,6 +5,7 @@ // user interface (TUI, CLI) and the agent execution (tools, API). pub mod subconscious; +pub mod unconscious; pub mod identity; pub mod log; @@ -27,6 +28,7 @@ use crate::config::{AppConfig, SessionConfig}; use crate::subconscious::learn; pub use subconscious::{SubconsciousSnapshot, Subconscious}; +pub use unconscious::{UnconsciousSnapshot, Unconscious}; use crate::agent::context::{AstNode, NodeBody, Section, Ast, ContextState}; @@ -252,6 +254,7 @@ pub struct Mind { pub shared: Arc, pub config: SessionConfig, subconscious: Arc>, + unconscious: tokio::sync::Mutex, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, @@ -291,7 +294,9 @@ impl Mind { subconscious.lock().await.init_output_tool(subconscious.clone()); Self { agent, shared, config, - subconscious, turn_tx, turn_watch, bg_tx, + subconscious, + unconscious: tokio::sync::Mutex::new(Unconscious::new()), + turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } @@ -311,6 +316,10 @@ impl Mind { self.subconscious.lock().await.walked() } + pub async fn unconscious_snapshots(&self) -> Vec { + self.unconscious.lock().await.snapshots() + } + pub async fn init(&self) { // Restore conversation self.agent.restore_from_log().await; @@ -523,6 +532,11 @@ impl Mind { let mut sub = self.subconscious.lock().await; sub.collect_results(&self.agent).await; sub.trigger(&self.agent).await; + drop(sub); + + let mut unc = self.unconscious.lock().await; + unc.collect_results().await; + unc.trigger(); } // Check for pending user input → push to agent context and start turn diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs new file mode 100644 index 0000000..5004c8f --- /dev/null +++ b/src/mind/unconscious.rs @@ -0,0 +1,240 @@ +// unconscious.rs — Graph maintenance agents +// +// Standalone agents that operate on the memory graph without needing +// conversation context. Unlike subconscious agents (which fork the +// conscious agent to share KV cache), unconscious agents create fresh +// Agent instances and select their own target nodes via queries +// defined in their .agent files. +// +// Scheduling is driven by the consolidation plan (neuro/scoring.rs), +// which analyzes graph health metrics and allocates agent runs. + +use std::time::{Duration, Instant}; + +use crate::agent::oneshot::{AutoAgent, AutoStep}; +use crate::agent::tools; +use crate::subconscious::defs; + +/// A single unconscious agent type and its runtime state. +struct UnconsciousAgent { + name: String, + /// How many runs are budgeted (from consolidation plan). + budget: usize, + /// How many runs completed this session. + completed: usize, + /// Currently running task. + handle: Option)>>, + last_run: Option, +} + +impl UnconsciousAgent { + fn is_running(&self) -> bool { + self.handle.as_ref().is_some_and(|h| !h.is_finished()) + } + + fn should_run(&self) -> bool { + if self.is_running() { return false; } + if self.completed >= self.budget { return false; } + // Min interval between runs of the same agent type + if let Some(last) = self.last_run { + if last.elapsed() < Duration::from_secs(60) { return false; } + } + true + } +} + +/// Snapshot for the TUI. +#[derive(Clone)] +pub struct UnconsciousSnapshot { + pub name: String, + pub running: bool, + pub completed: usize, + pub budget: usize, + pub last_run_secs_ago: Option, +} + +/// Orchestrates standalone graph maintenance agents. +pub struct Unconscious { + agents: Vec, + /// Max concurrent agent runs. + max_concurrent: usize, + /// When we last refreshed the consolidation plan. + last_plan_refresh: Option, +} + +impl Unconscious { + pub fn new() -> Self { + Self { + agents: Vec::new(), + max_concurrent: 2, + last_plan_refresh: None, + } + } + + /// Refresh the consolidation plan and update agent budgets. + fn refresh_plan(&mut self) { + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + let plan = crate::neuro::consolidation_plan_quick(&store); + + // Update existing agents or create new ones + for (agent_name, &count) in &plan.counts { + if count == 0 { continue; } + // Only include agents that have .agent definitions + if defs::get_def(agent_name).is_none() { continue; } + + if let Some(existing) = self.agents.iter_mut().find(|a| a.name == *agent_name) { + existing.budget = count; + } else { + self.agents.push(UnconsciousAgent { + name: agent_name.clone(), + budget: count, + completed: 0, + handle: None, + last_run: None, + }); + } + } + + self.last_plan_refresh = Some(Instant::now()); + dbglog!("[unconscious] plan refreshed: {} agent types, {} total runs", + self.agents.len(), + self.agents.iter().map(|a| a.budget).sum::()); + } + + pub fn snapshots(&self) -> Vec { + self.agents.iter().map(|a| UnconsciousSnapshot { + name: a.name.clone(), + running: a.is_running(), + completed: a.completed, + budget: a.budget, + last_run_secs_ago: a.last_run.map(|t| t.elapsed().as_secs_f64()), + }).collect() + } + + /// Collect results from finished agents. + pub async fn collect_results(&mut self) { + for agent in &mut self.agents { + if agent.handle.as_ref().is_some_and(|h| h.is_finished()) { + let handle = agent.handle.take().unwrap(); + agent.last_run = Some(Instant::now()); + agent.completed += 1; + + match handle.await { + Ok((_auto, Ok(text))) => { + let preview = &text[..text.floor_char_boundary(text.len().min(100))]; + dbglog!("[unconscious] {} completed: {}", agent.name, preview); + } + Ok((_auto, Err(e))) => { + dbglog!("[unconscious] {} failed: {}", agent.name, e); + } + Err(e) => { + dbglog!("[unconscious] {} panicked: {}", agent.name, e); + } + } + } + } + } + + /// Trigger agents that are due to run. + pub fn trigger(&mut self) { + // Refresh plan every 30 minutes (or on first call) + let should_refresh = self.last_plan_refresh + .map(|t| t.elapsed() > Duration::from_secs(1800)) + .unwrap_or(true); + if should_refresh { + self.refresh_plan(); + } + + // Count currently running + let running = self.agents.iter().filter(|a| a.is_running()).count(); + if running >= self.max_concurrent { return; } + let slots = self.max_concurrent - running; + + // Find agents that should run, sorted by most work remaining + let mut candidates: Vec = self.agents.iter().enumerate() + .filter(|(_, a)| a.should_run()) + .map(|(i, _)| i) + .collect(); + candidates.sort_by_key(|&i| std::cmp::Reverse( + self.agents[i].budget - self.agents[i].completed + )); + + for idx in candidates.into_iter().take(slots) { + self.spawn_agent(idx); + } + } + + fn spawn_agent(&mut self, idx: usize) { + let name = self.agents[idx].name.clone(); + dbglog!("[unconscious] spawning {} ({}/{})", + name, self.agents[idx].completed + 1, self.agents[idx].budget); + + let def = match defs::get_def(&name) { + Some(d) => d, + None => return, + }; + + // Build tools + let all_tools = tools::memory_and_journal_tools(); + let effective_tools: Vec = if def.tools.is_empty() { + all_tools + } else { + all_tools.into_iter() + .filter(|t| def.tools.iter().any(|w| w == t.name)) + .collect() + }; + + // Run query and resolve placeholders synchronously + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(e) => { + dbglog!("[unconscious] store load failed: {}", e); + return; + } + }; + + // Track which nodes other running agents are working on + // to avoid concurrent collisions + let exclude: std::collections::HashSet = std::collections::HashSet::new(); + + let batch = match defs::run_agent( + &store, &def, def.count.unwrap_or(5), &exclude, + ) { + Ok(b) => b, + Err(e) => { + dbglog!("[unconscious] {} query failed: {}", name, e); + return; + } + }; + + // Record visits + if !batch.node_keys.is_empty() { + let mut store_mut = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + store_mut.record_agent_visits(&batch.node_keys, &name).ok(); + } + + let steps: Vec = batch.steps.iter().map(|s| AutoStep { + prompt: s.prompt.clone(), + phase: s.phase.clone(), + }).collect(); + + let mut auto = AutoAgent::new( + name.clone(), + effective_tools, + steps, + def.temperature.unwrap_or(0.6), + def.priority, + ); + + self.agents[idx].handle = Some(tokio::spawn(async move { + let result = auto.run(None).await; + (auto, result) + })); + } +}