Wire subconscious agents through Mind via AutoAgent

Mind now holds SubconsciousAgent state (surface-observe, journal,
reflect) and triggers them after conscious turns complete. Each
agent forks from the conscious agent's context via AutoAgent,
runs as an async task, and routes output (surfaced memories,
reflections) back into the conscious agent.

Replaces the synchronous AgentCycleState that spawned child
processes and blocked start_turn.

Also adds .agent2 files — simplified prompts for the forked model
that strip {{conversation}} and {{agent-context}} (already in the
forked context).

TODO: resolve remaining placeholders (seen_current, input:walked,
memory_ratio) in the .agent2 prompts.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-07 01:33:07 -04:00
parent b37b6d7495
commit 58ff9a4d50
4 changed files with 406 additions and 39 deletions

View file

@ -23,8 +23,55 @@ use std::time::Instant;
use tokio::sync::mpsc;
use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::agent::oneshot::{AutoAgent, AutoStep};
use crate::config::{AppConfig, SessionConfig};
use crate::subconscious::learn;
use crate::subconscious::{defs, learn};
// ---------------------------------------------------------------------------
// Subconscious agents — forked from conscious agent, run on schedule
// ---------------------------------------------------------------------------
/// A subconscious agent managed by Mind.
struct SubconsciousAgent {
name: String,
def: defs::AgentDef,
/// Conversation bytes at last trigger.
last_trigger_bytes: u64,
/// When the agent last ran.
last_run: Option<Instant>,
/// Running task handle + AutoAgent for status.
handle: Option<tokio::task::JoinHandle<Result<String, String>>>,
}
/// Names and byte-interval triggers for the built-in subconscious agents.
const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[
("surface-observe", 0), // every trigger
("journal", 20_000), // every ~20KB of conversation
("reflect", 100_000), // every ~100KB of conversation
];
impl SubconsciousAgent {
fn new(name: &str, interval_bytes: u64) -> Option<Self> {
let def = defs::get_def(name)?;
Some(Self {
name: name.to_string(),
def,
last_trigger_bytes: 0,
last_run: None,
handle: None,
})
}
fn is_running(&self) -> bool {
self.handle.as_ref().is_some_and(|h| !h.is_finished())
}
fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool {
if self.is_running() { return false; }
if interval == 0 { return true; } // trigger every time
conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval
}
}
/// Which pane streaming text should go to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamTarget {
@ -199,7 +246,7 @@ pub struct Mind {
pub agent: Arc<tokio::sync::Mutex<Agent>>,
pub shared: Arc<SharedMindState>,
pub config: SessionConfig,
agent_cycles: tokio::sync::Mutex<crate::subconscious::subconscious::AgentCycleState>,
subconscious: tokio::sync::Mutex<Vec<SubconsciousAgent>>,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
turn_watch: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
@ -230,9 +277,12 @@ impl Mind {
shared_context,
shared_active_tools,
);
let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&ag.session_id);
let agent = Arc::new(tokio::sync::Mutex::new(ag));
let subconscious = SUBCONSCIOUS_AGENTS.iter()
.filter_map(|(name, interval)| SubconsciousAgent::new(name, *interval))
.collect();
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
let (turn_watch, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
@ -241,7 +291,7 @@ impl Mind {
sup.load_config();
sup.ensure_running();
Self { agent, shared, config, agent_cycles: tokio::sync::Mutex::new(agent_cycles),
Self { agent, shared, config, subconscious: tokio::sync::Mutex::new(subconscious),
turn_tx, turn_watch, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
}
@ -343,44 +393,147 @@ impl Mind {
/// The text moves from pending_input to ContextState atomically —
/// by the time this returns, the message is in context and the turn
/// is running.
/// Collect results from finished subconscious agents and inject
/// their output into the conscious agent's context.
async fn collect_subconscious_results(&self) {
// Collect finished handles without holding the lock across await
let finished: Vec<(String, tokio::task::JoinHandle<Result<String, String>>)> = {
let mut subs = self.subconscious.lock().await;
subs.iter_mut().filter_map(|sub| {
if sub.handle.as_ref().is_some_and(|h| h.is_finished()) {
sub.last_run = Some(Instant::now());
Some((sub.name.clone(), sub.handle.take().unwrap()))
} else {
None
}
}).collect()
};
for (name, handle) in finished {
match handle.await {
Ok(Ok(_output)) => {
let output_dir = crate::store::memory_dir()
.join("agent-output").join(&name);
// Surfaced memories
let surface_path = output_dir.join("surface");
if let Ok(content) = std::fs::read_to_string(&surface_path) {
let mut ag = self.agent.lock().await;
for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) {
if let Some(rendered) = crate::cli::node::render_node(
&crate::store::Store::load().unwrap_or_default(), key,
) {
let mut msg = crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
key, rendered,
));
msg.stamp();
ag.push_entry(crate::agent::context::ConversationEntry::Memory {
key: key.to_string(), message: msg,
});
}
}
std::fs::remove_file(&surface_path).ok();
}
// Reflection
let reflect_path = output_dir.join("reflection");
if let Ok(content) = std::fs::read_to_string(&reflect_path) {
if !content.trim().is_empty() {
let mut ag = self.agent.lock().await;
ag.push_message(crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
content.trim(),
)));
}
std::fs::remove_file(&reflect_path).ok();
}
dbglog!("[mind] {} completed", name);
}
Ok(Err(e)) => dbglog!("[mind] {} failed: {}", name, e),
Err(e) => dbglog!("[mind] {} panicked: {}", name, e),
}
}
}
/// Trigger subconscious agents that are due to run.
async fn trigger_subconscious(&self) {
if self.config.no_agents { return; }
// Estimate conversation size from the conscious agent's entries
let conversation_bytes = {
let ag = self.agent.lock().await;
ag.context.entries.iter()
.filter(|e| !e.is_log() && !e.is_memory())
.map(|e| e.message().content_text().len() as u64)
.sum::<u64>()
};
// Collect which agents to trigger (can't hold lock across await)
let to_trigger: Vec<(usize, Vec<AutoStep>, Vec<crate::agent::tools::Tool>, String, i32)> = {
let mut subs = self.subconscious.lock().await;
let mut result = Vec::new();
for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() {
if i >= subs.len() { continue; }
if !subs[i].should_trigger(conversation_bytes, interval) { continue; }
let sub = &mut subs[i];
sub.last_trigger_bytes = conversation_bytes;
let steps: Vec<AutoStep> = sub.def.steps.iter().map(|s| {
// TODO: resolve remaining placeholders (seen_current, input:walked, etc.)
AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone() }
}).collect();
let all_tools = crate::agent::tools::memory_and_journal_tools();
let tools: Vec<crate::agent::tools::Tool> = if sub.def.tools.is_empty() {
all_tools.to_vec()
} else {
all_tools.into_iter()
.filter(|t| sub.def.tools.iter().any(|w| w == t.name))
.collect()
};
result.push((i, steps, tools, sub.name.clone(), sub.def.priority));
}
result
};
if to_trigger.is_empty() { return; }
// Fork from conscious agent (one lock acquisition for all)
let conscious = self.agent.lock().await;
let mut spawns = Vec::new();
for (idx, steps, tools, name, priority) in to_trigger {
let output_dir = crate::store::memory_dir()
.join("agent-output").join(&name);
std::fs::create_dir_all(&output_dir).ok();
let mut auto = AutoAgent::from_agent(
name.clone(), &conscious, tools, steps, priority);
dbglog!("[mind] triggering {}", name);
let handle = tokio::spawn(async move {
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
auto.run(None).await
});
spawns.push((idx, handle));
}
drop(conscious);
// Store handles
let mut subs = self.subconscious.lock().await;
for (idx, handle) in spawns {
if idx < subs.len() {
subs[idx].handle = Some(handle);
}
}
}
async fn start_turn(&self, text: &str, target: StreamTarget) {
{
let mut ag = self.agent.lock().await;
// Run agent cycle — surface memories and reflection before the user message
let transcript_path = ag.conversation_log.as_ref()
.map(|l| l.path().to_string_lossy().to_string())
.unwrap_or_default();
let session = crate::session::HookSession::from_fields(
ag.session_id.clone(),
transcript_path,
"UserPromptSubmit".into(),
);
let mut cycles = self.agent_cycles.lock().await;
cycles.trigger(&session);
let cycle = std::mem::take(&mut cycles.last_output);
drop(cycles);
for key in &cycle.surfaced_keys {
if let Some(rendered) = crate::cli::node::render_node(
&crate::store::Store::load().unwrap_or_default(), &key,
) {
let mut msg = crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
key, rendered,
));
msg.stamp();
ag.push_entry(crate::agent::context::ConversationEntry::Memory {
key: key.clone(), message: msg,
});
}
}
if let Some(ref reflection) = cycle.reflection {
ag.push_message(crate::agent::api::types::Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
reflection.trim(),
)));
}
match target {
StreamTarget::Conversation => {
ag.push_message(crate::agent::api::types::Message::user(text));
@ -470,6 +623,10 @@ impl Mind {
if !self.config.no_agents {
cmds.push(MindCommand::Score);
}
// Trigger subconscious agents after conscious turn completes
self.collect_subconscious_results().await;
self.trigger_subconscious().await;
}
_ = tokio::time::sleep(timeout), if !turn_active => {