consciousness/src/mind/unconscious.rs

343 lines
12 KiB
Rust
Raw Normal View History

// unconscious.rs — Graph maintenance agents
//
// Standalone agents that operate on the memory graph without needing
// conversation context. Each agent runs in a loop: finish one run,
// start the next. Agents can be toggled on/off, persisted to
// ~/.consciousness/agent-enabled.json.
use std::time::Instant;
use std::collections::HashMap;
use futures::FutureExt;
use crate::agent::oneshot::{AutoAgent, AutoStep, RunStats};
use crate::agent::tools;
use crate::subconscious::defs;
fn config_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default()
.join(".consciousness/agent-enabled.json")
}
fn load_enabled_config() -> HashMap<String, bool> {
std::fs::read_to_string(config_path()).ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default()
}
fn save_enabled_config(map: &HashMap<String, bool>) {
if let Ok(json) = serde_json::to_string_pretty(map) {
let _ = std::fs::write(config_path(), json);
}
}
struct UnconsciousAgent {
name: String,
enabled: bool,
auto: Option<AutoAgent>,
handle: Option<tokio::task::JoinHandle<(AutoAgent, Result<(), String>)>>,
/// Shared agent handle — UI locks to read context live.
pub agent: Option<std::sync::Arc<crate::agent::Agent>>,
last_run: Option<Instant>,
}
impl UnconsciousAgent {
fn is_running(&self) -> bool {
self.handle.as_ref().is_some_and(|h| !h.is_finished())
}
fn should_run(&self) -> bool {
self.enabled && !self.is_running()
}
}
/// Snapshot for the TUI.
#[derive(Clone)]
pub struct UnconsciousSnapshot {
pub name: String,
pub running: bool,
pub enabled: bool,
pub runs: usize,
pub last_run_secs_ago: Option<f64>,
pub agent: Option<std::sync::Arc<crate::agent::Agent>>,
pub last_stats: Option<RunStats>,
/// Recent store activity for this agent: (key, timestamp), newest first.
pub history: Vec<(String, i64)>,
pub tool_calls_ewma: f64,
pub tool_failures_ewma: f64,
}
pub struct Unconscious {
agents: Vec<UnconsciousAgent>,
max_concurrent: usize,
pub graph_health: Option<crate::subconscious::daemon::GraphHealth>,
last_health_check: Option<Instant>,
/// Notified when agent state changes (finished, toggled)
pub wake: std::sync::Arc<tokio::sync::Notify>,
}
impl Unconscious {
pub fn new() -> Self {
let enabled_map = load_enabled_config();
// Scan all .agent files, exclude subconscious-* and surface-observe
let mut agents: Vec<UnconsciousAgent> = Vec::new();
let base_tools = tools::memory::memory_tools().to_vec();
let extra_tools = tools::memory::journal_tools().to_vec();
for def in defs::load_defs() {
if def.agent.starts_with("subconscious-") { continue; }
if def.agent == "surface-observe" { continue; }
let enabled = enabled_map.get(&def.agent).copied()
.unwrap_or(false);
let mut effective_tools = base_tools.clone();
for name in &def.tools {
if let Some(t) = extra_tools.iter().find(|t| t.name == name) {
effective_tools.push(t.clone());
}
}
let steps: Vec<AutoStep> = def.steps.iter().map(|s| AutoStep {
prompt: s.prompt.clone(),
phase: s.phase.clone(),
}).collect();
let auto = AutoAgent::new(
def.agent.clone(), effective_tools, steps,
def.temperature.unwrap_or(0.6), def.priority,
);
agents.push(UnconsciousAgent {
name: def.agent.clone(),
enabled,
auto: Some(auto),
handle: None,
agent: None,
last_run: None,
});
}
agents.sort_by(|a, b| a.name.cmp(&b.name));
let max_concurrent = crate::config::get().llm_concurrency;
Self {
agents, max_concurrent,
graph_health: None,
last_health_check: None,
wake: std::sync::Arc::new(tokio::sync::Notify::new()),
}
}
/// Toggle an agent on/off by name. Returns new enabled state.
/// If enabling, immediately spawns the agent if it's not running.
pub async fn toggle(&mut self, name: &str) -> Option<bool> {
let idx = self.agents.iter().position(|a| a.name == name)?;
self.agents[idx].enabled = !self.agents[idx].enabled;
let new_state = self.agents[idx].enabled;
self.save_enabled();
if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() {
let agent_name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap();
let wake = self.wake.clone();
match prepare_spawn(&agent_name, auto, wake).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
}
}
self.wake.notify_one(); // wake loop to consider new state
Some(new_state)
}
fn save_enabled(&self) {
let map: HashMap<String, bool> = self.agents.iter()
.map(|a| (a.name.clone(), a.enabled))
.collect();
save_enabled_config(&map);
}
pub fn snapshots(&self, store: Option<&crate::store::Store>) -> Vec<UnconsciousSnapshot> {
self.agents.iter().map(|a| {
let history = store.map(|st| st.recent_by_provenance(&a.name, 30))
.unwrap_or_default();
let stats = crate::agent::oneshot::get_stats(&a.name);
let tool_calls_ewma: f64 = stats.by_tool.values().map(|t| t.ewma).sum();
UnconsciousSnapshot {
name: a.name.clone(),
running: a.is_running(),
enabled: a.enabled,
runs: stats.runs,
last_run_secs_ago: a.last_run.map(|t| t.elapsed().as_secs_f64()),
agent: a.agent.clone(),
last_stats: stats.last_stats.clone(),
history,
tool_calls_ewma,
tool_failures_ewma: stats.failures.ewma,
}
}).collect()
}
/// Check if health refresh is due (quick check, no I/O).
pub fn needs_health_refresh(&self) -> bool {
self.last_health_check
.map(|t| t.elapsed() > std::time::Duration::from_secs(600))
.unwrap_or(true)
}
/// Store computed health (quick, just assignment).
pub fn set_health(&mut self, health: crate::subconscious::daemon::GraphHealth) {
self.graph_health = Some(health);
self.last_health_check = Some(Instant::now());
}
/// Reap finished agents (quick, hold lock briefly).
pub fn reap_finished(&mut self) {
for agent in &mut self.agents {
if agent.handle.as_ref().is_some_and(|h| h.is_finished()) {
let handle = agent.handle.take().unwrap();
agent.last_run = Some(Instant::now());
// Get the AutoAgent back from the finished task (stats already updated)
match handle.now_or_never() {
Some(Ok((auto_back, result))) => {
agent.auto = Some(auto_back);
match result {
Ok(_) => dbglog!("[unconscious] {} completed (run {})",
agent.name, crate::agent::oneshot::get_stats(&agent.name).runs),
Err(e) => dbglog!("[unconscious] {} failed: {}", agent.name, e),
}
}
_ => dbglog!("[unconscious] {} task lost", agent.name),
}
}
}
}
/// Select agents to spawn and take their AutoAgents out (quick, hold lock briefly).
/// Returns vec of (index, name, auto, tools) for agents that should spawn.
pub fn select_to_spawn(&mut self) -> Vec<(usize, String, AutoAgent)> {
let running = self.agents.iter().filter(|a| a.is_running()).count();
let mut to_spawn = Vec::new();
for _ in running..self.max_concurrent {
let next = self.agents.iter().enumerate()
.filter(|(_, a)| a.should_run() && a.auto.is_some())
.min_by_key(|(_, a)| a.last_run);
match next {
Some((idx, _)) => {
let name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap();
to_spawn.push((idx, name, auto));
}
None => break,
}
}
to_spawn
}
/// Store spawn result back (quick, hold lock briefly).
pub fn complete_spawn(&mut self, idx: usize, result: SpawnResult) {
self.agents[idx].agent = Some(result.agent);
self.agents[idx].handle = Some(result.handle);
}
/// Restore auto on spawn failure (quick, hold lock briefly).
pub fn abort_spawn(&mut self, idx: usize, auto: AutoAgent) {
self.agents[idx].auto = Some(auto);
}
}
/// Result of preparing an agent spawn (created outside the lock).
pub struct SpawnResult {
pub agent: std::sync::Arc<crate::agent::Agent>,
pub handle: tokio::task::JoinHandle<(AutoAgent, Result<(), String>)>,
}
/// Prepare an agent spawn — does the slow work (Store::load, query, Agent::new).
/// Called outside the Unconscious lock.
/// On success, auto is consumed (moved into spawned task).
/// On failure, auto is returned so it can be restored.
pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc<tokio::sync::Notify>) -> Result<SpawnResult, AutoAgent> {
dbglog!("[unconscious] spawning {}", name);
let def = match defs::get_def(name) {
Some(d) => d,
None => return Err(auto),
};
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
let batch = match defs::run_agent(
&def, def.count.unwrap_or(5), &exclude,
).await {
Ok(b) => b,
Err(e) => {
dbglog!("[unconscious] {} query failed: {}", name, e);
return Err(auto);
}
};
let orig_steps = std::mem::replace(&mut auto.steps,
batch.steps.iter().map(|s| AutoStep {
prompt: s.prompt.clone(),
phase: s.phase.clone(),
}).collect());
// Create standalone Agent — stored so UI can read context
let config = crate::config::get();
let base_url = config.api_base_url.as_deref().unwrap_or("");
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or("");
if base_url.is_empty() || model.is_empty() {
dbglog!("[unconscious] API not configured");
auto.steps = orig_steps;
return Err(auto);
}
let cli = crate::user::CliArgs::default();
let (app, _) = match crate::config::load_app(&cli) {
Ok(r) => r,
Err(e) => {
dbglog!("[unconscious] config: {}", e);
auto.steps = orig_steps;
return Err(auto);
}
};
// Unconscious agents have self-contained prompts — no standard context.
let client = crate::agent::api::ApiClient::new(base_url, api_key, model);
let agent = crate::agent::Agent::new(
client, Vec::new(),
config: drop dead code and collapse to a single backend Config had accumulated several obsolete fields, a legacy load path that was just returning defaults, and multi-backend infrastructure that's no longer used. Removed from Config (memory section): - load_legacy_jsonl() — just returned Config::default(), no callers - The legacy-fallback branch in load_from_file - surface_hooks, surface_timeout_secs — zero external readers - scoring_chunk_tokens + default fn — zero external readers - The POC_MEMORY_CONFIG env override note in the header comment (not actually wired up anywhere) Collapsed multi-backend to single-backend: - AppConfig used to carry `anthropic: BackendConfig` and `openrouter: BackendConfig` as required fields plus an optional `deepinfra`, picked between at runtime by name. Only one is ever actually used in any deployment. Collapse to a single `backend: BackendConfig` on AppConfig, drop the multi-backend match logic in resolve_model, drop the top-level `backend: String` selector field, drop the `BackendConfig::resolve` fallback path. - Also drop BackendConfig.model (redundant with ModelConfig.model_id once multi-backend is gone). - ModelConfig.backend field goes — there's only one backend now, no choice to make. Dead prompt_file machinery: - ModelConfig.prompt_file, ResolvedModel.prompt_file, SessionConfig .prompt_file, Agent.prompt_file — nothing in the codebase actually reads the file these strings name. Just passed around and compared. Delete the whole string through every struct. - The "if prompt_file changed on model switch, recompact" branch in user/chat.rs goes too (never fired usefully). Dead memory_project plumbing: - AppConfig.memory_project field, CliArgs.memory_project, the --memory-project CLI flag, the figment merge target, the show_config display line. Nothing reads it anywhere. Dead ContextInfo struct: - `struct ContextInfo` was never constructed — context_info: None was the only initializer. The conditional display blocks in user/context.rs that dereferenced it were dead. Behavior change: AppConfig::resolve() now requires a non-empty `models` map and bails with a helpful message if it's missing. The old fallback ("no models? use top-level backend + PromptConfig to build a default") path is gone — it was only kept for symmetry with a mode nobody used. Config file shape: `deepinfra: {...}` → `backend: {...}`, and model entries no longer need `backend:` or `prompt_file:`. Updated ~/.consciousness/config.json5 to match. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-16 15:41:55 -04:00
app, None,
crate::agent::tools::ActiveTools::new(),
auto.tools.clone(),
).await;
{
let mut st = agent.state.lock().await;
st.provenance = auto.name.clone();
st.priority = Some(auto.priority);
st.temperature = auto.temperature;
}
let agent_clone = agent.clone();
let handle = tokio::spawn(async move {
let result = auto.run_shared(&agent_clone).await;
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await;
auto.update_stats(stats);
auto.steps = orig_steps;
wake.notify_one(); // wake the loop to reap and maybe spawn more
(auto, result)
});
Ok(SpawnResult { agent, handle })
}
// Backwards compat: trigger() that does all three phases (still holds lock too long, but works)
impl Unconscious {
pub async fn trigger(&mut self) {
self.reap_finished();
let to_spawn = self.select_to_spawn();
let wake = self.wake.clone();
for (idx, name, auto) in to_spawn {
match prepare_spawn(&name, auto, wake.clone()).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
}
}
}
}
// save_agent_log and RunStats moved to crate::agent::oneshot