With singleton Store (one daemon, RPC for clients), there's no concurrent writers to capnp log. The file-based flock and incremental refresh logic was for multi-process coordination we no longer need. -110 lines of dead concurrency code. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
454 lines
16 KiB
Rust
454 lines
16 KiB
Rust
// Persistence layer: load, replay, append
|
|
//
|
|
// capnp logs are the source of truth; redb provides indexed access.
|
|
|
|
use super::{index, types::*};
|
|
use redb::ReadableTableMetadata;
|
|
|
|
use crate::memory_capnp;
|
|
|
|
use anyhow::{Context, Result};
|
|
use capnp::message;
|
|
use capnp::serialize;
|
|
|
|
use std::collections::HashMap;
|
|
use std::fs;
|
|
use std::io::{BufReader, Seek};
|
|
use std::path::Path;
|
|
|
|
impl Store {
|
|
/// Load store by replaying capnp logs, then open/verify redb indices.
|
|
pub fn load() -> Result<Store> {
|
|
let nodes_p = nodes_path();
|
|
let rels_p = relations_path();
|
|
|
|
let mut store = Store::default();
|
|
|
|
if nodes_p.exists() {
|
|
store.replay_nodes(&nodes_p)?;
|
|
}
|
|
if rels_p.exists() {
|
|
store.replay_relations(&rels_p)?;
|
|
}
|
|
|
|
// Record log sizes after replay
|
|
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
|
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
// Drop edges referencing deleted/missing nodes
|
|
store.relations.retain(|r|
|
|
store.nodes.contains_key(&r.source_key) &&
|
|
store.nodes.contains_key(&r.target_key)
|
|
);
|
|
|
|
// Open redb and verify/rebuild indices
|
|
let db_p = db_path();
|
|
store.db = Some(store.open_or_rebuild_db(&db_p)?);
|
|
|
|
Ok(store)
|
|
}
|
|
|
|
/// Open redb database, rebuilding if unhealthy.
|
|
fn open_or_rebuild_db(&self, path: &Path) -> Result<redb::Database> {
|
|
// Try opening existing database
|
|
if path.exists() {
|
|
match index::open_db(path) {
|
|
Ok(database) => {
|
|
if self.db_is_healthy(&database)? {
|
|
return Ok(database);
|
|
}
|
|
eprintln!("redb index stale, rebuilding...");
|
|
}
|
|
Err(e) => {
|
|
eprintln!("redb open failed ({}), rebuilding...", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Rebuild index from capnp log
|
|
rebuild_index(path, &nodes_path())
|
|
}
|
|
|
|
/// Check if redb indices match in-memory state.
|
|
fn db_is_healthy(&self, database: &redb::Database) -> Result<bool> {
|
|
use redb::ReadableDatabase;
|
|
|
|
let txn = database.begin_read()?;
|
|
|
|
// Quick check: node count should match
|
|
let nodes_table = txn.open_table(index::NODES)?;
|
|
let db_count = nodes_table.len()?;
|
|
|
|
if db_count != self.nodes.len() as u64 {
|
|
return Ok(false);
|
|
}
|
|
|
|
// Spot check: verify a few random nodes exist with matching keys
|
|
// (full verification would be too slow)
|
|
for (i, key) in self.nodes.keys().enumerate() {
|
|
if i >= 10 { break; } // check first 10
|
|
if nodes_table.get(key.as_str())?.is_none() {
|
|
return Ok(false);
|
|
}
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Replay node log, keeping latest version per UUID.
|
|
/// Tracks all UUIDs seen per key to detect duplicates.
|
|
fn replay_nodes(&mut self, path: &Path) -> Result<()> {
|
|
let file = fs::File::open(path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Track all non-deleted UUIDs per key to detect duplicates
|
|
let mut key_uuids: HashMap<String, Vec<[u8; 16]>> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.with_context(|| format!("read node log"))?;
|
|
for node_reader in log.get_nodes()
|
|
.with_context(|| format!("get nodes"))? {
|
|
let node = Node::from_capnp_migrate(node_reader)?;
|
|
let existing_version = self.nodes.get(&node.key)
|
|
.map(|n| n.version)
|
|
.unwrap_or(0);
|
|
if node.version >= existing_version {
|
|
if node.deleted {
|
|
self.nodes.remove(&node.key);
|
|
self.uuid_to_key.remove(&node.uuid);
|
|
if let Some(uuids) = key_uuids.get_mut(&node.key) {
|
|
uuids.retain(|u| *u != node.uuid);
|
|
}
|
|
} else {
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
|
self.nodes.insert(node.key.clone(), node.clone());
|
|
let uuids = key_uuids.entry(node.key).or_default();
|
|
if !uuids.contains(&node.uuid) {
|
|
uuids.push(node.uuid);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Report duplicate keys
|
|
for (key, uuids) in &key_uuids {
|
|
if uuids.len() > 1 {
|
|
dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len());
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay relation log, keeping latest version per UUID
|
|
fn replay_relations(&mut self, path: &Path) -> Result<()> {
|
|
let file = fs::File::open(path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Collect all, then deduplicate by UUID keeping latest version
|
|
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
.with_context(|| format!("read relation log"))?;
|
|
for rel_reader in log.get_relations()
|
|
.with_context(|| format!("get relations"))? {
|
|
let rel = Relation::from_capnp_migrate(rel_reader)?;
|
|
let existing_version = by_uuid.get(&rel.uuid)
|
|
.map(|r| r.version)
|
|
.unwrap_or(0);
|
|
if rel.version >= existing_version {
|
|
by_uuid.insert(rel.uuid, rel);
|
|
}
|
|
}
|
|
}
|
|
|
|
self.relations = by_uuid.into_values()
|
|
.filter(|r| !r.deleted)
|
|
.collect();
|
|
Ok(())
|
|
}
|
|
|
|
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
|
|
/// Returns a map from key → vec of all live Node versions (one per UUID).
|
|
/// The "winner" in self.nodes is always one of them.
|
|
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>> {
|
|
let path = nodes_path();
|
|
if !path.exists() { return Ok(HashMap::new()); }
|
|
|
|
let file = fs::File::open(&path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Track latest version of each UUID
|
|
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.with_context(|| format!("read node log"))?;
|
|
for node_reader in log.get_nodes()
|
|
.with_context(|| format!("get nodes"))? {
|
|
let node = Node::from_capnp_migrate(node_reader)?;
|
|
let dominated = by_uuid.get(&node.uuid)
|
|
.map(|n| node.version >= n.version)
|
|
.unwrap_or(true);
|
|
if dominated {
|
|
by_uuid.insert(node.uuid, node);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Group live (non-deleted) nodes by key
|
|
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
|
|
for node in by_uuid.into_values() {
|
|
if !node.deleted {
|
|
by_key.entry(node.key.clone()).or_default().push(node);
|
|
}
|
|
}
|
|
|
|
// Keep only duplicates
|
|
by_key.retain(|_, nodes| nodes.len() > 1);
|
|
Ok(by_key)
|
|
}
|
|
|
|
/// Append nodes to the log file. Returns the offset where the message was written.
|
|
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<u64> {
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::node_log::Builder>();
|
|
let mut list = log.init_nodes(nodes.len() as u32);
|
|
for (i, node) in nodes.iter().enumerate() {
|
|
node.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.with_context(|| format!("serialize nodes"))?;
|
|
|
|
let path = nodes_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
|
|
// Get offset before writing
|
|
let offset = file.metadata().map(|m| m.len()).unwrap_or(0);
|
|
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.with_context(|| format!("write nodes"))?;
|
|
|
|
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
|
Ok(offset)
|
|
}
|
|
|
|
/// Append relations to the log file.
|
|
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> {
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
|
|
let mut list = log.init_relations(relations.len() as u32);
|
|
for (i, rel) in relations.iter().enumerate() {
|
|
rel.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.with_context(|| format!("serialize relations"))?;
|
|
|
|
let path = relations_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.with_context(|| format!("write relations"))?;
|
|
|
|
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
|
Ok(())
|
|
}
|
|
|
|
/// Placeholder - indices will be updated on write with redb.
|
|
pub fn save(&self) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Check and repair corrupt capnp log files.
|
|
///
|
|
/// Reads each message sequentially, tracking file position. On the first
|
|
/// corrupt message, truncates the file to the last good position. Also
|
|
/// removes stale caches so the next load replays from the repaired log.
|
|
pub fn fsck() -> Result<()> {
|
|
let mut any_corrupt = false;
|
|
|
|
for (path, kind) in [
|
|
(nodes_path(), "node"),
|
|
(relations_path(), "relation"),
|
|
] {
|
|
if !path.exists() { continue; }
|
|
|
|
let file = fs::File::open(&path)
|
|
.with_context(|| format!("open {}", path.display()))?;
|
|
let file_len = file.metadata()
|
|
.with_context(|| format!("stat {}", path.display()))?.len();
|
|
let mut reader = BufReader::new(file);
|
|
|
|
let mut good_messages = 0u64;
|
|
let mut last_good_pos = 0u64;
|
|
|
|
loop {
|
|
let pos = reader.stream_position()
|
|
.with_context(|| format!("tell {}", path.display()))?;
|
|
|
|
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
Ok(m) => m,
|
|
Err(_) => {
|
|
// read_message fails at EOF (normal) or on corrupt framing
|
|
if pos < file_len {
|
|
// Not at EOF — corrupt framing
|
|
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
|
|
any_corrupt = true;
|
|
drop(reader);
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
.with_context(|| format!("open for truncate"))?;
|
|
file.set_len(pos)
|
|
.with_context(|| format!("truncate {}", path.display()))?;
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
kind, file_len, pos, good_messages);
|
|
}
|
|
break;
|
|
}
|
|
};
|
|
|
|
// Validate the message content too
|
|
let valid = if kind == "node" {
|
|
msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.and_then(|l| l.get_nodes().map(|_| ()))
|
|
.is_ok()
|
|
} else {
|
|
msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
.and_then(|l| l.get_relations().map(|_| ()))
|
|
.is_ok()
|
|
};
|
|
|
|
if valid {
|
|
good_messages += 1;
|
|
last_good_pos = reader.stream_position()
|
|
.with_context(|| format!("tell {}", path.display()))?;
|
|
} else {
|
|
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
|
|
kind, pos, last_good_pos);
|
|
any_corrupt = true;
|
|
drop(reader);
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
.with_context(|| format!("open for truncate"))?;
|
|
file.set_len(last_good_pos)
|
|
.with_context(|| format!("truncate {}", path.display()))?;
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
kind, file_len, last_good_pos, good_messages);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !any_corrupt {
|
|
eprintln!("{}: {} messages, all clean", kind, good_messages);
|
|
}
|
|
}
|
|
|
|
if any_corrupt {
|
|
eprintln!("repair complete — run `poc-memory status` to verify");
|
|
} else {
|
|
eprintln!("store is clean");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Rebuild redb index from capnp log.
|
|
/// Scans the log, tracking offsets, and records latest version of each node.
|
|
fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
|
// Remove old database if it exists
|
|
if db_path.exists() {
|
|
fs::remove_file(db_path)
|
|
.with_context(|| format!("remove old db {}", db_path.display()))?;
|
|
}
|
|
|
|
let database = index::open_db(db_path)?;
|
|
|
|
if !capnp_path.exists() {
|
|
return Ok(database);
|
|
}
|
|
|
|
// Track latest (offset, uuid, version, deleted) per key
|
|
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool)> = HashMap::new();
|
|
|
|
let file = fs::File::open(capnp_path)
|
|
.with_context(|| format!("open {}", capnp_path.display()))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
loop {
|
|
let offset = reader.stream_position()?;
|
|
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
Ok(m) => m,
|
|
Err(_) => break,
|
|
};
|
|
|
|
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
|
|
Ok(l) => l,
|
|
Err(_) => continue,
|
|
};
|
|
|
|
let nodes = match log.get_nodes() {
|
|
Ok(n) => n,
|
|
Err(_) => continue,
|
|
};
|
|
for node_reader in nodes {
|
|
let key = node_reader.get_key().ok()
|
|
.and_then(|t| t.to_str().ok())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
if key.is_empty() { continue; }
|
|
|
|
let version = node_reader.get_version();
|
|
let deleted = node_reader.get_deleted();
|
|
|
|
let mut uuid = [0u8; 16];
|
|
if let Ok(data) = node_reader.get_uuid() {
|
|
if data.len() >= 16 {
|
|
uuid.copy_from_slice(&data[..16]);
|
|
}
|
|
}
|
|
|
|
// Keep if newer version
|
|
let dominated = latest.get(&key)
|
|
.map(|(_, _, v, _)| version >= *v)
|
|
.unwrap_or(true);
|
|
if dominated {
|
|
latest.insert(key, (offset, uuid, version, deleted));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write index entries for non-deleted nodes
|
|
{
|
|
let txn = database.begin_write()?;
|
|
{
|
|
let mut nodes_table = txn.open_table(index::NODES)?;
|
|
let mut uuid_table = txn.open_table(index::UUID_TO_KEY)?;
|
|
|
|
for (key, (offset, uuid, _, deleted)) in latest {
|
|
if !deleted {
|
|
nodes_table.insert(key.as_str(), offset)?;
|
|
uuid_table.insert(uuid.as_slice(), key.as_str())?;
|
|
}
|
|
}
|
|
}
|
|
txn.commit()?;
|
|
}
|
|
|
|
Ok(database)
|
|
}
|