forked from kent/consciousness
Gate unconscious agents on 60s of no conscious activity using sleep_until() instead of polling. Remove COOLDOWN constant — once idle, agents run back-to-back to keep the GPU busy. Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
316 lines
11 KiB
Rust
316 lines
11 KiB
Rust
// unconscious.rs — Graph maintenance agents
|
|
//
|
|
// Standalone agents that operate on the memory graph without needing
|
|
// conversation context. Each agent runs in a loop: finish one run,
|
|
// start the next. Agents can be toggled on/off, persisted to
|
|
// ~/.consciousness/agent-enabled.json.
|
|
|
|
use std::time::Instant;
|
|
use std::collections::HashMap;
|
|
use futures::FutureExt;
|
|
|
|
use crate::agent::oneshot::{AutoAgent, AutoStep};
|
|
use crate::agent::tools;
|
|
use crate::subconscious::defs;
|
|
|
|
fn config_path() -> std::path::PathBuf {
|
|
dirs::home_dir().unwrap_or_default()
|
|
.join(".consciousness/agent-enabled.json")
|
|
}
|
|
|
|
fn load_enabled_config() -> HashMap<String, bool> {
|
|
std::fs::read_to_string(config_path()).ok()
|
|
.and_then(|s| serde_json::from_str(&s).ok())
|
|
.unwrap_or_default()
|
|
}
|
|
|
|
fn save_enabled_config(map: &HashMap<String, bool>) {
|
|
if let Ok(json) = serde_json::to_string_pretty(map) {
|
|
let _ = std::fs::write(config_path(), json);
|
|
}
|
|
}
|
|
|
|
struct UnconsciousAgent {
|
|
name: String,
|
|
enabled: bool,
|
|
auto: AutoAgent,
|
|
handle: Option<tokio::task::JoinHandle<(AutoAgent, Result<String, String>)>>,
|
|
/// Shared agent handle — UI locks to read context live.
|
|
pub agent: Option<std::sync::Arc<crate::agent::Agent>>,
|
|
last_run: Option<Instant>,
|
|
runs: usize,
|
|
}
|
|
|
|
impl UnconsciousAgent {
|
|
fn is_running(&self) -> bool {
|
|
self.handle.as_ref().is_some_and(|h| !h.is_finished())
|
|
}
|
|
|
|
fn should_run(&self) -> bool {
|
|
self.enabled && !self.is_running()
|
|
}
|
|
}
|
|
|
|
/// Snapshot for the TUI.
|
|
#[derive(Clone)]
|
|
pub struct UnconsciousSnapshot {
|
|
pub name: String,
|
|
pub running: bool,
|
|
pub enabled: bool,
|
|
pub runs: usize,
|
|
pub last_run_secs_ago: Option<f64>,
|
|
pub agent: Option<std::sync::Arc<crate::agent::Agent>>,
|
|
}
|
|
|
|
pub struct Unconscious {
|
|
agents: Vec<UnconsciousAgent>,
|
|
max_concurrent: usize,
|
|
pub graph_health: Option<crate::subconscious::daemon::GraphHealth>,
|
|
last_health_check: Option<Instant>,
|
|
}
|
|
|
|
impl Unconscious {
|
|
pub fn new() -> Self {
|
|
let enabled_map = load_enabled_config();
|
|
|
|
// Scan all .agent files, exclude subconscious-* and surface-observe
|
|
let mut agents: Vec<UnconsciousAgent> = Vec::new();
|
|
let all_tools = tools::memory::memory_tools().to_vec();
|
|
for def in defs::load_defs() {
|
|
if def.agent.starts_with("subconscious-") { continue; }
|
|
if def.agent == "surface-observe" { continue; }
|
|
let enabled = enabled_map.get(&def.agent).copied()
|
|
.unwrap_or(false);
|
|
let effective_tools: Vec<tools::Tool> = if def.tools.is_empty() {
|
|
all_tools.clone()
|
|
} else {
|
|
all_tools.iter()
|
|
.filter(|t| def.tools.iter().any(|w| w == t.name))
|
|
.cloned()
|
|
.collect()
|
|
};
|
|
let steps: Vec<AutoStep> = def.steps.iter().map(|s| AutoStep {
|
|
prompt: s.prompt.clone(),
|
|
phase: s.phase.clone(),
|
|
}).collect();
|
|
let auto = AutoAgent::new(
|
|
def.agent.clone(), effective_tools, steps,
|
|
def.temperature.unwrap_or(0.6), def.priority,
|
|
);
|
|
agents.push(UnconsciousAgent {
|
|
name: def.agent.clone(),
|
|
enabled,
|
|
auto,
|
|
handle: None,
|
|
agent: None,
|
|
last_run: None,
|
|
runs: 0,
|
|
});
|
|
}
|
|
agents.sort_by(|a, b| a.name.cmp(&b.name));
|
|
|
|
Self {
|
|
agents, max_concurrent: 2,
|
|
graph_health: None,
|
|
last_health_check: None,
|
|
}
|
|
}
|
|
|
|
/// Toggle an agent on/off by name. Returns new enabled state.
|
|
/// If enabling, immediately spawns the agent if it's not running.
|
|
pub async fn toggle(&mut self, name: &str) -> Option<bool> {
|
|
let idx = self.agents.iter().position(|a| a.name == name)?;
|
|
self.agents[idx].enabled = !self.agents[idx].enabled;
|
|
let new_state = self.agents[idx].enabled;
|
|
self.save_enabled();
|
|
if new_state && !self.agents[idx].is_running() {
|
|
self.spawn_agent(idx).await;
|
|
}
|
|
Some(new_state)
|
|
}
|
|
|
|
fn save_enabled(&self) {
|
|
let map: HashMap<String, bool> = self.agents.iter()
|
|
.map(|a| (a.name.clone(), a.enabled))
|
|
.collect();
|
|
save_enabled_config(&map);
|
|
}
|
|
|
|
pub fn snapshots(&self) -> Vec<UnconsciousSnapshot> {
|
|
self.agents.iter().map(|a| UnconsciousSnapshot {
|
|
name: a.name.clone(),
|
|
running: a.is_running(),
|
|
enabled: a.enabled,
|
|
runs: a.runs,
|
|
last_run_secs_ago: a.last_run.map(|t| t.elapsed().as_secs_f64()),
|
|
agent: a.agent.clone(),
|
|
}).collect()
|
|
}
|
|
|
|
fn refresh_health(&mut self) {
|
|
let store = match crate::store::Store::load() {
|
|
Ok(s) => s,
|
|
Err(_) => return,
|
|
};
|
|
self.graph_health = Some(crate::subconscious::daemon::compute_graph_health(&store));
|
|
self.last_health_check = Some(Instant::now());
|
|
}
|
|
|
|
/// Reap finished agents and spawn new ones.
|
|
pub async fn trigger(&mut self) {
|
|
// Periodic graph health refresh (also on first call)
|
|
if self.last_health_check
|
|
.map(|t| t.elapsed() > std::time::Duration::from_secs(600))
|
|
.unwrap_or(true)
|
|
{
|
|
self.refresh_health();
|
|
}
|
|
|
|
for agent in &mut self.agents {
|
|
if agent.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
|
let handle = agent.handle.take().unwrap();
|
|
agent.last_run = Some(Instant::now());
|
|
agent.runs += 1;
|
|
// Get the AutoAgent back from the finished task
|
|
match handle.now_or_never() {
|
|
Some(Ok((auto_back, result))) => {
|
|
agent.auto = auto_back;
|
|
match result {
|
|
Ok(_) => dbglog!("[unconscious] {} completed (run {})",
|
|
agent.name, agent.runs),
|
|
Err(e) => dbglog!("[unconscious] {} failed: {}", agent.name, e),
|
|
}
|
|
}
|
|
_ => dbglog!("[unconscious] {} task lost", agent.name),
|
|
}
|
|
}
|
|
}
|
|
|
|
let running = self.agents.iter().filter(|a| a.is_running()).count();
|
|
if running >= self.max_concurrent { return; }
|
|
let slots = self.max_concurrent - running;
|
|
|
|
let ready: Vec<usize> = self.agents.iter().enumerate()
|
|
.filter(|(_, a)| a.should_run())
|
|
.map(|(i, _)| i)
|
|
.take(slots)
|
|
.collect();
|
|
|
|
for idx in ready {
|
|
self.spawn_agent(idx).await;
|
|
}
|
|
}
|
|
|
|
async fn spawn_agent(&mut self, idx: usize) {
|
|
let name = self.agents[idx].name.clone();
|
|
dbglog!("[unconscious] spawning {}", name);
|
|
|
|
let def = match defs::get_def(&name) {
|
|
Some(d) => d,
|
|
None => return,
|
|
};
|
|
|
|
// Run query and resolve placeholders
|
|
let mut store = match crate::store::Store::load() {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
dbglog!("[unconscious] store load failed: {}", e);
|
|
return;
|
|
}
|
|
};
|
|
|
|
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
|
|
let batch = match defs::run_agent(
|
|
&store, &def, def.count.unwrap_or(5), &exclude,
|
|
) {
|
|
Ok(b) => b,
|
|
Err(e) => {
|
|
dbglog!("[unconscious] {} query failed: {}", name, e);
|
|
return;
|
|
}
|
|
};
|
|
|
|
if !batch.node_keys.is_empty() {
|
|
store.record_agent_visits(&batch.node_keys, &name).ok();
|
|
}
|
|
|
|
// Swap auto out, replace steps with resolved prompts
|
|
let mut auto = std::mem::replace(&mut self.agents[idx].auto,
|
|
AutoAgent::new(String::new(), vec![], vec![], 0.0, 0));
|
|
let orig_steps = std::mem::replace(&mut auto.steps,
|
|
batch.steps.iter().map(|s| AutoStep {
|
|
prompt: s.prompt.clone(),
|
|
phase: s.phase.clone(),
|
|
}).collect());
|
|
|
|
// Create standalone Agent — stored so UI can read context
|
|
let config = crate::config::get();
|
|
let base_url = config.api_base_url.as_deref().unwrap_or("");
|
|
let api_key = config.api_key.as_deref().unwrap_or("");
|
|
let model = config.api_model.as_deref().unwrap_or("");
|
|
if base_url.is_empty() || model.is_empty() {
|
|
dbglog!("[unconscious] API not configured");
|
|
auto.steps = orig_steps;
|
|
self.agents[idx].auto = auto;
|
|
return;
|
|
}
|
|
|
|
let cli = crate::user::CliArgs::default();
|
|
let (app, _) = match crate::config::load_app(&cli) {
|
|
Ok(r) => r,
|
|
Err(e) => {
|
|
dbglog!("[unconscious] config: {}", e);
|
|
auto.steps = orig_steps;
|
|
self.agents[idx].auto = auto;
|
|
return;
|
|
}
|
|
};
|
|
let (system_prompt, personality) = match crate::config::reload_for_model(&app, &app.prompts.other) {
|
|
Ok(r) => r,
|
|
Err(e) => {
|
|
dbglog!("[unconscious] config: {}", e);
|
|
auto.steps = orig_steps;
|
|
self.agents[idx].auto = auto;
|
|
return;
|
|
}
|
|
};
|
|
|
|
let client = crate::agent::api::ApiClient::new(base_url, api_key, model);
|
|
let agent = crate::agent::Agent::new(
|
|
client, system_prompt, personality,
|
|
app, String::new(), None,
|
|
crate::agent::tools::ActiveTools::new(),
|
|
).await;
|
|
{
|
|
let mut st = agent.state.lock().await;
|
|
st.provenance = format!("unconscious:{}", auto.name);
|
|
st.tools = auto.tools.clone();
|
|
st.priority = Some(10);
|
|
}
|
|
|
|
self.agents[idx].agent = Some(agent.clone());
|
|
|
|
self.agents[idx].handle = Some(tokio::spawn(async move {
|
|
let result = auto.run_shared(&agent).await;
|
|
save_agent_log(&auto.name, &agent).await;
|
|
auto.steps = orig_steps;
|
|
(auto, result)
|
|
}));
|
|
}
|
|
}
|
|
|
|
async fn save_agent_log(name: &str, agent: &std::sync::Arc<crate::agent::Agent>) {
|
|
let dir = dirs::home_dir().unwrap_or_default()
|
|
.join(format!(".consciousness/logs/{}", name));
|
|
if std::fs::create_dir_all(&dir).is_err() { return; }
|
|
let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
|
|
let path = dir.join(format!("{}.json", ts));
|
|
let nodes: Vec<crate::agent::context::AstNode> = {
|
|
let ctx = agent.context.lock().await;
|
|
ctx.conversation().to_vec()
|
|
};
|
|
if let Ok(json) = serde_json::to_string_pretty(&nodes) {
|
|
let _ = std::fs::write(&path, json);
|
|
dbglog!("[unconscious] saved log to {}", path.display());
|
|
}
|
|
}
|