diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index 3f0e229..160a551 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -598,8 +598,8 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { return Ok(database); } - // Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key - let mut latest: HashMap = HashMap::new(); + // Track latest (offset, uuid, version, deleted, node_type, timestamp) per key + let mut latest: HashMap = HashMap::new(); let file = fs::File::open(capnp_path) .with_context(|| format!("open {}", capnp_path.display()))?; @@ -634,10 +634,6 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { .map(|t| t as u8) .unwrap_or(0); let timestamp = node_reader.get_timestamp(); - let provenance = node_reader.get_provenance().ok() - .and_then(|t| t.to_str().ok()) - .unwrap_or("manual") - .to_string(); let mut uuid = [0u8; 16]; if let Ok(data) = node_reader.get_uuid() { @@ -648,10 +644,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { // Keep if newer version let dominated = latest.get(&key) - .map(|(_, _, v, _, _, _, _)| version >= *v) + .map(|(_, _, v, _, _, _)| version >= *v) .unwrap_or(true); if dominated { - latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance)); + latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp)); } } } @@ -663,9 +659,8 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { let mut nodes_table = txn.open_table(index::NODES)?; let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?; - let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?; - for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest { + for (key, (offset, uuid, _, deleted, node_type, timestamp)) in latest { if !deleted { nodes_table.insert(key.as_str(), offset)?; // Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes @@ -674,12 +669,6 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { packed[16] = node_type; packed[17..25].copy_from_slice(×tamp.to_be_bytes()); key_uuid_table.insert(key.as_str(), packed.as_slice())?; - // Pack: [negated_timestamp:8][key] for descending sort - let neg_ts = (!timestamp).to_be_bytes(); - let mut prov_val = Vec::with_capacity(8 + key.len()); - prov_val.extend_from_slice(&neg_ts); - prov_val.extend_from_slice(key.as_bytes()); - by_provenance.insert(provenance.as_str(), prov_val.as_slice())?; } // Always record offset in UUID history (even for deleted) uuid_offsets.insert(uuid.as_slice(), offset)?; diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index a663366..012db0f 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -24,8 +24,7 @@ pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes"); // KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid"); pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets"); -// NODES_BY_PROVENANCE: provenance → [timestamp:8 BE][key] (sorted by timestamp desc via negated ts) -pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &[u8]> = MultimapTableDefinition::new("nodes_by_provenance"); +pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &str> = MultimapTableDefinition::new("nodes_by_provenance"); // Composite key: [node_type: u8][timestamp: i64 BE] for range queries pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type"); @@ -82,62 +81,19 @@ pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64) { } } -/// Pack provenance value: [negated_timestamp:8][key] for descending sort -fn pack_provenance_value(timestamp: i64, key: &str) -> Vec { - let neg_ts = (!timestamp).to_be_bytes(); // negate for descending order - let mut buf = Vec::with_capacity(8 + key.len()); - buf.extend_from_slice(&neg_ts); - buf.extend_from_slice(key.as_bytes()); - buf -} - -/// Unpack provenance value: returns (timestamp, key) -fn unpack_provenance_value(data: &[u8]) -> (i64, String) { - let neg_ts = i64::from_be_bytes([data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]]); - let timestamp = !neg_ts; - let key = String::from_utf8_lossy(&data[8..]).to_string(); - (timestamp, key) -} - /// Record a node's location in the index. -pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str) -> Result<()> { +pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64) -> Result<()> { let mut nodes_table = txn.open_table(NODES)?; let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; - let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; nodes_table.insert(key, offset)?; let packed = pack_node_meta(uuid, node_type, timestamp); key_uuid_table.insert(key, packed.as_slice())?; uuid_offsets.insert(uuid.as_slice(), offset)?; - let prov_val = pack_provenance_value(timestamp, key); - by_provenance.insert(provenance, prov_val.as_slice())?; Ok(()) } -/// Get recent keys for a given provenance, sorted by timestamp descending. -pub fn recent_by_provenance(db: &Database, provenance: &str, limit: usize) -> Result> { - let txn = db.begin_read()?; - let table = txn.open_multimap_table(NODES_BY_PROVENANCE)?; - let mut results = Vec::new(); - for entry in table.get(provenance)? { - if results.len() >= limit { break; } - let (timestamp, key) = unpack_provenance_value(entry?.value()); - results.push((key, timestamp)); - } - Ok(results) -} - -/// Get node metadata (uuid, node_type, timestamp) from KEY_TO_UUID. -pub fn get_node_meta(db: &Database, key: &str) -> Result> { - let txn = db.begin_read()?; - let table = txn.open_table(KEY_TO_UUID)?; - match table.get(key)? { - Some(data) => Ok(Some(unpack_node_meta(data.value()))), - None => Ok(None), - } -} - /// Get offset for a node by key. pub fn get_offset(db: &Database, key: &str) -> Result> { let txn = db.begin_read()?; diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index c7ff977..8beb173 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -2,7 +2,7 @@ // // CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics. -use super::{index, types::*, Store}; +use super::{capnp, index, types::*, Store}; use anyhow::{anyhow, bail, Result}; use std::collections::{HashMap, HashSet}; @@ -24,7 +24,7 @@ impl Store { let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let txn = db.begin_write()?; let offset = self.append_nodes(&[node.clone()])?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; txn.commit()?; Ok(()) } @@ -45,8 +45,24 @@ impl Store { Some(db) => db, None => return Vec::new(), }; - // Index stores entries sorted by timestamp descending, so just take first N - index::recent_by_provenance(db, provenance, limit).unwrap_or_default() + let keys = match index::all_keys(db) { + Ok(keys) => keys, + Err(_) => return Vec::new(), + }; + let mut nodes: Vec<_> = keys.iter() + .filter_map(|key| { + let offset = index::get_offset(db, key).ok()??; + let node = capnp::read_node_at_offset(offset).ok()?; + if !node.deleted && node.provenance == provenance { + Some((key.clone(), node.timestamp)) + } else { + None + } + }) + .collect(); + nodes.sort_by(|a, b| b.1.cmp(&a.1)); + nodes.truncate(limit); + nodes } /// Upsert a node: update if exists (and content changed), create if not. @@ -74,7 +90,7 @@ impl Store { node.version += 1; let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; txn.commit()?; Ok("updated") } else { @@ -82,7 +98,7 @@ impl Store { node.provenance = provenance.to_string(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; txn.commit()?; Ok("created") } @@ -173,7 +189,7 @@ impl Store { let txn = db.begin_write()?; let offset = self.append_nodes(&[renamed.clone(), tombstone])?; index::remove_node(&txn, old_key)?; - index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance)?; + index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp)?; if !updated_rels.is_empty() { self.append_relations(&updated_rels)?; } @@ -304,7 +320,7 @@ impl Store { node.timestamp = now_epoch(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; txn.commit()?; Ok((old, weight)) } diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 9fcc101..ca6d740 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -346,44 +346,32 @@ impl Mind { let mut s = shared_for_unc.lock().unwrap(); s.unc_idle = true; } - - // Get wake notify for event-driven loop - let wake = unc.lock().await.wake.clone(); - let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600)); - health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - // Do work: reap finished agents, spawn new ones - let (to_spawn, needs_health) = { - let mut guard = unc.lock().await; - guard.reap_finished(); - (guard.select_to_spawn(), guard.needs_health_refresh()) - }; - - // Spawn agents outside lock - for (idx, name, auto) in to_spawn { - match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await { - Ok(result) => unc.lock().await.complete_spawn(idx, result), - Err(auto) => unc.lock().await.abort_spawn(idx, auto), - } - } - - // Health check outside lock (slow I/O) + // Phase 0: health check outside lock (slow I/O) + let needs_health = unc.lock().await.needs_health_refresh(); if needs_health { if let Ok(store_arc) = access_local() { let health = crate::subconscious::daemon::compute_graph_health(&store_arc); unc.lock().await.set_health(health); } } - - // Wait for: conscious active, agent finished, or health timer - tokio::select! { - _ = unc_rx.changed() => { - if *unc_rx.borrow() { break; } + // Phase 1: quick work under lock + let to_spawn = { + let mut guard = unc.lock().await; + guard.reap_finished(); + guard.select_to_spawn() + }; + // Phase 2: slow work outside lock + for (idx, name, auto) in to_spawn { + match crate::mind::unconscious::prepare_spawn(&name, auto).await { + Ok(result) => unc.lock().await.complete_spawn(idx, result), + Err(auto) => unc.lock().await.abort_spawn(idx, auto), } - _ = wake.notified() => {} - _ = health_interval.tick() => {} } + // Check if conscious became active + if *unc_rx.borrow() { break; } + // Brief yield to not starve other tasks + tokio::task::yield_now().await; } } }); @@ -649,8 +637,7 @@ impl Mind { }; let mut cmds = Vec::new(); - #[allow(unused_assignments)] - let mut _dmn_expired = false; + let mut dmn_expired = false; tokio::select! { biased; @@ -689,7 +676,7 @@ impl Mind { } } - _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, + _ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true, } if !self.config.no_agents { diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 8989264..87c44db 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -71,8 +71,6 @@ pub struct Unconscious { max_concurrent: usize, pub graph_health: Option, last_health_check: Option, - /// Notified when agent state changes (finished, toggled) - pub wake: std::sync::Arc, } impl Unconscious { @@ -119,7 +117,6 @@ impl Unconscious { agents, max_concurrent, graph_health: None, last_health_check: None, - wake: std::sync::Arc::new(tokio::sync::Notify::new()), } } @@ -133,13 +130,11 @@ impl Unconscious { if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { let agent_name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); - let wake = self.wake.clone(); - match prepare_spawn(&agent_name, auto, wake).await { + match prepare_spawn(&agent_name, auto).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } } - self.wake.notify_one(); // wake loop to consider new state Some(new_state) } @@ -250,7 +245,7 @@ pub struct SpawnResult { /// Called outside the Unconscious lock. /// On success, auto is consumed (moved into spawned task). /// On failure, auto is returned so it can be restored. -pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc) -> Result { +pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result { dbglog!("[unconscious] spawning {}", name); let def = match defs::get_def(name) { @@ -317,7 +312,6 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await; auto.update_stats(stats); auto.steps = orig_steps; - wake.notify_one(); // wake the loop to reap and maybe spawn more (auto, result) }); @@ -329,9 +323,8 @@ impl Unconscious { pub async fn trigger(&mut self) { self.reap_finished(); let to_spawn = self.select_to_spawn(); - let wake = self.wake.clone(); for (idx, name, auto) in to_spawn { - match prepare_spawn(&name, auto, wake.clone()).await { + match prepare_spawn(&name, auto).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), }