From ba4e01b6f37ce78d64d5f8c50590c68c55771631 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Wed, 15 Apr 2026 05:03:32 -0400 Subject: [PATCH] store: add weight to index, index-only key matching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KEY_TO_UUID now stores weight (30 bytes: uuid+type+ts+deleted+weight) - UUID_OFFSETS changed to composite key for O(log n) max-offset lookup - Add NODES_BY_TYPE index for efficient type+date range queries - Add for_each_key_weight() to StoreView for index-only iteration - match_seeds uses index-only path when content not needed - Fix transaction consistency in ops (single txn for related updates) - rebuild() now records all uuid→offset mappings for version history - Backwards compatible: old index formats decoded with default weight Co-Authored-By: Proof of Concept Signed-off-by: Kent Overstreet --- Cargo.toml | 3 + src/bin/dump-table.rs | 105 +++++++ src/hippocampus/local.rs | 77 ++--- src/hippocampus/query/engine.rs | 3 +- src/hippocampus/store/capnp.rs | 388 +++-------------------- src/hippocampus/store/index.rs | 529 +++++++++++++++++++++++++++----- src/hippocampus/store/mod.rs | 79 ++++- src/hippocampus/store/ops.rs | 60 ++-- src/hippocampus/store/view.rs | 34 +- 9 files changed, 776 insertions(+), 502 deletions(-) create mode 100644 src/bin/dump-table.rs diff --git a/Cargo.toml b/Cargo.toml index d7c818b..c253bd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,3 +99,6 @@ path = "src/bin/diag-key.rs" [[bin]] name = "find-deleted" path = "src/bin/find-deleted.rs" +[[bin]] +name = "dump-table" +path = "src/bin/dump-table.rs" diff --git a/src/bin/dump-table.rs b/src/bin/dump-table.rs new file mode 100644 index 0000000..2be0540 --- /dev/null +++ b/src/bin/dump-table.rs @@ -0,0 +1,105 @@ +// Dump a redb table in text form +// Usage: dump-table +// Tables: key_to_uuid, uuid_offsets, nodes_by_provenance, nodes_by_type, rels + +use consciousness::store::{ + memory_dir, + KEY_TO_UUID, UUID_OFFSETS, NODES_BY_PROVENANCE, NODES_BY_TYPE, RELS, + unpack_node_meta, unpack_provenance_value, unpack_rel, +}; +use redb::{Database, ReadableDatabase, ReadableTable, ReadableMultimapTable}; + +fn format_uuid(uuid: &[u8; 16]) -> String { + format!("{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", + uuid[0], uuid[1], uuid[2], uuid[3], uuid[4], uuid[5], uuid[6], uuid[7], + uuid[8], uuid[9], uuid[10], uuid[11], uuid[12], uuid[13], uuid[14], uuid[15]) +} + +fn main() { + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + eprintln!("usage: dump-table "); + eprintln!("tables: key_to_uuid, uuid_offsets, nodes_by_provenance, nodes_by_type, rels"); + std::process::exit(1); + } + let table_name = &args[1]; + + let db_path = memory_dir().join("index.redb"); + let db = Database::open(&db_path).expect("open db"); + let txn = db.begin_read().expect("begin read"); + + match table_name.as_str() { + "key_to_uuid" => { + let table = txn.open_table(KEY_TO_UUID).expect("open"); + for entry in table.iter().expect("iter") { + let (key, data) = entry.expect("entry"); + let (uuid, node_type, ts, deleted, weight) = unpack_node_meta(data.value()); + println!("{}\t{}\ttype={}\tts={}\tdel={}\tw={:.3}", key.value(), format_uuid(&uuid), node_type, ts, deleted, weight); + } + } + "uuid_offsets" => { + // Key: [uuid:16][offset:8 BE], Value: () + let table = txn.open_table(UUID_OFFSETS).expect("open"); + for entry in table.iter().expect("iter") { + let (key_bytes, _) = entry.expect("entry"); + let key = key_bytes.value(); + if key.len() >= 24 { + let mut uuid = [0u8; 16]; + uuid.copy_from_slice(&key[0..16]); + let offset = u64::from_be_bytes([ + key[16], key[17], key[18], key[19], + key[20], key[21], key[22], key[23], + ]); + println!("{}\t{}", format_uuid(&uuid), offset); + } + } + } + "nodes_by_provenance" => { + let table = txn.open_multimap_table(NODES_BY_PROVENANCE).expect("open"); + for entry in table.iter().expect("iter") { + let (prov, values) = entry.expect("entry"); + for val in values { + let (ts, uuid) = unpack_provenance_value(val.expect("val").value()); + println!("{}\t{}\t{}", prov.value(), ts, format_uuid(&uuid)); + } + } + } + "nodes_by_type" => { + // Key: [type:1][neg_timestamp:8], Value: uuid + let table = txn.open_table(NODES_BY_TYPE).expect("open"); + for entry in table.iter().expect("iter") { + let (key_bytes, uuid_bytes) = entry.expect("entry"); + let key = key_bytes.value(); + let node_type = key[0]; + let neg_ts = i64::from_be_bytes([key[1], key[2], key[3], key[4], key[5], key[6], key[7], key[8]]); + let ts = !neg_ts; + let mut uuid = [0u8; 16]; + uuid.copy_from_slice(uuid_bytes.value()); + println!("type={}\tts={}\t{}", node_type, ts, format_uuid(&uuid)); + } + } + "rels" => { + let table = txn.open_multimap_table(RELS).expect("open"); + for entry in table.iter().expect("iter") { + let (uuid_bytes, values) = entry.expect("entry"); + let uuid = uuid_bytes.value(); + let uuid_str = if uuid.len() >= 16 { + let mut arr = [0u8; 16]; + arr.copy_from_slice(&uuid[..16]); + format_uuid(&arr) + } else { + format!("{:02x?}", uuid) + }; + for val in values { + let (other, strength, rel_type, is_out) = unpack_rel(val.expect("val").value()); + println!("{}\t{}\tstr={:.3}\ttype={}\tout={}", + uuid_str, format_uuid(&other), strength, rel_type, is_out); + } + } + } + _ => { + eprintln!("unknown table: {}", table_name); + std::process::exit(1); + } + } +} diff --git a/src/hippocampus/local.rs b/src/hippocampus/local.rs index a42b122..66b4803 100644 --- a/src/hippocampus/local.rs +++ b/src/hippocampus/local.rs @@ -4,6 +4,10 @@ use super::store::Store; use crate::graph::Graph; use crate::neuro::{consolidation_priority, ReplayItem}; +// All functions take `provenance: &str` for interface uniformity (MCP tools +// pass it to everything), but read-only operations ignore it (_provenance). +// Only write operations actually record the provenance string. + // ── Memory operations ────────────────────────────────────────── pub fn memory_render(store: &Store, _provenance: &str, key: &str, raw: Option) -> Result { @@ -125,30 +129,7 @@ pub fn memory_history(store: &Store, _provenance: &str, key: &str, full: Option< let key = store.resolve_key(key).unwrap_or_else(|_| key.to_string()); let full = full.unwrap_or(false); - let path = crate::store::nodes_path(); - if !path.exists() { - anyhow::bail!("No node log found"); - } - - use std::io::BufReader; - let file = std::fs::File::open(&path) - .map_err(|e| anyhow::anyhow!("open {}: {}", path.display(), e))?; - let mut reader = BufReader::new(file); - - let mut versions: Vec = Vec::new(); - while let Ok(msg) = capnp::serialize::read_message(&mut reader, capnp::message::ReaderOptions::new()) { - let log = msg.get_root::() - .map_err(|e| anyhow::anyhow!("read log: {}", e))?; - for node_reader in log.get_nodes() - .map_err(|e| anyhow::anyhow!("get nodes: {}", e))? { - let node = crate::store::Node::from_capnp_migrate(node_reader) - .map_err(|e| anyhow::anyhow!("{}", e))?; - if node.key == key { - versions.push(node); - } - } - } - + let versions = store.get_history(&key)?; if versions.is_empty() { anyhow::bail!("No history found for '{}'", key); } @@ -305,19 +286,23 @@ pub fn journal_tail(store: &Store, _provenance: &str, count: Option, level: .map(|dt| dt.and_utc().timestamp()) }); - let all_keys = store.all_keys()?; - let mut entries: Vec<_> = all_keys.iter() - .filter_map(|key| store.get_node(key).ok()?) - .filter(|n| n.node_type == node_type) - .filter(|n| after_ts.map(|ts| n.created_at >= ts).unwrap_or(true)) - .map(|n| JournalEntry { - key: n.key.clone(), - content: n.content, - created_at: n.created_at, - }) - .collect(); - entries.sort_by_key(|e| std::cmp::Reverse(e.created_at)); - entries.truncate(count); + // Use NODES_BY_TYPE index: O(log n + k) instead of O(n) + let db = store.db()?; + let uuids = crate::store::nodes_by_type(db, node_type as u8, count, after_ts)?; + + let mut entries = Vec::with_capacity(uuids.len()); + for uuid in uuids { + if let Ok(Some(node)) = store.get_node_by_uuid(&uuid) { + if !node.deleted { + entries.push(JournalEntry { + key: node.key.clone(), + content: node.content.clone(), + created_at: node.created_at, + }); + } + } + } + // Already sorted by timestamp from index, no need to sort again Ok(entries) } @@ -366,13 +351,17 @@ pub fn journal_new(store: &Store, provenance: &str, name: &str, title: &str, bod pub fn journal_update(store: &Store, provenance: &str, body: &str, level: Option) -> Result { let level = level.unwrap_or(0); let node_type = level_to_node_type(level); - let all_keys = store.all_keys()?; - let latest_key = all_keys.iter() - .filter_map(|key| store.get_node(key).ok()?) - .filter(|n| n.node_type == node_type) - .max_by_key(|n| n.created_at) - .map(|n| n.key.clone()); - let Some(key) = latest_key else { + + // Use NODES_BY_TYPE index to find most recent + let db = store.db()?; + let uuids = crate::store::nodes_by_type(db, node_type as u8, 1, None)?; + let key = match uuids.first() { + Some(uuid) => store.get_node_by_uuid(uuid)? + .filter(|n| !n.deleted) + .map(|n| n.key), + None => None, + }; + let Some(key) = key else { anyhow::bail!("no entry at level {} to update — use journal_new first", level); }; let existing = store.get_node(&key)?.ok_or_else(|| anyhow::anyhow!("node not found"))?.content; diff --git a/src/hippocampus/query/engine.rs b/src/hippocampus/query/engine.rs index b6d85b6..3006c8a 100644 --- a/src/hippocampus/query/engine.rs +++ b/src/hippocampus/query/engine.rs @@ -633,7 +633,8 @@ pub fn match_seeds_opts( // Build component index: word → vec of (original key, weight) let mut component_map: HashMap> = HashMap::new(); - store.for_each_node(|key, _content, weight| { + // Index-only pass: no capnp reads needed for key matching + store.for_each_key_weight(|key, weight| { let lkey = key.to_lowercase(); key_map.insert(lkey.clone(), (key.to_owned(), weight as f64)); diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index c41212e..1d221b1 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -8,8 +8,6 @@ // - fsck (corruption repair) use super::{index, types::*}; -use redb::ReadableTableMetadata; - use crate::memory_capnp; use super::Store; @@ -262,6 +260,47 @@ pub fn read_node_at_offset(offset: u64) -> Result { read_node_at_offset_for_key(offset, None) } +/// Iterate over all nodes in the capnp log, yielding (offset, Node) pairs. +/// Nodes are yielded in log order (oldest first). +/// Multiple nodes in the same message share the same offset. +pub fn iter_nodes() -> Result> { + let path = nodes_path(); + if !path.exists() { + return Ok(Vec::new()); + } + + let file = fs::File::open(&path) + .with_context(|| format!("open {}", path.display()))?; + let mut reader = BufReader::new(file); + let mut results = Vec::new(); + + loop { + let offset = reader.stream_position()?; + let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { + Ok(m) => m, + Err(_) => break, // EOF or corrupt + }; + + let log = match msg.get_root::() { + Ok(l) => l, + Err(_) => continue, + }; + + let nodes = match log.get_nodes() { + Ok(n) => n, + Err(_) => continue, + }; + + for node_reader in nodes { + if let Ok(node) = Node::from_capnp_migrate(node_reader) { + results.push((offset, node)); + } + } + } + + Ok(results) +} + // --------------------------------------------------------------------------- // Store persistence methods // --------------------------------------------------------------------------- @@ -274,9 +313,9 @@ impl Store { let mut store = Store::default(); - // Open redb index first (rebuilds from capnp if needed) + // Open redb index (rebuilds from capnp if needed) let db_p = db_path(); - store.db = Some(store.open_or_rebuild_db(&db_p)?); + store.db = Some(index::open_or_rebuild(&db_p)?); // Replay relations if rels_p.exists() { @@ -294,64 +333,9 @@ impl Store { Ordering::Relaxed ); - // Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped) - Ok(store) } - /// Open redb database, rebuilding if unhealthy. - fn open_or_rebuild_db(&self, path: &Path) -> Result { - // Try opening existing database - if path.exists() { - match index::open_db(path) { - Ok(database) => { - if self.db_is_healthy(&database)? { - return Ok(database); - } - eprintln!("redb index stale, rebuilding..."); - } - Err(e) => { - eprintln!("redb open failed ({}), rebuilding...", e); - } - } - } - - // Rebuild index from capnp log - rebuild_index(path, &nodes_path()) - } - - /// Check if redb index is healthy by verifying some offsets are valid. - fn db_is_healthy(&self, database: &redb::Database) -> Result { - use redb::{ReadableDatabase, ReadableTable}; - - let txn = database.begin_read()?; - let nodes_table = txn.open_table(index::NODES)?; - - // Check that we can read the table and it has entries - if nodes_table.len()? == 0 { - // Empty database - might be stale or new - let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - return Ok(capnp_size == 0); // healthy only if capnp is also empty - } - - // Spot check: verify a few offsets point to valid messages - let mut checked = 0; - for entry in nodes_table.iter()? { - if checked >= 5 { break; } - let (key, offset) = entry?; - let offset = offset.value(); - - // Try to read the node at this offset - if read_node_at_offset(offset).is_err() { - return Ok(false); - } - checked += 1; - let _ = key; // silence unused warning - } - - Ok(true) - } - /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) @@ -429,88 +413,6 @@ impl Store { Ok(by_key) } - /// Find the most recent version of a node by key (including deleted). - /// Scans the entire log. Used for version continuity when recreating deleted nodes. - pub fn find_latest_by_key(&self, target_key: &str) -> Result> { - let path = nodes_path(); - if !path.exists() { return Ok(None); } - - let file = fs::File::open(&path) - .with_context(|| format!("open {}", path.display()))?; - let mut reader = BufReader::new(file); - - let mut latest: Option = None; - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = match msg.get_root::() { - Ok(l) => l, - Err(_) => continue, - }; - let nodes = match log.get_nodes() { - Ok(n) => n, - Err(_) => continue, - }; - for node_reader in nodes { - let node = match Node::from_capnp_migrate(node_reader) { - Ok(n) => n, - Err(_) => continue, - }; - if node.key != target_key { continue; } - // Keep if newer timestamp (handles version resets) - let dominated = latest.as_ref() - .map(|l| node.timestamp >= l.timestamp) - .unwrap_or(true); - if dominated { - latest = Some(node); - } - } - } - - Ok(latest) - } - - /// Find the last non-deleted version of a node by key. - /// Scans the entire log. Used for restore operations. - pub fn find_last_live_version(&self, target_key: &str) -> Result> { - let path = nodes_path(); - if !path.exists() { return Ok(None); } - - let file = fs::File::open(&path) - .with_context(|| format!("open {}", path.display()))?; - let mut reader = BufReader::new(file); - - let mut last_live: Option = None; - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = match msg.get_root::() { - Ok(l) => l, - Err(_) => continue, - }; - let nodes = match log.get_nodes() { - Ok(n) => n, - Err(_) => continue, - }; - for node_reader in nodes { - let node = match Node::from_capnp_migrate(node_reader) { - Ok(n) => n, - Err(_) => continue, - }; - if node.key != target_key { continue; } - if !node.deleted { - // Keep the most recent non-deleted version by timestamp - let dominated = last_live.as_ref() - .map(|l| node.timestamp >= l.timestamp) - .unwrap_or(true); - if dominated { - last_live = Some(node); - } - } - } - } - - Ok(last_live) - } - /// Append nodes to the log file. Returns the offset where the message was written. pub fn append_nodes(&self, nodes: &[Node]) -> Result { use std::sync::atomic::Ordering; @@ -680,207 +582,3 @@ pub fn fsck() -> Result<()> { Ok(()) } -/// Rebuild redb index from capnp log. -/// Scans the log, tracking offsets, and records latest version of each node. -fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { - // Remove old database if it exists - if db_path.exists() { - fs::remove_file(db_path) - .with_context(|| format!("remove old db {}", db_path.display()))?; - } - - let database = index::open_db(db_path)?; - - if !capnp_path.exists() { - return Ok(database); - } - - // Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key - let mut latest: HashMap = HashMap::new(); - - let file = fs::File::open(capnp_path) - .with_context(|| format!("open {}", capnp_path.display()))?; - let mut reader = BufReader::new(file); - - loop { - let offset = reader.stream_position()?; - let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { - Ok(m) => m, - Err(_) => break, - }; - - let log = match msg.get_root::() { - Ok(l) => l, - Err(_) => continue, - }; - - let nodes = match log.get_nodes() { - Ok(n) => n, - Err(_) => continue, - }; - for node_reader in nodes { - let key = node_reader.get_key().ok() - .and_then(|t| t.to_str().ok()) - .unwrap_or("") - .to_string(); - if key.is_empty() { continue; } - - let version = node_reader.get_version(); - let deleted = node_reader.get_deleted(); - let node_type = node_reader.get_node_type() - .map(|t| t as u8) - .unwrap_or(0); - let timestamp = node_reader.get_timestamp(); - let provenance = node_reader.get_provenance().ok() - .and_then(|t| t.to_str().ok()) - .unwrap_or("manual") - .to_string(); - - let mut uuid = [0u8; 16]; - if let Ok(data) = node_reader.get_uuid() { - if data.len() >= 16 { - uuid.copy_from_slice(&data[..16]); - } - } - - // Keep if newer timestamp (not version - version can reset after delete/recreate) - let dominated = latest.get(&key) - .map(|(_, _, _, _, _, ts, _)| timestamp >= *ts) - .unwrap_or(true); - if dominated { - latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance)); - } - } - } - - // Write index entries for non-deleted nodes - { - let txn = database.begin_write()?; - { - let mut nodes_table = txn.open_table(index::NODES)?; - let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?; - let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?; - let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?; - - for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest { - if !deleted { - nodes_table.insert(key.as_str(), offset)?; - // Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes - let mut packed = [0u8; 25]; - packed[0..16].copy_from_slice(&uuid); - packed[16] = node_type; - packed[17..25].copy_from_slice(×tamp.to_be_bytes()); - key_uuid_table.insert(key.as_str(), packed.as_slice())?; - // Pack: [negated_timestamp:8][key] for descending sort - let neg_ts = (!timestamp).to_be_bytes(); - let mut prov_val = Vec::with_capacity(8 + key.len()); - prov_val.extend_from_slice(&neg_ts); - prov_val.extend_from_slice(key.as_bytes()); - by_provenance.insert(provenance.as_str(), prov_val.as_slice())?; - } - // Always record offset in UUID history (even for deleted) - uuid_offsets.insert(uuid.as_slice(), offset)?; - } - } - txn.commit()?; - } - - Ok(database) -} - -/// Fsck report — discrepancies found between capnp logs and redb index. -#[derive(Debug, Default)] -pub struct FsckReport { - /// Keys in current index but not in rebuilt (zombie entries) - pub zombies: Vec, - /// Keys in rebuilt but not in current index (missing from index) - pub missing: Vec, - /// Was capnp log repaired? - pub capnp_repaired: bool, -} - -impl FsckReport { - pub fn is_clean(&self) -> bool { - self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired - } -} - -/// Full fsck: verify capnp logs, rebuild index to temp, compare with current. -/// Returns a report of discrepancies found. -pub fn fsck_full() -> Result { - use redb::{ReadableDatabase, ReadableTable}; - use tempfile::TempDir; - - let mut report = FsckReport::default(); - - // Step 1: Run capnp log fsck (may truncate corrupt messages) - // We need to check if it did repairs — currently fsck() just prints to stderr - // For now, we'll re-check after by comparing file sizes - let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); - fsck()?; - let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); - report.capnp_repaired = nodes_size_after != nodes_size_before; - - // Step 2: Rebuild index to temp file - let temp_dir = TempDir::new().context("create temp dir")?; - let temp_db_path = temp_dir.path().join("rebuilt.redb"); - let rebuilt_db = rebuild_index(&temp_db_path, &nodes_path())?; - - // Step 3: Copy current index to temp and open (avoids write lock contention) - let current_db_path = db_path(); - if !current_db_path.exists() { - // No current index — all rebuilt keys are "missing" - let txn = rebuilt_db.begin_read()?; - let table = txn.open_table(index::NODES)?; - for entry in table.iter()? { - let (key, _) = entry?; - report.missing.push(key.value().to_string()); - } - return Ok(report); - } - - // Copy to temp to avoid lock contention with running daemon - let current_copy_path = temp_dir.path().join("current.redb"); - fs::copy(¤t_db_path, ¤t_copy_path) - .with_context(|| format!("copy {} to temp", current_db_path.display()))?; - - let current_db = redb::Database::open(¤t_copy_path) - .with_context(|| format!("open current db copy"))?; - - // Step 4: Compare NODES tables - // Collect all keys from both - let rebuilt_keys: std::collections::HashSet = { - let txn = rebuilt_db.begin_read()?; - let table = txn.open_table(index::NODES)?; - table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? - }; - - let current_keys: std::collections::HashSet = { - let txn = current_db.begin_read()?; - let table = txn.open_table(index::NODES)?; - table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? - }; - - // Keys in current but not rebuilt = zombies (shouldn't exist) - for key in current_keys.difference(&rebuilt_keys) { - report.zombies.push(key.clone()); - } - report.zombies.sort(); - - // Keys in rebuilt but not current = missing (should exist but don't) - for key in rebuilt_keys.difference(¤t_keys) { - report.missing.push(key.clone()); - } - report.missing.sort(); - - Ok(report) -} - -/// Repair the index by rebuilding from capnp logs. -/// Use after fsck_full() reports discrepancies. -pub fn repair_index() -> Result<()> { - let db_path = db_path(); - rebuild_index(&db_path, &nodes_path())?; - eprintln!("index rebuilt from capnp log"); - Ok(()) -} diff --git a/src/hippocampus/store/index.rs b/src/hippocampus/store/index.rs index 95ebe19..aae42a8 100644 --- a/src/hippocampus/store/index.rs +++ b/src/hippocampus/store/index.rs @@ -3,31 +3,35 @@ // capnp logs are source of truth; redb provides indexed access. // // Node tables: -// NODES: key → offset (current version) -// KEY_TO_UUID: key → uuid -// UUID_OFFSETS: uuid → offsets (multimap, all versions) -// NODES_BY_PROVENANCE: provenance → keys (multimap) -// NODES_BY_TYPE: [type_byte][timestamp_be] → key (for range queries by type+date) +// KEY_TO_UUID: key → (uuid, node_type, timestamp, deleted) +// Keeps entries for deleted nodes to enable index-based restore. +// UUID_OFFSETS: [uuid:16][offset:8 BE] → () composite key for O(log n) max-offset lookup +// NODES_BY_PROVENANCE: provenance → (timestamp, uuid) (multimap) // // Relation tables: // RELS: node_uuid → (other_uuid, strength, rel_type, is_outgoing) packed (multimap) // Each relation stored twice — once per endpoint with direction bit. // -// To get key from uuid: UUID_OFFSETS → read_node_at_offset() → node.key +// To get current offset: KEY_TO_UUID[key] → uuid → max(UUID_OFFSETS[uuid][*]) +// To get key from uuid: read_node_at_offset(max_offset) → node.key use anyhow::{Context, Result}; -use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition, WriteTransaction}; +use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition, WriteTransaction}; +use std::collections::HashMap; use std::path::Path; +use super::types::Node; +use super::capnp::read_node_at_offset; + // Node tables -pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes"); -// KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes +// KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8][deleted:1][weight:4] = 30 bytes pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid"); -pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets"); -// NODES_BY_PROVENANCE: provenance → [timestamp:8 BE][key] (sorted by timestamp desc via negated ts) +// UUID_OFFSETS: [uuid:16][offset:8 BE] → () — offset in key for range scans +pub const UUID_OFFSETS: TableDefinition<&[u8], ()> = TableDefinition::new("uuid_offsets"); +// NODES_BY_PROVENANCE: provenance → [negated_timestamp:8][uuid:16] = 24 bytes (sorted by timestamp desc) pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &[u8]> = MultimapTableDefinition::new("nodes_by_provenance"); -// Composite key: [node_type: u8][timestamp: i64 BE] for range queries -pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type"); +// NODES_BY_TYPE: [type:1][neg_timestamp:8] → uuid (for type+date range queries, newest first) +pub const NODES_BY_TYPE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("nodes_by_type"); // Relations table - each relation stored twice (once per endpoint) // Value: (other_uuid: [u8;16], strength: f32, rel_type: u8, is_outgoing: bool) @@ -43,9 +47,8 @@ pub fn open_db(path: &Path) -> Result { let txn = db.begin_write()?; { // Node tables - let _ = txn.open_table(NODES)?; let _ = txn.open_table(KEY_TO_UUID)?; - let _ = txn.open_multimap_table(UUID_OFFSETS)?; + let _ = txn.open_table(UUID_OFFSETS)?; let _ = txn.open_multimap_table(NODES_BY_PROVENANCE)?; let _ = txn.open_table(NODES_BY_TYPE)?; // Relations @@ -56,150 +59,297 @@ pub fn open_db(path: &Path) -> Result { Ok(db) } -/// Pack node metadata: [uuid:16][node_type:1][timestamp:8] = 25 bytes -fn pack_node_meta(uuid: &[u8; 16], node_type: u8, timestamp: i64) -> [u8; 25] { - let mut buf = [0u8; 25]; +/// Pack node metadata: [uuid:16][node_type:1][timestamp:8][deleted:1][weight:4] = 30 bytes +fn pack_node_meta(uuid: &[u8; 16], node_type: u8, timestamp: i64, deleted: bool, weight: f32) -> [u8; 30] { + let mut buf = [0u8; 30]; buf[0..16].copy_from_slice(uuid); buf[16] = node_type; buf[17..25].copy_from_slice(×tamp.to_be_bytes()); + buf[25] = if deleted { 1 } else { 0 }; + buf[26..30].copy_from_slice(&weight.to_be_bytes()); buf } -/// Unpack node metadata. Handles both old (16-byte) and new (25-byte) formats. -pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64) { +/// Unpack node metadata. Returns (uuid, node_type, timestamp, deleted, weight). +/// Handles old formats (16-byte, 25-byte, 26-byte) and new (30-byte). +pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64, bool, f32) { let mut uuid = [0u8; 16]; uuid.copy_from_slice(&data[0..16]); - if data.len() >= 25 { + if data.len() >= 30 { let node_type = data[16]; let timestamp = i64::from_be_bytes([ data[17], data[18], data[19], data[20], data[21], data[22], data[23], data[24], ]); - (uuid, node_type, timestamp) + let deleted = data[25] != 0; + let weight = f32::from_be_bytes([data[26], data[27], data[28], data[29]]); + (uuid, node_type, timestamp, deleted, weight) + } else if data.len() >= 26 { + let node_type = data[16]; + let timestamp = i64::from_be_bytes([ + data[17], data[18], data[19], data[20], + data[21], data[22], data[23], data[24], + ]); + let deleted = data[25] != 0; + (uuid, node_type, timestamp, deleted, 0.5) // default weight + } else if data.len() >= 25 { + let node_type = data[16]; + let timestamp = i64::from_be_bytes([ + data[17], data[18], data[19], data[20], + data[21], data[22], data[23], data[24], + ]); + (uuid, node_type, timestamp, false, 0.5) } else { // Old format: just uuid, default metadata - (uuid, 0, 0) + (uuid, 0, 0, false, 0.5) } } -/// Pack provenance value: [negated_timestamp:8][key] for descending sort -fn pack_provenance_value(timestamp: i64, key: &str) -> Vec { +/// Pack provenance value: [negated_timestamp:8][uuid:16] = 24 bytes for descending sort +fn pack_provenance_value(timestamp: i64, uuid: &[u8; 16]) -> [u8; 24] { + let mut buf = [0u8; 24]; let neg_ts = (!timestamp).to_be_bytes(); // negate for descending order - let mut buf = Vec::with_capacity(8 + key.len()); - buf.extend_from_slice(&neg_ts); - buf.extend_from_slice(key.as_bytes()); + buf[0..8].copy_from_slice(&neg_ts); + buf[8..24].copy_from_slice(uuid); buf } -/// Unpack provenance value: returns (timestamp, key) -fn unpack_provenance_value(data: &[u8]) -> (i64, String) { +/// Unpack provenance value: returns (timestamp, uuid) +pub fn unpack_provenance_value(data: &[u8]) -> (i64, [u8; 16]) { let neg_ts = i64::from_be_bytes([data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]]); let timestamp = !neg_ts; - let key = String::from_utf8_lossy(&data[8..]).to_string(); - (timestamp, key) + let mut uuid = [0u8; 16]; + uuid.copy_from_slice(&data[8..24]); + (timestamp, uuid) } -/// Record a node's location in the index. -pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str) -> Result<()> { - let mut nodes_table = txn.open_table(NODES)?; - let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; - let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?; - let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; +/// Pack UUID_OFFSETS key: [uuid:16][offset:8 BE] = 24 bytes +fn pack_uuid_offset(uuid: &[u8; 16], offset: u64) -> [u8; 24] { + let mut buf = [0u8; 24]; + buf[0..16].copy_from_slice(uuid); + buf[16..24].copy_from_slice(&offset.to_be_bytes()); + buf +} - nodes_table.insert(key, offset)?; - let packed = pack_node_meta(uuid, node_type, timestamp); +/// Pack NODES_BY_TYPE key: [type:1][neg_timestamp:8] = 9 bytes (newest first within type) +fn pack_type_key(node_type: u8, timestamp: i64) -> [u8; 9] { + let mut buf = [0u8; 9]; + buf[0] = node_type; + buf[1..9].copy_from_slice(&(!timestamp).to_be_bytes()); + buf +} + +/// Unpack offset from UUID_OFFSETS key +fn unpack_uuid_offset_key(key: &[u8]) -> ([u8; 16], u64) { + let mut uuid = [0u8; 16]; + uuid.copy_from_slice(&key[0..16]); + let offset = u64::from_be_bytes([key[16], key[17], key[18], key[19], key[20], key[21], key[22], key[23]]); + (uuid, offset) +} + +/// Record a node's location in the index (for live nodes). +pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str, weight: f32) -> Result<()> { + let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; + let mut uuid_offsets = txn.open_table(UUID_OFFSETS)?; + let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let mut by_type = txn.open_table(NODES_BY_TYPE)?; + + let packed = pack_node_meta(uuid, node_type, timestamp, false, weight); key_uuid_table.insert(key, packed.as_slice())?; - uuid_offsets.insert(uuid.as_slice(), offset)?; - let prov_val = pack_provenance_value(timestamp, key); + let uuid_offset_key = pack_uuid_offset(uuid, offset); + uuid_offsets.insert(uuid_offset_key.as_slice(), ())?; + let prov_val = pack_provenance_value(timestamp, uuid); by_provenance.insert(provenance, prov_val.as_slice())?; + let type_key = pack_type_key(node_type, timestamp); + by_type.insert(type_key.as_slice(), uuid.as_slice())?; Ok(()) } +/// Record a uuid→offset mapping only (for deleted nodes - preserves version history). +pub fn record_uuid_offset(txn: &WriteTransaction, uuid: &[u8; 16], offset: u64) -> Result<()> { + let mut uuid_offsets = txn.open_table(UUID_OFFSETS)?; + let uuid_offset_key = pack_uuid_offset(uuid, offset); + uuid_offsets.insert(uuid_offset_key.as_slice(), ())?; + Ok(()) +} + +/// Get max offset for a UUID from an already-opened table. +/// Uses reverse range scan to find the highest offset (last key in range). +fn max_offset_for_uuid_in_table( + table: &redb::ReadOnlyTable<&[u8], ()>, + uuid: &[u8; 16], +) -> Result> { + let start = pack_uuid_offset(uuid, 0); + let end = pack_uuid_offset(uuid, u64::MAX); + + // Get last entry in range (highest offset) + if let Some(entry) = table.range(start.as_slice()..=end.as_slice())?.next_back() { + let (key, _) = entry?; + let (_, offset) = unpack_uuid_offset_key(key.value()); + Ok(Some(offset)) + } else { + Ok(None) + } +} + /// Get recent keys for a given provenance, sorted by timestamp descending. +/// Resolves UUID → current key by reading node at latest offset. +/// Single transaction for all index lookups. pub fn recent_by_provenance(db: &Database, provenance: &str, limit: usize) -> Result> { let txn = db.begin_read()?; - let table = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let prov_table = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let uuid_offsets = txn.open_table(UUID_OFFSETS)?; + let mut results = Vec::new(); - for entry in table.get(provenance)? { + for entry in prov_table.get(provenance)? { if results.len() >= limit { break; } - let (timestamp, key) = unpack_provenance_value(entry?.value()); - results.push((key, timestamp)); + let (timestamp, uuid) = unpack_provenance_value(entry?.value()); + + if let Some(offset) = max_offset_for_uuid_in_table(&uuid_offsets, &uuid)? { + if let Ok(node) = read_node_at_offset(offset) { + results.push((node.key, timestamp)); + } + } } Ok(results) } -/// Get offset for a node by key. +/// Get UUIDs for nodes of a given type, sorted by timestamp descending (newest first). +/// Optionally filter to timestamps >= after_ts. +/// Returns up to `limit` UUIDs. +pub fn nodes_by_type(db: &Database, node_type: u8, limit: usize, after_ts: Option) -> Result> { + let txn = db.begin_read()?; + let by_type = txn.open_table(NODES_BY_TYPE)?; + + // Range: [type][0x80..] to [type][0xFF..] for positive timestamps (newest first) + // !i64::MAX = 0x8000... (far future, smallest), !0 = 0xFFFF... (epoch, largest) + let start = pack_type_key(node_type, i64::MAX); // !MAX = 0x8000... = smallest + let end = pack_type_key(node_type, 0); // !0 = 0xFFFF... = largest + + let mut results = Vec::new(); + for entry in by_type.range(start.as_slice()..=end.as_slice())? { + if results.len() >= limit { break; } + let (key_bytes, uuid_bytes) = entry?; + + // Decode timestamp from key to check after_ts filter + let key = key_bytes.value(); + let neg_ts = i64::from_be_bytes([key[1], key[2], key[3], key[4], key[5], key[6], key[7], key[8]]); + let timestamp = !neg_ts; + + if let Some(after) = after_ts { + if timestamp < after { continue; } + } + + let mut uuid = [0u8; 16]; + uuid.copy_from_slice(uuid_bytes.value()); + results.push(uuid); + } + Ok(results) +} + +/// Get offset for a node by key (via KEY_TO_UUID → UUID_OFFSETS). +/// Single transaction, returns the newest offset. pub fn get_offset(db: &Database, key: &str) -> Result> { let txn = db.begin_read()?; - let table = txn.open_table(NODES)?; - Ok(table.get(key)?.map(|v| v.value())) + let key_uuid = txn.open_table(KEY_TO_UUID)?; + let uuid_offsets = txn.open_table(UUID_OFFSETS)?; + + let uuid = match key_uuid.get(key)? { + Some(data) => { + let (uuid, _, _, deleted, _) = unpack_node_meta(data.value()); + if deleted { return Ok(None); } + uuid + } + None => return Ok(None), + }; + + max_offset_for_uuid_in_table(&uuid_offsets, &uuid) } -/// Check if a key exists in the index. +/// Check if a key exists in the index (and is not deleted). pub fn contains_key(db: &Database, key: &str) -> Result { let txn = db.begin_read()?; - let table = txn.open_table(NODES)?; - Ok(table.get(key)?.is_some()) + let table = txn.open_table(KEY_TO_UUID)?; + match table.get(key)? { + Some(data) => { + let (_, _, _, deleted, _) = unpack_node_meta(data.value()); + Ok(!deleted) + } + None => Ok(false), + } } -/// Get a node's UUID from its key. +/// Get a node's UUID from its key (returns UUID even for deleted nodes). pub fn get_uuid_for_key(db: &Database, key: &str) -> Result> { let txn = db.begin_read()?; let table = txn.open_table(KEY_TO_UUID)?; match table.get(key)? { Some(data) => { - let (uuid, _, _) = unpack_node_meta(data.value()); + let (uuid, _, _, _, _) = unpack_node_meta(data.value()); Ok(Some(uuid)) } None => Ok(None), } } -/// Get all offsets for a UUID (all versions). Returns newest first. +/// Get all offsets for a UUID (all versions). Returns newest (highest) first. pub fn get_offsets_for_uuid(db: &Database, uuid: &[u8; 16]) -> Result> { let txn = db.begin_read()?; - let table = txn.open_multimap_table(UUID_OFFSETS)?; + let table = txn.open_table(UUID_OFFSETS)?; + + // Range scan: [uuid][0x00..] to [uuid][0xFF..] + let start = pack_uuid_offset(uuid, 0); + let end = pack_uuid_offset(uuid, u64::MAX); + let mut offsets = Vec::new(); - for entry in table.get(uuid.as_slice())? { - offsets.push(entry?.value()); + for entry in table.range(start.as_slice()..=end.as_slice())? { + let (key, _) = entry?; + let (_, offset) = unpack_uuid_offset_key(key.value()); + offsets.push(offset); } - // Sort descending so newest (highest offset) is first - offsets.sort_by(|a, b| b.cmp(a)); + // Already sorted ascending by key; reverse for newest first + offsets.reverse(); Ok(offsets) } -/// Remove a node from the index (key mappings only; UUID history preserved). +/// Mark a node as deleted in the index (key stays for history; UUID_OFFSETS preserved). pub fn remove_node(txn: &WriteTransaction, key: &str) -> Result<()> { - let mut nodes_table = txn.open_table(NODES)?; let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; - // Note: UUID_OFFSETS is not cleared - preserves version history - - nodes_table.remove(key)?; - key_uuid_table.remove(key)?; + // Copy out data to avoid borrow conflict + let meta = key_uuid_table.get(key)?.map(|data| { + unpack_node_meta(data.value()) + }); + if let Some((uuid, node_type, timestamp, _, weight)) = meta { + let packed = pack_node_meta(&uuid, node_type, timestamp, true, weight); + key_uuid_table.insert(key, packed.as_slice())?; + } Ok(()) } -/// Collect all keys from the index. +/// Collect all keys from the index (excludes deleted nodes). pub fn all_keys(db: &Database) -> Result> { let txn = db.begin_read()?; - let table = txn.open_table(NODES)?; + let table = txn.open_table(KEY_TO_UUID)?; let mut keys = Vec::new(); for entry in table.iter()? { - let (key, _) = entry?; - keys.push(key.value().to_string()); + let (key, data) = entry?; + let (_, _, _, deleted, _) = unpack_node_meta(data.value()); + if !deleted { + keys.push(key.value().to_string()); + } } Ok(keys) } -/// Collect all (key, uuid, node_type, timestamp) in a single table scan. -pub fn all_key_uuid_pairs(db: &Database) -> Result> { +/// Collect all (key, uuid, node_type, timestamp, deleted, weight) in a single table scan. +pub fn all_key_uuid_pairs(db: &Database) -> Result> { let txn = db.begin_read()?; let table = txn.open_table(KEY_TO_UUID)?; let mut pairs = Vec::new(); for entry in table.iter()? { let (key, data) = entry?; - let (uuid, node_type, timestamp) = unpack_node_meta(data.value()); - pairs.push((key.value().to_string(), uuid, node_type, timestamp)); + let (uuid, node_type, timestamp, deleted, weight) = unpack_node_meta(data.value()); + pairs.push((key.value().to_string(), uuid, node_type, timestamp, deleted, weight)); } Ok(pairs) } @@ -281,3 +431,234 @@ pub fn edges_for_node(db: &Database, node_uuid: &[u8; 16]) -> Result) -> Result<()> { + // Track latest (offset, node) per key - newest timestamp wins + let mut latest: HashMap = HashMap::new(); + // Track ALL uuid→offset mappings for history + let mut all_offsets: Vec<([u8; 16], u64)> = Vec::new(); + + for (offset, node) in nodes { + // Record every offset for history + all_offsets.push((node.uuid, offset)); + + let dominated = latest.get(&node.key) + .map(|(_, existing)| node.timestamp >= existing.timestamp) + .unwrap_or(true); + if dominated { + latest.insert(node.key.clone(), (offset, node)); + } + } + + // Write to index + let txn = db.begin_write()?; + { + // Record all uuid→offset mappings + let mut uuid_offsets = txn.open_table(UUID_OFFSETS)?; + for (uuid, offset) in &all_offsets { + let key = pack_uuid_offset(uuid, *offset); + uuid_offsets.insert(key.as_slice(), ())?; + } + drop(uuid_offsets); + + // Record KEY_TO_UUID and NODES_BY_PROVENANCE for latest version of each key + for (key, (_offset, node)) in &latest { + if !node.deleted { + index_node_no_offset(&txn, key, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance, node.weight)?; + } else { + // For deleted nodes, just mark KEY_TO_UUID as deleted + let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; + let packed = pack_node_meta(&node.uuid, node.node_type as u8, node.timestamp, true, node.weight); + key_uuid_table.insert(key.as_str(), packed.as_slice())?; + } + } + } + txn.commit()?; + + Ok(()) +} + +/// Record a node in KEY_TO_UUID, NODES_BY_PROVENANCE, and NODES_BY_TYPE (but not UUID_OFFSETS - for rebuild use). +fn index_node_no_offset(txn: &WriteTransaction, key: &str, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str, weight: f32) -> Result<()> { + let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?; + let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?; + let mut by_type = txn.open_table(NODES_BY_TYPE)?; + + let packed = pack_node_meta(uuid, node_type, timestamp, false, weight); + key_uuid_table.insert(key, packed.as_slice())?; + let prov_val = pack_provenance_value(timestamp, uuid); + by_provenance.insert(provenance, prov_val.as_slice())?; + let type_key = pack_type_key(node_type, timestamp); + by_type.insert(type_key.as_slice(), uuid.as_slice())?; + Ok(()) +} + +/// Fsck report — discrepancies found between capnp logs and redb index. +#[derive(Debug, Default)] +pub struct FsckReport { + /// Keys in current index but not in rebuilt (zombie entries) + pub zombies: Vec, + /// Keys in rebuilt but not in current index (missing from index) + pub missing: Vec, + /// Was capnp log repaired? + pub capnp_repaired: bool, +} + +impl FsckReport { + pub fn is_clean(&self) -> bool { + self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired + } +} + +/// Full fsck: verify capnp logs, rebuild index to temp, compare with current. +/// Returns a report of discrepancies found. +pub fn fsck_full() -> Result { + use std::collections::HashSet; + use tempfile::TempDir; + use super::capnp::{fsck, iter_nodes}; + use super::types::{nodes_path, db_path}; + + let mut report = FsckReport::default(); + + // Step 1: Run capnp log fsck (may truncate corrupt messages) + let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); + fsck()?; + let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); + report.capnp_repaired = nodes_size_after != nodes_size_before; + + // Step 2: Rebuild index to temp file + let temp_dir = TempDir::new().context("create temp dir")?; + let temp_db_path = temp_dir.path().join("rebuilt.redb"); + let rebuilt_db = open_db(&temp_db_path)?; + rebuild(&rebuilt_db, iter_nodes()?)?; + + // Step 3: Copy current index to temp and open (avoids write lock contention) + let current_db_path = db_path(); + if !current_db_path.exists() { + // No current index — all rebuilt keys are "missing" + let txn = rebuilt_db.begin_read()?; + let table = txn.open_table(KEY_TO_UUID)?; + for entry in table.iter()? { + let (key, _) = entry?; + report.missing.push(key.value().to_string()); + } + return Ok(report); + } + + // Copy to temp to avoid lock contention with running daemon + let current_copy_path = temp_dir.path().join("current.redb"); + std::fs::copy(¤t_db_path, ¤t_copy_path) + .with_context(|| format!("copy {} to temp", current_db_path.display()))?; + + let current_db = Database::open(¤t_copy_path) + .with_context(|| "open current db copy")?; + + // Step 4: Compare KEY_TO_UUID tables + let rebuilt_keys: HashSet = { + let txn = rebuilt_db.begin_read()?; + let table = txn.open_table(KEY_TO_UUID)?; + table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? + }; + + let current_keys: HashSet = { + let txn = current_db.begin_read()?; + let table = txn.open_table(KEY_TO_UUID)?; + table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? + }; + + // Keys in current but not rebuilt = zombies (shouldn't exist) + for key in current_keys.difference(&rebuilt_keys) { + report.zombies.push(key.clone()); + } + report.zombies.sort(); + + // Keys in rebuilt but not current = missing (should exist but don't) + for key in rebuilt_keys.difference(¤t_keys) { + report.missing.push(key.clone()); + } + report.missing.sort(); + + Ok(report) +} + +/// Repair the index by rebuilding from capnp logs. +pub fn repair_index() -> Result<()> { + use super::capnp::iter_nodes; + use super::types::db_path; + use std::fs; + + let db_p = db_path(); + if db_p.exists() { + fs::remove_file(&db_p).context("remove old index")?; + } + let db = open_db(&db_p)?; + rebuild(&db, iter_nodes()?)?; + eprintln!("index rebuilt from capnp log"); + Ok(()) +} + +/// Check if redb index is healthy by verifying some offsets are valid. +pub fn is_healthy(db: &Database) -> Result { + use super::types::nodes_path; + use std::fs; + + let txn = db.begin_read()?; + let key_uuid_table = txn.open_table(KEY_TO_UUID)?; + + // Check that we can read the table and it has entries + if key_uuid_table.len()? == 0 { + let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); + return Ok(capnp_size == 0); // healthy only if capnp is also empty + } + + // Spot check: verify a few offsets point to valid messages + let uuid_offsets = txn.open_table(UUID_OFFSETS)?; + let mut checked = 0; + for entry in key_uuid_table.iter()? { + if checked >= 5 { break; } + let (_key, data) = entry?; + let (uuid, _, _, _, _) = unpack_node_meta(data.value()); + + if let Some(offset) = max_offset_for_uuid_in_table(&uuid_offsets, &uuid)? { + if read_node_at_offset(offset).is_err() { + return Ok(false); + } + } + checked += 1; + } + + Ok(true) +} + +/// Open redb database, rebuilding if unhealthy. +pub fn open_or_rebuild(path: &Path) -> Result { + use super::capnp::iter_nodes; + use std::fs; + + // Try opening existing database + if path.exists() { + match open_db(path) { + Ok(database) => { + if is_healthy(&database)? { + return Ok(database); + } + eprintln!("redb index stale, rebuilding..."); + } + Err(e) => { + eprintln!("redb open failed ({}), rebuilding...", e); + } + } + } + + // Rebuild index from capnp log + if path.exists() { + fs::remove_file(path).with_context(|| format!("remove old db {}", path.display()))?; + } + let database = open_db(path)?; + rebuild(&database, iter_nodes()?)?; + Ok(database) +} diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 8e5fec0..0260bb7 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -27,7 +27,13 @@ pub use types::{ new_node, new_relation, }; pub use view::StoreView; -pub use capnp::{fsck, fsck_full, repair_index, FsckReport}; +pub use capnp::fsck; +pub use index::{ + KEY_TO_UUID, UUID_OFFSETS, NODES_BY_PROVENANCE, NODES_BY_TYPE, RELS, + unpack_node_meta, unpack_provenance_value, unpack_rel, + fsck_full, repair_index, FsckReport, + nodes_by_type, +}; use crate::graph::{self, Graph}; @@ -119,6 +125,77 @@ impl Store { self.db.as_ref().ok_or_else(|| anyhow::anyhow!("store not loaded")) } + /// Get all versions of a node by key (for history display). + /// Uses UUID_OFFSETS index - no full log scan. + pub fn get_history(&self, key: &str) -> Result> { + let db = self.db()?; + + let uuid = index::get_uuid_for_key(db, key)? + .ok_or_else(|| anyhow::anyhow!("No history found for '{}'", key))?; + let offsets = index::get_offsets_for_uuid(db, &uuid)?; + + let mut versions = Vec::new(); + for offset in offsets { + if let Ok(node) = capnp::read_node_at_offset(offset) { + versions.push(node); + } + } + // Sort by timestamp (oldest first) + versions.sort_by_key(|n| n.timestamp); + Ok(versions) + } + + /// Get the latest version of a node by UUID. + pub fn get_node_by_uuid(&self, uuid: &[u8; 16]) -> Result> { + let db = self.db()?; + let offsets = index::get_offsets_for_uuid(db, uuid)?; + if let Some(&offset) = offsets.first() { + Ok(Some(capnp::read_node_at_offset(offset)?)) + } else { + Ok(None) + } + } + + /// Find the most recent version of a node (including deleted). + /// Uses index - O(log n) lookup instead of full log scan. + pub fn find_latest_by_key(&self, key: &str) -> Result> { + let db = self.db()?; + + let uuid = match index::get_uuid_for_key(db, key)? { + Some(u) => u, + None => return Ok(None), + }; + let offsets = index::get_offsets_for_uuid(db, &uuid)?; + + // offsets are newest first (highest offset = most recent) + if let Some(&offset) = offsets.first() { + return Ok(Some(capnp::read_node_at_offset(offset)?)); + } + Ok(None) + } + + /// Find the last non-deleted version of a node. + /// Uses index - walks backwards through versions until finding non-deleted. + pub fn find_last_live_version(&self, key: &str) -> Result> { + let db = self.db()?; + + let uuid = match index::get_uuid_for_key(db, key)? { + Some(u) => u, + None => return Ok(None), + }; + let offsets = index::get_offsets_for_uuid(db, &uuid)?; + + // offsets are newest first - find first non-deleted + for offset in offsets { + if let Ok(node) = capnp::read_node_at_offset(offset) { + if !node.deleted { + return Ok(Some(node)); + } + } + } + Ok(None) + } + /// Remove a node from the index (used after appending a tombstone). /// For batched operations, use index::remove_node with a WriteTransaction directly. pub fn remove_from_index(&self, key: &str) -> Result<()> { diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index f45ac88..d8baab7 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -25,7 +25,7 @@ impl Store { let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let txn = db.begin_write()?; let offset = self.append_nodes(&[node.clone()])?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance, node.weight)?; txn.commit()?; Ok(()) } @@ -76,7 +76,7 @@ impl Store { node.version += 1; let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance, node.weight)?; txn.commit()?; Ok("updated") } else { @@ -95,13 +95,13 @@ impl Store { node.provenance = provenance.to_string(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance, node.weight)?; txn.commit()?; Ok("created") } } - /// Soft-delete a node (appends deleted version, removes from index). + /// Soft-delete a node (appends deleted version, marks deleted in index). /// Fails if node is in protected_nodes list. pub fn delete_node(&self, key: &str, provenance: &str) -> Result<()> { if is_protected(key) { @@ -118,7 +118,8 @@ impl Store { deleted.timestamp = now_epoch(); let txn = db.begin_write()?; - self.append_nodes(std::slice::from_ref(&deleted))?; + let offset = self.append_nodes(std::slice::from_ref(&deleted))?; + index::record_uuid_offset(&txn, &deleted.uuid, offset)?; index::remove_node(&txn, key)?; txn.commit()?; Ok(()) @@ -151,7 +152,7 @@ impl Store { let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&restored))?; - index::index_node(&txn, &restored.key, offset, &restored.uuid, restored.node_type as u8, restored.timestamp, &restored.provenance)?; + index::index_node(&txn, &restored.key, offset, &restored.uuid, restored.node_type as u8, restored.timestamp, &restored.provenance, restored.weight)?; txn.commit()?; let preview: String = restored.content.chars().take(100).collect(); @@ -224,7 +225,7 @@ impl Store { let txn = db.begin_write()?; let offset = self.append_nodes(&[renamed.clone(), tombstone])?; index::remove_node(&txn, old_key)?; - index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance)?; + index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance, renamed.weight)?; if !updated_rels.is_empty() { self.append_relations(&updated_rels)?; } @@ -355,7 +356,7 @@ impl Store { node.timestamp = now_epoch(); let txn = db.begin_write()?; let offset = self.append_nodes(std::slice::from_ref(&node))?; - index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?; + index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance, node.weight)?; txn.commit()?; Ok((old, weight)) } @@ -364,6 +365,7 @@ impl Store { /// Returns the old strength. Creates link if it doesn't exist. pub fn set_link_strength(&self, source: &str, target: &str, strength: f32, provenance: &str) -> Result { let strength = strength.clamp(0.01, 1.0); + let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; let source_uuid = self.get_node(source)? .map(|n| n.uuid) @@ -372,37 +374,31 @@ impl Store { .map(|n| n.uuid) .ok_or_else(|| anyhow!("target not found: {}", target))?; - // Find existing edge via index (scope the borrow) - let existing = { - let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; - let edges = index::edges_for_node(db, &source_uuid)?; - edges.iter().find(|(other, _, _, _)| *other == target_uuid) - .map(|(_, s, t, _)| (*s, *t)) - }; + // Find existing edge via index + let edges = index::edges_for_node(db, &source_uuid)?; + let existing = edges.iter() + .find(|(other, _, _, _)| *other == target_uuid) + .map(|(_, s, t, _)| (*s, *t)); - if let Some((old_strength, rel_type)) = existing { - let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; - let txn = db.begin_write()?; - // Remove old edge from index, add updated one - index::remove_relation(&txn, &source_uuid, &target_uuid, old_strength, rel_type)?; + let txn = db.begin_write()?; + let old_strength = if let Some((old_str, rel_type)) = existing { + index::remove_relation(&txn, &source_uuid, &target_uuid, old_str, rel_type)?; index::index_relation(&txn, &source_uuid, &target_uuid, strength, rel_type)?; - // Append updated relation to log let mut rel = new_relation(source_uuid, target_uuid, RelationType::from_u8(rel_type), strength, source, target, provenance); - rel.version = 2; // indicate update + rel.version = 2; self.append_relations(std::slice::from_ref(&rel))?; - txn.commit()?; - Ok(old_strength) + old_str } else { - // Create new link then update its strength - self.add_link(source, target, provenance)?; - let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?; - let txn = db.begin_write()?; - index::remove_relation(&txn, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?; + // Create new link with specified strength index::index_relation(&txn, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?; - txn.commit()?; - Ok(0.0) - } + let rel = new_relation(source_uuid, target_uuid, + RelationType::Link, strength, source, target, provenance); + self.append_relations(std::slice::from_ref(&rel))?; + 0.0 + }; + txn.commit()?; + Ok(old_strength) } /// Add a link between two nodes with Jaccard-based initial strength. diff --git a/src/hippocampus/store/view.rs b/src/hippocampus/store/view.rs index aa1d0ad..1584ba9 100644 --- a/src/hippocampus/store/view.rs +++ b/src/hippocampus/store/view.rs @@ -11,6 +11,9 @@ pub trait StoreView { /// Get all node keys (from index, no deserialization). fn all_keys(&self) -> Vec; + /// Iterate keys and weights only (index-only, no capnp reads). + fn for_each_key_weight(&self, f: F); + /// Iterate all nodes. Callback receives (key, content, weight). fn for_each_node(&self, f: F); @@ -33,6 +36,22 @@ impl StoreView for Store { index::all_keys(db).unwrap_or_default() } + fn for_each_key_weight(&self, mut f: F) { + let db = match self.db.as_ref() { + Some(db) => db, + None => return, + }; + let pairs = match index::all_key_uuid_pairs(db) { + Ok(p) => p, + Err(_) => return, + }; + for (key, _, _, _, deleted, weight) in pairs { + if !deleted { + f(&key, weight); + } + } + } + fn for_each_node(&self, mut f: F) { let db = match self.db.as_ref() { Some(db) => db, @@ -61,8 +80,10 @@ impl StoreView for Store { Ok(p) => p, Err(_) => return, }; - for (key, _uuid, node_type, timestamp) in pairs { - f(&key, NodeType::from_u8(node_type), timestamp); + for (key, _uuid, node_type, timestamp, deleted, _weight) in pairs { + if !deleted { + f(&key, NodeType::from_u8(node_type), timestamp); + } } } @@ -78,12 +99,15 @@ impl StoreView for Store { Err(_) => return, }; let mut uuid_to_key: std::collections::HashMap<[u8; 16], String> = std::collections::HashMap::new(); - for (key, uuid, _, _) in &pairs { - uuid_to_key.insert(*uuid, key.clone()); + for (key, uuid, _, _, deleted, _) in &pairs { + if !deleted { + uuid_to_key.insert(*uuid, key.clone()); + } } // Iterate edges: only process outgoing to avoid duplicates - for (key, uuid, _, _) in &pairs { + for (key, uuid, _, _, deleted, _) in &pairs { + if *deleted { continue; } let edges = match index::edges_for_node(db, uuid) { Ok(e) => e, Err(_) => continue,