From 4696bb8b7dbdc856ac2519b4ccbaed9af7f59057 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 13 Apr 2026 21:44:20 -0400 Subject: [PATCH] store: index ops take WriteTransaction, mutations batch properly Index functions now take &WriteTransaction instead of &Database, allowing callers to batch multiple index operations in a single transaction. Store mutations (upsert, delete, rename, etc.) now begin_write/commit their own transactions, ensuring atomicity. - replay_relations uses single txn for all relation indexing - Store::db() exposes Database for callers needing txn control - Convenience wrappers open their own txn for simple cases Co-Authored-By: Proof of Concept --- src/cli/admin.rs | 2 +- src/hippocampus/store/capnp.rs | 6 +- src/hippocampus/store/index.rs | 72 ++++++-------- src/hippocampus/store/mod.rs | 39 +++++--- src/hippocampus/store/ops.rs | 176 +++++++++++++++++---------------- 5 files changed, 151 insertions(+), 144 deletions(-) diff --git a/src/cli/admin.rs b/src/cli/admin.rs index 765c583..8100e6a 100644 --- a/src/cli/admin.rs +++ b/src/cli/admin.rs @@ -323,7 +323,7 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> { // Remove doomed nodes from index for (doomed_node, _) in &copies[1..] { - store.remove_from_index(&doomed_node.key, &doomed_node.uuid)?; + store.remove_from_index(&doomed_node.key)?; } merged += doomed_uuids.len(); diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index b194ad4..a9debff 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -354,12 +354,14 @@ impl Store { } } - // Index relations directly (no Vec intermediate) + // Index relations directly (single transaction) if let Some(db) = &self.db { + let txn = db.begin_write()?; for rel in by_uuid.into_values() { if rel.deleted { continue; } - index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; + index::index_relation(&txn, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; } + txn.commit()?; } Ok(()) } diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index 43bbb4f..9b11f11 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -16,7 +16,7 @@ // To get key from uuid: UUID_OFFSETS → read_node_at_offset() → node.key use anyhow::{Context, Result}; -use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition}; +use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition, WriteTransaction}; use std::path::Path; // Node tables @@ -55,18 +55,14 @@ pub fn open_db(path: &Path) -> Result { } /// Record a node's location in the index. -pub fn index_node(db: &Database, key: &str, offset: u64, uuid: &[u8; 16]) -> Result<()> { - let txn = db.begin_write()?; - { - let mut nodes_table = txn.open_table(NODES)?; - let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; - let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; +pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16]) -> Result<()> { + let mut nodes_table = txn.open_table(NODES)?; + let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; + let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; - nodes_table.insert(key, offset)?; - key_uuid_table.insert(key, uuid.as_slice())?; - uuid_offsets.insert(uuid.as_slice(), offset)?; - } - txn.commit()?; + nodes_table.insert(key, offset)?; + key_uuid_table.insert(key, uuid.as_slice())?; + uuid_offsets.insert(uuid.as_slice(), offset)?; Ok(()) } @@ -113,17 +109,13 @@ pub fn get_offsets_for_uuid(db: &Database, uuid: &[u8; 16]) -> Result> } /// Remove a node from the index (key mappings only; UUID history preserved). -pub fn remove_node(db: &Database, key: &str, _uuid: &[u8; 16]) -> Result<()> { - let txn = db.begin_write()?; - { - let mut nodes_table = txn.open_table(NODES)?; - let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; - // Note: UUID_OFFSETS is not cleared - preserves version history +pub fn remove_node(txn: &WriteTransaction, key: &str) -> Result<()> { + let mut nodes_table = txn.open_table(NODES)?; + let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; + // Note: UUID_OFFSETS is not cleared - preserves version history - nodes_table.remove(key)?; - key_uuid_table.remove(key)?; - } - txn.commit()?; + nodes_table.remove(key)?; + key_uuid_table.remove(key)?; Ok(()) } @@ -165,47 +157,39 @@ pub fn unpack_rel(data: &[u8]) -> ([u8; 16], f32, u8, bool) { /// Index a relation: store twice (once per endpoint). pub fn index_relation( - db: &Database, + txn: &WriteTransaction, source_uuid: &[u8; 16], target_uuid: &[u8; 16], strength: f32, rel_type: u8, ) -> Result<()> { - let txn = db.begin_write()?; - { - let mut rels = txn.open_multimap_table(RELS)?; + let mut rels = txn.open_multimap_table(RELS)?; - // Store outgoing: source → (target, strength, type, true) - let outgoing = pack_rel(target_uuid, strength, rel_type, true); - rels.insert(source_uuid.as_slice(), outgoing.as_slice())?; + // Store outgoing: source → (target, strength, type, true) + let outgoing = pack_rel(target_uuid, strength, rel_type, true); + rels.insert(source_uuid.as_slice(), outgoing.as_slice())?; - // Store incoming: target → (source, strength, type, false) - let incoming = pack_rel(source_uuid, strength, rel_type, false); - rels.insert(target_uuid.as_slice(), incoming.as_slice())?; - } - txn.commit()?; + // Store incoming: target → (source, strength, type, false) + let incoming = pack_rel(source_uuid, strength, rel_type, false); + rels.insert(target_uuid.as_slice(), incoming.as_slice())?; Ok(()) } /// Remove a relation from the index. pub fn remove_relation( - db: &Database, + txn: &WriteTransaction, source_uuid: &[u8; 16], target_uuid: &[u8; 16], strength: f32, rel_type: u8, ) -> Result<()> { - let txn = db.begin_write()?; - { - let mut rels = txn.open_multimap_table(RELS)?; + let mut rels = txn.open_multimap_table(RELS)?; - let outgoing = pack_rel(target_uuid, strength, rel_type, true); - rels.remove(source_uuid.as_slice(), outgoing.as_slice())?; + let outgoing = pack_rel(target_uuid, strength, rel_type, true); + rels.remove(source_uuid.as_slice(), outgoing.as_slice())?; - let incoming = pack_rel(source_uuid, strength, rel_type, false); - rels.remove(target_uuid.as_slice(), incoming.as_slice())?; - } - txn.commit()?; + let incoming = pack_rel(source_uuid, strength, rel_type, false); + rels.remove(target_uuid.as_slice(), incoming.as_slice())?; Ok(()) } diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index b5e7ddd..419b9a6 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -33,6 +33,7 @@ pub use ops::current_provenance; use crate::graph::{self, Graph}; use anyhow::{bail, Result}; +use redb::Database; /// Strip .md suffix from a key, handling both bare keys and section keys. /// "identity.md" → "identity", "foo.md#section" → "foo#section", "identity" → "identity" @@ -120,34 +121,44 @@ impl Store { Ok(neighbors) } + /// Get the database for transaction management. + pub fn db(&self) -> Result<&Database> { + self.db.as_ref().ok_or_else(|| anyhow::anyhow!("store not loaded")) + } + /// Remove a node from the index (used after appending a tombstone). - pub fn remove_from_index(&self, key: &str, uuid: &[u8; 16]) -> Result<()> { - if let Some(db) = self.db.as_ref() { - index::remove_node(db, key, uuid)?; - } + /// For batched operations, use index::remove_node with a WriteTransaction directly. + pub fn remove_from_index(&self, key: &str) -> Result<()> { + let db = self.db()?; + let txn = db.begin_write()?; + index::remove_node(&txn, key)?; + txn.commit()?; Ok(()) } /// Get all edges for a node by UUID. Returns (other_uuid, strength, rel_type, is_outgoing). pub fn edges_for_uuid(&self, uuid: &[u8; 16]) -> Result> { - let db = self.db.as_ref() - .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; + let db = self.db()?; index::edges_for_node(db, uuid) } - /// Add a relation to the index. + /// Add a relation to the index (opens its own transaction). + /// For batched operations, use index::index_relation with a WriteTransaction directly. pub fn index_relation(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> { - if let Some(db) = self.db.as_ref() { - index::index_relation(db, source, target, strength, rel_type)?; - } + let db = self.db()?; + let txn = db.begin_write()?; + index::index_relation(&txn, source, target, strength, rel_type)?; + txn.commit()?; Ok(()) } - /// Remove a relation from the index. + /// Remove a relation from the index (opens its own transaction). + /// For batched operations, use index::remove_relation with a WriteTransaction directly. pub fn remove_relation_from_index(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> { - if let Some(db) = self.db.as_ref() { - index::remove_relation(db, source, target, strength, rel_type)?; - } + let db = self.db()?; + let txn = db.begin_write()?; + index::remove_relation(&txn, source, target, strength, rel_type)?; + txn.commit()?; Ok(()) } diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index a978ab6..9aa8ade 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -21,19 +21,21 @@ impl Store { node.uuid = existing.uuid; node.version = existing.version + 1; } + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; + let txn = db.begin_write()?; let offset = self.append_nodes(&[node.clone()])?; - if let Some(ref database) = self.db { - index::index_node(database, &node.key, offset, &node.uuid)?; - } + index::index_node(&txn, &node.key, offset, &node.uuid)?; + txn.commit()?; Ok(()) } /// Add a relation (appends to log + indexes) pub fn add_relation(&mut self, rel: Relation) -> Result<()> { + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; + let txn = db.begin_write()?; self.append_relations(std::slice::from_ref(&rel))?; - if let Some(db) = &self.db { - index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; - } + index::index_relation(&txn, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; + txn.commit()?; Ok(()) } @@ -75,6 +77,8 @@ impl Store { /// Upsert with explicit provenance (for agent-created nodes). pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> { + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; + if let Some(existing) = self.get_node(key)? { if existing.content == content { return Ok("unchanged"); @@ -84,18 +88,18 @@ impl Store { node.provenance = provenance.to_string(); node.timestamp = now_epoch(); node.version += 1; + let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - if let Some(ref database) = self.db { - index::index_node(database, &node.key, offset, &node.uuid)?; - } + index::index_node(&txn, &node.key, offset, &node.uuid)?; + txn.commit()?; Ok("updated") } else { let mut node = new_node(key, content); node.provenance = provenance.to_string(); + let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - if let Some(ref database) = self.db { - index::index_node(database, &node.key, offset, &node.uuid)?; - } + index::index_node(&txn, &node.key, offset, &node.uuid)?; + txn.commit()?; Ok("created") } } @@ -103,19 +107,20 @@ impl Store { /// Soft-delete a node (appends deleted version, removes from index). pub fn delete_node(&mut self, key: &str) -> Result<()> { let prov = current_provenance(); + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let node = self.get_node(key)? .ok_or_else(|| anyhow!("No node '{}'", key))?; - let uuid = node.uuid; let mut deleted = node; deleted.deleted = true; deleted.version += 1; deleted.provenance = prov; deleted.timestamp = now_epoch(); + + let txn = db.begin_write()?; self.append_nodes(std::slice::from_ref(&deleted))?; - if let Some(ref database) = self.db { - index::remove_node(database, key, &uuid)?; - } + index::remove_node(&txn, key)?; + txn.commit()?; Ok(()) } @@ -131,6 +136,7 @@ impl Store { if self.contains_key(new_key)? { bail!("Key '{}' already exists", new_key); } + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let node = self.get_node(old_key)? .ok_or_else(|| anyhow!("No node '{}'", old_key))?; @@ -150,48 +156,44 @@ impl Store { tombstone.provenance = prov; tombstone.timestamp = now_epoch(); - // Persist node changes - let offset = self.append_nodes(&[renamed.clone(), tombstone.clone()])?; + // Find relations touching this node's UUID (read before txn) + let node_uuid = node.uuid; + let edges = index::edges_for_node(db, &node_uuid)?; - // Update node index: remove old key, add renamed - if let Some(ref database) = self.db { - index::remove_node(database, old_key, &tombstone.uuid)?; - index::index_node(database, new_key, offset, &renamed.uuid)?; - - // Find relations touching this node's UUID and update their key strings - let node_uuid = node.uuid; - let edges = index::edges_for_node(database, &node_uuid)?; - - // Build uuid → key map for the other endpoints - let keys = index::all_keys(database)?; - let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new(); - for k in &keys { - if let Ok(Some(u)) = index::get_uuid_for_key(database, k) { - uuid_to_key.insert(u, k.clone()); - } - } - // Update the renamed node's mapping - uuid_to_key.insert(node_uuid, new_key.to_string()); - - let mut updated_rels = Vec::new(); - for (other_uuid, strength, rel_type, is_outgoing) in edges { - let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default(); - let (src_uuid, tgt_uuid, src_key, tgt_key) = if is_outgoing { - (node_uuid, other_uuid, new_key.to_string(), other_key) - } else { - (other_uuid, node_uuid, other_key, new_key.to_string()) - }; - let mut rel = new_relation(src_uuid, tgt_uuid, - RelationType::from_u8(rel_type), strength, - &src_key, &tgt_key); - rel.version = 2; // indicate update - updated_rels.push(rel); - } - - if !updated_rels.is_empty() { - self.append_relations(&updated_rels)?; + // Build uuid → key map for the other endpoints + let keys = index::all_keys(db)?; + let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new(); + for k in &keys { + if let Ok(Some(u)) = index::get_uuid_for_key(db, k) { + uuid_to_key.insert(u, k.clone()); } } + uuid_to_key.insert(node_uuid, new_key.to_string()); + + let mut updated_rels = Vec::new(); + for (other_uuid, strength, rel_type, is_outgoing) in edges { + let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default(); + let (src_uuid, tgt_uuid, src_key, tgt_key) = if is_outgoing { + (node_uuid, other_uuid, new_key.to_string(), other_key) + } else { + (other_uuid, node_uuid, other_key, new_key.to_string()) + }; + let mut rel = new_relation(src_uuid, tgt_uuid, + RelationType::from_u8(rel_type), strength, + &src_key, &tgt_key); + rel.version = 2; // indicate update + updated_rels.push(rel); + } + + // Single transaction for all index updates + let txn = db.begin_write()?; + let offset = self.append_nodes(&[renamed.clone(), tombstone])?; + index::remove_node(&txn, old_key)?; + index::index_node(&txn, new_key, offset, &renamed.uuid)?; + if !updated_rels.is_empty() { + self.append_relations(&updated_rels)?; + } + txn.commit()?; Ok(()) } @@ -284,18 +286,20 @@ impl Store { } } - // Now mutate: remove from index and persist tombstones + // Now mutate: remove from index and persist tombstones (single txn) let pruned_count = to_remove.len(); - for (source_uuid, target_uuid, strength, rel_type, source_key, target_key) in to_remove { - if let Some(db) = &self.db { - index::remove_relation(db, &source_uuid, &target_uuid, strength, rel_type)?; + if !to_remove.is_empty() { + let txn = db.begin_write()?; + for (source_uuid, target_uuid, strength, rel_type, source_key, target_key) in to_remove { + index::remove_relation(&txn, &source_uuid, &target_uuid, strength, rel_type)?; + let mut rel = new_relation(source_uuid, target_uuid, + RelationType::from_u8(rel_type), strength, + &source_key, &target_key); + rel.deleted = true; + rel.version = 2; + self.append_relations(std::slice::from_ref(&rel))?; } - let mut rel = new_relation(source_uuid, target_uuid, - RelationType::from_u8(rel_type), strength, - &source_key, &target_key); - rel.deleted = true; - rel.version = 2; - self.append_relations(std::slice::from_ref(&rel))?; + txn.commit()?; } Ok((hubs_capped, pruned_count)) @@ -304,6 +308,7 @@ impl Store { /// Set a node's weight directly. Returns (old, new). pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> { let weight = weight.clamp(0.01, 1.0); + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let mut node = self.get_node(key)? .ok_or_else(|| anyhow!("node not found: {}", key))?; let old = node.weight; @@ -313,10 +318,10 @@ impl Store { node.weight = weight; node.version += 1; node.timestamp = now_epoch(); + let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - if let Some(ref database) = self.db { - index::index_node(database, key, offset, &node.uuid)?; - } + index::index_node(&txn, key, offset, &node.uuid)?; + txn.commit()?; Ok((old, weight)) } @@ -332,30 +337,35 @@ impl Store { .map(|n| n.uuid) .ok_or_else(|| anyhow!("target not found: {}", target))?; - // Find existing edge via index - let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; - let edges = index::edges_for_node(db, &source_uuid)?; - let existing = edges.iter().find(|(other, _, _, _)| *other == target_uuid); + // Find existing edge via index (scope the borrow) + let existing = { + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; + let edges = index::edges_for_node(db, &source_uuid)?; + edges.iter().find(|(other, _, _, _)| *other == target_uuid) + .map(|(_, s, t, _)| (*s, *t)) + }; - if let Some((_, old_strength, rel_type, _)) = existing { - let old = *old_strength; + if let Some((old_strength, rel_type)) = existing { + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; + let txn = db.begin_write()?; // Remove old edge from index, add updated one - index::remove_relation(db, &source_uuid, &target_uuid, old, *rel_type)?; - index::index_relation(db, &source_uuid, &target_uuid, strength, *rel_type)?; - + index::remove_relation(&txn, &source_uuid, &target_uuid, old_strength, rel_type)?; + index::index_relation(&txn, &source_uuid, &target_uuid, strength, rel_type)?; // Append updated relation to log let mut rel = new_relation(source_uuid, target_uuid, - RelationType::from_u8(*rel_type), strength, source, target); + RelationType::from_u8(rel_type), strength, source, target); rel.version = 2; // indicate update self.append_relations(std::slice::from_ref(&rel))?; - Ok(old) + txn.commit()?; + Ok(old_strength) } else { - // Create new link + // Create new link then update its strength self.add_link(source, target, "link_set")?; - // Update its strength let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; - index::remove_relation(db, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?; - index::index_relation(db, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?; + let txn = db.begin_write()?; + index::remove_relation(&txn, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?; + index::index_relation(&txn, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?; + txn.commit()?; Ok(0.0) } }