AutoAgent: persistent across runs, run() vs run_forked()
AutoAgent holds config + walked state. Backend is ephemeral per run:
- run(): standalone, global API client (oneshot CLI)
- run_forked(): forks conscious agent, resolves prompt templates
with current memory_keys and walked state
Mind creates AutoAgents once at startup, takes them out for spawned
tasks, puts them back on completion (preserving walked state).
Removes {{seen_previous}}, {{input:walked}}, {{memory_ratio}} from
subconscious agent prompts. Walked keys are now a Vec on AutoAgent,
resolved via {{walked}} from in-memory state.
Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
ba62e0a767
commit
94ddf7b189
5 changed files with 238 additions and 247 deletions
220
src/mind/mod.rs
220
src/mind/mod.rs
|
|
@ -33,13 +33,12 @@ use crate::subconscious::{defs, learn};
|
|||
|
||||
/// A subconscious agent managed by Mind.
|
||||
struct SubconsciousAgent {
|
||||
name: String,
|
||||
def: defs::AgentDef,
|
||||
auto: AutoAgent,
|
||||
/// Conversation bytes at last trigger.
|
||||
last_trigger_bytes: u64,
|
||||
/// When the agent last ran.
|
||||
last_run: Option<Instant>,
|
||||
/// Running task handle + AutoAgent for status.
|
||||
/// Running task handle.
|
||||
handle: Option<tokio::task::JoinHandle<Result<String, String>>>,
|
||||
}
|
||||
|
||||
|
|
@ -51,11 +50,30 @@ const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[
|
|||
];
|
||||
|
||||
impl SubconsciousAgent {
|
||||
fn new(name: &str, interval_bytes: u64) -> Option<Self> {
|
||||
fn new(name: &str, _interval_bytes: u64) -> Option<Self> {
|
||||
let def = defs::get_def(name)?;
|
||||
|
||||
let all_tools = crate::agent::tools::memory_and_journal_tools();
|
||||
let tools: Vec<crate::agent::tools::Tool> = if def.tools.is_empty() {
|
||||
all_tools.to_vec()
|
||||
} else {
|
||||
all_tools.into_iter()
|
||||
.filter(|t| def.tools.iter().any(|w| w == t.name))
|
||||
.collect()
|
||||
};
|
||||
|
||||
let steps: Vec<AutoStep> = def.steps.iter().map(|s| AutoStep {
|
||||
prompt: s.prompt.clone(),
|
||||
phase: s.phase.clone(),
|
||||
}).collect();
|
||||
|
||||
let auto = AutoAgent::new(
|
||||
name.to_string(), tools, steps,
|
||||
def.temperature.unwrap_or(0.6), def.priority,
|
||||
);
|
||||
|
||||
Some(Self {
|
||||
name: name.to_string(),
|
||||
def,
|
||||
auto,
|
||||
last_trigger_bytes: 0,
|
||||
last_run: None,
|
||||
handle: None,
|
||||
|
|
@ -68,60 +86,11 @@ impl SubconsciousAgent {
|
|||
|
||||
fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool {
|
||||
if self.is_running() { return false; }
|
||||
if interval == 0 { return true; } // trigger every time
|
||||
if interval == 0 { return true; }
|
||||
conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve {{placeholder}} templates in subconscious agent prompts.
|
||||
/// Handles: seen_current, seen_previous, input:KEY.
|
||||
/// Resolve {{placeholder}} templates in subconscious agent prompts.
|
||||
fn resolve_prompt(
|
||||
template: &str,
|
||||
memory_keys: &[String],
|
||||
output_dir: &std::path::Path,
|
||||
) -> String {
|
||||
let mut result = String::with_capacity(template.len());
|
||||
let mut rest = template;
|
||||
while let Some(start) = rest.find("{{") {
|
||||
result.push_str(&rest[..start]);
|
||||
let after = &rest[start + 2..];
|
||||
if let Some(end) = after.find("}}") {
|
||||
let name = after[..end].trim();
|
||||
let replacement = match name {
|
||||
"seen_current" | "seen_previous" => {
|
||||
if memory_keys.is_empty() {
|
||||
"(none)".to_string()
|
||||
} else {
|
||||
memory_keys.iter()
|
||||
.map(|k| format!("- {}", k))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
}
|
||||
}
|
||||
_ if name.starts_with("input:") => {
|
||||
let key = &name[6..];
|
||||
std::fs::read_to_string(output_dir.join(key))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
_ => {
|
||||
// Unknown placeholder — leave as-is
|
||||
result.push_str("{{");
|
||||
result.push_str(&after[..end + 2]);
|
||||
rest = &after[end + 2..];
|
||||
continue;
|
||||
}
|
||||
};
|
||||
result.push_str(&replacement);
|
||||
rest = &after[end + 2..];
|
||||
} else {
|
||||
result.push_str("{{");
|
||||
rest = after;
|
||||
}
|
||||
}
|
||||
result.push_str(rest);
|
||||
result
|
||||
}
|
||||
/// Which pane streaming text should go to.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum StreamTarget {
|
||||
|
|
@ -296,7 +265,7 @@ pub struct Mind {
|
|||
pub agent: Arc<tokio::sync::Mutex<Agent>>,
|
||||
pub shared: Arc<SharedMindState>,
|
||||
pub config: SessionConfig,
|
||||
subconscious: tokio::sync::Mutex<Vec<SubconsciousAgent>>,
|
||||
subconscious: Arc<tokio::sync::Mutex<Vec<SubconsciousAgent>>>,
|
||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||
turn_watch: tokio::sync::watch::Sender<bool>,
|
||||
bg_tx: mpsc::UnboundedSender<BgEvent>,
|
||||
|
|
@ -341,7 +310,7 @@ impl Mind {
|
|||
sup.load_config();
|
||||
sup.ensure_running();
|
||||
|
||||
Self { agent, shared, config, subconscious: tokio::sync::Mutex::new(subconscious),
|
||||
Self { agent, shared, config, subconscious: Arc::new(tokio::sync::Mutex::new(subconscious)),
|
||||
turn_tx, turn_watch, bg_tx,
|
||||
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
|
||||
}
|
||||
|
|
@ -447,24 +416,25 @@ impl Mind {
|
|||
/// their output into the conscious agent's context.
|
||||
async fn collect_subconscious_results(&self) {
|
||||
// Collect finished handles without holding the lock across await
|
||||
let finished: Vec<(String, tokio::task::JoinHandle<Result<String, String>>)> = {
|
||||
let finished: Vec<(usize, tokio::task::JoinHandle<Result<String, String>>)> = {
|
||||
let mut subs = self.subconscious.lock().await;
|
||||
subs.iter_mut().filter_map(|sub| {
|
||||
subs.iter_mut().enumerate().filter_map(|(i, sub)| {
|
||||
if sub.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
||||
sub.last_run = Some(Instant::now());
|
||||
Some((sub.name.clone(), sub.handle.take().unwrap()))
|
||||
Some((i, sub.handle.take().unwrap()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect()
|
||||
};
|
||||
|
||||
for (name, handle) in finished {
|
||||
for (idx, handle) in finished {
|
||||
let name = self.subconscious.lock().await[idx].auto.name.clone();
|
||||
let output_dir = crate::store::memory_dir()
|
||||
.join("agent-output").join(&name);
|
||||
|
||||
match handle.await {
|
||||
Ok(Ok(_output)) => {
|
||||
let output_dir = crate::store::memory_dir()
|
||||
.join("agent-output").join(&name);
|
||||
|
||||
// Surfaced memories
|
||||
let surface_path = output_dir.join("surface");
|
||||
if let Ok(content) = std::fs::read_to_string(&surface_path) {
|
||||
|
|
@ -486,6 +456,21 @@ impl Mind {
|
|||
std::fs::remove_file(&surface_path).ok();
|
||||
}
|
||||
|
||||
// Walked keys — store for next run
|
||||
let walked_path = output_dir.join("walked");
|
||||
if let Ok(content) = std::fs::read_to_string(&walked_path) {
|
||||
let walked: Vec<String> = content.lines()
|
||||
.map(|l| l.trim().to_string())
|
||||
.filter(|l| !l.is_empty())
|
||||
.collect();
|
||||
// Store on all subconscious agents (shared state)
|
||||
let mut subs = self.subconscious.lock().await;
|
||||
for sub in subs.iter_mut() {
|
||||
sub.auto.walked = walked.clone();
|
||||
}
|
||||
std::fs::remove_file(&walked_path).ok();
|
||||
}
|
||||
|
||||
// Reflection
|
||||
let reflect_path = output_dir.join("reflection");
|
||||
if let Ok(content) = std::fs::read_to_string(&reflect_path) {
|
||||
|
|
@ -511,89 +496,82 @@ impl Mind {
|
|||
async fn trigger_subconscious(&self) {
|
||||
if self.config.no_agents { return; }
|
||||
|
||||
// Estimate conversation size from the conscious agent's entries
|
||||
let conversation_bytes = {
|
||||
// Get conversation size + memory keys from conscious agent
|
||||
let (conversation_bytes, memory_keys) = {
|
||||
let ag = self.agent.lock().await;
|
||||
ag.context.entries.iter()
|
||||
let bytes = ag.context.entries.iter()
|
||||
.filter(|e| !e.is_log() && !e.is_memory())
|
||||
.map(|e| e.message().content_text().len() as u64)
|
||||
.sum::<u64>()
|
||||
};
|
||||
|
||||
// Get memory keys from conscious agent for placeholder resolution
|
||||
let memory_keys: Vec<String> = {
|
||||
let ag = self.agent.lock().await;
|
||||
ag.context.entries.iter().filter_map(|e| {
|
||||
.sum::<u64>();
|
||||
let keys: Vec<String> = ag.context.entries.iter().filter_map(|e| {
|
||||
if let crate::agent::context::ConversationEntry::Memory { key, .. } = e {
|
||||
Some(key.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect()
|
||||
} else { None }
|
||||
}).collect();
|
||||
(bytes, keys)
|
||||
};
|
||||
|
||||
// Collect which agents to trigger (can't hold lock across await)
|
||||
let to_trigger: Vec<(usize, Vec<AutoStep>, Vec<crate::agent::tools::Tool>, String, i32)> = {
|
||||
// Find which agents to trigger, take their AutoAgents out
|
||||
let mut to_run: Vec<(usize, AutoAgent)> = Vec::new();
|
||||
{
|
||||
let mut subs = self.subconscious.lock().await;
|
||||
let mut result = Vec::new();
|
||||
for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() {
|
||||
if i >= subs.len() { continue; }
|
||||
if !subs[i].should_trigger(conversation_bytes, interval) { continue; }
|
||||
subs[i].last_trigger_bytes = conversation_bytes;
|
||||
|
||||
let sub = &mut subs[i];
|
||||
sub.last_trigger_bytes = conversation_bytes;
|
||||
|
||||
// The output dir for this agent — used for input: placeholders
|
||||
// and the output() tool at runtime
|
||||
let output_dir = crate::store::memory_dir()
|
||||
.join("agent-output").join(&sub.name);
|
||||
|
||||
let steps: Vec<AutoStep> = sub.def.steps.iter().map(|s| {
|
||||
let prompt = resolve_prompt(&s.prompt, &memory_keys, &output_dir);
|
||||
AutoStep { prompt, phase: s.phase.clone() }
|
||||
}).collect();
|
||||
|
||||
let all_tools = crate::agent::tools::memory_and_journal_tools();
|
||||
let tools: Vec<crate::agent::tools::Tool> = if sub.def.tools.is_empty() {
|
||||
all_tools.to_vec()
|
||||
} else {
|
||||
all_tools.into_iter()
|
||||
.filter(|t| sub.def.tools.iter().any(|w| w == t.name))
|
||||
.collect()
|
||||
};
|
||||
|
||||
result.push((i, steps, tools, sub.name.clone(), sub.def.priority));
|
||||
// Take the AutoAgent out — task owns it, returns it when done
|
||||
let auto = std::mem::replace(&mut subs[i].auto,
|
||||
AutoAgent::new(String::new(), vec![], vec![], 0.0, 0));
|
||||
to_run.push((i, auto));
|
||||
}
|
||||
result
|
||||
};
|
||||
}
|
||||
|
||||
if to_trigger.is_empty() { return; }
|
||||
if to_run.is_empty() { return; }
|
||||
|
||||
// Fork from conscious agent (one lock acquisition for all)
|
||||
// Fork from conscious agent and spawn tasks
|
||||
let conscious = self.agent.lock().await;
|
||||
let mut spawns = Vec::new();
|
||||
for (idx, steps, tools, name, priority) in to_trigger {
|
||||
for (idx, mut auto) in to_run {
|
||||
let output_dir = crate::store::memory_dir()
|
||||
.join("agent-output").join(&name);
|
||||
.join("agent-output").join(&auto.name);
|
||||
std::fs::create_dir_all(&output_dir).ok();
|
||||
|
||||
let mut auto = AutoAgent::from_agent(
|
||||
name.clone(), &conscious, tools, steps, priority);
|
||||
dbglog!("[mind] triggering {}", name);
|
||||
dbglog!("[mind] triggering {}", auto.name);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
|
||||
auto.run(None).await
|
||||
});
|
||||
let forked = conscious.fork(auto.tools.clone());
|
||||
let keys = memory_keys.clone();
|
||||
let handle: tokio::task::JoinHandle<(AutoAgent, Result<String, String>)> =
|
||||
tokio::spawn(async move {
|
||||
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
|
||||
let result = auto.run_forked(&forked, &keys).await;
|
||||
(auto, result)
|
||||
});
|
||||
spawns.push((idx, handle));
|
||||
}
|
||||
drop(conscious);
|
||||
|
||||
// Store handles
|
||||
let mut subs = self.subconscious.lock().await;
|
||||
// Store handles (type-erased — we'll extract AutoAgent on completion)
|
||||
// We need to store the JoinHandle that returns (AutoAgent, Result)
|
||||
// but SubconsciousAgent.handle expects JoinHandle<Result<String, String>>.
|
||||
// Wrap: spawn an outer task that extracts the result and puts back the AutoAgent.
|
||||
let subconscious = self.subconscious.clone();
|
||||
for (idx, handle) in spawns {
|
||||
let subs = subconscious.clone();
|
||||
let outer = tokio::spawn(async move {
|
||||
let (auto, result) = handle.await.unwrap_or_else(
|
||||
|e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0),
|
||||
Err(format!("task panicked: {}", e))));
|
||||
// Put the AutoAgent back
|
||||
let mut locked = subs.lock().await;
|
||||
if idx < locked.len() {
|
||||
locked[idx].auto = auto;
|
||||
}
|
||||
result
|
||||
});
|
||||
let mut subs = self.subconscious.lock().await;
|
||||
if idx < subs.len() {
|
||||
subs[idx].handle = Some(handle);
|
||||
subs[idx].handle = Some(outer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue