diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index 3f0e229..c41212e 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -212,7 +212,9 @@ impl Relation { /// Read a single node at the given offset in the capnp log. /// The offset must point to a valid message containing the node. -pub fn read_node_at_offset(offset: u64) -> Result { +/// Read a node at a given offset. If `target_key` is provided, find that specific +/// node in the message (handles batch writes where multiple nodes share an offset). +pub fn read_node_at_offset_for_key(offset: u64, target_key: Option<&str>) -> Result { let path = nodes_path(); let mut file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; @@ -229,13 +231,22 @@ pub fn read_node_at_offset(offset: u64) -> Result { let nodes = log.get_nodes() .with_context(|| "get nodes")?; - // A message at this offset should have exactly one node (from upsert), - // or we take the last one if there are multiple (from batch operations like rename) if nodes.is_empty() { anyhow::bail!("no nodes in message at offset {}", offset); } - // Return the first non-deleted node, or the first one if all are deleted + // If target_key specified, find that specific node + if let Some(key) = target_key { + for node_reader in nodes.iter() { + let node = Node::from_capnp_migrate(node_reader)?; + if node.key == key { + return Ok(node); + } + } + anyhow::bail!("node '{}' not found in message at offset {}", key, offset); + } + + // No target key - return first non-deleted, or first if all deleted for node_reader in nodes.iter() { let node = Node::from_capnp_migrate(node_reader)?; if !node.deleted { @@ -243,10 +254,14 @@ pub fn read_node_at_offset(offset: u64) -> Result { } } - // All nodes in this message are deleted - shouldn't happen if index is correct Node::from_capnp_migrate(nodes.get(0)) } +/// Read a node at offset (legacy, no key filtering) +pub fn read_node_at_offset(offset: u64) -> Result { + read_node_at_offset_for_key(offset, None) +} + // --------------------------------------------------------------------------- // Store persistence methods // --------------------------------------------------------------------------- @@ -414,6 +429,88 @@ impl Store { Ok(by_key) } + /// Find the most recent version of a node by key (including deleted). + /// Scans the entire log. Used for version continuity when recreating deleted nodes. + pub fn find_latest_by_key(&self, target_key: &str) -> Result> { + let path = nodes_path(); + if !path.exists() { return Ok(None); } + + let file = fs::File::open(&path) + .with_context(|| format!("open {}", path.display()))?; + let mut reader = BufReader::new(file); + + let mut latest: Option = None; + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = match msg.get_root::() { + Ok(l) => l, + Err(_) => continue, + }; + let nodes = match log.get_nodes() { + Ok(n) => n, + Err(_) => continue, + }; + for node_reader in nodes { + let node = match Node::from_capnp_migrate(node_reader) { + Ok(n) => n, + Err(_) => continue, + }; + if node.key != target_key { continue; } + // Keep if newer timestamp (handles version resets) + let dominated = latest.as_ref() + .map(|l| node.timestamp >= l.timestamp) + .unwrap_or(true); + if dominated { + latest = Some(node); + } + } + } + + Ok(latest) + } + + /// Find the last non-deleted version of a node by key. + /// Scans the entire log. Used for restore operations. + pub fn find_last_live_version(&self, target_key: &str) -> Result> { + let path = nodes_path(); + if !path.exists() { return Ok(None); } + + let file = fs::File::open(&path) + .with_context(|| format!("open {}", path.display()))?; + let mut reader = BufReader::new(file); + + let mut last_live: Option = None; + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = match msg.get_root::() { + Ok(l) => l, + Err(_) => continue, + }; + let nodes = match log.get_nodes() { + Ok(n) => n, + Err(_) => continue, + }; + for node_reader in nodes { + let node = match Node::from_capnp_migrate(node_reader) { + Ok(n) => n, + Err(_) => continue, + }; + if node.key != target_key { continue; } + if !node.deleted { + // Keep the most recent non-deleted version by timestamp + let dominated = last_live.as_ref() + .map(|l| node.timestamp >= l.timestamp) + .unwrap_or(true); + if dominated { + last_live = Some(node); + } + } + } + } + + Ok(last_live) + } + /// Append nodes to the log file. Returns the offset where the message was written. pub fn append_nodes(&self, nodes: &[Node]) -> Result { use std::sync::atomic::Ordering; @@ -646,9 +743,9 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { } } - // Keep if newer version + // Keep if newer timestamp (not version - version can reset after delete/recreate) let dominated = latest.get(&key) - .map(|(_, _, v, _, _, _, _)| version >= *v) + .map(|(_, _, _, _, _, ts, _)| timestamp >= *ts) .unwrap_or(true); if dominated { latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance)); @@ -690,3 +787,100 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { Ok(database) } + +/// Fsck report — discrepancies found between capnp logs and redb index. +#[derive(Debug, Default)] +pub struct FsckReport { + /// Keys in current index but not in rebuilt (zombie entries) + pub zombies: Vec, + /// Keys in rebuilt but not in current index (missing from index) + pub missing: Vec, + /// Was capnp log repaired? + pub capnp_repaired: bool, +} + +impl FsckReport { + pub fn is_clean(&self) -> bool { + self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired + } +} + +/// Full fsck: verify capnp logs, rebuild index to temp, compare with current. +/// Returns a report of discrepancies found. +pub fn fsck_full() -> Result { + use redb::{ReadableDatabase, ReadableTable}; + use tempfile::TempDir; + + let mut report = FsckReport::default(); + + // Step 1: Run capnp log fsck (may truncate corrupt messages) + // We need to check if it did repairs — currently fsck() just prints to stderr + // For now, we'll re-check after by comparing file sizes + let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); + fsck()?; + let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0); + report.capnp_repaired = nodes_size_after != nodes_size_before; + + // Step 2: Rebuild index to temp file + let temp_dir = TempDir::new().context("create temp dir")?; + let temp_db_path = temp_dir.path().join("rebuilt.redb"); + let rebuilt_db = rebuild_index(&temp_db_path, &nodes_path())?; + + // Step 3: Copy current index to temp and open (avoids write lock contention) + let current_db_path = db_path(); + if !current_db_path.exists() { + // No current index — all rebuilt keys are "missing" + let txn = rebuilt_db.begin_read()?; + let table = txn.open_table(index::NODES)?; + for entry in table.iter()? { + let (key, _) = entry?; + report.missing.push(key.value().to_string()); + } + return Ok(report); + } + + // Copy to temp to avoid lock contention with running daemon + let current_copy_path = temp_dir.path().join("current.redb"); + fs::copy(¤t_db_path, ¤t_copy_path) + .with_context(|| format!("copy {} to temp", current_db_path.display()))?; + + let current_db = redb::Database::open(¤t_copy_path) + .with_context(|| format!("open current db copy"))?; + + // Step 4: Compare NODES tables + // Collect all keys from both + let rebuilt_keys: std::collections::HashSet = { + let txn = rebuilt_db.begin_read()?; + let table = txn.open_table(index::NODES)?; + table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? + }; + + let current_keys: std::collections::HashSet = { + let txn = current_db.begin_read()?; + let table = txn.open_table(index::NODES)?; + table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::>()? + }; + + // Keys in current but not rebuilt = zombies (shouldn't exist) + for key in current_keys.difference(&rebuilt_keys) { + report.zombies.push(key.clone()); + } + report.zombies.sort(); + + // Keys in rebuilt but not current = missing (should exist but don't) + for key in rebuilt_keys.difference(¤t_keys) { + report.missing.push(key.clone()); + } + report.missing.sort(); + + Ok(report) +} + +/// Repair the index by rebuilding from capnp logs. +/// Use after fsck_full() reports discrepancies. +pub fn repair_index() -> Result<()> { + let db_path = db_path(); + rebuild_index(&db_path, &nodes_path())?; + eprintln!("index rebuilt from capnp log"); + Ok(()) +} diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 67326c3..fee0770 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -27,8 +27,7 @@ pub use types::{ new_node, new_relation, }; pub use view::StoreView; -pub use capnp::fsck; -pub use ops::current_provenance; +pub use capnp::{fsck, fsck_full, repair_index, FsckReport}; use crate::graph::{self, Graph}; @@ -81,7 +80,7 @@ impl Store { .ok_or_else(|| anyhow::anyhow!("store not loaded"))?; match index::get_offset(db, key)? { - Some(offset) => Ok(Some(capnp::read_node_at_offset(offset)?)), + Some(offset) => Ok(Some(capnp::read_node_at_offset_for_key(offset, Some(key))?)), None => Ok(None), } }