forked from kent/consciousness
store: remove Vec<Relation>, dedup uses index iteration
The relations Vec is gone from Store. dedup now iterates via edges_for_uuid() instead of mutating in-memory Vec — removes/re-adds edges through the index directly. Removed load_relations_vec() and clear_relations() — no longer needed. Added helper methods: edges_for_uuid, index_relation, remove_relation_from_index. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
c2de14dcab
commit
2548ca059d
5 changed files with 105 additions and 78 deletions
123
src/cli/admin.rs
123
src/cli/admin.rs
|
|
@ -113,7 +113,7 @@ pub async fn cmd_fsck() -> Result<()> {
|
|||
}
|
||||
|
||||
pub async fn cmd_dedup(apply: bool) -> Result<()> {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
let arc = memory::access_local()?;
|
||||
let mut store = arc.lock().await;
|
||||
|
|
@ -210,6 +210,14 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> {
|
|||
.chain(diverged_groups)
|
||||
.collect();
|
||||
|
||||
// Build uuid → key map for relation key strings
|
||||
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
|
||||
for key in store.all_keys()? {
|
||||
if let Ok(Some(node)) = store.get_node(&key) {
|
||||
uuid_to_key.insert(node.uuid, key);
|
||||
}
|
||||
}
|
||||
|
||||
let mut merged = 0usize;
|
||||
let mut edges_redirected = 0usize;
|
||||
let mut edges_deduped = 0usize;
|
||||
|
|
@ -219,52 +227,89 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> {
|
|||
copies.sort_by(|a, b| b.1.cmp(&a.1).then(b.0.version.cmp(&a.0.version)));
|
||||
|
||||
let survivor_uuid = copies[0].0.uuid;
|
||||
let survivor_key = uuid_to_key.get(&survivor_uuid).cloned().unwrap_or_default();
|
||||
let doomed_uuids: Vec<[u8; 16]> = copies[1..].iter().map(|c| c.0.uuid).collect();
|
||||
|
||||
// Redirect edges from doomed UUIDs to survivor
|
||||
let mut updated_rels = Vec::new();
|
||||
for rel in &mut store.relations {
|
||||
if rel.deleted { continue; }
|
||||
let mut changed = false;
|
||||
if doomed_uuids.contains(&rel.source) {
|
||||
rel.source = survivor_uuid;
|
||||
changed = true;
|
||||
}
|
||||
if doomed_uuids.contains(&rel.target) {
|
||||
rel.target = survivor_uuid;
|
||||
changed = true;
|
||||
}
|
||||
if changed {
|
||||
rel.version += 1;
|
||||
updated_rels.push(rel.clone());
|
||||
// Redirect edges from doomed UUIDs to survivor via index iteration
|
||||
for doomed_uuid in &doomed_uuids {
|
||||
let edges = store.edges_for_uuid(doomed_uuid)?;
|
||||
for (other_uuid, strength, rel_type, is_outgoing) in edges {
|
||||
let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default();
|
||||
|
||||
// Remove old edge from index
|
||||
let (old_src, old_tgt) = if is_outgoing {
|
||||
(*doomed_uuid, other_uuid)
|
||||
} else {
|
||||
(other_uuid, *doomed_uuid)
|
||||
};
|
||||
store.remove_relation_from_index(&old_src, &old_tgt, strength, rel_type)?;
|
||||
|
||||
// Add redirected edge
|
||||
let (new_src, new_tgt, src_key, tgt_key) = if is_outgoing {
|
||||
(survivor_uuid, other_uuid, survivor_key.clone(), other_key)
|
||||
} else {
|
||||
(other_uuid, survivor_uuid, other_key, survivor_key.clone())
|
||||
};
|
||||
store.index_relation(&new_src, &new_tgt, strength, rel_type)?;
|
||||
|
||||
// Append tombstone for old + new relation to log
|
||||
let mut tombstone = store::new_relation(
|
||||
old_src, old_tgt,
|
||||
store::RelationType::from_u8(rel_type), strength,
|
||||
&uuid_to_key.get(&old_src).cloned().unwrap_or_default(),
|
||||
&uuid_to_key.get(&old_tgt).cloned().unwrap_or_default(),
|
||||
);
|
||||
tombstone.deleted = true;
|
||||
tombstone.version = 2;
|
||||
|
||||
let mut redirected = store::new_relation(
|
||||
new_src, new_tgt,
|
||||
store::RelationType::from_u8(rel_type), strength,
|
||||
&src_key, &tgt_key,
|
||||
);
|
||||
redirected.version = 2;
|
||||
|
||||
store.append_relations(&[tombstone, redirected])?;
|
||||
edges_redirected += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Dedup edges: same (source, target, rel_type) → keep highest strength
|
||||
let mut seen: HashSet<([u8; 16], [u8; 16], String)> = HashSet::new();
|
||||
let mut to_tombstone_rels = Vec::new();
|
||||
// Sort by strength descending so we keep the strongest
|
||||
let mut rels_with_idx: Vec<(usize, &store::Relation)> = store.relations.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, r)| !r.deleted && (r.source == survivor_uuid || r.target == survivor_uuid))
|
||||
.collect();
|
||||
rels_with_idx.sort_by(|a, b| b.1.strength.total_cmp(&a.1.strength));
|
||||
// Dedup edges: same (other_uuid, rel_type) → keep highest strength
|
||||
// Group edges by (other, type), sort each group by strength desc, tombstone extras
|
||||
let edges = store.edges_for_uuid(&survivor_uuid)?;
|
||||
let mut by_endpoint: HashMap<([u8; 16], u8), Vec<(f32, bool)>> = HashMap::new();
|
||||
for (other_uuid, strength, rel_type, is_outgoing) in edges {
|
||||
by_endpoint.entry((other_uuid, rel_type))
|
||||
.or_default()
|
||||
.push((strength, is_outgoing));
|
||||
}
|
||||
|
||||
for (idx, rel) in &rels_with_idx {
|
||||
let edge_key = (rel.source, rel.target, format!("{:?}", rel.rel_type));
|
||||
if !seen.insert(edge_key) {
|
||||
to_tombstone_rels.push(*idx);
|
||||
for ((other_uuid, rel_type), mut variants) in by_endpoint {
|
||||
if variants.len() <= 1 { continue; }
|
||||
// Sort by strength descending, keep first
|
||||
variants.sort_by(|a, b| b.0.total_cmp(&a.0));
|
||||
let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default();
|
||||
|
||||
for (strength, is_outgoing) in variants.into_iter().skip(1) {
|
||||
let (src, tgt, src_key, tgt_key) = if is_outgoing {
|
||||
(survivor_uuid, other_uuid, survivor_key.clone(), other_key.clone())
|
||||
} else {
|
||||
(other_uuid, survivor_uuid, other_key.clone(), survivor_key.clone())
|
||||
};
|
||||
store.remove_relation_from_index(&src, &tgt, strength, rel_type)?;
|
||||
|
||||
let mut tombstone = store::new_relation(
|
||||
src, tgt,
|
||||
store::RelationType::from_u8(rel_type), strength,
|
||||
&src_key, &tgt_key,
|
||||
);
|
||||
tombstone.deleted = true;
|
||||
tombstone.version = 2;
|
||||
store.append_relations(&[tombstone])?;
|
||||
edges_deduped += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for &idx in &to_tombstone_rels {
|
||||
store.relations[idx].deleted = true;
|
||||
store.relations[idx].version += 1;
|
||||
updated_rels.push(store.relations[idx].clone());
|
||||
}
|
||||
|
||||
// Tombstone doomed nodes
|
||||
let mut tombstones = Vec::new();
|
||||
for (doomed_node, _) in &copies[1..] {
|
||||
|
|
@ -275,9 +320,6 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> {
|
|||
}
|
||||
|
||||
store.append_nodes(&tombstones)?;
|
||||
if !updated_rels.is_empty() {
|
||||
store.append_relations(&updated_rels)?;
|
||||
}
|
||||
|
||||
// Remove doomed nodes from index
|
||||
for (doomed_node, _) in &copies[1..] {
|
||||
|
|
@ -287,9 +329,6 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> {
|
|||
merged += doomed_uuids.len();
|
||||
}
|
||||
|
||||
// Remove tombstoned relations from cache and rebuild index
|
||||
store.relations.retain(|r| !r.deleted);
|
||||
store.reindex_relations()?;
|
||||
store.save()?;
|
||||
|
||||
println!("Merged {} duplicates, redirected {} edges, deduped {} duplicate edges",
|
||||
|
|
|
|||
|
|
@ -272,12 +272,7 @@ impl Store {
|
|||
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
||||
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
// Drop edges referencing deleted/missing nodes
|
||||
let db = store.db.as_ref().unwrap();
|
||||
store.relations.retain(|r|
|
||||
index::contains_key(db, &r.source_key).unwrap_or(false) &&
|
||||
index::contains_key(db, &r.target_key).unwrap_or(false)
|
||||
);
|
||||
// Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped)
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
|
@ -359,13 +354,10 @@ impl Store {
|
|||
}
|
||||
}
|
||||
|
||||
self.relations = by_uuid.into_values()
|
||||
.filter(|r| !r.deleted)
|
||||
.collect();
|
||||
|
||||
// Index relations in redb
|
||||
// Index relations directly (no Vec intermediate)
|
||||
if let Some(db) = &self.db {
|
||||
for rel in &self.relations {
|
||||
for rel in by_uuid.into_values() {
|
||||
if rel.deleted { continue; }
|
||||
index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -209,18 +209,6 @@ pub fn remove_relation(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear all relations from the index.
|
||||
pub fn clear_relations(db: &Database) -> Result<()> {
|
||||
let txn = db.begin_write()?;
|
||||
{
|
||||
// Drop and recreate the table
|
||||
txn.delete_multimap_table(RELS)?;
|
||||
let _ = txn.open_multimap_table(RELS)?;
|
||||
}
|
||||
txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all edges for a node. Returns (other_uuid, strength, rel_type, is_outgoing).
|
||||
pub fn edges_for_node(db: &Database, node_uuid: &[u8; 16]) -> Result<Vec<([u8; 16], f32, u8, bool)>> {
|
||||
let txn = db.begin_read()?;
|
||||
|
|
|
|||
|
|
@ -47,7 +47,6 @@ pub fn strip_md_suffix(key: &str) -> String {
|
|||
|
||||
// The full in-memory store
|
||||
pub struct Store {
|
||||
pub relations: Vec<Relation>, // all active relations
|
||||
/// Log sizes at load time — used for staleness detection.
|
||||
pub(crate) loaded_nodes_size: u64,
|
||||
pub(crate) loaded_rels_size: u64,
|
||||
|
|
@ -58,7 +57,6 @@ pub struct Store {
|
|||
impl Default for Store {
|
||||
fn default() -> Self {
|
||||
Store {
|
||||
relations: Vec::new(),
|
||||
loaded_nodes_size: 0,
|
||||
loaded_rels_size: 0,
|
||||
db: None,
|
||||
|
|
@ -130,14 +128,25 @@ impl Store {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Rebuild relation index from Vec. Call after mutations that modify relations.
|
||||
pub fn reindex_relations(&self) -> Result<()> {
|
||||
if let Some(db) = self.db.as_ref() {
|
||||
index::clear_relations(db)?;
|
||||
for rel in &self.relations {
|
||||
if rel.deleted { continue; }
|
||||
index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
|
||||
/// Get all edges for a node by UUID. Returns (other_uuid, strength, rel_type, is_outgoing).
|
||||
pub fn edges_for_uuid(&self, uuid: &[u8; 16]) -> Result<Vec<([u8; 16], f32, u8, bool)>> {
|
||||
let db = self.db.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("store not loaded"))?;
|
||||
index::edges_for_node(db, uuid)
|
||||
}
|
||||
|
||||
/// Add a relation to the index.
|
||||
pub fn index_relation(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> {
|
||||
if let Some(db) = self.db.as_ref() {
|
||||
index::index_relation(db, source, target, strength, rel_type)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a relation from the index.
|
||||
pub fn remove_relation_from_index(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> {
|
||||
if let Some(db) = self.db.as_ref() {
|
||||
index::remove_relation(db, source, target, strength, rel_type)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,13 +28,12 @@ impl Store {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a relation (appends to log + updates cache + indexes)
|
||||
/// Add a relation (appends to log + indexes)
|
||||
pub fn add_relation(&mut self, rel: Relation) -> Result<()> {
|
||||
self.append_relations(std::slice::from_ref(&rel))?;
|
||||
if let Some(db) = &self.db {
|
||||
index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
|
||||
}
|
||||
self.relations.push(rel);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue