// unconscious.rs — Graph maintenance agents // // Standalone agents that operate on the memory graph without needing // conversation context. Each agent runs in a loop: finish one run, // start the next. Agents can be toggled on/off, persisted to // ~/.consciousness/agent-enabled.json. use std::time::Instant; use std::collections::HashMap; use futures::FutureExt; use crate::agent::oneshot::{AutoAgent, AutoStep, RunStats}; use crate::agent::tools; use crate::subconscious::defs; fn config_path() -> std::path::PathBuf { dirs::home_dir().unwrap_or_default() .join(".consciousness/agent-enabled.json") } fn load_enabled_config() -> HashMap { std::fs::read_to_string(config_path()).ok() .and_then(|s| serde_json::from_str(&s).ok()) .unwrap_or_default() } fn save_enabled_config(map: &HashMap) { if let Ok(json) = serde_json::to_string_pretty(map) { let _ = std::fs::write(config_path(), json); } } struct UnconsciousAgent { name: String, enabled: bool, auto: Option, handle: Option)>>, /// Shared agent handle — UI locks to read context live. pub agent: Option>, last_run: Option, } impl UnconsciousAgent { fn is_running(&self) -> bool { self.handle.as_ref().is_some_and(|h| !h.is_finished()) } fn should_run(&self) -> bool { self.enabled && !self.is_running() } } /// Snapshot for the TUI. #[derive(Clone)] pub struct UnconsciousSnapshot { pub name: String, pub running: bool, pub enabled: bool, pub runs: usize, pub last_run_secs_ago: Option, pub agent: Option>, pub last_stats: Option, /// Recent store activity for this agent: (key, timestamp), newest first. pub history: Vec<(String, i64)>, pub tool_calls_ewma: f64, pub tool_failures_ewma: f64, } pub struct Unconscious { agents: Vec, max_concurrent: usize, pub graph_health: Option, last_health_check: Option, /// Notified when agent state changes (finished, toggled) pub wake: std::sync::Arc, } impl Unconscious { pub fn new() -> Self { let enabled_map = load_enabled_config(); // Scan all .agent files, exclude subconscious-* and surface-observe let mut agents: Vec = Vec::new(); let base_tools = tools::memory::memory_tools().to_vec(); let extra_tools = tools::memory::journal_tools().to_vec(); for def in defs::load_defs() { if def.agent.starts_with("subconscious-") { continue; } if def.agent == "surface-observe" { continue; } let enabled = enabled_map.get(&def.agent).copied() .unwrap_or(false); let mut effective_tools = base_tools.clone(); for name in &def.tools { if let Some(t) = extra_tools.iter().find(|t| t.name == name) { effective_tools.push(t.clone()); } } let steps: Vec = def.steps.iter().map(|s| AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone(), }).collect(); let auto = AutoAgent::new( def.agent.clone(), effective_tools, steps, def.temperature.unwrap_or(0.6), def.priority, ); agents.push(UnconsciousAgent { name: def.agent.clone(), enabled, auto: Some(auto), handle: None, agent: None, last_run: None, }); } agents.sort_by(|a, b| a.name.cmp(&b.name)); let max_concurrent = crate::config::get().llm_concurrency; Self { agents, max_concurrent, graph_health: None, last_health_check: None, wake: std::sync::Arc::new(tokio::sync::Notify::new()), } } /// Toggle an agent on/off by name. Returns new enabled state. /// If enabling, immediately spawns the agent if it's not running. pub async fn toggle(&mut self, name: &str) -> Option { let idx = self.agents.iter().position(|a| a.name == name)?; self.agents[idx].enabled = !self.agents[idx].enabled; let new_state = self.agents[idx].enabled; self.save_enabled(); if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { let agent_name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); let wake = self.wake.clone(); match prepare_spawn(&agent_name, auto, wake).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } } self.wake.notify_one(); // wake loop to consider new state Some(new_state) } fn save_enabled(&self) { let map: HashMap = self.agents.iter() .map(|a| (a.name.clone(), a.enabled)) .collect(); save_enabled_config(&map); } pub fn snapshots(&self, store: Option<&crate::store::Store>) -> Vec { self.agents.iter().map(|a| { let history = store.map(|st| st.recent_by_provenance(&a.name, 30)) .unwrap_or_default(); let stats = crate::agent::oneshot::get_stats(&a.name); let tool_calls_ewma: f64 = stats.by_tool.values().map(|t| t.ewma).sum(); UnconsciousSnapshot { name: a.name.clone(), running: a.is_running(), enabled: a.enabled, runs: stats.runs, last_run_secs_ago: a.last_run.map(|t| t.elapsed().as_secs_f64()), agent: a.agent.clone(), last_stats: stats.last_stats.clone(), history, tool_calls_ewma, tool_failures_ewma: stats.failures.ewma, } }).collect() } /// Check if health refresh is due (quick check, no I/O). pub fn needs_health_refresh(&self) -> bool { self.last_health_check .map(|t| t.elapsed() > std::time::Duration::from_secs(600)) .unwrap_or(true) } /// Store computed health (quick, just assignment). pub fn set_health(&mut self, health: crate::subconscious::daemon::GraphHealth) { self.graph_health = Some(health); self.last_health_check = Some(Instant::now()); } /// Reap finished agents (quick, hold lock briefly). pub fn reap_finished(&mut self) { for agent in &mut self.agents { if agent.handle.as_ref().is_some_and(|h| h.is_finished()) { let handle = agent.handle.take().unwrap(); agent.last_run = Some(Instant::now()); // Get the AutoAgent back from the finished task (stats already updated) match handle.now_or_never() { Some(Ok((auto_back, result))) => { agent.auto = Some(auto_back); match result { Ok(_) => dbglog!("[unconscious] {} completed (run {})", agent.name, crate::agent::oneshot::get_stats(&agent.name).runs), Err(e) => dbglog!("[unconscious] {} failed: {}", agent.name, e), } } _ => dbglog!("[unconscious] {} task lost", agent.name), } } } } /// Select agents to spawn and take their AutoAgents out (quick, hold lock briefly). /// Returns vec of (index, name, auto, tools) for agents that should spawn. pub fn select_to_spawn(&mut self) -> Vec<(usize, String, AutoAgent)> { let running = self.agents.iter().filter(|a| a.is_running()).count(); let mut to_spawn = Vec::new(); for _ in running..self.max_concurrent { let next = self.agents.iter().enumerate() .filter(|(_, a)| a.should_run() && a.auto.is_some()) .min_by_key(|(_, a)| a.last_run); match next { Some((idx, _)) => { let name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); to_spawn.push((idx, name, auto)); } None => break, } } to_spawn } /// Store spawn result back (quick, hold lock briefly). pub fn complete_spawn(&mut self, idx: usize, result: SpawnResult) { self.agents[idx].agent = Some(result.agent); self.agents[idx].handle = Some(result.handle); } /// Restore auto on spawn failure (quick, hold lock briefly). pub fn abort_spawn(&mut self, idx: usize, auto: AutoAgent) { self.agents[idx].auto = Some(auto); } } /// Result of preparing an agent spawn (created outside the lock). pub struct SpawnResult { pub agent: std::sync::Arc, pub handle: tokio::task::JoinHandle<(AutoAgent, Result<(), String>)>, } /// Prepare an agent spawn — does the slow work (Store::load, query, Agent::new). /// Called outside the Unconscious lock. /// On success, auto is consumed (moved into spawned task). /// On failure, auto is returned so it can be restored. pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc) -> Result { dbglog!("[unconscious] spawning {}", name); let def = match defs::get_def(name) { Some(d) => d, None => return Err(auto), }; let exclude: std::collections::HashSet = std::collections::HashSet::new(); let batch = match defs::run_agent( &def, def.count.unwrap_or(5), &exclude, ).await { Ok(b) => b, Err(e) => { dbglog!("[unconscious] {} query failed: {}", name, e); return Err(auto); } }; let orig_steps = std::mem::replace(&mut auto.steps, batch.steps.iter().map(|s| AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone(), }).collect()); // Create standalone Agent — stored so UI can read context let config = crate::config::get(); let base_url = config.api_base_url.as_deref().unwrap_or(""); let api_key = config.api_key.as_deref().unwrap_or(""); let model = config.api_model.as_deref().unwrap_or(""); if base_url.is_empty() || model.is_empty() { dbglog!("[unconscious] API not configured"); auto.steps = orig_steps; return Err(auto); } let cli = crate::user::CliArgs::default(); let (app, _) = match crate::config::load_app(&cli) { Ok(r) => r, Err(e) => { dbglog!("[unconscious] config: {}", e); auto.steps = orig_steps; return Err(auto); } }; // Unconscious agents have self-contained prompts — no standard context. let client = crate::agent::api::ApiClient::new(base_url, api_key, model); let agent = crate::agent::Agent::new( client, Vec::new(), app, String::new(), None, crate::agent::tools::ActiveTools::new(), auto.tools.clone(), ).await; { let mut st = agent.state.lock().await; st.provenance = auto.name.clone(); st.priority = Some(auto.priority); st.temperature = auto.temperature; } let agent_clone = agent.clone(); let handle = tokio::spawn(async move { let result = auto.run_shared(&agent_clone).await; let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await; auto.update_stats(stats); auto.steps = orig_steps; wake.notify_one(); // wake the loop to reap and maybe spawn more (auto, result) }); Ok(SpawnResult { agent, handle }) } // Backwards compat: trigger() that does all three phases (still holds lock too long, but works) impl Unconscious { pub async fn trigger(&mut self) { self.reap_finished(); let to_spawn = self.select_to_spawn(); let wake = self.wake.clone(); for (idx, name, auto) in to_spawn { match prepare_spawn(&name, auto, wake.clone()).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } } } } // save_agent_log and RunStats moved to crate::agent::oneshot