// Persistence layer: load, replay, append // // capnp logs are the source of truth; redb provides indexed access. use super::{index, types::*}; use redb::ReadableTableMetadata; use crate::memory_capnp; use anyhow::{Context, Result}; use capnp::message; use capnp::serialize; use std::collections::HashMap; use std::fs; use std::io::{BufReader, Seek}; use std::path::Path; impl Store { /// Load store by replaying capnp logs, then open/verify redb indices. pub fn load() -> Result { let nodes_p = nodes_path(); let rels_p = relations_path(); let mut store = Store::default(); if nodes_p.exists() { store.replay_nodes(&nodes_p)?; } if rels_p.exists() { store.replay_relations(&rels_p)?; } // Record log sizes after replay store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); // Drop edges referencing deleted/missing nodes store.relations.retain(|r| store.nodes.contains_key(&r.source_key) && store.nodes.contains_key(&r.target_key) ); // Open redb and verify/rebuild indices let db_p = db_path(); store.db = Some(store.open_or_rebuild_db(&db_p)?); Ok(store) } /// Open redb database, rebuilding if unhealthy. fn open_or_rebuild_db(&self, path: &Path) -> Result { // Try opening existing database if path.exists() { match index::open_db(path) { Ok(database) => { if self.db_is_healthy(&database)? { return Ok(database); } eprintln!("redb index stale, rebuilding..."); } Err(e) => { eprintln!("redb open failed ({}), rebuilding...", e); } } } // Rebuild index from capnp log rebuild_index(path, &nodes_path()) } /// Check if redb indices match in-memory state. fn db_is_healthy(&self, database: &redb::Database) -> Result { use redb::ReadableDatabase; let txn = database.begin_read()?; // Quick check: node count should match let nodes_table = txn.open_table(index::NODES)?; let db_count = nodes_table.len()?; if db_count != self.nodes.len() as u64 { return Ok(false); } // Spot check: verify a few random nodes exist with matching keys // (full verification would be too slow) for (i, key) in self.nodes.keys().enumerate() { if i >= 10 { break; } // check first 10 if nodes_table.get(key.as_str())?.is_none() { return Ok(false); } } Ok(true) } /// Replay node log, keeping latest version per UUID. /// Tracks all UUIDs seen per key to detect duplicates. fn replay_nodes(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Track all non-deleted UUIDs per key to detect duplicates let mut key_uuids: HashMap> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes"))? { let node = Node::from_capnp_migrate(node_reader)?; let existing_version = self.nodes.get(&node.key) .map(|n| n.version) .unwrap_or(0); if node.version >= existing_version { if node.deleted { self.nodes.remove(&node.key); self.uuid_to_key.remove(&node.uuid); if let Some(uuids) = key_uuids.get_mut(&node.key) { uuids.retain(|u| *u != node.uuid); } } else { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node.clone()); let uuids = key_uuids.entry(node.key).or_default(); if !uuids.contains(&node.uuid) { uuids.push(node.uuid); } } } } } // Report duplicate keys for (key, uuids) in &key_uuids { if uuids.len() > 1 { dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len()); } } Ok(()) } /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Collect all, then deduplicate by UUID keeping latest version let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read relation log"))?; for rel_reader in log.get_relations() .with_context(|| format!("get relations"))? { let rel = Relation::from_capnp_migrate(rel_reader)?; let existing_version = by_uuid.get(&rel.uuid) .map(|r| r.version) .unwrap_or(0); if rel.version >= existing_version { by_uuid.insert(rel.uuid, rel); } } } self.relations = by_uuid.into_values() .filter(|r| !r.deleted) .collect(); Ok(()) } /// Find all duplicate keys: keys with multiple live UUIDs in the log. /// Returns a map from key → vec of all live Node versions (one per UUID). /// The "winner" in self.nodes is always one of them. pub fn find_duplicates(&self) -> Result>> { let path = nodes_path(); if !path.exists() { return Ok(HashMap::new()); } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Track latest version of each UUID let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes"))? { let node = Node::from_capnp_migrate(node_reader)?; let dominated = by_uuid.get(&node.uuid) .map(|n| node.version >= n.version) .unwrap_or(true); if dominated { by_uuid.insert(node.uuid, node); } } } // Group live (non-deleted) nodes by key let mut by_key: HashMap> = HashMap::new(); for node in by_uuid.into_values() { if !node.deleted { by_key.entry(node.key.clone()).or_default().push(node); } } // Keep only duplicates by_key.retain(|_, nodes| nodes.len() > 1); Ok(by_key) } /// Append nodes to the log file. /// Serializes to a Vec first, then does a single write() syscall /// so the append is atomic with O_APPEND even without flock. /// Returns the offset where the message was written. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result { let _lock = StoreLock::acquire()?; self.append_nodes_unlocked(nodes) } /// Append nodes without acquiring the lock. Caller must hold StoreLock. /// Returns the offset where the message was written. pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_nodes(nodes.len() as u32); for (i, node) in nodes.iter().enumerate() { node.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize nodes"))?; let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; // Get offset before writing let offset = file.metadata().map(|m| m.len()).unwrap_or(0); use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write nodes"))?; self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(offset) } /// Replay only new entries appended to the node log since we last loaded. /// Call under StoreLock to catch writes from concurrent processes. pub(crate) fn refresh_nodes(&mut self) -> Result<()> { let path = nodes_path(); let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); if current_size <= self.loaded_nodes_size { return Ok(()); // no new data } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size)) .with_context(|| format!("seek nodes log"))?; while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log delta"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes delta"))? { let node = Node::from_capnp_migrate(node_reader)?; let dominated = self.nodes.get(&node.key) .map(|n| node.version >= n.version) .unwrap_or(true); if dominated { if node.deleted { self.nodes.remove(&node.key); self.uuid_to_key.remove(&node.uuid); } else { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node); } } } } self.loaded_nodes_size = current_size; Ok(()) } /// Append relations to the log file. /// Single write() syscall for atomic O_APPEND. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> { let _lock = StoreLock::acquire()?; self.append_relations_unlocked(relations) } /// Append relations without acquiring the lock. Caller must hold StoreLock. pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<()> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_relations(relations.len() as u32); for (i, rel) in relations.iter().enumerate() { rel.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize relations"))?; let path = relations_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write relations"))?; self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Placeholder - indices will be updated on write with redb. pub fn save(&self) -> Result<()> { Ok(()) } } /// Check and repair corrupt capnp log files. /// /// Reads each message sequentially, tracking file position. On the first /// corrupt message, truncates the file to the last good position. Also /// removes stale caches so the next load replays from the repaired log. pub fn fsck() -> Result<()> { let mut any_corrupt = false; for (path, kind) in [ (nodes_path(), "node"), (relations_path(), "relation"), ] { if !path.exists() { continue; } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let file_len = file.metadata() .with_context(|| format!("stat {}", path.display()))?.len(); let mut reader = BufReader::new(file); let mut good_messages = 0u64; let mut last_good_pos = 0u64; loop { let pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => { // read_message fails at EOF (normal) or on corrupt framing if pos < file_len { // Not at EOF — corrupt framing eprintln!("{}: corrupt message at offset {}, truncating", kind, pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, pos, good_messages); } break; } }; // Validate the message content too let valid = if kind == "node" { msg.get_root::() .and_then(|l| l.get_nodes().map(|_| ())) .is_ok() } else { msg.get_root::() .and_then(|l| l.get_relations().map(|_| ())) .is_ok() }; if valid { good_messages += 1; last_good_pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; } else { eprintln!("{}: corrupt message content at offset {}, truncating to {}", kind, pos, last_good_pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(last_good_pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, last_good_pos, good_messages); break; } } if !any_corrupt { eprintln!("{}: {} messages, all clean", kind, good_messages); } } if any_corrupt { eprintln!("repair complete — run `poc-memory status` to verify"); } else { eprintln!("store is clean"); } Ok(()) } /// Rebuild redb index from capnp log. /// Scans the log, tracking offsets, and records latest version of each node. fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { // Remove old database if it exists if db_path.exists() { fs::remove_file(db_path) .with_context(|| format!("remove old db {}", db_path.display()))?; } let database = index::open_db(db_path)?; if !capnp_path.exists() { return Ok(database); } // Track latest (offset, uuid, version, deleted) per key let mut latest: HashMap = HashMap::new(); let file = fs::File::open(capnp_path) .with_context(|| format!("open {}", capnp_path.display()))?; let mut reader = BufReader::new(file); loop { let offset = reader.stream_position()?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => break, }; let log = match msg.get_root::() { Ok(l) => l, Err(_) => continue, }; let nodes = match log.get_nodes() { Ok(n) => n, Err(_) => continue, }; for node_reader in nodes { let key = node_reader.get_key().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); if key.is_empty() { continue; } let version = node_reader.get_version(); let deleted = node_reader.get_deleted(); let mut uuid = [0u8; 16]; if let Ok(data) = node_reader.get_uuid() { if data.len() >= 16 { uuid.copy_from_slice(&data[..16]); } } // Keep if newer version let dominated = latest.get(&key) .map(|(_, _, v, _)| version >= *v) .unwrap_or(true); if dominated { latest.insert(key, (offset, uuid, version, deleted)); } } } // Write index entries for non-deleted nodes { let txn = database.begin_write()?; { let mut nodes_table = txn.open_table(index::NODES)?; let mut uuid_table = txn.open_table(index::UUID_TO_KEY)?; for (key, (offset, uuid, _, deleted)) in latest { if !deleted { nodes_table.insert(key.as_str(), offset)?; uuid_table.insert(uuid.as_slice(), key.as_str())?; } } } txn.commit()?; } Ok(database) }