store: remove nodes and uuid_to_key HashMaps

All node access now goes through index → capnp:
- scoring.rs: consolidation_priority, replay_queue, consolidation_plan
- admin.rs: cmd_init, cmd_fsck, cmd_dedup
- engine.rs: run_generator, eval_filter, run_transform
- parser.rs: resolve_field, execute_query

Added Store::remove_from_index() for dedup cleanup.

The relations Vec remains for now (used for graph building).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 19:49:09 -04:00
parent af3e41f1d9
commit 5877fd857a
5 changed files with 65 additions and 47 deletions

View file

@ -28,13 +28,13 @@ pub async fn cmd_init() -> Result<()> {
// Seed identity node if empty // Seed identity node if empty
let arc = memory::access_local()?; let arc = memory::access_local()?;
let mut store = arc.lock().await; let mut store = arc.lock().await;
if !store.nodes.contains_key("identity") { if !store.contains_key("identity").unwrap_or(false) {
let default = include_str!("../../defaults/identity.md"); let default = include_str!("../../defaults/identity.md");
store.upsert("identity", default)?; store.upsert("identity", default)?;
println!("Seeded identity in store"); println!("Seeded identity in store");
} }
store.save()?; store.save()?;
println!("Initialized with {} nodes", store.nodes.len()); println!("Initialized with {} nodes", store.all_keys().unwrap_or_default().len());
// Create config if none exists // Create config if none exists
let config_path = std::env::var("POC_MEMORY_CONFIG") let config_path = std::env::var("POC_MEMORY_CONFIG")
@ -65,22 +65,25 @@ pub async fn cmd_fsck() -> Result<()> {
// Check node-key consistency // Check node-key consistency
let mut issues = 0; let mut issues = 0;
for (key, node) in &store.nodes { let all_keys = store.all_keys().unwrap_or_default();
for key in &all_keys {
if let Ok(Some(node)) = store.get_node(key) {
if key != &node.key { if key != &node.key {
eprintln!("MISMATCH: map key '{}' vs node.key '{}'", key, node.key); eprintln!("MISMATCH: map key '{}' vs node.key '{}'", key, node.key);
issues += 1; issues += 1;
} }
} }
}
// Check edge endpoints // Check edge endpoints
let mut dangling = 0; let mut dangling = 0;
for rel in &store.relations { for rel in &store.relations {
if rel.deleted { continue; } if rel.deleted { continue; }
if !store.nodes.contains_key(&rel.source_key) { if !store.contains_key(&rel.source_key).unwrap_or(false) {
eprintln!("DANGLING: edge source '{}'", rel.source_key); eprintln!("DANGLING: edge source '{}'", rel.source_key);
dangling += 1; dangling += 1;
} }
if !store.nodes.contains_key(&rel.target_key) { if !store.contains_key(&rel.target_key).unwrap_or(false) {
eprintln!("DANGLING: edge target '{}'", rel.target_key); eprintln!("DANGLING: edge target '{}'", rel.target_key);
dangling += 1; dangling += 1;
} }
@ -90,8 +93,8 @@ pub async fn cmd_fsck() -> Result<()> {
let mut to_tombstone = Vec::new(); let mut to_tombstone = Vec::new();
for rel in &store.relations { for rel in &store.relations {
if rel.deleted { continue; } if rel.deleted { continue; }
if !store.nodes.contains_key(&rel.source_key) if !store.contains_key(&rel.source_key).unwrap_or(false)
|| !store.nodes.contains_key(&rel.target_key) { || !store.contains_key(&rel.target_key).unwrap_or(false) {
let mut tombstone = rel.clone(); let mut tombstone = rel.clone();
tombstone.deleted = true; tombstone.deleted = true;
tombstone.version += 1; tombstone.version += 1;
@ -112,7 +115,7 @@ pub async fn cmd_fsck() -> Result<()> {
let g = store.build_graph(); let g = store.build_graph();
println!("fsck: {} nodes, {} edges, {} issues, {} dangling", println!("fsck: {} nodes, {} edges, {} issues, {} dangling",
store.nodes.len(), g.edge_count(), issues, dangling); all_keys.len(), g.edge_count(), issues, dangling);
Ok(()) Ok(())
} }
@ -276,8 +279,9 @@ pub async fn cmd_dedup(apply: bool) -> Result<()> {
store.append_relations(&updated_rels)?; store.append_relations(&updated_rels)?;
} }
for uuid in &doomed_uuids { // Remove doomed nodes from index
store.uuid_to_key.remove(uuid); for (doomed_node, _) in &copies[1..] {
store.remove_from_index(&doomed_node.key, &doomed_node.uuid)?;
} }
merged += doomed_uuids.len(); merged += doomed_uuids.len();

View file

@ -26,7 +26,7 @@ pub fn consolidation_priority(
graph: &Graph, graph: &Graph,
spectral_outlier: Option<f64>, spectral_outlier: Option<f64>,
) -> f64 { ) -> f64 {
let node = match store.nodes.get(key) { let node = match store.get_node(key).ok().flatten() {
Some(n) => n, Some(n) => n,
None => return 0.0, None => return 0.0,
}; };
@ -97,8 +97,10 @@ pub fn replay_queue_with_graph(
HashMap::new() HashMap::new()
}; };
let mut items: Vec<ReplayItem> = store.nodes.iter() let all_keys = store.all_keys().unwrap_or_default();
.map(|(key, node)| { let mut items: Vec<ReplayItem> = all_keys.iter()
.filter_map(|key| {
let node = store.get_node(key).ok()??;
let pos = positions.get(key); let pos = positions.get(key);
let outlier_score = pos.map(|p| p.outlier_score).unwrap_or(0.0); let outlier_score = pos.map(|p| p.outlier_score).unwrap_or(0.0);
let classification = pos let classification = pos
@ -109,7 +111,7 @@ pub fn replay_queue_with_graph(
store, key, graph, store, key, graph,
pos.map(|p| p.outlier_score), pos.map(|p| p.outlier_score),
); );
ReplayItem { Some(ReplayItem {
key: key.clone(), key: key.clone(),
priority, priority,
interval_days: node.spaced_repetition_interval, interval_days: node.spaced_repetition_interval,
@ -117,7 +119,7 @@ pub fn replay_queue_with_graph(
cc: graph.clustering_coefficient(key), cc: graph.clustering_coefficient(key),
classification, classification,
outlier_score, outlier_score,
} })
}) })
.collect(); .collect();
@ -214,11 +216,13 @@ fn consolidation_plan_inner(store: &Store, _detect_interf: bool) -> Consolidatio
let gini = graph.degree_gini(); let gini = graph.degree_gini();
let _avg_cc = graph.avg_clustering_coefficient(); let _avg_cc = graph.avg_clustering_coefficient();
let episodic_count = store.nodes.iter() let all_keys = store.all_keys().unwrap_or_default();
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession)) let episodic_count = all_keys.iter()
.filter_map(|k| store.get_node(k).ok()?)
.filter(|n| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.count(); .count();
let _episodic_ratio = if store.nodes.is_empty() { 0.0 } let _episodic_ratio = if all_keys.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 }; else { episodic_count as f32 / all_keys.len() as f32 };
let mut plan = ConsolidationPlan { let mut plan = ConsolidationPlan {
counts: std::collections::HashMap::new(), counts: std::collections::HashMap::new(),

View file

@ -227,10 +227,10 @@ fn score_field(
(d / max).min(1.0) (d / max).min(1.0)
} }
ScoreField::Weight => { ScoreField::Weight => {
store.nodes.get(key).map(|n| n.weight as f64).unwrap_or(0.0) store.get_node(key).ok().flatten().map(|n| n.weight as f64).unwrap_or(0.0)
} }
ScoreField::ContentLen => { ScoreField::ContentLen => {
let len = store.nodes.get(key).map(|n| n.content.len()).unwrap_or(0) as f64; let len = store.get_node(key).ok().flatten().map(|n| n.content.len()).unwrap_or(0) as f64;
let max = precomputed.max_content_len.max(1.0); let max = precomputed.max_content_len.max(1.0);
(len / max).min(1.0) (len / max).min(1.0)
} }
@ -255,7 +255,7 @@ impl CompositeCache {
.map(|(k, _)| graph.degree(k) as f64) .map(|(k, _)| graph.degree(k) as f64)
.fold(0.0f64, f64::max); .fold(0.0f64, f64::max);
let max_content_len = items.iter() let max_content_len = items.iter()
.map(|(k, _)| store.nodes.get(k).map(|n| n.content.len()).unwrap_or(0) as f64) .map(|(k, _)| store.get_node(k).ok().flatten().map(|n| n.content.len()).unwrap_or(0) as f64)
.fold(0.0f64, f64::max); .fold(0.0f64, f64::max);
Self { Self {
isolation: graph.community_isolation(), isolation: graph.community_isolation(),
@ -393,9 +393,12 @@ pub fn run_query(
fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> { fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> {
match g { match g {
Generator::All => { Generator::All => {
store.nodes.iter() store.all_keys().unwrap_or_default().into_iter()
.filter(|(_, n)| !n.deleted) .filter_map(|key| {
.map(|(key, n)| (key.clone(), n.weight as f64)) let n = store.get_node(&key).ok()??;
if n.deleted { return None; }
Some((key, n.weight as f64))
})
.collect() .collect()
} }
Generator::Match(terms) => { Generator::Match(terms) => {
@ -409,7 +412,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> {
} }
pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
let node = match store.nodes.get(key) { let node = match store.get_node(key).ok().flatten() {
Some(n) => n, Some(n) => n,
None => return false, None => return false,
}; };
@ -442,15 +445,15 @@ pub fn run_transform(
} }
SortField::Timestamp => { SortField::Timestamp => {
items.sort_by(|a, b| { items.sort_by(|a, b| {
let ta = store.nodes.get(&a.0).map(|n| n.timestamp).unwrap_or(0); let ta = store.get_node(&a.0).ok().flatten().map(|n| n.timestamp).unwrap_or(0);
let tb = store.nodes.get(&b.0).map(|n| n.timestamp).unwrap_or(0); let tb = store.get_node(&b.0).ok().flatten().map(|n| n.timestamp).unwrap_or(0);
tb.cmp(&ta) // desc tb.cmp(&ta) // desc
}); });
} }
SortField::ContentLen => { SortField::ContentLen => {
items.sort_by(|a, b| { items.sort_by(|a, b| {
let la = store.nodes.get(&a.0).map(|n| n.content.len()).unwrap_or(0); let la = store.get_node(&a.0).ok().flatten().map(|n| n.content.len()).unwrap_or(0);
let lb = store.nodes.get(&b.0).map(|n| n.content.len()).unwrap_or(0); let lb = store.get_node(&b.0).ok().flatten().map(|n| n.content.len()).unwrap_or(0);
lb.cmp(&la) // desc lb.cmp(&la) // desc
}); });
} }
@ -480,7 +483,7 @@ pub fn run_transform(
SortField::Named(field, asc) => { SortField::Named(field, asc) => {
// Resolve field from node properties // Resolve field from node properties
let resolve = |key: &str| -> Option<f64> { let resolve = |key: &str| -> Option<f64> {
let node = store.nodes.get(key)?; let node = store.get_node(key).ok()??;
match field.as_str() { match field.as_str() {
"weight" => Some(node.weight as f64), "weight" => Some(node.weight as f64),
"emotion" => Some(node.emotion as f64), "emotion" => Some(node.emotion as f64),

View file

@ -300,7 +300,7 @@ pub fn parse_stages(s: &str) -> Result<Vec<Stage>, String> {
/// Resolve a field value from a node + graph context, returning a comparable Value. /// Resolve a field value from a node + graph context, returning a comparable Value.
fn resolve_field(field: &str, key: &str, store: &Store, graph: &Graph) -> Option<Value> { fn resolve_field(field: &str, key: &str, store: &Store, graph: &Graph) -> Option<Value> {
let node = store.nodes.get(key)?; let node = store.get_node(key).ok()??;
match field { match field {
"key" => Some(Value::Str(key.to_string())), "key" => Some(Value::Str(key.to_string())),
"weight" => Some(Value::Num(node.weight as f64)), "weight" => Some(Value::Num(node.weight as f64)),
@ -491,9 +491,13 @@ fn execute_parsed(
} }
_ => { _ => {
let mut out = Vec::new(); let mut out = Vec::new();
for key in store.nodes.keys() { for key in store.all_keys().unwrap_or_default() {
if store.nodes[key].deleted { continue; } let node = match store.get_node(&key).ok().flatten() {
if eval(&q.expr, &|f| resolve_field(f, key, store, graph), store, graph) { Some(n) => n,
None => continue,
};
if node.deleted { continue; }
if eval(&q.expr, &|f| resolve_field(f, &key, store, graph), store, graph) {
out.push(QueryResult { key: key.clone(), fields: BTreeMap::new() }); out.push(QueryResult { key: key.clone(), fields: BTreeMap::new() });
} }
} }
@ -565,15 +569,15 @@ fn execute_parsed(
} }
SortField::Weight => { SortField::Weight => {
results.sort_by(|a, b| { results.sort_by(|a, b| {
let wa = store.nodes.get(&a.key).map(|n| n.weight).unwrap_or(0.0); let wa = store.get_node(&a.key).ok().flatten().map(|n| n.weight).unwrap_or(0.0);
let wb = store.nodes.get(&b.key).map(|n| n.weight).unwrap_or(0.0); let wb = store.get_node(&b.key).ok().flatten().map(|n| n.weight).unwrap_or(0.0);
wb.total_cmp(&wa) wb.total_cmp(&wa)
}); });
} }
SortField::Timestamp => { SortField::Timestamp => {
results.sort_by(|a, b| { results.sort_by(|a, b| {
let ta = store.nodes.get(&a.key).map(|n| n.timestamp).unwrap_or(0); let ta = store.get_node(&a.key).ok().flatten().map(|n| n.timestamp).unwrap_or(0);
let tb = store.nodes.get(&b.key).map(|n| n.timestamp).unwrap_or(0); let tb = store.get_node(&b.key).ok().flatten().map(|n| n.timestamp).unwrap_or(0);
tb.cmp(&ta) tb.cmp(&ta)
}); });
} }

View file

@ -32,7 +32,6 @@ pub use ops::current_provenance;
use crate::graph::{self, Graph}; use crate::graph::{self, Graph};
use std::collections::HashMap;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
/// Strip .md suffix from a key, handling both bare keys and section keys. /// Strip .md suffix from a key, handling both bare keys and section keys.
@ -48,8 +47,6 @@ pub fn strip_md_suffix(key: &str) -> String {
// The full in-memory store // The full in-memory store
pub struct Store { pub struct Store {
pub nodes: HashMap<String, Node>, // key → latest node
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
pub relations: Vec<Relation>, // all active relations pub relations: Vec<Relation>, // all active relations
/// Log sizes at load time — used for staleness detection. /// Log sizes at load time — used for staleness detection.
pub(crate) loaded_nodes_size: u64, pub(crate) loaded_nodes_size: u64,
@ -61,8 +58,6 @@ pub struct Store {
impl Default for Store { impl Default for Store {
fn default() -> Self { fn default() -> Self {
Store { Store {
nodes: HashMap::new(),
uuid_to_key: HashMap::new(),
relations: Vec::new(), relations: Vec::new(),
loaded_nodes_size: 0, loaded_nodes_size: 0,
loaded_rels_size: 0, loaded_rels_size: 0,
@ -101,6 +96,14 @@ impl Store {
index::all_keys(db) index::all_keys(db)
} }
/// Remove a node from the index (used after appending a tombstone).
pub fn remove_from_index(&self, key: &str, uuid: &[u8; 16]) -> Result<()> {
if let Some(db) = self.db.as_ref() {
index::remove_node(db, key, uuid)?;
}
Ok(())
}
pub fn resolve_key(&self, target: &str) -> Result<String> { pub fn resolve_key(&self, target: &str) -> Result<String> {
// Strip .md suffix if present — keys no longer use it // Strip .md suffix if present — keys no longer use it
let bare = strip_md_suffix(target); let bare = strip_md_suffix(target);