forked from kent/consciousness
store: convert more callers to use RELS index
Convert remaining Vec users to index-based access: - memory.rs: MemoryNode::from_store uses Store::neighbors() - graph.rs: orphan detection uses for_each_relation - local.rs: normalize_strengths uses for_each_relation + set_link_strength Add Store::neighbors() method and index::get_offsets_for_uuid(). Cleanup: - for_each_relation: build both uuid↔key maps in one pass - cap_degree: consolidate key/uuid/degree collection Remaining Vec uses: admin.rs (fsck, dedup), capnp.rs (load path). Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
5fe51fbfda
commit
5832e57970
8 changed files with 109 additions and 81 deletions
|
|
@ -882,18 +882,18 @@ pub fn health_report(graph: &Graph, store: &Store) -> String {
|
|||
.count();
|
||||
|
||||
// Orphan edges: relations referencing non-existent nodes
|
||||
// With index-based lookup, we count edges where endpoints don't resolve
|
||||
let mut orphan_edges = 0usize;
|
||||
let mut missing_nodes: HashSet<String> = HashSet::new();
|
||||
for rel in &store.relations {
|
||||
if rel.deleted { continue; }
|
||||
let s_missing = !store.contains_key(&rel.source_key).unwrap_or(false);
|
||||
let t_missing = !store.contains_key(&rel.target_key).unwrap_or(false);
|
||||
store.for_each_relation(|source, target, _, _| {
|
||||
let s_missing = !store.contains_key(source).unwrap_or(false);
|
||||
let t_missing = !store.contains_key(target).unwrap_or(false);
|
||||
if s_missing || t_missing {
|
||||
orphan_edges += 1;
|
||||
if s_missing { missing_nodes.insert(rel.source_key.clone()); }
|
||||
if t_missing { missing_nodes.insert(rel.target_key.clone()); }
|
||||
if s_missing { missing_nodes.insert(source.to_string()); }
|
||||
if t_missing { missing_nodes.insert(target.to_string()); }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// NodeType breakdown
|
||||
let mut type_counts: HashMap<&str, usize> = HashMap::new();
|
||||
|
|
|
|||
|
|
@ -397,43 +397,46 @@ pub fn graph_communities(store: &Store, _provenance: &str, top_n: Option<usize>,
|
|||
}
|
||||
|
||||
pub fn graph_normalize_strengths(store: &mut Store, _provenance: &str, apply: Option<bool>) -> Result<String> {
|
||||
use crate::store::{StoreView, RelationType};
|
||||
|
||||
let apply = apply.unwrap_or(false);
|
||||
let graph = store.build_graph();
|
||||
let strengths = graph.jaccard_strengths();
|
||||
|
||||
// Build lookup from (source_key, target_key) → new_strength
|
||||
let mut updates: std::collections::HashMap<(String, String), f32> = std::collections::HashMap::new();
|
||||
let mut target_strengths: std::collections::HashMap<(String, String), f32> = std::collections::HashMap::new();
|
||||
for (a, b, s) in &strengths {
|
||||
updates.insert((a.clone(), b.clone()), *s);
|
||||
updates.insert((b.clone(), a.clone()), *s);
|
||||
target_strengths.insert((a.clone(), b.clone()), *s);
|
||||
target_strengths.insert((b.clone(), a.clone()), *s);
|
||||
}
|
||||
|
||||
let mut changed = 0usize;
|
||||
// Collect edges and compute changes
|
||||
let mut to_update: Vec<(String, String, f32)> = Vec::new();
|
||||
let mut unchanged = 0usize;
|
||||
let mut temporal_skipped = 0usize;
|
||||
let mut delta_sum: f64 = 0.0;
|
||||
let mut buckets = [0usize; 10];
|
||||
|
||||
for rel in &mut store.relations {
|
||||
if rel.deleted { continue; }
|
||||
if rel.strength == 1.0 && rel.rel_type == crate::store::RelationType::Auto {
|
||||
store.for_each_relation(|source, target, strength, rel_type| {
|
||||
// Skip temporal links
|
||||
if strength == 1.0 && rel_type == RelationType::Auto {
|
||||
temporal_skipped += 1;
|
||||
continue;
|
||||
return;
|
||||
}
|
||||
if let Some(&new_s) = updates.get(&(rel.source_key.clone(), rel.target_key.clone())) {
|
||||
let old_s = rel.strength;
|
||||
let delta = (new_s - old_s).abs();
|
||||
if let Some(&new_s) = target_strengths.get(&(source.to_string(), target.to_string())) {
|
||||
let delta = (new_s - strength).abs();
|
||||
if delta > 0.001 {
|
||||
delta_sum += delta as f64;
|
||||
if apply { rel.strength = new_s; }
|
||||
changed += 1;
|
||||
to_update.push((source.to_string(), target.to_string(), new_s));
|
||||
} else {
|
||||
unchanged += 1;
|
||||
}
|
||||
let bucket = ((new_s * 10.0) as usize).min(9);
|
||||
buckets[bucket] += 1;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let changed = to_update.len();
|
||||
|
||||
use std::fmt::Write;
|
||||
let mut out = String::new();
|
||||
|
|
@ -455,7 +458,9 @@ pub fn graph_normalize_strengths(store: &mut Store, _provenance: &str, apply: Op
|
|||
}
|
||||
|
||||
if apply {
|
||||
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
|
||||
for (source, target, new_strength) in to_update {
|
||||
store.set_link_strength(&source, &target, new_strength)?;
|
||||
}
|
||||
writeln!(out, "\nApplied {} strength updates.", changed).ok();
|
||||
} else {
|
||||
writeln!(out, "\nDry run. Pass apply:true to write changes.").ok();
|
||||
|
|
|
|||
|
|
@ -34,30 +34,24 @@ impl MemoryNode {
|
|||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let mut neighbors: std::collections::HashMap<&str, (f32, bool)> = std::collections::HashMap::new();
|
||||
for r in &store.relations {
|
||||
if r.deleted { continue; }
|
||||
let neighbor_key = if r.source_key == key {
|
||||
&r.target_key
|
||||
} else if r.target_key == key {
|
||||
&r.source_key
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
// Get neighbors via index
|
||||
let mut neighbors: std::collections::HashMap<String, (f32, bool)> = std::collections::HashMap::new();
|
||||
if let Ok(neighbor_list) = store.neighbors(key) {
|
||||
for (neighbor_key, strength) in neighbor_list {
|
||||
let is_new = older_than > 0 && store.get_node(&neighbor_key)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|n| n.created_at > older_than)
|
||||
.unwrap_or(false);
|
||||
|
||||
let is_new = older_than > 0 && store.get_node(neighbor_key)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|n| n.created_at > older_than)
|
||||
.unwrap_or(false);
|
||||
|
||||
let e = neighbors.entry(neighbor_key.as_str()).or_insert((0.0, false));
|
||||
e.0 = e.0.max(r.strength);
|
||||
e.1 = e.1 || is_new;
|
||||
let e = neighbors.entry(neighbor_key).or_insert((0.0, false));
|
||||
e.0 = e.0.max(strength);
|
||||
e.1 = e.1 || is_new;
|
||||
}
|
||||
}
|
||||
|
||||
let mut links: Vec<(String, f32, bool)> = neighbors.into_iter()
|
||||
.map(|(k, (s, new))| (k.to_string(), s, new))
|
||||
.map(|(k, (s, new))| (k, s, new))
|
||||
.collect();
|
||||
links.sort_by(|a, b| b.1.total_cmp(&a.1));
|
||||
|
||||
|
|
|
|||
|
|
@ -99,6 +99,19 @@ pub fn get_uuid_for_key(db: &Database, key: &str) -> Result<Option<[u8; 16]>> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get all offsets for a UUID (all versions). Returns newest first.
|
||||
pub fn get_offsets_for_uuid(db: &Database, uuid: &[u8; 16]) -> Result<Vec<u64>> {
|
||||
let txn = db.begin_read()?;
|
||||
let table = txn.open_multimap_table(UUID_OFFSETS)?;
|
||||
let mut offsets = Vec::new();
|
||||
for entry in table.get(uuid.as_slice())? {
|
||||
offsets.push(entry?.value());
|
||||
}
|
||||
// Sort descending so newest (highest offset) is first
|
||||
offsets.sort_by(|a, b| b.cmp(a));
|
||||
Ok(offsets)
|
||||
}
|
||||
|
||||
/// Remove a node from the index (key mappings only; UUID history preserved).
|
||||
pub fn remove_node(db: &Database, key: &str, _uuid: &[u8; 16]) -> Result<()> {
|
||||
let txn = db.begin_write()?;
|
||||
|
|
|
|||
|
|
@ -96,6 +96,32 @@ impl Store {
|
|||
index::all_keys(db)
|
||||
}
|
||||
|
||||
/// Get neighbors of a node: (key, strength) pairs.
|
||||
pub fn neighbors(&self, key: &str) -> Result<Vec<(String, f32)>> {
|
||||
let db = self.db.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("store not loaded"))?;
|
||||
|
||||
let uuid = match index::get_uuid_for_key(db, key)? {
|
||||
Some(u) => u,
|
||||
None => return Ok(Vec::new()),
|
||||
};
|
||||
|
||||
let edges = index::edges_for_node(db, &uuid)?;
|
||||
let mut neighbors = Vec::new();
|
||||
|
||||
for (other_uuid, strength, _, _) in edges {
|
||||
// Look up key for other_uuid
|
||||
let offsets = index::get_offsets_for_uuid(db, &other_uuid)?;
|
||||
if offsets.is_empty() { continue; }
|
||||
match capnp::read_node_at_offset(offsets[0]) {
|
||||
Ok(n) if !n.deleted => neighbors.push((n.key, strength)),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(neighbors)
|
||||
}
|
||||
|
||||
/// Remove a node from the index (used after appending a tombstone).
|
||||
pub fn remove_from_index(&self, key: &str, uuid: &[u8; 16]) -> Result<()> {
|
||||
if let Some(db) = self.db.as_ref() {
|
||||
|
|
|
|||
|
|
@ -202,35 +202,28 @@ impl Store {
|
|||
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
|
||||
let keys = index::all_keys(db)?;
|
||||
|
||||
// Build uuid ↔ key maps
|
||||
// Build uuid ↔ key maps and count degrees in one pass
|
||||
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
|
||||
let mut key_to_uuid: HashMap<String, [u8; 16]> = HashMap::new();
|
||||
let mut node_info: Vec<(String, [u8; 16], usize)> = Vec::new(); // (key, uuid, degree)
|
||||
for key in &keys {
|
||||
if let Ok(Some(uuid)) = index::get_uuid_for_key(db, key) {
|
||||
let degree = index::edges_for_node(db, &uuid)?.len();
|
||||
uuid_to_key.insert(uuid, key.clone());
|
||||
key_to_uuid.insert(key.clone(), uuid);
|
||||
node_info.push((key.clone(), uuid, degree));
|
||||
}
|
||||
}
|
||||
|
||||
// Count degrees per node
|
||||
let mut node_degree: HashMap<String, usize> = HashMap::new();
|
||||
for key in &keys {
|
||||
let uuid = match key_to_uuid.get(key) {
|
||||
Some(u) => u,
|
||||
None => continue,
|
||||
};
|
||||
let edges = index::edges_for_node(db, uuid)?;
|
||||
node_degree.insert(key.clone(), edges.len());
|
||||
}
|
||||
// Build degree lookup
|
||||
let node_degree: HashMap<&str, usize> = node_info.iter()
|
||||
.map(|(k, _, d)| (k.as_str(), *d))
|
||||
.collect();
|
||||
|
||||
let mut to_delete: HashSet<([u8; 16], [u8; 16])> = HashSet::new();
|
||||
let mut hubs_capped = 0;
|
||||
|
||||
for key in &keys {
|
||||
let uuid = match key_to_uuid.get(key) {
|
||||
Some(u) => *u,
|
||||
None => continue,
|
||||
};
|
||||
for (_key, uuid, degree) in &node_info {
|
||||
if *degree <= max_degree { continue; }
|
||||
let uuid = *uuid;
|
||||
let edges = index::edges_for_node(db, &uuid)?;
|
||||
if edges.len() <= max_degree { continue; }
|
||||
|
||||
|
|
@ -239,9 +232,9 @@ impl Store {
|
|||
let mut link_edges: Vec<([u8; 16], [u8; 16], usize)> = Vec::new();
|
||||
|
||||
for (other_uuid, strength, rel_type, is_outgoing) in &edges {
|
||||
// Canonical edge direction (source < target by outgoing flag)
|
||||
// Canonical edge direction
|
||||
let (src, tgt) = if *is_outgoing { (uuid, *other_uuid) } else { (*other_uuid, uuid) };
|
||||
if to_delete.contains(&(src, tgt)) { continue; }
|
||||
if to_delete.contains(&(src, tgt)) || to_delete.contains(&(tgt, src)) { continue; }
|
||||
|
||||
let other_key = match uuid_to_key.get(other_uuid) {
|
||||
Some(k) => k,
|
||||
|
|
@ -251,7 +244,7 @@ impl Store {
|
|||
if *rel_type == RelationType::Auto as u8 {
|
||||
auto_edges.push((src, tgt, *strength));
|
||||
} else {
|
||||
let other_deg = node_degree.get(other_key).copied().unwrap_or(0);
|
||||
let other_deg = node_degree.get(other_key.as_str()).copied().unwrap_or(0);
|
||||
link_edges.push((src, tgt, other_deg));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,36 +64,33 @@ impl StoreView for Store {
|
|||
None => return,
|
||||
};
|
||||
|
||||
// Build uuid → key map by iterating all nodes once
|
||||
let mut uuid_to_key: std::collections::HashMap<[u8; 16], String> = std::collections::HashMap::new();
|
||||
// Build uuid ↔ key maps in one pass
|
||||
let keys = match index::all_keys(db) {
|
||||
Ok(keys) => keys,
|
||||
Err(_) => return,
|
||||
};
|
||||
let mut uuid_to_key: std::collections::HashMap<[u8; 16], String> = std::collections::HashMap::new();
|
||||
let mut key_to_uuid: std::collections::HashMap<String, [u8; 16]> = std::collections::HashMap::new();
|
||||
for key in &keys {
|
||||
if let Ok(Some(uuid)) = index::get_uuid_for_key(db, key) {
|
||||
uuid_to_key.insert(uuid, key.clone());
|
||||
key_to_uuid.insert(key.clone(), uuid);
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate edges: only process outgoing to avoid duplicates
|
||||
for key in &keys {
|
||||
let uuid = match index::get_uuid_for_key(db, key) {
|
||||
Ok(Some(u)) => u,
|
||||
_ => continue,
|
||||
};
|
||||
let edges = match index::edges_for_node(db, &uuid) {
|
||||
for (key, uuid) in &key_to_uuid {
|
||||
let edges = match index::edges_for_node(db, uuid) {
|
||||
Ok(e) => e,
|
||||
Err(_) => continue,
|
||||
};
|
||||
for (other_uuid, strength, rel_type_byte, is_outgoing) in edges {
|
||||
if !is_outgoing { continue; } // only process outgoing
|
||||
if !is_outgoing { continue; }
|
||||
let target_key = match uuid_to_key.get(&other_uuid) {
|
||||
Some(k) => k,
|
||||
None => continue, // orphan edge
|
||||
None => continue,
|
||||
};
|
||||
let rel_type = RelationType::from_u8(rel_type_byte);
|
||||
f(key, target_key, strength, rel_type);
|
||||
f(key, target_key, strength, RelationType::from_u8(rel_type_byte));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -176,12 +176,6 @@ pub fn apply_digest_links(store: &mut Store, links: &[DigestLink]) -> (usize, us
|
|||
|
||||
if source == target { skipped += 1; continue; }
|
||||
|
||||
// Check if link already exists
|
||||
let exists = store.relations.iter().any(|r|
|
||||
r.source_key == source && r.target_key == target && !r.deleted
|
||||
);
|
||||
if exists { skipped += 1; continue; }
|
||||
|
||||
let source_uuid = match store.get_node(&source).ok().flatten() {
|
||||
Some(n) => n.uuid,
|
||||
None => { skipped += 1; continue; }
|
||||
|
|
@ -191,6 +185,12 @@ pub fn apply_digest_links(store: &mut Store, links: &[DigestLink]) -> (usize, us
|
|||
None => { skipped += 1; continue; }
|
||||
};
|
||||
|
||||
// Check if link already exists via index
|
||||
let exists = store.neighbors(&source).ok()
|
||||
.map(|n| n.iter().any(|(k, _)| k == &target))
|
||||
.unwrap_or(false);
|
||||
if exists { skipped += 1; continue; }
|
||||
|
||||
let rel = new_relation(
|
||||
source_uuid, target_uuid,
|
||||
store::RelationType::Link,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue