From 7eb86656d4a9573e13f3701e6585d12daf03fdd8 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 13 Apr 2026 19:31:28 -0400 Subject: [PATCH] store: read nodes via index instead of HashMap - Add get_node() and contains_key() methods that read via redb index - Migrate all store/ reads to use index lookup - Remove HashMap cache updates from mutations (write-through to capnp+index only) - Remove replay_nodes() - load no longer builds HashMap - Update db_is_healthy to validate by spot-checking offsets - Fix set_weight bug: now persists weight changes to capnp Store.nodes HashMap still exists for code outside store/ module, but store/ itself no longer uses it. Co-Authored-By: Proof of Concept --- src/hippocampus/store/capnp.rs | 140 ++++++++++++++++----------------- src/hippocampus/store/mod.rs | 26 +++++- src/hippocampus/store/ops.rs | 66 ++++++++++------ src/hippocampus/store/view.rs | 47 ++++++++--- 4 files changed, 167 insertions(+), 112 deletions(-) diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index ddd6531..fee5762 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -206,39 +206,79 @@ impl Relation { } } +// --------------------------------------------------------------------------- +// Direct node access +// --------------------------------------------------------------------------- + +/// Read a single node at the given offset in the capnp log. +/// The offset must point to a valid message containing the node. +pub fn read_node_at_offset(offset: u64) -> Result { + let path = nodes_path(); + let mut file = fs::File::open(&path) + .with_context(|| format!("open {}", path.display()))?; + + use std::io::{Seek, SeekFrom}; + file.seek(SeekFrom::Start(offset))?; + + let mut reader = BufReader::new(file); + let msg = serialize::read_message(&mut reader, message::ReaderOptions::new()) + .with_context(|| format!("read message at offset {}", offset))?; + + let log = msg.get_root::() + .with_context(|| "read node log")?; + let nodes = log.get_nodes() + .with_context(|| "get nodes")?; + + // A message at this offset should have exactly one node (from upsert), + // or we take the last one if there are multiple (from batch operations like rename) + if nodes.is_empty() { + anyhow::bail!("no nodes in message at offset {}", offset); + } + + // Return the first non-deleted node, or the first one if all are deleted + for node_reader in nodes.iter() { + let node = Node::from_capnp_migrate(node_reader)?; + if !node.deleted { + return Ok(node); + } + } + + // All nodes in this message are deleted - shouldn't happen if index is correct + Node::from_capnp_migrate(nodes.get(0)) +} + // --------------------------------------------------------------------------- // Store persistence methods // --------------------------------------------------------------------------- impl Store { - /// Load store by replaying capnp logs, then open/verify redb indices. + /// Load store by opening redb index and replaying relations. pub fn load() -> Result { let nodes_p = nodes_path(); let rels_p = relations_path(); let mut store = Store::default(); - if nodes_p.exists() { - store.replay_nodes(&nodes_p)?; - } + // Open redb index first (rebuilds from capnp if needed) + let db_p = db_path(); + store.db = Some(store.open_or_rebuild_db(&db_p)?); + + // Replay relations if rels_p.exists() { store.replay_relations(&rels_p)?; } - // Record log sizes after replay + // Record log sizes store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); // Drop edges referencing deleted/missing nodes + let db = store.db.as_ref().unwrap(); store.relations.retain(|r| - store.nodes.contains_key(&r.source_key) && - store.nodes.contains_key(&r.target_key) + index::contains_key(db, &r.source_key).unwrap_or(false) && + index::contains_key(db, &r.target_key).unwrap_or(false) ); - // Open redb and verify/rebuild indices - let db_p = db_path(); - store.db = Some(store.open_or_rebuild_db(&db_p)?); - Ok(store) } @@ -263,80 +303,38 @@ impl Store { rebuild_index(path, &nodes_path()) } - /// Check if redb indices match in-memory state. + /// Check if redb index is healthy by verifying some offsets are valid. fn db_is_healthy(&self, database: &redb::Database) -> Result { - use redb::ReadableDatabase; + use redb::{ReadableDatabase, ReadableTable}; let txn = database.begin_read()?; - - // Quick check: node count should match let nodes_table = txn.open_table(index::NODES)?; - let db_count = nodes_table.len()?; - if db_count != self.nodes.len() as u64 { - return Ok(false); + // Check that we can read the table and it has entries + if nodes_table.len()? == 0 { + // Empty database - might be stale or new + let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); + return Ok(capnp_size == 0); // healthy only if capnp is also empty } - // Spot check: verify a few random nodes exist with matching keys - // (full verification would be too slow) - for (i, key) in self.nodes.keys().enumerate() { - if i >= 10 { break; } // check first 10 - if nodes_table.get(key.as_str())?.is_none() { + // Spot check: verify a few offsets point to valid messages + let mut checked = 0; + for entry in nodes_table.iter()? { + if checked >= 5 { break; } + let (key, offset) = entry?; + let offset = offset.value(); + + // Try to read the node at this offset + if read_node_at_offset(offset).is_err() { return Ok(false); } + checked += 1; + let _ = key; // silence unused warning } Ok(true) } - /// Replay node log, keeping latest version per UUID. - /// Tracks all UUIDs seen per key to detect duplicates. - fn replay_nodes(&mut self, path: &Path) -> Result<()> { - let file = fs::File::open(path) - .with_context(|| format!("open {}", path.display()))?; - let mut reader = BufReader::new(file); - - // Track all non-deleted UUIDs per key to detect duplicates - let mut key_uuids: HashMap> = HashMap::new(); - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = msg.get_root::() - .with_context(|| format!("read node log"))?; - for node_reader in log.get_nodes() - .with_context(|| format!("get nodes"))? { - let node = Node::from_capnp_migrate(node_reader)?; - let existing_version = self.nodes.get(&node.key) - .map(|n| n.version) - .unwrap_or(0); - if node.version >= existing_version { - if node.deleted { - self.nodes.remove(&node.key); - self.uuid_to_key.remove(&node.uuid); - if let Some(uuids) = key_uuids.get_mut(&node.key) { - uuids.retain(|u| *u != node.uuid); - } - } else { - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(node.key.clone(), node.clone()); - let uuids = key_uuids.entry(node.key).or_default(); - if !uuids.contains(&node.uuid) { - uuids.push(node.uuid); - } - } - } - } - } - - // Report duplicate keys - for (key, uuids) in &key_uuids { - if uuids.len() > 1 { - dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len()); - } - } - - Ok(()) - } - /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 29af846..8009c21 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -76,15 +76,37 @@ impl Store { graph::build_graph(self) } + /// Get a node by key, reading from capnp via the index. + pub fn get_node(&self, key: &str) -> Result> { + let db = self.db.as_ref() + .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; + + match index::get_offset(db, key)? { + Some(offset) => Ok(Some(capnp::read_node_at_offset(offset)?)), + None => Ok(None), + } + } + + /// Check if a node exists by key. + pub fn contains_key(&self, key: &str) -> Result { + let db = self.db.as_ref() + .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; + index::contains_key(db, key) + } + pub fn resolve_key(&self, target: &str) -> Result { // Strip .md suffix if present — keys no longer use it let bare = strip_md_suffix(target); - if self.nodes.contains_key(&bare) { + if self.contains_key(&bare)? { return Ok(bare); } - let matches: Vec<_> = self.nodes.keys() + let db = self.db.as_ref() + .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; + let all_keys = index::all_keys(db)?; + + let matches: Vec<_> = all_keys.iter() .filter(|k| k.to_lowercase().contains(&target.to_lowercase())) .cloned().collect(); diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index d7def92..eac4e5f 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -2,7 +2,7 @@ // // CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics. -use super::{index, types::*, Store}; +use super::{capnp, index, types::*, Store}; use anyhow::{anyhow, bail, Result}; use std::collections::{HashMap, HashSet}; @@ -17,7 +17,7 @@ pub fn current_provenance() -> String { impl Store { /// Add or update a node (appends to log + updates index). pub fn upsert_node(&mut self, mut node: Node) -> Result<()> { - if let Some(existing) = self.nodes.get(&node.key) { + if let Some(existing) = self.get_node(&node.key)? { node.uuid = existing.uuid; node.version = existing.version + 1; } @@ -25,8 +25,6 @@ impl Store { if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(node.key.clone(), node); Ok(()) } @@ -39,9 +37,24 @@ impl Store { /// Recent nodes by provenance, sorted newest-first. Returns (key, timestamp). pub fn recent_by_provenance(&self, provenance: &str, limit: usize) -> Vec<(String, i64)> { - let mut nodes: Vec<_> = self.nodes.values() - .filter(|n| !n.deleted && n.provenance == provenance) - .map(|n| (n.key.clone(), n.timestamp)) + let db = match self.db.as_ref() { + Some(db) => db, + None => return Vec::new(), + }; + let keys = match index::all_keys(db) { + Ok(keys) => keys, + Err(_) => return Vec::new(), + }; + let mut nodes: Vec<_> = keys.iter() + .filter_map(|key| { + let offset = index::get_offset(db, key).ok()??; + let node = capnp::read_node_at_offset(offset).ok()?; + if !node.deleted && node.provenance == provenance { + Some((key.clone(), node.timestamp)) + } else { + None + } + }) .collect(); nodes.sort_by(|a, b| b.1.cmp(&a.1)); nodes.truncate(limit); @@ -60,11 +73,11 @@ impl Store { /// Upsert with explicit provenance (for agent-created nodes). pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> { - if let Some(existing) = self.nodes.get(key) { + if let Some(existing) = self.get_node(key)? { if existing.content == content { return Ok("unchanged"); } - let mut node = existing.clone(); + let mut node = existing; node.content = content.to_string(); node.provenance = provenance.to_string(); node.timestamp = now_epoch(); @@ -73,7 +86,6 @@ impl Store { if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } - self.nodes.insert(key.to_string(), node); Ok("updated") } else { let mut node = new_node(key, content); @@ -82,8 +94,6 @@ impl Store { if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(key.to_string(), node); Ok("created") } } @@ -92,10 +102,10 @@ impl Store { pub fn delete_node(&mut self, key: &str) -> Result<()> { let prov = current_provenance(); - let node = self.nodes.get(key) + let node = self.get_node(key)? .ok_or_else(|| anyhow!("No node '{}'", key))?; let uuid = node.uuid; - let mut deleted = node.clone(); + let mut deleted = node; deleted.deleted = true; deleted.version += 1; deleted.provenance = prov; @@ -104,7 +114,6 @@ impl Store { if let Some(ref database) = self.db { index::remove_node(database, key, &uuid)?; } - self.nodes.remove(key); Ok(()) } @@ -117,12 +126,11 @@ impl Store { if old_key == new_key { return Ok(()); } - if self.nodes.contains_key(new_key) { + if self.contains_key(new_key)? { bail!("Key '{}' already exists", new_key); } - let node = self.nodes.get(old_key) - .ok_or_else(|| anyhow!("No node '{}'", old_key))? - .clone(); + let node = self.get_node(old_key)? + .ok_or_else(|| anyhow!("No node '{}'", old_key))?; let prov = current_provenance(); @@ -164,10 +172,7 @@ impl Store { index::index_node(database, new_key, offset, &renamed.uuid)?; } - // Update in-memory cache - self.nodes.remove(old_key); - self.uuid_to_key.insert(renamed.uuid, new_key.to_string()); - self.nodes.insert(new_key.to_string(), renamed); + // Update in-memory relations cache for updated in &updated_rels { if let Some(r) = self.relations.iter_mut().find(|r| r.uuid == updated.uuid) { r.source_key = updated.source_key.clone(); @@ -261,10 +266,19 @@ impl Store { /// Set a node's weight directly. Returns (old, new). pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> { let weight = weight.clamp(0.01, 1.0); - let node = self.nodes.get_mut(key) + let mut node = self.get_node(key)? .ok_or_else(|| anyhow!("node not found: {}", key))?; let old = node.weight; + if (old - weight).abs() < 0.001 { + return Ok((old, weight)); // unchanged + } node.weight = weight; + node.version += 1; + node.timestamp = now_epoch(); + let offset = self.append_nodes(std::slice::from_ref(&node))?; + if let Some(ref database) = self.db { + index::index_node(database, key, offset, &node.uuid)?; + } Ok((old, weight)) } @@ -317,10 +331,10 @@ impl Store { bail!("link already exists: {} ↔ {}", source, target); } - let source_uuid = self.nodes.get(source) + let source_uuid = self.get_node(source)? .map(|n| n.uuid) .ok_or_else(|| anyhow!("source not found: {}", source))?; - let target_uuid = self.nodes.get(target) + let target_uuid = self.get_node(target)? .map(|n| n.uuid) .ok_or_else(|| anyhow!("target not found: {}", target))?; diff --git a/src/hippocampus/store/view.rs b/src/hippocampus/store/view.rs index dedeae9..ca0e7ad 100644 --- a/src/hippocampus/store/view.rs +++ b/src/hippocampus/store/view.rs @@ -1,6 +1,6 @@ // Read-only access abstraction for the memory store -use super::types::*; +use super::{capnp, index, types::*}; use super::Store; // --------------------------------------------------------------------------- @@ -19,21 +19,42 @@ pub trait StoreView { /// Node weight by key, or the default weight if missing. fn node_weight(&self, key: &str) -> f64; - - /// Node content by key. - fn node_content(&self, key: &str) -> Option<&str>; } impl StoreView for Store { fn for_each_node(&self, mut f: F) { - for (key, node) in &self.nodes { - f(key, &node.content, node.weight); + let db = match self.db.as_ref() { + Some(db) => db, + None => return, + }; + let keys = match index::all_keys(db) { + Ok(keys) => keys, + Err(_) => return, + }; + for key in keys { + if let Ok(Some(offset)) = index::get_offset(db, &key) { + if let Ok(node) = capnp::read_node_at_offset(offset) { + f(&key, &node.content, node.weight); + } + } } } fn for_each_node_meta(&self, mut f: F) { - for (key, node) in &self.nodes { - f(key, node.node_type, node.timestamp); + let db = match self.db.as_ref() { + Some(db) => db, + None => return, + }; + let keys = match index::all_keys(db) { + Ok(keys) => keys, + Err(_) => return, + }; + for key in keys { + if let Ok(Some(offset)) = index::get_offset(db, &key) { + if let Ok(node) = capnp::read_node_at_offset(offset) { + f(&key, node.node_type, node.timestamp); + } + } } } @@ -46,10 +67,10 @@ impl StoreView for Store { fn node_weight(&self, key: &str) -> f64 { let cfg = crate::config::get(); - self.nodes.get(key).map(|n| n.weight as f64).unwrap_or(cfg.default_node_weight) - } - - fn node_content(&self, key: &str) -> Option<&str> { - self.nodes.get(key).map(|n| n.content.as_str()) + self.get_node(key) + .ok() + .flatten() + .map(|n| n.weight as f64) + .unwrap_or(cfg.default_node_weight) } }