store: index ops take WriteTransaction, mutations batch properly

Index functions now take &WriteTransaction instead of &Database,
allowing callers to batch multiple index operations in a single
transaction. Store mutations (upsert, delete, rename, etc.) now
begin_write/commit their own transactions, ensuring atomicity.

- replay_relations uses single txn for all relation indexing
- Store::db() exposes Database for callers needing txn control
- Convenience wrappers open their own txn for simple cases

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 21:44:20 -04:00
commit 4696bb8b7d
5 changed files with 151 additions and 144 deletions

View file

@ -21,19 +21,21 @@ impl Store {
node.uuid = existing.uuid;
node.version = existing.version + 1;
}
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let txn = db.begin_write()?;
let offset = self.append_nodes(&[node.clone()])?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
index::index_node(&txn, &node.key, offset, &node.uuid)?;
txn.commit()?;
Ok(())
}
/// Add a relation (appends to log + indexes)
pub fn add_relation(&mut self, rel: Relation) -> Result<()> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let txn = db.begin_write()?;
self.append_relations(std::slice::from_ref(&rel))?;
if let Some(db) = &self.db {
index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
}
index::index_relation(&txn, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
txn.commit()?;
Ok(())
}
@ -75,6 +77,8 @@ impl Store {
/// Upsert with explicit provenance (for agent-created nodes).
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
if let Some(existing) = self.get_node(key)? {
if existing.content == content {
return Ok("unchanged");
@ -84,18 +88,18 @@ impl Store {
node.provenance = provenance.to_string();
node.timestamp = now_epoch();
node.version += 1;
let txn = db.begin_write()?;
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
index::index_node(&txn, &node.key, offset, &node.uuid)?;
txn.commit()?;
Ok("updated")
} else {
let mut node = new_node(key, content);
node.provenance = provenance.to_string();
let txn = db.begin_write()?;
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
index::index_node(&txn, &node.key, offset, &node.uuid)?;
txn.commit()?;
Ok("created")
}
}
@ -103,19 +107,20 @@ impl Store {
/// Soft-delete a node (appends deleted version, removes from index).
pub fn delete_node(&mut self, key: &str) -> Result<()> {
let prov = current_provenance();
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let node = self.get_node(key)?
.ok_or_else(|| anyhow!("No node '{}'", key))?;
let uuid = node.uuid;
let mut deleted = node;
deleted.deleted = true;
deleted.version += 1;
deleted.provenance = prov;
deleted.timestamp = now_epoch();
let txn = db.begin_write()?;
self.append_nodes(std::slice::from_ref(&deleted))?;
if let Some(ref database) = self.db {
index::remove_node(database, key, &uuid)?;
}
index::remove_node(&txn, key)?;
txn.commit()?;
Ok(())
}
@ -131,6 +136,7 @@ impl Store {
if self.contains_key(new_key)? {
bail!("Key '{}' already exists", new_key);
}
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let node = self.get_node(old_key)?
.ok_or_else(|| anyhow!("No node '{}'", old_key))?;
@ -150,48 +156,44 @@ impl Store {
tombstone.provenance = prov;
tombstone.timestamp = now_epoch();
// Persist node changes
let offset = self.append_nodes(&[renamed.clone(), tombstone.clone()])?;
// Find relations touching this node's UUID (read before txn)
let node_uuid = node.uuid;
let edges = index::edges_for_node(db, &node_uuid)?;
// Update node index: remove old key, add renamed
if let Some(ref database) = self.db {
index::remove_node(database, old_key, &tombstone.uuid)?;
index::index_node(database, new_key, offset, &renamed.uuid)?;
// Find relations touching this node's UUID and update their key strings
let node_uuid = node.uuid;
let edges = index::edges_for_node(database, &node_uuid)?;
// Build uuid → key map for the other endpoints
let keys = index::all_keys(database)?;
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
for k in &keys {
if let Ok(Some(u)) = index::get_uuid_for_key(database, k) {
uuid_to_key.insert(u, k.clone());
}
}
// Update the renamed node's mapping
uuid_to_key.insert(node_uuid, new_key.to_string());
let mut updated_rels = Vec::new();
for (other_uuid, strength, rel_type, is_outgoing) in edges {
let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default();
let (src_uuid, tgt_uuid, src_key, tgt_key) = if is_outgoing {
(node_uuid, other_uuid, new_key.to_string(), other_key)
} else {
(other_uuid, node_uuid, other_key, new_key.to_string())
};
let mut rel = new_relation(src_uuid, tgt_uuid,
RelationType::from_u8(rel_type), strength,
&src_key, &tgt_key);
rel.version = 2; // indicate update
updated_rels.push(rel);
}
if !updated_rels.is_empty() {
self.append_relations(&updated_rels)?;
// Build uuid → key map for the other endpoints
let keys = index::all_keys(db)?;
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
for k in &keys {
if let Ok(Some(u)) = index::get_uuid_for_key(db, k) {
uuid_to_key.insert(u, k.clone());
}
}
uuid_to_key.insert(node_uuid, new_key.to_string());
let mut updated_rels = Vec::new();
for (other_uuid, strength, rel_type, is_outgoing) in edges {
let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default();
let (src_uuid, tgt_uuid, src_key, tgt_key) = if is_outgoing {
(node_uuid, other_uuid, new_key.to_string(), other_key)
} else {
(other_uuid, node_uuid, other_key, new_key.to_string())
};
let mut rel = new_relation(src_uuid, tgt_uuid,
RelationType::from_u8(rel_type), strength,
&src_key, &tgt_key);
rel.version = 2; // indicate update
updated_rels.push(rel);
}
// Single transaction for all index updates
let txn = db.begin_write()?;
let offset = self.append_nodes(&[renamed.clone(), tombstone])?;
index::remove_node(&txn, old_key)?;
index::index_node(&txn, new_key, offset, &renamed.uuid)?;
if !updated_rels.is_empty() {
self.append_relations(&updated_rels)?;
}
txn.commit()?;
Ok(())
}
@ -284,18 +286,20 @@ impl Store {
}
}
// Now mutate: remove from index and persist tombstones
// Now mutate: remove from index and persist tombstones (single txn)
let pruned_count = to_remove.len();
for (source_uuid, target_uuid, strength, rel_type, source_key, target_key) in to_remove {
if let Some(db) = &self.db {
index::remove_relation(db, &source_uuid, &target_uuid, strength, rel_type)?;
if !to_remove.is_empty() {
let txn = db.begin_write()?;
for (source_uuid, target_uuid, strength, rel_type, source_key, target_key) in to_remove {
index::remove_relation(&txn, &source_uuid, &target_uuid, strength, rel_type)?;
let mut rel = new_relation(source_uuid, target_uuid,
RelationType::from_u8(rel_type), strength,
&source_key, &target_key);
rel.deleted = true;
rel.version = 2;
self.append_relations(std::slice::from_ref(&rel))?;
}
let mut rel = new_relation(source_uuid, target_uuid,
RelationType::from_u8(rel_type), strength,
&source_key, &target_key);
rel.deleted = true;
rel.version = 2;
self.append_relations(std::slice::from_ref(&rel))?;
txn.commit()?;
}
Ok((hubs_capped, pruned_count))
@ -304,6 +308,7 @@ impl Store {
/// Set a node's weight directly. Returns (old, new).
pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> {
let weight = weight.clamp(0.01, 1.0);
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let mut node = self.get_node(key)?
.ok_or_else(|| anyhow!("node not found: {}", key))?;
let old = node.weight;
@ -313,10 +318,10 @@ impl Store {
node.weight = weight;
node.version += 1;
node.timestamp = now_epoch();
let txn = db.begin_write()?;
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, key, offset, &node.uuid)?;
}
index::index_node(&txn, key, offset, &node.uuid)?;
txn.commit()?;
Ok((old, weight))
}
@ -332,30 +337,35 @@ impl Store {
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("target not found: {}", target))?;
// Find existing edge via index
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let edges = index::edges_for_node(db, &source_uuid)?;
let existing = edges.iter().find(|(other, _, _, _)| *other == target_uuid);
// Find existing edge via index (scope the borrow)
let existing = {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let edges = index::edges_for_node(db, &source_uuid)?;
edges.iter().find(|(other, _, _, _)| *other == target_uuid)
.map(|(_, s, t, _)| (*s, *t))
};
if let Some((_, old_strength, rel_type, _)) = existing {
let old = *old_strength;
if let Some((old_strength, rel_type)) = existing {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let txn = db.begin_write()?;
// Remove old edge from index, add updated one
index::remove_relation(db, &source_uuid, &target_uuid, old, *rel_type)?;
index::index_relation(db, &source_uuid, &target_uuid, strength, *rel_type)?;
index::remove_relation(&txn, &source_uuid, &target_uuid, old_strength, rel_type)?;
index::index_relation(&txn, &source_uuid, &target_uuid, strength, rel_type)?;
// Append updated relation to log
let mut rel = new_relation(source_uuid, target_uuid,
RelationType::from_u8(*rel_type), strength, source, target);
RelationType::from_u8(rel_type), strength, source, target);
rel.version = 2; // indicate update
self.append_relations(std::slice::from_ref(&rel))?;
Ok(old)
txn.commit()?;
Ok(old_strength)
} else {
// Create new link
// Create new link then update its strength
self.add_link(source, target, "link_set")?;
// Update its strength
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
index::remove_relation(db, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?;
index::index_relation(db, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?;
let txn = db.begin_write()?;
index::remove_relation(&txn, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?;
index::index_relation(&txn, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?;
txn.commit()?;
Ok(0.0)
}
}