consciousness/src/hippocampus/store/capnp.rs

887 lines
32 KiB
Rust
Raw Normal View History

// Cap'n Proto serialization and persistence
//
// capnp logs are the source of truth; redb provides indexed access.
// This module contains:
// - Serialization macros (capnp_enum!, capnp_message!)
// - Load/replay from capnp logs
// - Append to capnp logs
// - fsck (corruption repair)
use super::{index, types::*};
use redb::ReadableTableMetadata;
use crate::memory_capnp;
use super::Store;
use anyhow::{anyhow, Context, Result};
use capnp::message;
use capnp::serialize;
use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, Seek};
use std::path::Path;
// ---------------------------------------------------------------------------
// Capnp serialization macros
//
// Declarative mapping between Rust types and capnp generated types.
// Adding a field to the schema means adding it in one place below;
// both read and write are generated from the same declaration.
// ---------------------------------------------------------------------------
/// Generate to_capnp/from_capnp conversion methods for an enum.
macro_rules! capnp_enum {
($rust_type:ident, $capnp_type:path, [$($variant:ident),+ $(,)?]) => {
impl $rust_type {
#[allow(clippy::wrong_self_convention, dead_code)]
pub(crate) fn to_capnp(&self) -> $capnp_type {
match self {
$(Self::$variant => <$capnp_type>::$variant,)+
}
}
pub(crate) fn from_capnp(v: $capnp_type) -> Self {
match v {
$(<$capnp_type>::$variant => Self::$variant,)+
}
}
}
};
}
/// Generate from_capnp/to_capnp methods for a struct with capnp serialization.
/// Fields are grouped by serialization kind:
/// text - capnp Text fields (String in Rust)
/// uuid - capnp Data fields ([u8; 16] in Rust)
/// prim - copy types (u32, f32, f64, bool)
/// enm - enums with to_capnp/from_capnp methods
/// skip - Rust-only fields not in capnp (set to Default on read)
macro_rules! capnp_message {
(
$struct:ident,
reader: $reader:ty,
builder: $builder:ty,
text: [$($tf:ident),* $(,)?],
uuid: [$($uf:ident),* $(,)?],
prim: [$($pf:ident),* $(,)?],
enm: [$($ef:ident: $et:ident),* $(,)?],
skip: [$($sf:ident),* $(,)?] $(,)?
) => {
impl $struct {
pub fn from_capnp(r: $reader) -> Result<Self> {
paste::paste! {
Ok(Self {
$($tf: read_text(r.[<get_ $tf>]()),)*
$($uf: read_uuid(r.[<get_ $uf>]()),)*
$($pf: r.[<get_ $pf>](),)*
$($ef: $et::from_capnp(
r.[<get_ $ef>]().map_err(|_| anyhow!(concat!("bad ", stringify!($ef))))?
),)*
$($sf: Default::default(),)*
})
}
}
pub fn to_capnp(&self, mut b: $builder) {
paste::paste! {
$(b.[<set_ $tf>](&self.$tf);)*
$(b.[<set_ $uf>](&self.$uf);)*
$(b.[<set_ $pf>](self.$pf);)*
$(b.[<set_ $ef>](self.$ef.to_capnp());)*
}
}
}
};
}
// ---------------------------------------------------------------------------
// Capnp helpers
// ---------------------------------------------------------------------------
/// Read a capnp text field, returning empty string on any error
fn read_text(result: capnp::Result<capnp::text::Reader>) -> String {
result.ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string()
}
/// Read a capnp data field as [u8; 16], zero-padded
fn read_uuid(result: capnp::Result<&[u8]>) -> [u8; 16] {
let mut out = [0u8; 16];
if let Ok(data) = result
&& data.len() >= 16 {
out.copy_from_slice(&data[..16]);
}
out
}
// ---------------------------------------------------------------------------
// Type-to-capnp mappings
// ---------------------------------------------------------------------------
capnp_enum!(NodeType, memory_capnp::NodeType,
[EpisodicSession, EpisodicDaily, EpisodicWeekly, Semantic, EpisodicMonthly]);
capnp_enum!(RelationType, memory_capnp::RelationType,
[Link, Causal, Auto]);
capnp_message!(Node,
reader: memory_capnp::content_node::Reader<'_>,
builder: memory_capnp::content_node::Builder<'_>,
text: [key, content, source_ref, provenance],
uuid: [uuid],
prim: [version, timestamp, weight, emotion, deleted,
retrievals, uses, wrongs, last_replayed,
spaced_repetition_interval, created_at, last_scored],
enm: [node_type: NodeType],
skip: [community_id, clustering_coefficient, degree],
);
capnp_message!(Relation,
reader: memory_capnp::relation::Reader<'_>,
builder: memory_capnp::relation::Builder<'_>,
text: [source_key, target_key, provenance],
uuid: [uuid, source, target],
prim: [version, timestamp, strength, deleted],
enm: [rel_type: RelationType],
skip: [],
);
// ---------------------------------------------------------------------------
// Migration helpers (legacy provenance enum → string)
// ---------------------------------------------------------------------------
/// Convert legacy capnp provenance enum to string label.
fn legacy_provenance_label(p: memory_capnp::Provenance) -> &'static str {
use memory_capnp::Provenance::*;
match p {
Manual => "manual",
Journal => "journal",
Agent => "agent",
Dream => "dream",
Derived => "derived",
AgentExperienceMine => "agent:experience-mine",
AgentKnowledgeObservation => "agent:knowledge-observation",
AgentKnowledgePattern => "agent:knowledge-pattern",
AgentKnowledgeConnector => "agent:knowledge-connector",
AgentKnowledgeChallenger => "agent:knowledge-challenger",
AgentConsolidate => "agent:consolidate",
AgentDigest => "agent:digest",
AgentFactMine => "agent:fact-mine",
AgentDecay => "agent:decay",
}
}
impl Node {
/// Read from capnp with migration: if the new provenance text field
/// is empty (old record), fall back to the deprecated provenanceOld enum.
pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result<Self> {
let mut node = Self::from_capnp(r)?;
if node.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
node.provenance = legacy_provenance_label(old).to_string();
}
// Sanitize timestamps: old capnp records have raw offsets instead
// of unix epoch. Anything past year 2100 (~4102444800) is bogus.
const MAX_SANE_EPOCH: i64 = 4_102_444_800;
if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 {
node.timestamp = node.created_at;
}
if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 {
node.created_at = node.timestamp.min(MAX_SANE_EPOCH);
}
Ok(node)
}
}
impl Relation {
pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result<Self> {
let mut rel = Self::from_capnp(r)?;
if rel.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
rel.provenance = legacy_provenance_label(old).to_string();
}
Ok(rel)
}
}
// ---------------------------------------------------------------------------
// Direct node access
// ---------------------------------------------------------------------------
/// Read a single node at the given offset in the capnp log.
/// The offset must point to a valid message containing the node.
/// Read a node at a given offset. If `target_key` is provided, find that specific
/// node in the message (handles batch writes where multiple nodes share an offset).
pub fn read_node_at_offset_for_key(offset: u64, target_key: Option<&str>) -> Result<Node> {
let path = nodes_path();
let mut file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
use std::io::{Seek, SeekFrom};
file.seek(SeekFrom::Start(offset))?;
let mut reader = BufReader::new(file);
let msg = serialize::read_message(&mut reader, message::ReaderOptions::new())
.with_context(|| format!("read message at offset {}", offset))?;
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.with_context(|| "read node log")?;
let nodes = log.get_nodes()
.with_context(|| "get nodes")?;
if nodes.is_empty() {
anyhow::bail!("no nodes in message at offset {}", offset);
}
// If target_key specified, find that specific node
if let Some(key) = target_key {
for node_reader in nodes.iter() {
let node = Node::from_capnp_migrate(node_reader)?;
if node.key == key {
return Ok(node);
}
}
anyhow::bail!("node '{}' not found in message at offset {}", key, offset);
}
// No target key - return first non-deleted, or first if all deleted
for node_reader in nodes.iter() {
let node = Node::from_capnp_migrate(node_reader)?;
if !node.deleted {
return Ok(node);
}
}
Node::from_capnp_migrate(nodes.get(0))
}
/// Read a node at offset (legacy, no key filtering)
pub fn read_node_at_offset(offset: u64) -> Result<Node> {
read_node_at_offset_for_key(offset, None)
}
// ---------------------------------------------------------------------------
// Store persistence methods
// ---------------------------------------------------------------------------
impl Store {
/// Load store by opening redb index and replaying relations.
pub fn load() -> Result<Store> {
let nodes_p = nodes_path();
let rels_p = relations_path();
let mut store = Store::default();
// Open redb index first (rebuilds from capnp if needed)
let db_p = db_path();
store.db = Some(store.open_or_rebuild_db(&db_p)?);
// Replay relations
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
// Record log sizes
use std::sync::atomic::Ordering;
store.loaded_nodes_size.store(
fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
store.loaded_rels_size.store(
fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
// Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped)
Ok(store)
}
/// Open redb database, rebuilding if unhealthy.
fn open_or_rebuild_db(&self, path: &Path) -> Result<redb::Database> {
// Try opening existing database
if path.exists() {
match index::open_db(path) {
Ok(database) => {
if self.db_is_healthy(&database)? {
return Ok(database);
}
eprintln!("redb index stale, rebuilding...");
}
Err(e) => {
eprintln!("redb open failed ({}), rebuilding...", e);
}
}
}
// Rebuild index from capnp log
rebuild_index(path, &nodes_path())
}
/// Check if redb index is healthy by verifying some offsets are valid.
fn db_is_healthy(&self, database: &redb::Database) -> Result<bool> {
use redb::{ReadableDatabase, ReadableTable};
let txn = database.begin_read()?;
let nodes_table = txn.open_table(index::NODES)?;
// Check that we can read the table and it has entries
if nodes_table.len()? == 0 {
// Empty database - might be stale or new
let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
return Ok(capnp_size == 0); // healthy only if capnp is also empty
}
// Spot check: verify a few offsets point to valid messages
let mut checked = 0;
for entry in nodes_table.iter()? {
if checked >= 5 { break; }
let (key, offset) = entry?;
let offset = offset.value();
// Try to read the node at this offset
if read_node_at_offset(offset).is_err() {
return Ok(false);
}
checked += 1;
let _ = key; // silence unused warning
}
Ok(true)
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.with_context(|| format!("read relation log"))?;
for rel_reader in log.get_relations()
.with_context(|| format!("get relations"))? {
let rel = Relation::from_capnp_migrate(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
.unwrap_or(0);
if rel.version >= existing_version {
by_uuid.insert(rel.uuid, rel);
}
}
}
// Index relations directly (single transaction)
if let Some(db) = &self.db {
let txn = db.begin_write()?;
for rel in by_uuid.into_values() {
if rel.deleted { continue; }
index::index_relation(&txn, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
}
txn.commit()?;
}
Ok(())
}
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
/// Returns a map from key → vec of all live Node versions (one per UUID).
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>> {
let path = nodes_path();
if !path.exists() { return Ok(HashMap::new()); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Track latest version of each UUID
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.with_context(|| format!("read node log"))?;
for node_reader in log.get_nodes()
.with_context(|| format!("get nodes"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = by_uuid.get(&node.uuid)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
by_uuid.insert(node.uuid, node);
}
}
}
// Group live (non-deleted) nodes by key
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
for node in by_uuid.into_values() {
if !node.deleted {
by_key.entry(node.key.clone()).or_default().push(node);
}
}
// Keep only duplicates
by_key.retain(|_, nodes| nodes.len() > 1);
Ok(by_key)
}
/// Find the most recent version of a node by key (including deleted).
/// Scans the entire log. Used for version continuity when recreating deleted nodes.
pub fn find_latest_by_key(&self, target_key: &str) -> Result<Option<Node>> {
let path = nodes_path();
if !path.exists() { return Ok(None); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut latest: Option<Node> = None;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let node = match Node::from_capnp_migrate(node_reader) {
Ok(n) => n,
Err(_) => continue,
};
if node.key != target_key { continue; }
// Keep if newer timestamp (handles version resets)
let dominated = latest.as_ref()
.map(|l| node.timestamp >= l.timestamp)
.unwrap_or(true);
if dominated {
latest = Some(node);
}
}
}
Ok(latest)
}
/// Find the last non-deleted version of a node by key.
/// Scans the entire log. Used for restore operations.
pub fn find_last_live_version(&self, target_key: &str) -> Result<Option<Node>> {
let path = nodes_path();
if !path.exists() { return Ok(None); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut last_live: Option<Node> = None;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let node = match Node::from_capnp_migrate(node_reader) {
Ok(n) => n,
Err(_) => continue,
};
if node.key != target_key { continue; }
if !node.deleted {
// Keep the most recent non-deleted version by timestamp
let dominated = last_live.as_ref()
.map(|l| node.timestamp >= l.timestamp)
.unwrap_or(true);
if dominated {
last_live = Some(node);
}
}
}
}
Ok(last_live)
}
/// Append nodes to the log file. Returns the offset where the message was written.
pub fn append_nodes(&self, nodes: &[Node]) -> Result<u64> {
use std::sync::atomic::Ordering;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
let mut list = log.init_nodes(nodes.len() as u32);
for (i, node) in nodes.iter().enumerate() {
node.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize nodes"))?;
// Lock for file append
let _guard = self.append_lock.lock().unwrap();
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
// Get offset before writing
let offset = file.metadata().map(|m| m.len()).unwrap_or(0);
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write nodes"))?;
self.loaded_nodes_size.store(
file.metadata().map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
Ok(offset)
}
/// Append relations to the log file.
pub fn append_relations(&self, relations: &[Relation]) -> Result<()> {
use std::sync::atomic::Ordering;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
let mut list = log.init_relations(relations.len() as u32);
for (i, rel) in relations.iter().enumerate() {
rel.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize relations"))?;
// Lock for file append
let _guard = self.append_lock.lock().unwrap();
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write relations"))?;
self.loaded_rels_size.store(
file.metadata().map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
Ok(())
}
/// Placeholder - indices will be updated on write with redb.
pub fn save(&self) -> Result<()> {
Ok(())
}
}
/// Check and repair corrupt capnp log files.
///
/// Reads each message sequentially, tracking file position. On the first
/// corrupt message, truncates the file to the last good position. Also
/// removes stale caches so the next load replays from the repaired log.
pub fn fsck() -> Result<()> {
let mut any_corrupt = false;
for (path, kind) in [
(nodes_path(), "node"),
(relations_path(), "relation"),
] {
if !path.exists() { continue; }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let file_len = file.metadata()
.with_context(|| format!("stat {}", path.display()))?.len();
let mut reader = BufReader::new(file);
let mut good_messages = 0u64;
let mut last_good_pos = 0u64;
loop {
let pos = reader.stream_position()
.with_context(|| format!("tell {}", path.display()))?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => {
// read_message fails at EOF (normal) or on corrupt framing
if pos < file_len {
// Not at EOF — corrupt framing
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.with_context(|| format!("open for truncate"))?;
file.set_len(pos)
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, pos, good_messages);
}
break;
}
};
// Validate the message content too
let valid = if kind == "node" {
msg.get_root::<memory_capnp::node_log::Reader>()
.and_then(|l| l.get_nodes().map(|_| ()))
.is_ok()
} else {
msg.get_root::<memory_capnp::relation_log::Reader>()
.and_then(|l| l.get_relations().map(|_| ()))
.is_ok()
};
if valid {
good_messages += 1;
last_good_pos = reader.stream_position()
.with_context(|| format!("tell {}", path.display()))?;
} else {
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
kind, pos, last_good_pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.with_context(|| format!("open for truncate"))?;
file.set_len(last_good_pos)
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, last_good_pos, good_messages);
break;
}
}
if !any_corrupt {
eprintln!("{}: {} messages, all clean", kind, good_messages);
}
}
if any_corrupt {
eprintln!("repair complete — run `poc-memory status` to verify");
} else {
eprintln!("store is clean");
}
Ok(())
}
/// Rebuild redb index from capnp log.
/// Scans the log, tracking offsets, and records latest version of each node.
fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
// Remove old database if it exists
if db_path.exists() {
fs::remove_file(db_path)
.with_context(|| format!("remove old db {}", db_path.display()))?;
}
let database = index::open_db(db_path)?;
if !capnp_path.exists() {
return Ok(database);
}
// Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool, u8, i64, String)> = HashMap::new();
let file = fs::File::open(capnp_path)
.with_context(|| format!("open {}", capnp_path.display()))?;
let mut reader = BufReader::new(file);
loop {
let offset = reader.stream_position()?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => break,
};
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let key = node_reader.get_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
if key.is_empty() { continue; }
let version = node_reader.get_version();
let deleted = node_reader.get_deleted();
let node_type = node_reader.get_node_type()
.map(|t| t as u8)
.unwrap_or(0);
let timestamp = node_reader.get_timestamp();
let provenance = node_reader.get_provenance().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("manual")
.to_string();
let mut uuid = [0u8; 16];
if let Ok(data) = node_reader.get_uuid() {
if data.len() >= 16 {
uuid.copy_from_slice(&data[..16]);
}
}
// Keep if newer timestamp (not version - version can reset after delete/recreate)
let dominated = latest.get(&key)
.map(|(_, _, _, _, _, ts, _)| timestamp >= *ts)
.unwrap_or(true);
if dominated {
latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance));
}
}
}
// Write index entries for non-deleted nodes
{
let txn = database.begin_write()?;
{
let mut nodes_table = txn.open_table(index::NODES)?;
let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?;
let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?;
let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?;
for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest {
if !deleted {
nodes_table.insert(key.as_str(), offset)?;
// Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes
let mut packed = [0u8; 25];
packed[0..16].copy_from_slice(&uuid);
packed[16] = node_type;
packed[17..25].copy_from_slice(&timestamp.to_be_bytes());
key_uuid_table.insert(key.as_str(), packed.as_slice())?;
// Pack: [negated_timestamp:8][key] for descending sort
let neg_ts = (!timestamp).to_be_bytes();
let mut prov_val = Vec::with_capacity(8 + key.len());
prov_val.extend_from_slice(&neg_ts);
prov_val.extend_from_slice(key.as_bytes());
by_provenance.insert(provenance.as_str(), prov_val.as_slice())?;
}
// Always record offset in UUID history (even for deleted)
uuid_offsets.insert(uuid.as_slice(), offset)?;
}
}
txn.commit()?;
}
Ok(database)
}
/// Fsck report — discrepancies found between capnp logs and redb index.
#[derive(Debug, Default)]
pub struct FsckReport {
/// Keys in current index but not in rebuilt (zombie entries)
pub zombies: Vec<String>,
/// Keys in rebuilt but not in current index (missing from index)
pub missing: Vec<String>,
/// Was capnp log repaired?
pub capnp_repaired: bool,
}
impl FsckReport {
pub fn is_clean(&self) -> bool {
self.zombies.is_empty() && self.missing.is_empty() && !self.capnp_repaired
}
}
/// Full fsck: verify capnp logs, rebuild index to temp, compare with current.
/// Returns a report of discrepancies found.
pub fn fsck_full() -> Result<FsckReport> {
use redb::{ReadableDatabase, ReadableTable};
use tempfile::TempDir;
let mut report = FsckReport::default();
// Step 1: Run capnp log fsck (may truncate corrupt messages)
// We need to check if it did repairs — currently fsck() just prints to stderr
// For now, we'll re-check after by comparing file sizes
let nodes_size_before = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
fsck()?;
let nodes_size_after = nodes_path().metadata().map(|m| m.len()).unwrap_or(0);
report.capnp_repaired = nodes_size_after != nodes_size_before;
// Step 2: Rebuild index to temp file
let temp_dir = TempDir::new().context("create temp dir")?;
let temp_db_path = temp_dir.path().join("rebuilt.redb");
let rebuilt_db = rebuild_index(&temp_db_path, &nodes_path())?;
// Step 3: Copy current index to temp and open (avoids write lock contention)
let current_db_path = db_path();
if !current_db_path.exists() {
// No current index — all rebuilt keys are "missing"
let txn = rebuilt_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
for entry in table.iter()? {
let (key, _) = entry?;
report.missing.push(key.value().to_string());
}
return Ok(report);
}
// Copy to temp to avoid lock contention with running daemon
let current_copy_path = temp_dir.path().join("current.redb");
fs::copy(&current_db_path, &current_copy_path)
.with_context(|| format!("copy {} to temp", current_db_path.display()))?;
let current_db = redb::Database::open(&current_copy_path)
.with_context(|| format!("open current db copy"))?;
// Step 4: Compare NODES tables
// Collect all keys from both
let rebuilt_keys: std::collections::HashSet<String> = {
let txn = rebuilt_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
};
let current_keys: std::collections::HashSet<String> = {
let txn = current_db.begin_read()?;
let table = txn.open_table(index::NODES)?;
table.iter()?.map(|e| e.map(|(k, _)| k.value().to_string())).collect::<Result<_, _>>()?
};
// Keys in current but not rebuilt = zombies (shouldn't exist)
for key in current_keys.difference(&rebuilt_keys) {
report.zombies.push(key.clone());
}
report.zombies.sort();
// Keys in rebuilt but not current = missing (should exist but don't)
for key in rebuilt_keys.difference(&current_keys) {
report.missing.push(key.clone());
}
report.missing.sort();
Ok(report)
}
/// Repair the index by rebuilding from capnp logs.
/// Use after fsck_full() reports discrepancies.
pub fn repair_index() -> Result<()> {
let db_path = db_path();
rebuild_index(&db_path, &nodes_path())?;
eprintln!("index rebuilt from capnp log");
Ok(())
}