From f413a853d83b0048de9a381b2a6f19ffb3263a37 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 13 Apr 2026 19:10:08 -0400 Subject: [PATCH] store: redb indexes offsets into capnp log, not full nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restructure store module with clearer file names: - persist.rs → capnp.rs (capnp log IO) - db.rs → index.rs (redb index operations) redb now stores key → offset mapping, not serialized nodes. Mutations record the offset after appending to capnp log. rebuild_index scans capnp log to reconstruct the index. The HashMap still exists for now; next step is to use the index for lookups and remove it. Co-Authored-By: Proof of Concept --- .../store/{persist.rs => capnp.rs} | 107 ++++++++++++++-- src/hippocampus/store/db.rs | 119 ------------------ src/hippocampus/store/index.rs | 104 +++++++++++++++ src/hippocampus/store/mod.rs | 18 +-- src/hippocampus/store/ops.rs | 26 ++-- 5 files changed, 225 insertions(+), 149 deletions(-) rename src/hippocampus/store/{persist.rs => capnp.rs} (83%) delete mode 100644 src/hippocampus/store/db.rs create mode 100644 src/hippocampus/store/index.rs diff --git a/src/hippocampus/store/persist.rs b/src/hippocampus/store/capnp.rs similarity index 83% rename from src/hippocampus/store/persist.rs rename to src/hippocampus/store/capnp.rs index a199e7d..a8eee3c 100644 --- a/src/hippocampus/store/persist.rs +++ b/src/hippocampus/store/capnp.rs @@ -2,7 +2,7 @@ // // capnp logs are the source of truth; redb provides indexed access. -use super::{db, types::*}; +use super::{index, types::*}; use redb::ReadableTableMetadata; use crate::memory_capnp; @@ -52,7 +52,7 @@ impl Store { fn open_or_rebuild_db(&self, path: &Path) -> Result { // Try opening existing database if path.exists() { - match db::open_db(path) { + match index::open_db(path) { Ok(database) => { if self.db_is_healthy(&database)? { return Ok(database); @@ -65,8 +65,8 @@ impl Store { } } - // Rebuild from in-memory state - db::rebuild_from_store(path, self) + // Rebuild index from capnp log + rebuild_index(path, &nodes_path()) } /// Check if redb indices match in-memory state. @@ -76,7 +76,7 @@ impl Store { let txn = database.begin_read()?; // Quick check: node count should match - let nodes_table = txn.open_table(db::NODES)?; + let nodes_table = txn.open_table(index::NODES)?; let db_count = nodes_table.len()?; if db_count != self.nodes.len() as u64 { @@ -218,13 +218,15 @@ impl Store { /// Append nodes to the log file. /// Serializes to a Vec first, then does a single write() syscall /// so the append is atomic with O_APPEND even without flock. - pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<()> { + /// Returns the offset where the message was written. + pub fn append_nodes(&mut self, nodes: &[Node]) -> Result { let _lock = StoreLock::acquire()?; self.append_nodes_unlocked(nodes) } /// Append nodes without acquiring the lock. Caller must hold StoreLock. - pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<()> { + /// Returns the offset where the message was written. + pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); @@ -241,12 +243,16 @@ impl Store { let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; + + // Get offset before writing + let offset = file.metadata().map(|m| m.len()).unwrap_or(0); + use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write nodes"))?; self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); - Ok(()) + Ok(offset) } /// Replay only new entries appended to the node log since we last loaded. @@ -418,3 +424,88 @@ pub fn fsck() -> Result<()> { Ok(()) } + +/// Rebuild redb index from capnp log. +/// Scans the log, tracking offsets, and records latest version of each node. +fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { + // Remove old database if it exists + if db_path.exists() { + fs::remove_file(db_path) + .with_context(|| format!("remove old db {}", db_path.display()))?; + } + + let database = index::open_db(db_path)?; + + if !capnp_path.exists() { + return Ok(database); + } + + // Track latest (offset, uuid, version, deleted) per key + let mut latest: HashMap = HashMap::new(); + + let file = fs::File::open(capnp_path) + .with_context(|| format!("open {}", capnp_path.display()))?; + let mut reader = BufReader::new(file); + + loop { + let offset = reader.stream_position()?; + let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { + Ok(m) => m, + Err(_) => break, + }; + + let log = match msg.get_root::() { + Ok(l) => l, + Err(_) => continue, + }; + + let nodes = match log.get_nodes() { + Ok(n) => n, + Err(_) => continue, + }; + for node_reader in nodes { + let key = node_reader.get_key().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("") + .to_string(); + if key.is_empty() { continue; } + + let version = node_reader.get_version(); + let deleted = node_reader.get_deleted(); + + let mut uuid = [0u8; 16]; + if let Ok(data) = node_reader.get_uuid() { + if data.len() >= 16 { + uuid.copy_from_slice(&data[..16]); + } + } + + // Keep if newer version + let dominated = latest.get(&key) + .map(|(_, _, v, _)| version >= *v) + .unwrap_or(true); + if dominated { + latest.insert(key, (offset, uuid, version, deleted)); + } + } + } + + // Write index entries for non-deleted nodes + { + let txn = database.begin_write()?; + { + let mut nodes_table = txn.open_table(index::NODES)?; + let mut uuid_table = txn.open_table(index::UUID_TO_KEY)?; + + for (key, (offset, uuid, _, deleted)) in latest { + if !deleted { + nodes_table.insert(key.as_str(), offset)?; + uuid_table.insert(uuid.as_slice(), key.as_str())?; + } + } + } + txn.commit()?; + } + + Ok(database) +} diff --git a/src/hippocampus/store/db.rs b/src/hippocampus/store/db.rs deleted file mode 100644 index 403436a..0000000 --- a/src/hippocampus/store/db.rs +++ /dev/null @@ -1,119 +0,0 @@ -// redb index tables -// -// capnp logs are source of truth; redb provides indexed access. -// Tables: -// nodes: key → Node (JSON serialized) -// uuid_to_key: [u8;16] → key -// -// Relations stay in-memory for now (frequently iterated in full). - -use super::types::*; -use anyhow::{Context, Result}; -use redb::{Database, ReadableDatabase, TableDefinition}; -use std::path::Path; - -// Table definitions -pub const NODES: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes"); -pub const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key"); - -/// Open or create the redb database, ensuring all tables exist. -pub fn open_db(path: &Path) -> Result { - let db = Database::create(path) - .with_context(|| format!("create redb {}", path.display()))?; - - // Ensure tables exist by opening a write transaction - let txn = db.begin_write()?; - { - let _ = txn.open_table(NODES)?; - let _ = txn.open_table(UUID_TO_KEY)?; - } - txn.commit()?; - - Ok(db) -} - -/// Rebuild redb indices from in-memory Store (loaded from capnp logs). -/// Deletes and recreates the database for a clean rebuild. -pub fn rebuild_from_store(path: &Path, store: &Store) -> Result { - // Remove old database if it exists - if path.exists() { - std::fs::remove_file(path) - .with_context(|| format!("remove old db {}", path.display()))?; - } - - let db = open_db(path)?; - let txn = db.begin_write()?; - - { - let mut nodes_table = txn.open_table(NODES)?; - let mut uuid_table = txn.open_table(UUID_TO_KEY)?; - - for (key, node) in &store.nodes { - let json = serde_json::to_vec(node) - .with_context(|| format!("serialize node {}", key))?; - nodes_table.insert(key.as_str(), json.as_slice())?; - uuid_table.insert(node.uuid.as_slice(), key.as_str())?; - } - } - - txn.commit()?; - Ok(db) -} - -/// Get a node by key from redb. -pub fn get_node(db: &Database, key: &str) -> Result> { - let txn = db.begin_read()?; - let table = txn.open_table(NODES)?; - - match table.get(key)? { - Some(data) => { - let node: Node = serde_json::from_slice(data.value()) - .with_context(|| format!("deserialize node {}", key))?; - Ok(Some(node)) - } - None => Ok(None), - } -} - -/// Get key by uuid from redb. -pub fn get_key_by_uuid(db: &Database, uuid: &[u8; 16]) -> Result> { - let txn = db.begin_read()?; - let table = txn.open_table(UUID_TO_KEY)?; - - match table.get(uuid.as_slice())? { - Some(key) => Ok(Some(key.value().to_string())), - None => Ok(None), - } -} - -/// Insert or update a node in redb. -pub fn upsert_node(db: &Database, node: &Node) -> Result<()> { - let txn = db.begin_write()?; - { - let mut nodes_table = txn.open_table(NODES)?; - let mut uuid_table = txn.open_table(UUID_TO_KEY)?; - - let json = serde_json::to_vec(node) - .with_context(|| format!("serialize node {}", node.key))?; - - nodes_table.insert(node.key.as_str(), json.as_slice())?; - uuid_table.insert(node.uuid.as_slice(), node.key.as_str())?; - } - txn.commit()?; - Ok(()) -} - -/// Delete a node from redb indices (by marking key empty, keeping uuid mapping). -pub fn delete_node(db: &Database, key: &str, uuid: &[u8; 16]) -> Result<()> { - let txn = db.begin_write()?; - { - let mut nodes_table = txn.open_table(NODES)?; - let mut uuid_table = txn.open_table(UUID_TO_KEY)?; - - nodes_table.remove(key)?; - uuid_table.remove(uuid.as_slice())?; - } - txn.commit()?; - Ok(()) -} - diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs new file mode 100644 index 0000000..7a02616 --- /dev/null +++ b/src/hippocampus/store/index.rs @@ -0,0 +1,104 @@ +// redb index tables +// +// capnp logs are source of truth; redb provides indexed access. +// Tables: +// nodes: key → offset in capnp log (u64) +// uuid_to_key: [u8;16] → key +// +// To read a node: lookup offset in redb, seek in capnp file, deserialize. + +use anyhow::{Context, Result}; +use redb::{Database, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition}; +use std::path::Path; + +// Table definitions - nodes maps key to byte offset in capnp log +pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes"); +pub const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key"); + +/// Open or create the redb database, ensuring all tables exist. +pub fn open_db(path: &Path) -> Result { + let db = Database::create(path) + .with_context(|| format!("create redb {}", path.display()))?; + + // Ensure tables exist by opening a write transaction + let txn = db.begin_write()?; + { + let _ = txn.open_table(NODES)?; + let _ = txn.open_table(UUID_TO_KEY)?; + } + txn.commit()?; + + Ok(db) +} + +/// Record a node's location in the index. +pub fn index_node(db: &Database, key: &str, offset: u64, uuid: &[u8; 16]) -> Result<()> { + let txn = db.begin_write()?; + { + let mut nodes_table = txn.open_table(NODES)?; + let mut uuid_table = txn.open_table(UUID_TO_KEY)?; + + nodes_table.insert(key, offset)?; + uuid_table.insert(uuid.as_slice(), key)?; + } + txn.commit()?; + Ok(()) +} + +/// Get offset for a node by key. +pub fn get_offset(db: &Database, key: &str) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(NODES)?; + Ok(table.get(key)?.map(|v| v.value())) +} + +/// Check if a key exists in the index. +pub fn contains_key(db: &Database, key: &str) -> Result { + let txn = db.begin_read()?; + let table = txn.open_table(NODES)?; + Ok(table.get(key)?.is_some()) +} + +/// Get key by uuid from redb. +pub fn get_key_by_uuid(db: &Database, uuid: &[u8; 16]) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(UUID_TO_KEY)?; + + match table.get(uuid.as_slice())? { + Some(key) => Ok(Some(key.value().to_string())), + None => Ok(None), + } +} + +/// Remove a node from the index. +pub fn remove_node(db: &Database, key: &str, uuid: &[u8; 16]) -> Result<()> { + let txn = db.begin_write()?; + { + let mut nodes_table = txn.open_table(NODES)?; + let mut uuid_table = txn.open_table(UUID_TO_KEY)?; + + nodes_table.remove(key)?; + uuid_table.remove(uuid.as_slice())?; + } + txn.commit()?; + Ok(()) +} + +/// Count nodes in the index. +pub fn node_count(db: &Database) -> Result { + let txn = db.begin_read()?; + let table = txn.open_table(NODES)?; + Ok(table.len()?) +} + +/// Collect all keys from the index. +pub fn all_keys(db: &Database) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(NODES)?; + let mut keys = Vec::new(); + for entry in table.iter()? { + let (key, _) = entry?; + keys.push(key.value().to_string()); + } + Ok(keys) +} diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 4f26915..9812c54 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -7,17 +7,17 @@ // redb provides indexed access; Store struct holds in-memory state. // // Module layout: -// types.rs — Node, Relation, enums, capnp macros, path helpers -// view.rs — StoreView trait for read-only access -// persist.rs — load, replay, append (capnp IO) -// ops.rs — mutations (upsert, delete, decay, cap_degree, etc.) -// mod.rs — re-exports, key resolution, ingestion, rendering +// types.rs — Node, Relation, enums, capnp macros, path helpers +// index.rs — redb index operations +// capnp.rs — capnp log IO (load, replay, append, fsck) +// ops.rs — mutations (upsert, delete, rename, etc.) +// view.rs — StoreView trait for read-only access mod types; -mod view; -mod persist; +mod index; +mod capnp; mod ops; -pub mod db; +mod view; // Re-export everything callers need pub use types::{ @@ -27,7 +27,7 @@ pub use types::{ new_node, new_relation, }; pub use view::StoreView; -pub use persist::fsck; +pub use capnp::fsck; pub use ops::current_provenance; use crate::graph::{self, Graph}; diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index a83d366..f45fdb0 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -2,7 +2,7 @@ // // CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics. -use super::{db, types::*}; +use super::{index, types::*}; use anyhow::{anyhow, bail, Result}; use std::collections::{HashMap, HashSet}; @@ -15,7 +15,7 @@ pub fn current_provenance() -> String { } impl Store { - /// Add or update a node (appends to log + updates cache + redb). + /// Add or update a node (appends to log + updates index). /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. pub fn upsert_node(&mut self, mut node: Node) -> Result<()> { let _lock = StoreLock::acquire()?; @@ -25,9 +25,9 @@ impl Store { node.uuid = existing.uuid; node.version = existing.version + 1; } - self.append_nodes_unlocked(&[node.clone()])?; + let offset = self.append_nodes_unlocked(&[node.clone()])?; if let Some(ref database) = self.db { - db::upsert_node(database, &node)?; + index::index_node(database, &node.key, offset, &node.uuid)?; } self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node); @@ -77,18 +77,18 @@ impl Store { node.provenance = provenance.to_string(); node.timestamp = now_epoch(); node.version += 1; - self.append_nodes_unlocked(std::slice::from_ref(&node))?; + let offset = self.append_nodes_unlocked(std::slice::from_ref(&node))?; if let Some(ref database) = self.db { - db::upsert_node(database, &node)?; + index::index_node(database, &node.key, offset, &node.uuid)?; } self.nodes.insert(key.to_string(), node); Ok("updated") } else { let mut node = new_node(key, content); node.provenance = provenance.to_string(); - self.append_nodes_unlocked(std::slice::from_ref(&node))?; + let offset = self.append_nodes_unlocked(std::slice::from_ref(&node))?; if let Some(ref database) = self.db { - db::upsert_node(database, &node)?; + index::index_node(database, &node.key, offset, &node.uuid)?; } self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(key.to_string(), node); @@ -114,7 +114,7 @@ impl Store { deleted.timestamp = now_epoch(); self.append_nodes_unlocked(std::slice::from_ref(&deleted))?; if let Some(ref database) = self.db { - db::delete_node(database, key, &uuid)?; + index::remove_node(database, key, &uuid)?; } self.nodes.remove(key); Ok(()) @@ -172,15 +172,15 @@ impl Store { .collect(); // Persist under single lock - self.append_nodes_unlocked(&[renamed.clone(), tombstone.clone()])?; + let offset = self.append_nodes_unlocked(&[renamed.clone(), tombstone.clone()])?; if !updated_rels.is_empty() { self.append_relations_unlocked(&updated_rels)?; } - // Update redb: delete old key, insert renamed + // Update index: remove old key, add renamed if let Some(ref database) = self.db { - db::delete_node(database, old_key, &tombstone.uuid)?; - db::upsert_node(database, &renamed)?; + index::remove_node(database, old_key, &tombstone.uuid)?; + index::index_node(database, new_key, offset, &renamed.uuid)?; } // Update in-memory cache