From 19789b7e745644817d6369cbec0bc4c7adfd1991 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 13 Apr 2026 22:25:12 -0400 Subject: [PATCH] index: add NODES_BY_PROVENANCE with timestamp-sorted values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Store [negated_timestamp:8][key] as value for descending sort - recent_by_provenance uses index directly, no capnp reads - Eliminates 24k×5 capnp reads from subconscious snapshots Co-Authored-By: Proof of Concept --- src/hippocampus/store/capnp.rs | 21 +++++++++++---- src/hippocampus/store/index.rs | 48 ++++++++++++++++++++++++++++++++-- src/hippocampus/store/ops.rs | 32 ++++++----------------- 3 files changed, 70 insertions(+), 31 deletions(-) diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index 160a551..3f0e229 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -598,8 +598,8 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { return Ok(database); } - // Track latest (offset, uuid, version, deleted, node_type, timestamp) per key - let mut latest: HashMap = HashMap::new(); + // Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key + let mut latest: HashMap = HashMap::new(); let file = fs::File::open(capnp_path) .with_context(|| format!("open {}", capnp_path.display()))?; @@ -634,6 +634,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { .map(|t| t as u8) .unwrap_or(0); let timestamp = node_reader.get_timestamp(); + let provenance = node_reader.get_provenance().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("manual") + .to_string(); let mut uuid = [0u8; 16]; if let Ok(data) = node_reader.get_uuid() { @@ -644,10 +648,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { // Keep if newer version let dominated = latest.get(&key) - .map(|(_, _, v, _, _, _)| version >= *v) + .map(|(_, _, v, _, _, _, _)| version >= *v) .unwrap_or(true); if dominated { - latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp)); + latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance)); } } } @@ -659,8 +663,9 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { let mut nodes_table = txn.open_table(index::NODES)?; let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?; + let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?; - for (key, (offset, uuid, _, deleted, node_type, timestamp)) in latest { + for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest { if !deleted { nodes_table.insert(key.as_str(), offset)?; // Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes @@ -669,6 +674,12 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { packed[16] = node_type; packed[17..25].copy_from_slice(×tamp.to_be_bytes()); key_uuid_table.insert(key.as_str(), packed.as_slice())?; + // Pack: [negated_timestamp:8][key] for descending sort + let neg_ts = (!timestamp).to_be_bytes(); + let mut prov_val = Vec::with_capacity(8 + key.len()); + prov_val.extend_from_slice(&neg_ts); + prov_val.extend_from_slice(key.as_bytes()); + by_provenance.insert(provenance.as_str(), prov_val.as_slice())?; } // Always record offset in UUID history (even for deleted) uuid_offsets.insert(uuid.as_slice(), offset)?; diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index 012db0f..a663366 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -24,7 +24,8 @@ pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes"); // KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid"); pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets"); -pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &str> = MultimapTableDefinition::new("nodes_by_provenance"); +// NODES_BY_PROVENANCE: provenance → [timestamp:8 BE][key] (sorted by timestamp desc via negated ts) +pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &[u8]> = MultimapTableDefinition::new("nodes_by_provenance"); // Composite key: [node_type: u8][timestamp: i64 BE] for range queries pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type"); @@ -81,19 +82,62 @@ pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64) { } } +/// Pack provenance value: [negated_timestamp:8][key] for descending sort +fn pack_provenance_value(timestamp: i64, key: &str) -> Vec { + let neg_ts = (!timestamp).to_be_bytes(); // negate for descending order + let mut buf = Vec::with_capacity(8 + key.len()); + buf.extend_from_slice(&neg_ts); + buf.extend_from_slice(key.as_bytes()); + buf +} + +/// Unpack provenance value: returns (timestamp, key) +fn unpack_provenance_value(data: &[u8]) -> (i64, String) { + let neg_ts = i64::from_be_bytes([data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]]); + let timestamp = !neg_ts; + let key = String::from_utf8_lossy(&data[8..]).to_string(); + (timestamp, key) +} + /// Record a node's location in the index. -pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64) -> Result<()> { +pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str) -> Result<()> { let mut nodes_table = txn.open_table(NODES)?; let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; + let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; nodes_table.insert(key, offset)?; let packed = pack_node_meta(uuid, node_type, timestamp); key_uuid_table.insert(key, packed.as_slice())?; uuid_offsets.insert(uuid.as_slice(), offset)?; + let prov_val = pack_provenance_value(timestamp, key); + by_provenance.insert(provenance, prov_val.as_slice())?; Ok(()) } +/// Get recent keys for a given provenance, sorted by timestamp descending. +pub fn recent_by_provenance(db: &Database, provenance: &str, limit: usize) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let mut results = Vec::new(); + for entry in table.get(provenance)? { + if results.len() >= limit { break; } + let (timestamp, key) = unpack_provenance_value(entry?.value()); + results.push((key, timestamp)); + } + Ok(results) +} + +/// Get node metadata (uuid, node_type, timestamp) from KEY_TO_UUID. +pub fn get_node_meta(db: &Database, key: &str) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(KEY_TO_UUID)?; + match table.get(key)? { + Some(data) => Ok(Some(unpack_node_meta(data.value()))), + None => Ok(None), + } +} + /// Get offset for a node by key. pub fn get_offset(db: &Database, key: &str) -> Result> { let txn = db.begin_read()?; diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index 8beb173..c7ff977 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -2,7 +2,7 @@ // // CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics. -use super::{capnp, index, types::*, Store}; +use super::{index, types::*, Store}; use anyhow::{anyhow, bail, Result}; use std::collections::{HashMap, HashSet}; @@ -24,7 +24,7 @@ impl Store { let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let txn = db.begin_write()?; let offset = self.append_nodes(&[node.clone()])?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok(()) } @@ -45,24 +45,8 @@ impl Store { Some(db) => db, None => return Vec::new(), }; - let keys = match index::all_keys(db) { - Ok(keys) => keys, - Err(_) => return Vec::new(), - }; - let mut nodes: Vec<_> = keys.iter() - .filter_map(|key| { - let offset = index::get_offset(db, key).ok()??; - let node = capnp::read_node_at_offset(offset).ok()?; - if !node.deleted && node.provenance == provenance { - Some((key.clone(), node.timestamp)) - } else { - None - } - }) - .collect(); - nodes.sort_by(|a, b| b.1.cmp(&a.1)); - nodes.truncate(limit); - nodes + // Index stores entries sorted by timestamp descending, so just take first N + index::recent_by_provenance(db, provenance, limit).unwrap_or_default() } /// Upsert a node: update if exists (and content changed), create if not. @@ -90,7 +74,7 @@ impl Store { node.version += 1; let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok("updated") } else { @@ -98,7 +82,7 @@ impl Store { node.provenance = provenance.to_string(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok("created") } @@ -189,7 +173,7 @@ impl Store { let txn = db.begin_write()?; let offset = self.append_nodes(&[renamed.clone(), tombstone])?; index::remove_node(&txn, old_key)?; - index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp)?; + index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance)?; if !updated_rels.is_empty() { self.append_relations(&updated_rels)?; } @@ -320,7 +304,7 @@ impl Store { node.timestamp = now_epoch(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp)?; + index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; txn.commit()?; Ok((old, weight)) }