Integrate redb into Store::load() with health check
- Add db: Option<Database> field to Store - Store::load() opens redb after replaying capnp logs - Health check compares node count + spot checks keys - Rebuilds automatically if db is missing, corrupt, or stale - Make table definitions public for cross-module access Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
2caccf875d
commit
6104c63890
3 changed files with 78 additions and 7 deletions
|
|
@ -15,10 +15,10 @@ use redb::{Database, ReadableDatabase, TableDefinition};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
// Table definitions
|
// Table definitions
|
||||||
const NODES: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
|
pub const NODES: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
|
||||||
const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key");
|
pub const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key");
|
||||||
const VISITS: TableDefinition<(&str, &str), i64> = TableDefinition::new("visits");
|
pub const VISITS: TableDefinition<(&str, &str), i64> = TableDefinition::new("visits");
|
||||||
const TRANSCRIPT_PROGRESS: TableDefinition<(&str, u32, &str), i64> =
|
pub const TRANSCRIPT_PROGRESS: TableDefinition<(&str, u32, &str), i64> =
|
||||||
TableDefinition::new("transcript_progress");
|
TableDefinition::new("transcript_progress");
|
||||||
|
|
||||||
/// Open or create the redb database, ensuring all tables exist.
|
/// Open or create the redb database, ensuring all tables exist.
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@
|
||||||
//
|
//
|
||||||
// capnp logs are the source of truth; redb provides indexed access.
|
// capnp logs are the source of truth; redb provides indexed access.
|
||||||
|
|
||||||
use super::types::*;
|
use super::{db, types::*};
|
||||||
|
use redb::ReadableTableMetadata;
|
||||||
|
|
||||||
use crate::memory_capnp;
|
use crate::memory_capnp;
|
||||||
|
|
||||||
|
|
@ -16,7 +17,7 @@ use std::io::{BufReader, Seek};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
/// Load store by replaying capnp logs.
|
/// Load store by replaying capnp logs, then open/verify redb indices.
|
||||||
pub fn load() -> Result<Store> {
|
pub fn load() -> Result<Store> {
|
||||||
let nodes_p = nodes_path();
|
let nodes_p = nodes_path();
|
||||||
let rels_p = relations_path();
|
let rels_p = relations_path();
|
||||||
|
|
@ -48,9 +49,60 @@ impl Store {
|
||||||
store.nodes.contains_key(&r.target_key)
|
store.nodes.contains_key(&r.target_key)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Open redb and verify/rebuild indices
|
||||||
|
let db_p = db_path();
|
||||||
|
store.db = Some(store.open_or_rebuild_db(&db_p)?);
|
||||||
|
|
||||||
Ok(store)
|
Ok(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Open redb database, rebuilding if unhealthy.
|
||||||
|
fn open_or_rebuild_db(&self, path: &Path) -> Result<redb::Database> {
|
||||||
|
// Try opening existing database
|
||||||
|
if path.exists() {
|
||||||
|
match db::open_db(path) {
|
||||||
|
Ok(database) => {
|
||||||
|
if self.db_is_healthy(&database)? {
|
||||||
|
return Ok(database);
|
||||||
|
}
|
||||||
|
eprintln!("redb index stale, rebuilding...");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("redb open failed ({}), rebuilding...", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild from in-memory state
|
||||||
|
db::rebuild_from_store(path, self)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if redb indices match in-memory state.
|
||||||
|
fn db_is_healthy(&self, database: &redb::Database) -> Result<bool> {
|
||||||
|
use redb::ReadableDatabase;
|
||||||
|
|
||||||
|
let txn = database.begin_read()?;
|
||||||
|
|
||||||
|
// Quick check: node count should match
|
||||||
|
let nodes_table = txn.open_table(db::NODES)?;
|
||||||
|
let db_count = nodes_table.len()?;
|
||||||
|
|
||||||
|
if db_count != self.nodes.len() as u64 {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spot check: verify a few random nodes exist with matching keys
|
||||||
|
// (full verification would be too slow)
|
||||||
|
for (i, key) in self.nodes.keys().enumerate() {
|
||||||
|
if i >= 10 { break; } // check first 10
|
||||||
|
if nodes_table.get(key.as_str())?.is_none() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
/// Replay node log, keeping latest version per UUID.
|
/// Replay node log, keeping latest version per UUID.
|
||||||
/// Tracks all UUIDs seen per key to detect duplicates.
|
/// Tracks all UUIDs seen per key to detect duplicates.
|
||||||
fn replay_nodes(&mut self, path: &Path) -> Result<()> {
|
fn replay_nodes(&mut self, path: &Path) -> Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -433,7 +433,6 @@ pub struct GapRecord {
|
||||||
pub(super) type VisitIndex = HashMap<String, HashMap<String, i64>>;
|
pub(super) type VisitIndex = HashMap<String, HashMap<String, i64>>;
|
||||||
|
|
||||||
// The full in-memory store
|
// The full in-memory store
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
pub nodes: HashMap<String, Node>, // key → latest node
|
pub nodes: HashMap<String, Node>, // key → latest node
|
||||||
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
|
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
|
||||||
|
|
@ -448,6 +447,26 @@ pub struct Store {
|
||||||
/// Log sizes at load time — used for staleness detection.
|
/// Log sizes at load time — used for staleness detection.
|
||||||
pub(crate) loaded_nodes_size: u64,
|
pub(crate) loaded_nodes_size: u64,
|
||||||
pub(crate) loaded_rels_size: u64,
|
pub(crate) loaded_rels_size: u64,
|
||||||
|
/// redb index database
|
||||||
|
pub(crate) db: Option<redb::Database>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Store {
|
||||||
|
fn default() -> Self {
|
||||||
|
Store {
|
||||||
|
nodes: HashMap::new(),
|
||||||
|
uuid_to_key: HashMap::new(),
|
||||||
|
relations: Vec::new(),
|
||||||
|
retrieval_log: Vec::new(),
|
||||||
|
gaps: Vec::new(),
|
||||||
|
params: Params::default(),
|
||||||
|
visits: HashMap::new(),
|
||||||
|
transcript_progress: HashMap::new(),
|
||||||
|
loaded_nodes_size: 0,
|
||||||
|
loaded_rels_size: 0,
|
||||||
|
db: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cap'n Proto serialization helpers
|
// Cap'n Proto serialization helpers
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue