diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index 160a551..3f0e229 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -598,8 +598,8 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { return Ok(database); } - // Track latest (offset, uuid, version, deleted, node_type, timestamp) per key - let mut latest: HashMap = HashMap::new(); + // Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key + let mut latest: HashMap = HashMap::new(); let file = fs::File::open(capnp_path) .with_context(|| format!("open {}", capnp_path.display()))?; @@ -634,6 +634,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { .map(|t| t as u8) .unwrap_or(0); let timestamp = node_reader.get_timestamp(); + let provenance = node_reader.get_provenance().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("manual") + .to_string(); let mut uuid = [0u8; 16]; if let Ok(data) = node_reader.get_uuid() { @@ -644,10 +648,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { // Keep if newer version let dominated = latest.get(&key) - .map(|(_, _, v, _, _, _)| version >= *v) + .map(|(_, _, v, _, _, _, _)| version >= *v) .unwrap_or(true); if dominated { - latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp)); + latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance)); } } } @@ -659,8 +663,9 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { let mut nodes_table = txn.open_table(index::NODES)?; let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?; + let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?; - for (key, (offset, uuid, _, deleted, node_type, timestamp)) in latest { + for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest { if !deleted { nodes_table.insert(key.as_str(), offset)?; // Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes @@ -669,6 +674,12 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { packed[16] = node_type; packed[17..25].copy_from_slice(×tamp.to_be_bytes()); key_uuid_table.insert(key.as_str(), packed.as_slice())?; + // Pack: [negated_timestamp:8][key] for descending sort + let neg_ts = (!timestamp).to_be_bytes(); + let mut prov_val = Vec::with_capacity(8 + key.len()); + prov_val.extend_from_slice(&neg_ts); + prov_val.extend_from_slice(key.as_bytes()); + by_provenance.insert(provenance.as_str(), prov_val.as_slice())?; } // Always record offset in UUID history (even for deleted) uuid_offsets.insert(uuid.as_slice(), offset)?; diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index 012db0f..a663366 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -24,7 +24,8 @@ pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes"); // KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid"); pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets"); -pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &str> = MultimapTableDefinition::new("nodes_by_provenance"); +// NODES_BY_PROVENANCE: provenance → [timestamp:8 BE][key] (sorted by timestamp desc via negated ts) +pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &[u8]> = MultimapTableDefinition::new("nodes_by_provenance"); // Composite key: [node_type: u8][timestamp: i64 BE] for range queries pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type"); @@ -81,19 +82,62 @@ pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64) { } } +/// Pack provenance value: [negated_timestamp:8][key] for descending sort +fn pack_provenance_value(timestamp: i64, key: &str) -> Vec { + let neg_ts = (!timestamp).to_be_bytes(); // negate for descending order + let mut buf = Vec::with_capacity(8 + key.len()); + buf.extend_from_slice(&neg_ts); + buf.extend_from_slice(key.as_bytes()); + buf +} + +/// Unpack provenance value: returns (timestamp, key) +fn unpack_provenance_value(data: &[u8]) -> (i64, String) { + let neg_ts = i64::from_be_bytes([data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]]); + let timestamp = !neg_ts; + let key = String::from_utf8_lossy(&data[8..]).to_string(); + (timestamp, key) +} + /// Record a node's location in the index. -pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64) -> Result<()> { +pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str) -> Result<()> { let mut nodes_table = txn.open_table(NODES)?; let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; + let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; nodes_table.insert(key, offset)?; let packed = pack_node_meta(uuid, node_type, timestamp); key_uuid_table.insert(key, packed.as_slice())?; uuid_offsets.insert(uuid.as_slice(), offset)?; + let prov_val = pack_provenance_value(timestamp, key); + by_provenance.insert(provenance, prov_val.as_slice())?; Ok(()) } +/// Get recent keys for a given provenance, sorted by timestamp descending. +pub fn recent_by_provenance(db: &Database, provenance: &str, limit: usize) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let mut results = Vec::new(); + for entry in table.get(provenance)? { + if results.len() >= limit { break; } + let (timestamp, key) = unpack_provenance_value(entry?.value()); + results.push((key, timestamp)); + } + Ok(results) +} + +/// Get node metadata (uuid, node_type, timestamp) from KEY_TO_UUID. +pub fn get_node_meta(db: &Database, key: &str) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(KEY_TO_UUID)?; + match table.get(key)? { + Some(data) => Ok(Some(unpack_node_meta(data.value()))), + None => Ok(None), + } +} + /// Get offset for a node by key. pub fn get_offset(db: &Database, key: &str) -> Result> { let txn = db.begin_read()?; diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index 8beb173..c7ff977 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -2,7 +2,7 @@ // // CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics. -use super::{capnp, index, types::*, Store}; +use super::{index, types::*, Store}; use anyhow::{anyhow, bail, Result}; use std::collections::{HashMap, HashSet}; @@ -24,7 +24,7 @@ impl Store { let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let txn = db.begin_write()?; let offset = self.append_nodes(&[node.clone()])?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok(()) } @@ -45,24 +45,8 @@ impl Store { Some(db) => db, None => return Vec::new(), }; - let keys = match index::all_keys(db) { - Ok(keys) => keys, - Err(_) => return Vec::new(), - }; - let mut nodes: Vec<_> = keys.iter() - .filter_map(|key| { - let offset = index::get_offset(db, key).ok()??; - let node = capnp::read_node_at_offset(offset).ok()?; - if !node.deleted && node.provenance == provenance { - Some((key.clone(), node.timestamp)) - } else { - None - } - }) - .collect(); - nodes.sort_by(|a, b| b.1.cmp(&a.1)); - nodes.truncate(limit); - nodes + // Index stores entries sorted by timestamp descending, so just take first N + index::recent_by_provenance(db, provenance, limit).unwrap_or_default() } /// Upsert a node: update if exists (and content changed), create if not. @@ -90,7 +74,7 @@ impl Store { node.version += 1; let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok("updated") } else { @@ -98,7 +82,7 @@ impl Store { node.provenance = provenance.to_string(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok("created") } @@ -189,7 +173,7 @@ impl Store { let txn = db.begin_write()?; let offset = self.append_nodes(&[renamed.clone(), tombstone])?; index::remove_node(&txn, old_key)?; - index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp)?; + index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance)?; if !updated_rels.is_empty() { self.append_relations(&updated_rels)?; } @@ -320,7 +304,7 @@ impl Store { node.timestamp = now_epoch(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok((old, weight)) } diff --git a/src/mind/mod.rs b/src/mind/mod.rs index ca6d740..9fcc101 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -346,32 +346,44 @@ impl Mind { let mut s = shared_for_unc.lock().unwrap(); s.unc_idle = true; } + + // Get wake notify for event-driven loop + let wake = unc.lock().await.wake.clone(); + let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600)); + health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - // Phase 0: health check outside lock (slow I/O) - let needs_health = unc.lock().await.needs_health_refresh(); + // Do work: reap finished agents, spawn new ones + let (to_spawn, needs_health) = { + let mut guard = unc.lock().await; + guard.reap_finished(); + (guard.select_to_spawn(), guard.needs_health_refresh()) + }; + + // Spawn agents outside lock + for (idx, name, auto) in to_spawn { + match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await { + Ok(result) => unc.lock().await.complete_spawn(idx, result), + Err(auto) => unc.lock().await.abort_spawn(idx, auto), + } + } + + // Health check outside lock (slow I/O) if needs_health { if let Ok(store_arc) = access_local() { let health = crate::subconscious::daemon::compute_graph_health(&store_arc); unc.lock().await.set_health(health); } } - // Phase 1: quick work under lock - let to_spawn = { - let mut guard = unc.lock().await; - guard.reap_finished(); - guard.select_to_spawn() - }; - // Phase 2: slow work outside lock - for (idx, name, auto) in to_spawn { - match crate::mind::unconscious::prepare_spawn(&name, auto).await { - Ok(result) => unc.lock().await.complete_spawn(idx, result), - Err(auto) => unc.lock().await.abort_spawn(idx, auto), + + // Wait for: conscious active, agent finished, or health timer + tokio::select! { + _ = unc_rx.changed() => { + if *unc_rx.borrow() { break; } } + _ = wake.notified() => {} + _ = health_interval.tick() => {} } - // Check if conscious became active - if *unc_rx.borrow() { break; } - // Brief yield to not starve other tasks - tokio::task::yield_now().await; } } }); @@ -637,7 +649,8 @@ impl Mind { }; let mut cmds = Vec::new(); - let mut dmn_expired = false; + #[allow(unused_assignments)] + let mut _dmn_expired = false; tokio::select! { biased; @@ -676,7 +689,7 @@ impl Mind { } } - _ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true, + _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, } if !self.config.no_agents { diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 87c44db..8989264 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -71,6 +71,8 @@ pub struct Unconscious { max_concurrent: usize, pub graph_health: Option, last_health_check: Option, + /// Notified when agent state changes (finished, toggled) + pub wake: std::sync::Arc, } impl Unconscious { @@ -117,6 +119,7 @@ impl Unconscious { agents, max_concurrent, graph_health: None, last_health_check: None, + wake: std::sync::Arc::new(tokio::sync::Notify::new()), } } @@ -130,11 +133,13 @@ impl Unconscious { if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() { let agent_name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); - match prepare_spawn(&agent_name, auto).await { + let wake = self.wake.clone(); + match prepare_spawn(&agent_name, auto, wake).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } } + self.wake.notify_one(); // wake loop to consider new state Some(new_state) } @@ -245,7 +250,7 @@ pub struct SpawnResult { /// Called outside the Unconscious lock. /// On success, auto is consumed (moved into spawned task). /// On failure, auto is returned so it can be restored. -pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result { +pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc) -> Result { dbglog!("[unconscious] spawning {}", name); let def = match defs::get_def(name) { @@ -312,6 +317,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), }