diff --git a/src/cli/admin.rs b/src/cli/admin.rs index a9cca43..765c583 100644 --- a/src/cli/admin.rs +++ b/src/cli/admin.rs @@ -113,7 +113,7 @@ pub async fn cmd_fsck() -> Result<()> { } pub async fn cmd_dedup(apply: bool) -> Result<()> { - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; let arc = memory::access_local()?; let mut store = arc.lock().await; @@ -210,6 +210,14 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> { .chain(diverged_groups) .collect(); + // Build uuid → key map for relation key strings + let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new(); + for key in store.all_keys()? { + if let Ok(Some(node)) = store.get_node(&key) { + uuid_to_key.insert(node.uuid, key); + } + } + let mut merged = 0usize; let mut edges_redirected = 0usize; let mut edges_deduped = 0usize; @@ -219,50 +227,87 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> { copies.sort_by(|a, b| b.1.cmp(&a.1).then(b.0.version.cmp(&a.0.version))); let survivor_uuid = copies[0].0.uuid; + let survivor_key = uuid_to_key.get(&survivor_uuid).cloned().unwrap_or_default(); let doomed_uuids: Vec<[u8; 16]> = copies[1..].iter().map(|c| c.0.uuid).collect(); - // Redirect edges from doomed UUIDs to survivor - let mut updated_rels = Vec::new(); - for rel in &mut store.relations { - if rel.deleted { continue; } - let mut changed = false; - if doomed_uuids.contains(&rel.source) { - rel.source = survivor_uuid; - changed = true; - } - if doomed_uuids.contains(&rel.target) { - rel.target = survivor_uuid; - changed = true; - } - if changed { - rel.version += 1; - updated_rels.push(rel.clone()); + // Redirect edges from doomed UUIDs to survivor via index iteration + for doomed_uuid in &doomed_uuids { + let edges = store.edges_for_uuid(doomed_uuid)?; + for (other_uuid, strength, rel_type, is_outgoing) in edges { + let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default(); + + // Remove old edge from index + let (old_src, old_tgt) = if is_outgoing { + (*doomed_uuid, other_uuid) + } else { + (other_uuid, *doomed_uuid) + }; + store.remove_relation_from_index(&old_src, &old_tgt, strength, rel_type)?; + + // Add redirected edge + let (new_src, new_tgt, src_key, tgt_key) = if is_outgoing { + (survivor_uuid, other_uuid, survivor_key.clone(), other_key) + } else { + (other_uuid, survivor_uuid, other_key, survivor_key.clone()) + }; + store.index_relation(&new_src, &new_tgt, strength, rel_type)?; + + // Append tombstone for old + new relation to log + let mut tombstone = store::new_relation( + old_src, old_tgt, + store::RelationType::from_u8(rel_type), strength, + &uuid_to_key.get(&old_src).cloned().unwrap_or_default(), + &uuid_to_key.get(&old_tgt).cloned().unwrap_or_default(), + ); + tombstone.deleted = true; + tombstone.version = 2; + + let mut redirected = store::new_relation( + new_src, new_tgt, + store::RelationType::from_u8(rel_type), strength, + &src_key, &tgt_key, + ); + redirected.version = 2; + + store.append_relations(&[tombstone, redirected])?; edges_redirected += 1; } } - // Dedup edges: same (source, target, rel_type) → keep highest strength - let mut seen: HashSet<([u8; 16], [u8; 16], String)> = HashSet::new(); - let mut to_tombstone_rels = Vec::new(); - // Sort by strength descending so we keep the strongest - let mut rels_with_idx: Vec<(usize, &store::Relation)> = store.relations.iter() - .enumerate() - .filter(|(_, r)| !r.deleted && (r.source == survivor_uuid || r.target == survivor_uuid)) - .collect(); - rels_with_idx.sort_by(|a, b| b.1.strength.total_cmp(&a.1.strength)); - - for (idx, rel) in &rels_with_idx { - let edge_key = (rel.source, rel.target, format!("{:?}", rel.rel_type)); - if !seen.insert(edge_key) { - to_tombstone_rels.push(*idx); - edges_deduped += 1; - } + // Dedup edges: same (other_uuid, rel_type) → keep highest strength + // Group edges by (other, type), sort each group by strength desc, tombstone extras + let edges = store.edges_for_uuid(&survivor_uuid)?; + let mut by_endpoint: HashMap<([u8; 16], u8), Vec<(f32, bool)>> = HashMap::new(); + for (other_uuid, strength, rel_type, is_outgoing) in edges { + by_endpoint.entry((other_uuid, rel_type)) + .or_default() + .push((strength, is_outgoing)); } - for &idx in &to_tombstone_rels { - store.relations[idx].deleted = true; - store.relations[idx].version += 1; - updated_rels.push(store.relations[idx].clone()); + for ((other_uuid, rel_type), mut variants) in by_endpoint { + if variants.len() <= 1 { continue; } + // Sort by strength descending, keep first + variants.sort_by(|a, b| b.0.total_cmp(&a.0)); + let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default(); + + for (strength, is_outgoing) in variants.into_iter().skip(1) { + let (src, tgt, src_key, tgt_key) = if is_outgoing { + (survivor_uuid, other_uuid, survivor_key.clone(), other_key.clone()) + } else { + (other_uuid, survivor_uuid, other_key.clone(), survivor_key.clone()) + }; + store.remove_relation_from_index(&src, &tgt, strength, rel_type)?; + + let mut tombstone = store::new_relation( + src, tgt, + store::RelationType::from_u8(rel_type), strength, + &src_key, &tgt_key, + ); + tombstone.deleted = true; + tombstone.version = 2; + store.append_relations(&[tombstone])?; + edges_deduped += 1; + } } // Tombstone doomed nodes @@ -275,9 +320,6 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> { } store.append_nodes(&tombstones)?; - if !updated_rels.is_empty() { - store.append_relations(&updated_rels)?; - } // Remove doomed nodes from index for (doomed_node, _) in &copies[1..] { @@ -287,9 +329,6 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> { merged += doomed_uuids.len(); } - // Remove tombstoned relations from cache and rebuild index - store.relations.retain(|r| !r.deleted); - store.reindex_relations()?; store.save()?; println!("Merged {} duplicates, redirected {} edges, deduped {} duplicate edges", diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index 0abc5b4..b194ad4 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -272,12 +272,7 @@ impl Store { store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); - // Drop edges referencing deleted/missing nodes - let db = store.db.as_ref().unwrap(); - store.relations.retain(|r| - index::contains_key(db, &r.source_key).unwrap_or(false) && - index::contains_key(db, &r.target_key).unwrap_or(false) - ); + // Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped) Ok(store) } @@ -359,13 +354,10 @@ impl Store { } } - self.relations = by_uuid.into_values() - .filter(|r| !r.deleted) - .collect(); - - // Index relations in redb + // Index relations directly (no Vec intermediate) if let Some(db) = &self.db { - for rel in &self.relations { + for rel in by_uuid.into_values() { + if rel.deleted { continue; } index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; } } diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index 6615f72..43bbb4f 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -209,18 +209,6 @@ pub fn remove_relation( Ok(()) } -/// Clear all relations from the index. -pub fn clear_relations(db: &Database) -> Result<()> { - let txn = db.begin_write()?; - { - // Drop and recreate the table - txn.delete_multimap_table(RELS)?; - let _ = txn.open_multimap_table(RELS)?; - } - txn.commit()?; - Ok(()) -} - /// Get all edges for a node. Returns (other_uuid, strength, rel_type, is_outgoing). pub fn edges_for_node(db: &Database, node_uuid: &[u8; 16]) -> Result> { let txn = db.begin_read()?; diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index eef031a..b5e7ddd 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -47,7 +47,6 @@ pub fn strip_md_suffix(key: &str) -> String { // The full in-memory store pub struct Store { - pub relations: Vec, // all active relations /// Log sizes at load time — used for staleness detection. pub(crate) loaded_nodes_size: u64, pub(crate) loaded_rels_size: u64, @@ -58,7 +57,6 @@ pub struct Store { impl Default for Store { fn default() -> Self { Store { - relations: Vec::new(), loaded_nodes_size: 0, loaded_rels_size: 0, db: None, @@ -130,14 +128,25 @@ impl Store { Ok(()) } - /// Rebuild relation index from Vec. Call after mutations that modify relations. - pub fn reindex_relations(&self) -> Result<()> { + /// Get all edges for a node by UUID. Returns (other_uuid, strength, rel_type, is_outgoing). + pub fn edges_for_uuid(&self, uuid: &[u8; 16]) -> Result> { + let db = self.db.as_ref() + .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; + index::edges_for_node(db, uuid) + } + + /// Add a relation to the index. + pub fn index_relation(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> { if let Some(db) = self.db.as_ref() { - index::clear_relations(db)?; - for rel in &self.relations { - if rel.deleted { continue; } - index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; - } + index::index_relation(db, source, target, strength, rel_type)?; + } + Ok(()) + } + + /// Remove a relation from the index. + pub fn remove_relation_from_index(&self, source: &[u8; 16], target: &[u8; 16], strength: f32, rel_type: u8) -> Result<()> { + if let Some(db) = self.db.as_ref() { + index::remove_relation(db, source, target, strength, rel_type)?; } Ok(()) } diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index 92a380d..a978ab6 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -28,13 +28,12 @@ impl Store { Ok(()) } - /// Add a relation (appends to log + updates cache + indexes) + /// Add a relation (appends to log + indexes) pub fn add_relation(&mut self, rel: Relation) -> Result<()> { self.append_relations(std::slice::from_ref(&rel))?; if let Some(db) = &self.db { index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?; } - self.relations.push(rel); Ok(()) }