forked from kent/consciousness
store: fsck improvements, fix index rebuild and batch offset bug
- Add fsck_full(): compares current index with rebuilt, reports zombies/missing - Add repair_index(): rebuilds index from capnp log - Index rebuild now uses timestamp (not version) for "latest" detection Fixes tombstones shadowing restored nodes when version numbers reset - Add read_node_at_offset_for_key() to handle batch writes correctly When multiple nodes share an offset, filter by key to get the right one - Add find_latest_by_key() and find_last_live_version() for restore support Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
6ec7fcb777
commit
290505fc51
2 changed files with 203 additions and 10 deletions
|
|
@ -212,7 +212,9 @@ impl Relation {
|
|||
|
||||
/// Read a single node at the given offset in the capnp log.
|
||||
/// The offset must point to a valid message containing the node.
|
||||
pub fn read_node_at_offset(offset: u64) -> Result<Node> {
|
||||
/// Read a node at a given offset. If `target_key` is provided, find that specific
|
||||
/// node in the message (handles batch writes where multiple nodes share an offset).
|
||||
pub fn read_node_at_offset_for_key(offset: u64, target_key: Option<&str>) -> Result<Node> {
|
||||
let path = nodes_path();
|
||||
let mut file = fs::File::open(&path)
|
||||
.with_context(|| format!("open {}", path.display()))?;
|
||||
|
|
@ -229,13 +231,22 @@ pub fn read_node_at_offset(offset: u64) -> Result<Node> {
|
|||
let nodes = log.get_nodes()
|
||||
.with_context(|| "get nodes")?;
|
||||
|
||||
// A message at this offset should have exactly one node (from upsert),
|
||||
// or we take the last one if there are multiple (from batch operations like rename)
|
||||
if nodes.is_empty() {
|
||||
anyhow::bail!("no nodes in message at offset {}", offset);
|
||||
}
|
||||
|
||||
// Return the first non-deleted node, or the first one if all are deleted
|
||||
// If target_key specified, find that specific node
|
||||
if let Some(key) = target_key {
|
||||
for node_reader in nodes.iter() {
|
||||
let node = Node::from_capnp_migrate(node_reader)?;
|
||||
if node.key == key {
|
||||
return Ok(node);
|
||||
}
|
||||
}
|
||||
anyhow::bail!("node '{}' not found in message at offset {}", key, offset);
|
||||
}
|
||||
|
||||
// No target key - return first non-deleted, or first if all deleted
|
||||
for node_reader in nodes.iter() {
|
||||
let node = Node::from_capnp_migrate(node_reader)?;
|
||||
if !node.deleted {
|
||||
|
|
@ -243,10 +254,14 @@ pub fn read_node_at_offset(offset: u64) -> Result<Node> {
|
|||
}
|
||||
}
|
||||
|
||||
// All nodes in this message are deleted - shouldn't happen if index is correct
|
||||
Node::from_capnp_migrate(nodes.get(0))
|
||||
}
|
||||
|
||||
/// Read a node at offset (legacy, no key filtering)
|
||||
pub fn read_node_at_offset(offset: u64) -> Result<Node> {
|
||||
read_node_at_offset_for_key(offset, None)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Store persistence methods
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -414,6 +429,88 @@ impl Store {
|
|||
Ok(by_key)
|
||||
}
|
||||
|
||||
/// Find the most recent version of a node by key (including deleted).
|
||||
/// Scans the entire log. Used for version continuity when recreating deleted nodes.
|
||||
pub fn find_latest_by_key(&self, target_key: &str) -> Result<Option<Node>> {
|
||||
let path = nodes_path();
|
||||
if !path.exists() { return Ok(None); }
|
||||
|
||||
let file = fs::File::open(&path)
|
||||
.with_context(|| format!("open {}", path.display()))?;
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
let mut latest: Option<Node> = None;
|
||||
|
||||
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
||||
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
|
||||
Ok(l) => l,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let nodes = match log.get_nodes() {
|
||||
Ok(n) => n,
|
||||
Err(_) => continue,
|
||||
};
|
||||
for node_reader in nodes {
|
||||
let node = match Node::from_capnp_migrate(node_reader) {
|
||||
Ok(n) => n,
|
||||
Err(_) => continue,
|
||||
};
|
||||
if node.key != target_key { continue; }
|
||||
// Keep if newer timestamp (handles version resets)
|
||||
let dominated = latest.as_ref()
|
||||
.map(|l| node.timestamp >= l.timestamp)
|
||||
.unwrap_or(true);
|
||||
if dominated {
|
||||
latest = Some(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(latest)
|
||||
}
|
||||
|
||||
/// Find the last non-deleted version of a node by key.
|
||||
/// Scans the entire log. Used for restore operations.
|
||||
pub fn find_last_live_version(&self, target_key: &str) -> Result<Option<Node>> {
|
||||
let path = nodes_path();
|
||||
if !path.exists() { return Ok(None); }
|
||||
|
||||
let file = fs::File::open(&path)
|
||||
.with_context(|| format!("open {}", path.display()))?;
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
let mut last_live: Option<Node> = None;
|
||||
|
||||
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
||||
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
|
||||
Ok(l) => l,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let nodes = match log.get_nodes() {
|
||||
Ok(n) => n,
|
||||
Err(_) => continue,
|
||||
};
|
||||
for node_reader in nodes {
|
||||
let node = match Node::from_capnp_migrate(node_reader) {
|
||||
Ok(n) => n,
|
||||
Err(_) => continue,
|
||||
};
|
||||
if node.key != target_key { continue; }
|
||||
if !node.deleted {
|
||||
// Keep the most recent non-deleted version by timestamp
|
||||
let dominated = last_live.as_ref()
|
||||
.map(|l| node.timestamp >= l.timestamp)
|
||||
.unwrap_or(true);
|
||||
if dominated {
|
||||
last_live = Some(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(last_live)
|
||||
}
|
||||
|
||||
/// Append nodes to the log file. Returns the offset where the message was written.
|
||||
pub fn append_nodes(&self, nodes: &[Node]) -> Result<u64> {
|
||||
use std::sync::atomic::Ordering;
|
||||
|
|
@ -646,9 +743,9 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
|||
}
|
||||
}
|
||||
|
||||
// Keep if newer version
|
||||
// Keep if newer timestamp (not version - version can reset after delete/recreate)
|
||||
let dominated = latest.get(&key)
|
||||
.map(|(_, _, v, _, _, _, _)| version >= *v)
|
||||
.map(|(_, _, _, _, _, ts, _)| timestamp >= *ts)
|
||||
.unwrap_or(true);
|
||||
if dominated {
|
||||
latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance));
|
||||
|
|
@ -690,3 +787,100 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
|||
|
||||
Ok(database)
|
||||
}
|
||||
|
||||
/// Fsck report — discrepancies found between capnp logs and redb index.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct FsckReport {
|
||||
/// Keys in current index but not in rebuilt (zombie entries)
|
||||
pub zombies: Vec<String>,
|
||||
/// Keys in rebuilt but not in current index (missing from index)
|
||||
pub missing: Vec<String>,
|
||||
/// Was capnp log repaired?
|
||||
pub capnp_repaired: bool,
|
||||
}
|
||||
|
||||
impl FsckReport {
|
||||
pub fn is_clean(&self) -> bool {
|
||||
self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired
|
||||
}
|
||||
}
|
||||
|
||||
/// Full fsck: verify capnp logs, rebuild index to temp, compare with current.
|
||||
/// Returns a report of discrepancies found.
|
||||
pub fn fsck_full() -> Result<FsckReport> {
|
||||
use redb::{ReadableDatabase, ReadableTable};
|
||||
use tempfile::TempDir;
|
||||
|
||||
let mut report = FsckReport::default();
|
||||
|
||||
// Step 1: Run capnp log fsck (may truncate corrupt messages)
|
||||
// We need to check if it did repairs — currently fsck() just prints to stderr
|
||||
// For now, we'll re-check after by comparing file sizes
|
||||
let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
|
||||
fsck()?;
|
||||
let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
|
||||
report.capnp_repaired = nodes_size_after != nodes_size_before;
|
||||
|
||||
// Step 2: Rebuild index to temp file
|
||||
let temp_dir = TempDir::new().context("create temp dir")?;
|
||||
let temp_db_path = temp_dir.path().join("rebuilt.redb");
|
||||
let rebuilt_db = rebuild_index(&temp_db_path, &nodes_path())?;
|
||||
|
||||
// Step 3: Copy current index to temp and open (avoids write lock contention)
|
||||
let current_db_path = db_path();
|
||||
if !current_db_path.exists() {
|
||||
// No current index — all rebuilt keys are "missing"
|
||||
let txn = rebuilt_db.begin_read()?;
|
||||
let table = txn.open_table(index::NODES)?;
|
||||
for entry in table.iter()? {
|
||||
let (key, _) = entry?;
|
||||
report.missing.push(key.value().to_string());
|
||||
}
|
||||
return Ok(report);
|
||||
}
|
||||
|
||||
// Copy to temp to avoid lock contention with running daemon
|
||||
let current_copy_path = temp_dir.path().join("current.redb");
|
||||
fs::copy(¤t_db_path, ¤t_copy_path)
|
||||
.with_context(|| format!("copy {} to temp", current_db_path.display()))?;
|
||||
|
||||
let current_db = redb::Database::open(¤t_copy_path)
|
||||
.with_context(|| format!("open current db copy"))?;
|
||||
|
||||
// Step 4: Compare NODES tables
|
||||
// Collect all keys from both
|
||||
let rebuilt_keys: std::collections::HashSet<String> = {
|
||||
let txn = rebuilt_db.begin_read()?;
|
||||
let table = txn.open_table(index::NODES)?;
|
||||
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
|
||||
};
|
||||
|
||||
let current_keys: std::collections::HashSet<String> = {
|
||||
let txn = current_db.begin_read()?;
|
||||
let table = txn.open_table(index::NODES)?;
|
||||
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
|
||||
};
|
||||
|
||||
// Keys in current but not rebuilt = zombies (shouldn't exist)
|
||||
for key in current_keys.difference(&rebuilt_keys) {
|
||||
report.zombies.push(key.clone());
|
||||
}
|
||||
report.zombies.sort();
|
||||
|
||||
// Keys in rebuilt but not current = missing (should exist but don't)
|
||||
for key in rebuilt_keys.difference(¤t_keys) {
|
||||
report.missing.push(key.clone());
|
||||
}
|
||||
report.missing.sort();
|
||||
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
/// Repair the index by rebuilding from capnp logs.
|
||||
/// Use after fsck_full() reports discrepancies.
|
||||
pub fn repair_index() -> Result<()> {
|
||||
let db_path = db_path();
|
||||
rebuild_index(&db_path, &nodes_path())?;
|
||||
eprintln!("index rebuilt from capnp log");
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,8 +27,7 @@ pub use types::{
|
|||
new_node, new_relation,
|
||||
};
|
||||
pub use view::StoreView;
|
||||
pub use capnp::fsck;
|
||||
pub use ops::current_provenance;
|
||||
pub use capnp::{fsck, fsck_full, repair_index, FsckReport};
|
||||
|
||||
use crate::graph::{self, Graph};
|
||||
|
||||
|
|
@ -81,7 +80,7 @@ impl Store {
|
|||
.ok_or_else(|| anyhow::anyhow!("store not loaded"))?;
|
||||
|
||||
match index::get_offset(db, key)? {
|
||||
Some(offset) => Ok(Some(capnp::read_node_at_offset(offset)?)),
|
||||
Some(offset) => Ok(Some(capnp::read_node_at_offset_for_key(offset, Some(key))?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue