forked from kent/consciousness
Lightweight resolver handles {{seen_current}}, {{seen_previous}}, and
{{input:KEY}} using the session_id and output_dir directly instead of
env vars. Runs in trigger_subconscious before creating AutoAgent.
Removes {{memory_ratio}} from surface-observe prompt — redundant with
existing budget mechanisms.
Co-Authored-By: Proof of Concept <poc@bcachefs.org>
730 lines
28 KiB
Rust
730 lines
28 KiB
Rust
// mind/ — Cognitive layer
|
|
//
|
|
// Mind state machine, DMN, identity, observation socket.
|
|
// Everything about how the mind operates, separate from the
|
|
// user interface (TUI, CLI) and the agent execution (tools, API).
|
|
|
|
pub mod dmn;
|
|
pub mod identity;
|
|
pub mod log;
|
|
|
|
// consciousness.rs — Mind state machine and event loop
|
|
//
|
|
// The core runtime for the consciousness binary. Mind manages turns,
|
|
// DMN state, compaction, scoring, and slash commands. The event loop
|
|
// bridges Mind (cognitive state) with App (TUI rendering).
|
|
//
|
|
// The event loop uses biased select! so priorities are deterministic:
|
|
// keyboard events > turn results > render ticks > DMN timer > UI messages.
|
|
|
|
use anyhow::Result;
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
use tokio::sync::mpsc;
|
|
use crate::agent::{Agent, TurnResult};
|
|
use crate::agent::api::ApiClient;
|
|
use crate::agent::oneshot::{AutoAgent, AutoStep};
|
|
use crate::config::{AppConfig, SessionConfig};
|
|
use crate::subconscious::{defs, learn};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Subconscious agents — forked from conscious agent, run on schedule
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// A subconscious agent managed by Mind.
|
|
struct SubconsciousAgent {
|
|
name: String,
|
|
def: defs::AgentDef,
|
|
/// Conversation bytes at last trigger.
|
|
last_trigger_bytes: u64,
|
|
/// When the agent last ran.
|
|
last_run: Option<Instant>,
|
|
/// Running task handle + AutoAgent for status.
|
|
handle: Option<tokio::task::JoinHandle<Result<String, String>>>,
|
|
}
|
|
|
|
/// Names and byte-interval triggers for the built-in subconscious agents.
|
|
const SUBCONSCIOUS_AGENTS: &[(&str, u64)] = &[
|
|
("subconscious-surface-observe", 0), // every trigger
|
|
("subconscious-journal", 20_000), // every ~20KB of conversation
|
|
("subconscious-reflect", 100_000), // every ~100KB of conversation
|
|
];
|
|
|
|
impl SubconsciousAgent {
|
|
fn new(name: &str, interval_bytes: u64) -> Option<Self> {
|
|
let def = defs::get_def(name)?;
|
|
Some(Self {
|
|
name: name.to_string(),
|
|
def,
|
|
last_trigger_bytes: 0,
|
|
last_run: None,
|
|
handle: None,
|
|
})
|
|
}
|
|
|
|
fn is_running(&self) -> bool {
|
|
self.handle.as_ref().is_some_and(|h| !h.is_finished())
|
|
}
|
|
|
|
fn should_trigger(&self, conversation_bytes: u64, interval: u64) -> bool {
|
|
if self.is_running() { return false; }
|
|
if interval == 0 { return true; } // trigger every time
|
|
conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval
|
|
}
|
|
}
|
|
|
|
/// Resolve {{placeholder}} templates in subconscious agent prompts.
|
|
/// Handles: seen_current, seen_previous, input:KEY.
|
|
fn resolve_prompt(template: &str, session_id: &str, output_dir: &std::path::Path) -> String {
|
|
let mut result = String::with_capacity(template.len());
|
|
let mut rest = template;
|
|
while let Some(start) = rest.find("{{") {
|
|
result.push_str(&rest[..start]);
|
|
let after = &rest[start + 2..];
|
|
if let Some(end) = after.find("}}") {
|
|
let name = after[..end].trim();
|
|
let replacement = match name {
|
|
"seen_current" => resolve_seen_list(session_id, ""),
|
|
"seen_previous" => resolve_seen_list(session_id, "-prev"),
|
|
_ if name.starts_with("input:") => {
|
|
let key = &name[6..];
|
|
std::fs::read_to_string(output_dir.join(key))
|
|
.unwrap_or_default()
|
|
}
|
|
_ => {
|
|
// Unknown placeholder — leave as-is
|
|
result.push_str("{{");
|
|
result.push_str(&after[..end + 2]);
|
|
rest = &after[end + 2..];
|
|
continue;
|
|
}
|
|
};
|
|
result.push_str(&replacement);
|
|
rest = &after[end + 2..];
|
|
} else {
|
|
// Unclosed {{ — pass through
|
|
result.push_str("{{");
|
|
rest = after;
|
|
}
|
|
}
|
|
result.push_str(rest);
|
|
result
|
|
}
|
|
|
|
fn resolve_seen_list(session_id: &str, suffix: &str) -> String {
|
|
if session_id.is_empty() { return "(no session)".to_string(); }
|
|
|
|
let path = crate::store::memory_dir()
|
|
.join("sessions")
|
|
.join(format!("seen{}-{}", suffix, session_id));
|
|
|
|
let entries: Vec<(String, String)> = std::fs::read_to_string(&path).ok()
|
|
.map(|content| {
|
|
content.lines()
|
|
.filter(|s| !s.is_empty())
|
|
.filter_map(|line| {
|
|
let (ts, key) = line.split_once('\t')?;
|
|
Some((ts.to_string(), key.to_string()))
|
|
})
|
|
.collect()
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
if entries.is_empty() { return "(none)".to_string(); }
|
|
|
|
let mut sorted = entries;
|
|
sorted.sort_by(|a, b| b.0.cmp(&a.0));
|
|
let mut seen = std::collections::HashSet::new();
|
|
sorted.into_iter()
|
|
.filter(|(_, key)| seen.insert(key.clone()))
|
|
.take(20)
|
|
.map(|(ts, key)| format!("- {} ({})", key, ts))
|
|
.collect::<Vec<_>>()
|
|
.join("\n")
|
|
}
|
|
/// Which pane streaming text should go to.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum StreamTarget {
|
|
/// User-initiated turn — text goes to conversation pane.
|
|
Conversation,
|
|
/// DMN-initiated turn — text goes to autonomous pane.
|
|
Autonomous,
|
|
}
|
|
|
|
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
|
|
fn compaction_threshold(app: &AppConfig) -> u32 {
|
|
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
|
|
}
|
|
|
|
/// Shared state between Mind and UI.
|
|
pub struct MindState {
|
|
/// Pending user input — UI pushes, Mind consumes after turn completes.
|
|
pub input: Vec<String>,
|
|
/// True while a turn is in progress.
|
|
pub turn_active: bool,
|
|
/// DMN state
|
|
pub dmn: dmn::State,
|
|
pub dmn_turns: u32,
|
|
pub max_dmn_turns: u32,
|
|
/// Whether memory scoring is running.
|
|
pub scoring_in_flight: bool,
|
|
/// Whether compaction is running.
|
|
pub compaction_in_flight: bool,
|
|
/// Per-turn tracking
|
|
pub last_user_input: Instant,
|
|
pub consecutive_errors: u32,
|
|
pub last_turn_had_tools: bool,
|
|
/// Handle to the currently running turn task.
|
|
pub turn_handle: Option<tokio::task::JoinHandle<()>>,
|
|
}
|
|
|
|
impl Clone for MindState {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
input: self.input.clone(),
|
|
turn_active: self.turn_active,
|
|
dmn: self.dmn.clone(),
|
|
dmn_turns: self.dmn_turns,
|
|
max_dmn_turns: self.max_dmn_turns,
|
|
scoring_in_flight: self.scoring_in_flight,
|
|
compaction_in_flight: self.compaction_in_flight,
|
|
last_user_input: self.last_user_input,
|
|
consecutive_errors: self.consecutive_errors,
|
|
last_turn_had_tools: self.last_turn_had_tools,
|
|
turn_handle: None, // Not cloned — only Mind's loop uses this
|
|
}
|
|
}
|
|
}
|
|
|
|
/// What should happen after a state transition.
|
|
pub enum MindCommand {
|
|
/// Run compaction check
|
|
Compact,
|
|
/// Run memory scoring
|
|
Score,
|
|
/// Abort current turn, kill processes
|
|
Interrupt,
|
|
/// Reset session
|
|
NewSession,
|
|
/// Nothing to do
|
|
None,
|
|
}
|
|
|
|
impl MindState {
|
|
pub fn new(max_dmn_turns: u32) -> Self {
|
|
Self {
|
|
input: Vec::new(),
|
|
turn_active: false,
|
|
dmn: if dmn::is_off() { dmn::State::Off }
|
|
else { dmn::State::Resting { since: Instant::now() } },
|
|
dmn_turns: 0,
|
|
max_dmn_turns,
|
|
scoring_in_flight: false,
|
|
compaction_in_flight: false,
|
|
last_user_input: Instant::now(),
|
|
consecutive_errors: 0,
|
|
last_turn_had_tools: false,
|
|
turn_handle: None,
|
|
}
|
|
}
|
|
|
|
/// Consume pending user input if no turn is active.
|
|
/// Returns the text to send; caller is responsible for pushing it
|
|
/// into the Agent's context and starting the turn.
|
|
fn take_pending_input(&mut self) -> Option<String> {
|
|
if self.turn_active || self.input.is_empty() {
|
|
return None;
|
|
}
|
|
let text = self.input.join("\n");
|
|
self.input.clear();
|
|
self.dmn_turns = 0;
|
|
self.consecutive_errors = 0;
|
|
self.last_user_input = Instant::now();
|
|
self.dmn = dmn::State::Engaged;
|
|
Some(text)
|
|
}
|
|
|
|
/// Process turn completion, return model switch name if requested.
|
|
fn complete_turn(&mut self, result: &Result<TurnResult>, target: StreamTarget) -> Option<String> {
|
|
self.turn_active = false;
|
|
match result {
|
|
Ok(turn_result) => {
|
|
if turn_result.tool_errors > 0 {
|
|
self.consecutive_errors += turn_result.tool_errors;
|
|
} else {
|
|
self.consecutive_errors = 0;
|
|
}
|
|
self.last_turn_had_tools = turn_result.had_tool_calls;
|
|
self.dmn = dmn::transition(
|
|
&self.dmn,
|
|
turn_result.yield_requested,
|
|
turn_result.had_tool_calls,
|
|
target == StreamTarget::Conversation,
|
|
);
|
|
if turn_result.dmn_pause {
|
|
self.dmn = dmn::State::Paused;
|
|
self.dmn_turns = 0;
|
|
}
|
|
turn_result.model_switch.clone()
|
|
}
|
|
Err(_) => {
|
|
self.consecutive_errors += 1;
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// DMN tick — returns a prompt and target if we should run a turn.
|
|
fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> {
|
|
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
|
|
return None;
|
|
}
|
|
|
|
self.dmn_turns += 1;
|
|
if self.dmn_turns > self.max_dmn_turns {
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
self.dmn_turns = 0;
|
|
return None;
|
|
}
|
|
|
|
let dmn_ctx = dmn::DmnContext {
|
|
user_idle: self.last_user_input.elapsed(),
|
|
consecutive_errors: self.consecutive_errors,
|
|
last_turn_had_tools: self.last_turn_had_tools,
|
|
};
|
|
let prompt = self.dmn.prompt(&dmn_ctx);
|
|
Some((prompt, StreamTarget::Autonomous))
|
|
}
|
|
|
|
fn interrupt(&mut self) {
|
|
self.input.clear();
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
}
|
|
}
|
|
|
|
/// Background task completion events.
|
|
enum BgEvent {
|
|
ScoringDone,
|
|
}
|
|
|
|
// --- Mind: cognitive state machine ---
|
|
|
|
pub type SharedMindState = std::sync::Mutex<MindState>;
|
|
|
|
pub struct Mind {
|
|
pub agent: Arc<tokio::sync::Mutex<Agent>>,
|
|
pub shared: Arc<SharedMindState>,
|
|
pub config: SessionConfig,
|
|
subconscious: tokio::sync::Mutex<Vec<SubconsciousAgent>>,
|
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
turn_watch: tokio::sync::watch::Sender<bool>,
|
|
bg_tx: mpsc::UnboundedSender<BgEvent>,
|
|
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
|
|
_supervisor: crate::thalamus::supervisor::Supervisor,
|
|
}
|
|
|
|
impl Mind {
|
|
pub fn new(
|
|
config: SessionConfig,
|
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
) -> Self {
|
|
let shared_context = crate::agent::context::shared_context_state();
|
|
let shared_active_tools = crate::agent::tools::shared_active_tools();
|
|
|
|
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
|
|
let conversation_log = log::ConversationLog::new(
|
|
config.session_dir.join("conversation.jsonl"),
|
|
).ok();
|
|
|
|
let ag = Agent::new(
|
|
client,
|
|
config.system_prompt.clone(),
|
|
config.context_parts.clone(),
|
|
config.app.clone(),
|
|
config.prompt_file.clone(),
|
|
conversation_log,
|
|
shared_context,
|
|
shared_active_tools,
|
|
);
|
|
let agent = Arc::new(tokio::sync::Mutex::new(ag));
|
|
|
|
let subconscious = SUBCONSCIOUS_AGENTS.iter()
|
|
.filter_map(|(name, interval)| SubconsciousAgent::new(name, *interval))
|
|
.collect();
|
|
|
|
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
|
|
let (turn_watch, _) = tokio::sync::watch::channel(false);
|
|
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
|
|
|
|
let mut sup = crate::thalamus::supervisor::Supervisor::new();
|
|
sup.load_config();
|
|
sup.ensure_running();
|
|
|
|
Self { agent, shared, config, subconscious: tokio::sync::Mutex::new(subconscious),
|
|
turn_tx, turn_watch, bg_tx,
|
|
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
|
|
}
|
|
|
|
/// Initialize — restore log, start daemons and background agents.
|
|
pub async fn init(&self) {
|
|
// Restore conversation
|
|
let mut ag = self.agent.lock().await;
|
|
ag.restore_from_log();
|
|
ag.changed.notify_one();
|
|
drop(ag);
|
|
}
|
|
|
|
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
|
|
self.turn_watch.subscribe()
|
|
}
|
|
|
|
/// Execute an Action from a MindState method.
|
|
async fn run_commands(&self, cmds: Vec<MindCommand>) {
|
|
for cmd in cmds {
|
|
match cmd {
|
|
MindCommand::None => {}
|
|
MindCommand::Compact => {
|
|
let threshold = compaction_threshold(&self.config.app) as usize;
|
|
let mut ag = self.agent.lock().await;
|
|
let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default();
|
|
if crate::agent::context::sections_used(§ions) > threshold {
|
|
ag.compact();
|
|
ag.notify("compacted");
|
|
}
|
|
}
|
|
MindCommand::Score => {
|
|
let mut s = self.shared.lock().unwrap();
|
|
if !s.scoring_in_flight {
|
|
s.scoring_in_flight = true;
|
|
drop(s);
|
|
self.start_memory_scoring();
|
|
}
|
|
}
|
|
MindCommand::Interrupt => {
|
|
self.shared.lock().unwrap().interrupt();
|
|
let ag = self.agent.lock().await;
|
|
let mut tools = ag.active_tools.lock().unwrap();
|
|
for entry in tools.drain(..) { entry.handle.abort(); }
|
|
drop(tools); drop(ag);
|
|
if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); }
|
|
self.shared.lock().unwrap().turn_active = false;
|
|
let _ = self.turn_watch.send(false);
|
|
}
|
|
MindCommand::NewSession => {
|
|
{
|
|
let mut s = self.shared.lock().unwrap();
|
|
s.dmn = dmn::State::Resting { since: Instant::now() };
|
|
s.dmn_turns = 0;
|
|
}
|
|
let new_log = log::ConversationLog::new(
|
|
self.config.session_dir.join("conversation.jsonl"),
|
|
).ok();
|
|
let mut ag = self.agent.lock().await;
|
|
let shared_ctx = ag.shared_context.clone();
|
|
let shared_tools = ag.active_tools.clone();
|
|
*ag = Agent::new(
|
|
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
|
|
self.config.system_prompt.clone(), self.config.context_parts.clone(),
|
|
self.config.app.clone(), self.config.prompt_file.clone(),
|
|
new_log, shared_ctx, shared_tools,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn start_memory_scoring(&self) {
|
|
let agent = self.agent.clone();
|
|
let bg_tx = self.bg_tx.clone();
|
|
let cfg = crate::config::get();
|
|
let max_age = cfg.scoring_interval_secs;
|
|
let response_window = cfg.scoring_response_window;
|
|
tokio::spawn(async move {
|
|
let (context, client) = {
|
|
let mut ag = agent.lock().await;
|
|
if ag.memory_scoring_in_flight { return; }
|
|
ag.memory_scoring_in_flight = true;
|
|
(ag.context.clone(), ag.client_clone())
|
|
};
|
|
let result = learn::score_memories_incremental(
|
|
&context, max_age as i64, response_window, &client, &agent,
|
|
).await;
|
|
{
|
|
let mut ag = agent.lock().await;
|
|
ag.memory_scoring_in_flight = false;
|
|
if let Ok(ref scores) = result { ag.memory_scores = scores.clone(); }
|
|
}
|
|
let _ = bg_tx.send(BgEvent::ScoringDone);
|
|
});
|
|
}
|
|
|
|
/// Push user/DMN message into the agent's context and spawn a turn.
|
|
/// The text moves from pending_input to ContextState atomically —
|
|
/// by the time this returns, the message is in context and the turn
|
|
/// is running.
|
|
/// Collect results from finished subconscious agents and inject
|
|
/// their output into the conscious agent's context.
|
|
async fn collect_subconscious_results(&self) {
|
|
// Collect finished handles without holding the lock across await
|
|
let finished: Vec<(String, tokio::task::JoinHandle<Result<String, String>>)> = {
|
|
let mut subs = self.subconscious.lock().await;
|
|
subs.iter_mut().filter_map(|sub| {
|
|
if sub.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
|
sub.last_run = Some(Instant::now());
|
|
Some((sub.name.clone(), sub.handle.take().unwrap()))
|
|
} else {
|
|
None
|
|
}
|
|
}).collect()
|
|
};
|
|
|
|
for (name, handle) in finished {
|
|
match handle.await {
|
|
Ok(Ok(_output)) => {
|
|
let output_dir = crate::store::memory_dir()
|
|
.join("agent-output").join(&name);
|
|
|
|
// Surfaced memories
|
|
let surface_path = output_dir.join("surface");
|
|
if let Ok(content) = std::fs::read_to_string(&surface_path) {
|
|
let mut ag = self.agent.lock().await;
|
|
for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) {
|
|
if let Some(rendered) = crate::cli::node::render_node(
|
|
&crate::store::Store::load().unwrap_or_default(), key,
|
|
) {
|
|
let mut msg = crate::agent::api::types::Message::user(format!(
|
|
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
|
|
key, rendered,
|
|
));
|
|
msg.stamp();
|
|
ag.push_entry(crate::agent::context::ConversationEntry::Memory {
|
|
key: key.to_string(), message: msg,
|
|
});
|
|
}
|
|
}
|
|
std::fs::remove_file(&surface_path).ok();
|
|
}
|
|
|
|
// Reflection
|
|
let reflect_path = output_dir.join("reflection");
|
|
if let Ok(content) = std::fs::read_to_string(&reflect_path) {
|
|
if !content.trim().is_empty() {
|
|
let mut ag = self.agent.lock().await;
|
|
ag.push_message(crate::agent::api::types::Message::user(format!(
|
|
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
|
|
content.trim(),
|
|
)));
|
|
}
|
|
std::fs::remove_file(&reflect_path).ok();
|
|
}
|
|
|
|
dbglog!("[mind] {} completed", name);
|
|
}
|
|
Ok(Err(e)) => dbglog!("[mind] {} failed: {}", name, e),
|
|
Err(e) => dbglog!("[mind] {} panicked: {}", name, e),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Trigger subconscious agents that are due to run.
|
|
async fn trigger_subconscious(&self) {
|
|
if self.config.no_agents { return; }
|
|
|
|
// Estimate conversation size from the conscious agent's entries
|
|
let conversation_bytes = {
|
|
let ag = self.agent.lock().await;
|
|
ag.context.entries.iter()
|
|
.filter(|e| !e.is_log() && !e.is_memory())
|
|
.map(|e| e.message().content_text().len() as u64)
|
|
.sum::<u64>()
|
|
};
|
|
|
|
// Get session_id for placeholder resolution
|
|
let session_id = {
|
|
let ag = self.agent.lock().await;
|
|
ag.session_id.clone()
|
|
};
|
|
|
|
// Collect which agents to trigger (can't hold lock across await)
|
|
let to_trigger: Vec<(usize, Vec<AutoStep>, Vec<crate::agent::tools::Tool>, String, i32)> = {
|
|
let mut subs = self.subconscious.lock().await;
|
|
let mut result = Vec::new();
|
|
for (i, &(_name, interval)) in SUBCONSCIOUS_AGENTS.iter().enumerate() {
|
|
if i >= subs.len() { continue; }
|
|
if !subs[i].should_trigger(conversation_bytes, interval) { continue; }
|
|
|
|
let sub = &mut subs[i];
|
|
sub.last_trigger_bytes = conversation_bytes;
|
|
|
|
// The output dir for this agent — used for input: placeholders
|
|
// and the output() tool at runtime
|
|
let output_dir = crate::store::memory_dir()
|
|
.join("agent-output").join(&sub.name);
|
|
|
|
let steps: Vec<AutoStep> = sub.def.steps.iter().map(|s| {
|
|
let prompt = resolve_prompt(&s.prompt, &session_id, &output_dir);
|
|
AutoStep { prompt, phase: s.phase.clone() }
|
|
}).collect();
|
|
|
|
let all_tools = crate::agent::tools::memory_and_journal_tools();
|
|
let tools: Vec<crate::agent::tools::Tool> = if sub.def.tools.is_empty() {
|
|
all_tools.to_vec()
|
|
} else {
|
|
all_tools.into_iter()
|
|
.filter(|t| sub.def.tools.iter().any(|w| w == t.name))
|
|
.collect()
|
|
};
|
|
|
|
result.push((i, steps, tools, sub.name.clone(), sub.def.priority));
|
|
}
|
|
result
|
|
};
|
|
|
|
if to_trigger.is_empty() { return; }
|
|
|
|
// Fork from conscious agent (one lock acquisition for all)
|
|
let conscious = self.agent.lock().await;
|
|
let mut spawns = Vec::new();
|
|
for (idx, steps, tools, name, priority) in to_trigger {
|
|
let output_dir = crate::store::memory_dir()
|
|
.join("agent-output").join(&name);
|
|
std::fs::create_dir_all(&output_dir).ok();
|
|
|
|
let mut auto = AutoAgent::from_agent(
|
|
name.clone(), &conscious, tools, steps, priority);
|
|
dbglog!("[mind] triggering {}", name);
|
|
|
|
let handle = tokio::spawn(async move {
|
|
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &output_dir); }
|
|
auto.run(None).await
|
|
});
|
|
spawns.push((idx, handle));
|
|
}
|
|
drop(conscious);
|
|
|
|
// Store handles
|
|
let mut subs = self.subconscious.lock().await;
|
|
for (idx, handle) in spawns {
|
|
if idx < subs.len() {
|
|
subs[idx].handle = Some(handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn start_turn(&self, text: &str, target: StreamTarget) {
|
|
{
|
|
let mut ag = self.agent.lock().await;
|
|
match target {
|
|
StreamTarget::Conversation => {
|
|
ag.push_message(crate::agent::api::types::Message::user(text));
|
|
}
|
|
StreamTarget::Autonomous => {
|
|
let mut msg = crate::agent::api::types::Message::user(text);
|
|
msg.stamp();
|
|
ag.push_entry(crate::agent::context::ConversationEntry::Dmn(msg));
|
|
}
|
|
}
|
|
|
|
// Compact if over budget before sending
|
|
let threshold = compaction_threshold(&self.config.app) as usize;
|
|
ag.publish_context_state();
|
|
let used = {
|
|
let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default();
|
|
crate::agent::context::sections_used(§ions)
|
|
};
|
|
if used > threshold {
|
|
ag.compact();
|
|
ag.notify("compacted");
|
|
}
|
|
}
|
|
self.shared.lock().unwrap().turn_active = true;
|
|
let _ = self.turn_watch.send(true);
|
|
let agent = self.agent.clone();
|
|
let result_tx = self.turn_tx.clone();
|
|
self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move {
|
|
let result = Agent::turn(agent).await;
|
|
let _ = result_tx.send((result, target)).await;
|
|
}));
|
|
}
|
|
|
|
pub async fn shutdown(&self) {
|
|
if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); }
|
|
}
|
|
|
|
/// Mind event loop — locks MindState, calls state methods, executes actions.
|
|
pub async fn run(
|
|
&self,
|
|
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<MindCommand>,
|
|
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
|
|
) {
|
|
let mut bg_rx = self.bg_rx.lock().unwrap().take()
|
|
.expect("Mind::run() called twice");
|
|
loop {
|
|
let timeout = self.shared.lock().unwrap().dmn.interval();
|
|
let turn_active = self.shared.lock().unwrap().turn_active;
|
|
|
|
let mut cmds = Vec::new();
|
|
|
|
tokio::select! {
|
|
biased;
|
|
|
|
cmd = input_rx.recv() => {
|
|
match cmd {
|
|
Some(cmd) => cmds.push(cmd),
|
|
None => break, // UI shut down
|
|
}
|
|
}
|
|
|
|
Some(bg) = bg_rx.recv() => {
|
|
match bg {
|
|
BgEvent::ScoringDone => {
|
|
self.shared.lock().unwrap().scoring_in_flight = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
Some((result, target)) = turn_rx.recv() => {
|
|
self.shared.lock().unwrap().turn_handle = None;
|
|
let model_switch = self.shared.lock().unwrap().complete_turn(&result, target);
|
|
let _ = self.turn_watch.send(false);
|
|
|
|
if let Some(name) = model_switch {
|
|
crate::user::chat::cmd_switch_model(&self.agent, &name).await;
|
|
}
|
|
|
|
// Post-turn maintenance
|
|
{
|
|
let mut ag = self.agent.lock().await;
|
|
ag.age_out_images();
|
|
ag.publish_context_state();
|
|
}
|
|
|
|
cmds.push(MindCommand::Compact);
|
|
if !self.config.no_agents {
|
|
cmds.push(MindCommand::Score);
|
|
}
|
|
|
|
// Trigger subconscious agents after conscious turn completes
|
|
self.collect_subconscious_results().await;
|
|
self.trigger_subconscious().await;
|
|
}
|
|
|
|
_ = tokio::time::sleep(timeout), if !turn_active => {
|
|
let tick = self.shared.lock().unwrap().dmn_tick();
|
|
if let Some((prompt, target)) = tick {
|
|
self.start_turn(&prompt, target).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check for pending user input → push to agent context and start turn
|
|
let pending = self.shared.lock().unwrap().take_pending_input();
|
|
if let Some(text) = pending {
|
|
self.start_turn(&text, StreamTarget::Conversation).await;
|
|
}
|
|
|
|
self.run_commands(cmds).await;
|
|
}
|
|
}
|
|
}
|