store: add weight to index, index-only key matching

- KEY_TO_UUID now stores weight (30 bytes: uuid+type+ts+deleted+weight)
- UUID_OFFSETS changed to composite key for O(log n) max-offset lookup
- Add NODES_BY_TYPE index for efficient type+date range queries
- Add for_each_key_weight() to StoreView for index-only iteration
- match_seeds uses index-only path when content not needed
- Fix transaction consistency in ops (single txn for related updates)
- rebuild() now records all uuid→offset mappings for version history
- Backwards compatible: old index formats decoded with default weight

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-15 05:03:32 -04:00
parent fc978e2f2e
commit ba4e01b6f3
9 changed files with 776 additions and 502 deletions

View file

@ -8,8 +8,6 @@
// - fsck (corruption repair)
use super::{index, types::*};
use redb::ReadableTableMetadata;
use crate::memory_capnp;
use super::Store;
@ -262,6 +260,47 @@ pub fn read_node_at_offset(offset: u64) -> Result<Node> {
read_node_at_offset_for_key(offset, None)
}
/// Iterate over all nodes in the capnp log, yielding (offset, Node) pairs.
/// Nodes are yielded in log order (oldest first).
/// Multiple nodes in the same message share the same offset.
pub fn iter_nodes() -> Result<Vec<(u64, Node)>> {
let path = nodes_path();
if !path.exists() {
return Ok(Vec::new());
}
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut results = Vec::new();
loop {
let offset = reader.stream_position()?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => break, // EOF or corrupt
};
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
if let Ok(node) = Node::from_capnp_migrate(node_reader) {
results.push((offset, node));
}
}
}
Ok(results)
}
// ---------------------------------------------------------------------------
// Store persistence methods
// ---------------------------------------------------------------------------
@ -274,9 +313,9 @@ impl Store {
let mut store = Store::default();
// Open redb index first (rebuilds from capnp if needed)
// Open redb index (rebuilds from capnp if needed)
let db_p = db_path();
store.db = Some(store.open_or_rebuild_db(&db_p)?);
store.db = Some(index::open_or_rebuild(&db_p)?);
// Replay relations
if rels_p.exists() {
@ -294,64 +333,9 @@ impl Store {
Ordering::Relaxed
);
// Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped)
Ok(store)
}
/// Open redb database, rebuilding if unhealthy.
fn open_or_rebuild_db(&self, path: &Path) -> Result<redb::Database> {
// Try opening existing database
if path.exists() {
match index::open_db(path) {
Ok(database) => {
if self.db_is_healthy(&database)? {
return Ok(database);
}
eprintln!("redb index stale, rebuilding...");
}
Err(e) => {
eprintln!("redb open failed ({}), rebuilding...", e);
}
}
}
// Rebuild index from capnp log
rebuild_index(path, &nodes_path())
}
/// Check if redb index is healthy by verifying some offsets are valid.
fn db_is_healthy(&self, database: &redb::Database) -> Result<bool> {
use redb::{ReadableDatabase, ReadableTable};
let txn = database.begin_read()?;
let nodes_table = txn.open_table(index::NODES)?;
// Check that we can read the table and it has entries
if nodes_table.len()? == 0 {
// Empty database - might be stale or new
let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
return Ok(capnp_size == 0); // healthy only if capnp is also empty
}
// Spot check: verify a few offsets point to valid messages
let mut checked = 0;
for entry in nodes_table.iter()? {
if checked >= 5 { break; }
let (key, offset) = entry?;
let offset = offset.value();
// Try to read the node at this offset
if read_node_at_offset(offset).is_err() {
return Ok(false);
}
checked += 1;
let _ = key; // silence unused warning
}
Ok(true)
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
@ -429,88 +413,6 @@ impl Store {
Ok(by_key)
}
/// Find the most recent version of a node by key (including deleted).
/// Scans the entire log. Used for version continuity when recreating deleted nodes.
pub fn find_latest_by_key(&self, target_key: &str) -> Result<Option<Node>> {
let path = nodes_path();
if !path.exists() { return Ok(None); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut latest: Option<Node> = None;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let node = match Node::from_capnp_migrate(node_reader) {
Ok(n) => n,
Err(_) => continue,
};
if node.key != target_key { continue; }
// Keep if newer timestamp (handles version resets)
let dominated = latest.as_ref()
.map(|l| node.timestamp >= l.timestamp)
.unwrap_or(true);
if dominated {
latest = Some(node);
}
}
}
Ok(latest)
}
/// Find the last non-deleted version of a node by key.
/// Scans the entire log. Used for restore operations.
pub fn find_last_live_version(&self, target_key: &str) -> Result<Option<Node>> {
let path = nodes_path();
if !path.exists() { return Ok(None); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut last_live: Option<Node> = None;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let node = match Node::from_capnp_migrate(node_reader) {
Ok(n) => n,
Err(_) => continue,
};
if node.key != target_key { continue; }
if !node.deleted {
// Keep the most recent non-deleted version by timestamp
let dominated = last_live.as_ref()
.map(|l| node.timestamp >= l.timestamp)
.unwrap_or(true);
if dominated {
last_live = Some(node);
}
}
}
}
Ok(last_live)
}
/// Append nodes to the log file. Returns the offset where the message was written.
pub fn append_nodes(&self, nodes: &[Node]) -> Result<u64> {
use std::sync::atomic::Ordering;
@ -680,207 +582,3 @@ pub fn fsck() -> Result<()> {
Ok(())
}
/// Rebuild redb index from capnp log.
/// Scans the log, tracking offsets, and records latest version of each node.
fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
// Remove old database if it exists
if db_path.exists() {
fs::remove_file(db_path)
.with_context(|| format!("remove old db {}", db_path.display()))?;
}
let database = index::open_db(db_path)?;
if !capnp_path.exists() {
return Ok(database);
}
// Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool, u8, i64, String)> = HashMap::new();
let file = fs::File::open(capnp_path)
.with_context(|| format!("open {}", capnp_path.display()))?;
let mut reader = BufReader::new(file);
loop {
let offset = reader.stream_position()?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => break,
};
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let key = node_reader.get_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
if key.is_empty() { continue; }
let version = node_reader.get_version();
let deleted = node_reader.get_deleted();
let node_type = node_reader.get_node_type()
.map(|t| t as u8)
.unwrap_or(0);
let timestamp = node_reader.get_timestamp();
let provenance = node_reader.get_provenance().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("manual")
.to_string();
let mut uuid = [0u8; 16];
if let Ok(data) = node_reader.get_uuid() {
if data.len() >= 16 {
uuid.copy_from_slice(&data[..16]);
}
}
// Keep if newer timestamp (not version - version can reset after delete/recreate)
let dominated = latest.get(&key)
.map(|(_, _, _, _, _, ts, _)| timestamp >= *ts)
.unwrap_or(true);
if dominated {
latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance));
}
}
}
// Write index entries for non-deleted nodes
{
let txn = database.begin_write()?;
{
let mut nodes_table = txn.open_table(index::NODES)?;
let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?;
let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?;
let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?;
for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest {
if !deleted {
nodes_table.insert(key.as_str(), offset)?;
// Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes
let mut packed = [0u8; 25];
packed[0..16].copy_from_slice(&uuid);
packed[16] = node_type;
packed[17..25].copy_from_slice(&timestamp.to_be_bytes());
key_uuid_table.insert(key.as_str(), packed.as_slice())?;
// Pack: [negated_timestamp:8][key] for descending sort
let neg_ts = (!timestamp).to_be_bytes();
let mut prov_val = Vec::with_capacity(8 + key.len());
prov_val.extend_from_slice(&neg_ts);
prov_val.extend_from_slice(key.as_bytes());
by_provenance.insert(provenance.as_str(), prov_val.as_slice())?;
}
// Always record offset in UUID history (even for deleted)
uuid_offsets.insert(uuid.as_slice(), offset)?;
}
}
txn.commit()?;
}
Ok(database)
}
/// Fsck report — discrepancies found between capnp logs and redb index.
#[derive(Debug, Default)]
pub struct FsckReport {
/// Keys in current index but not in rebuilt (zombie entries)
pub zombies: Vec<String>,
/// Keys in rebuilt but not in current index (missing from index)
pub missing: Vec<String>,
/// Was capnp log repaired?
pub capnp_repaired: bool,
}
impl FsckReport {
pub fn is_clean(&self) -> bool {
self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired
}
}
/// Full fsck: verify capnp logs, rebuild index to temp, compare with current.
/// Returns a report of discrepancies found.
pub fn fsck_full() -> Result<FsckReport> {
use redb::{ReadableDatabase, ReadableTable};
use tempfile::TempDir;
let mut report = FsckReport::default();
// Step 1: Run capnp log fsck (may truncate corrupt messages)
// We need to check if it did repairs — currently fsck() just prints to stderr
// For now, we'll re-check after by comparing file sizes
let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
fsck()?;
let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
report.capnp_repaired = nodes_size_after != nodes_size_before;
// Step 2: Rebuild index to temp file
let temp_dir = TempDir::new().context("create temp dir")?;
let temp_db_path = temp_dir.path().join("rebuilt.redb");
let rebuilt_db = rebuild_index(&temp_db_path, &nodes_path())?;
// Step 3: Copy current index to temp and open (avoids write lock contention)
let current_db_path = db_path();
if !current_db_path.exists() {
// No current index — all rebuilt keys are "missing"
let txn = rebuilt_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
for entry in table.iter()? {
let (key, _) = entry?;
report.missing.push(key.value().to_string());
}
return Ok(report);
}
// Copy to temp to avoid lock contention with running daemon
let current_copy_path = temp_dir.path().join("current.redb");
fs::copy(&current_db_path, &current_copy_path)
.with_context(|| format!("copy {} to temp", current_db_path.display()))?;
let current_db = redb::Database::open(&current_copy_path)
.with_context(|| format!("open current db copy"))?;
// Step 4: Compare NODES tables
// Collect all keys from both
let rebuilt_keys: std::collections::HashSet<String> = {
let txn = rebuilt_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
};
let current_keys: std::collections::HashSet<String> = {
let txn = current_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
};
// Keys in current but not rebuilt = zombies (shouldn't exist)
for key in current_keys.difference(&rebuilt_keys) {
report.zombies.push(key.clone());
}
report.zombies.sort();
// Keys in rebuilt but not current = missing (should exist but don't)
for key in rebuilt_keys.difference(&current_keys) {
report.missing.push(key.clone());
}
report.missing.sort();
Ok(report)
}
/// Repair the index by rebuilding from capnp logs.
/// Use after fsck_full() reports discrepancies.
pub fn repair_index() -> Result<()> {
let db_path = db_path();
rebuild_index(&db_path, &nodes_path())?;
eprintln!("index rebuilt from capnp log");
Ok(())
}