// Persistence layer: load, save, replay, append, snapshot // // Three-tier loading strategy: // 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize // 2. bincode cache (state.bin) — ~10ms // 3. capnp log replay — ~40ms // // Logs are append-only; cache staleness uses log file sizes, not mtimes. use super::types::*; use crate::memory_capnp; use capnp::message; use capnp::serialize; use std::collections::HashMap; use std::fs; use std::io::{BufReader, BufWriter, Seek, Write as IoWrite}; use std::path::Path; impl Store { /// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs. /// /// Staleness check uses log file sizes (not mtimes). Since logs are /// append-only, any write grows the file, invalidating the cache. /// This avoids the mtime race that caused data loss with concurrent /// writers (dream loop, link audit, journal enrichment). pub fn load() -> Result { // 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy) match Self::load_snapshot_mmap() { Ok(Some(store)) => return Ok(store), Ok(None) => {}, Err(e) => eprintln!("rkyv snapshot: {}", e), } // 2. Try bincode state.bin cache (~10ms) let nodes_p = nodes_path(); let rels_p = relations_path(); let state_p = state_path(); let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); if let Ok(data) = fs::read(&state_p) { if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC { let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap()); let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap()); if cached_nodes == nodes_size && cached_rels == rels_size { if let Ok(mut store) = bincode::deserialize::(&data[CACHE_HEADER_LEN..]) { // Rebuild uuid_to_key (skipped by serde) for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } store.loaded_nodes_size = nodes_size; store.loaded_rels_size = rels_size; // Bootstrap: write rkyv snapshot if missing if !snapshot_path().exists() { if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { eprintln!("rkyv bootstrap: {}", e); } } return Ok(store); } } } } // Stale or no cache — rebuild from capnp logs let mut store = Store::default(); if nodes_p.exists() { store.replay_nodes(&nodes_p)?; } if rels_p.exists() { store.replay_relations(&rels_p)?; } // Record log sizes after replay — this is the state we reflect store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); // Drop edges referencing deleted/missing nodes store.relations.retain(|r| store.nodes.contains_key(&r.source_key) && store.nodes.contains_key(&r.target_key) ); store.save()?; Ok(store) } /// Replay node log, keeping latest version per UUID fn replay_nodes(&mut self, path: &Path) -> Result<(), String> { let file = fs::File::open(path) .map_err(|e| format!("open {}: {}", path.display(), e))?; let mut reader = BufReader::new(file); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .map_err(|e| format!("read node log: {}", e))?; for node_reader in log.get_nodes() .map_err(|e| format!("get nodes: {}", e))? { let node = Node::from_capnp(node_reader)?; let existing_version = self.nodes.get(&node.key) .map(|n| n.version) .unwrap_or(0); if node.version >= existing_version { if node.deleted { self.nodes.remove(&node.key); self.uuid_to_key.remove(&node.uuid); } else { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node); } } } } Ok(()) } /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<(), String> { let file = fs::File::open(path) .map_err(|e| format!("open {}: {}", path.display(), e))?; let mut reader = BufReader::new(file); // Collect all, then deduplicate by UUID keeping latest version let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .map_err(|e| format!("read relation log: {}", e))?; for rel_reader in log.get_relations() .map_err(|e| format!("get relations: {}", e))? { let rel = Relation::from_capnp(rel_reader)?; let existing_version = by_uuid.get(&rel.uuid) .map(|r| r.version) .unwrap_or(0); if rel.version >= existing_version { by_uuid.insert(rel.uuid, rel); } } } self.relations = by_uuid.into_values() .filter(|r| !r.deleted) .collect(); Ok(()) } /// Append nodes to the log file. /// Serializes to a Vec first, then does a single write() syscall /// so the append is atomic with O_APPEND even without flock. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> { let _lock = StoreLock::acquire()?; let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_nodes(nodes.len() as u32); for (i, node) in nodes.iter().enumerate() { node.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .map_err(|e| format!("serialize nodes: {}", e))?; let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; use std::io::Write; (&file).write_all(&buf) .map_err(|e| format!("write nodes: {}", e))?; self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Append relations to the log file. /// Single write() syscall for atomic O_APPEND. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> { let _lock = StoreLock::acquire()?; let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_relations(relations.len() as u32); for (i, rel) in relations.iter().enumerate() { rel.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .map_err(|e| format!("serialize relations: {}", e))?; let path = relations_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; use std::io::Write; (&file).write_all(&buf) .map_err(|e| format!("write relations: {}", e))?; self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Save the derived cache with log size header for staleness detection. /// Uses atomic write (tmp + rename) to prevent partial reads. pub fn save(&self) -> Result<(), String> { let _lock = StoreLock::acquire()?; let path = state_path(); if let Some(parent) = path.parent() { fs::create_dir_all(parent).ok(); } // Use log sizes from load time, not current filesystem sizes. // If another writer appended since we loaded, our recorded size // will be smaller than the actual log → next reader detects stale // cache and replays the (correct, append-only) log. let nodes_size = self.loaded_nodes_size; let rels_size = self.loaded_rels_size; let bincode_data = bincode::serialize(self) .map_err(|e| format!("bincode serialize: {}", e))?; let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len()); data.extend_from_slice(&CACHE_MAGIC); data.extend_from_slice(&nodes_size.to_le_bytes()); data.extend_from_slice(&rels_size.to_le_bytes()); data.extend_from_slice(&bincode_data); // Atomic write: tmp file + rename let tmp_path = path.with_extension("bin.tmp"); fs::write(&tmp_path, &data) .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; fs::rename(&tmp_path, &path) .map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?; // Also write rkyv snapshot (mmap-friendly) if let Err(e) = self.save_snapshot(nodes_size, rels_size) { eprintln!("rkyv snapshot save: {}", e); } Ok(()) } /// Serialize store as rkyv snapshot with staleness header. /// Assumes StoreLock is already held by caller. fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> { let snap = Snapshot { nodes: self.nodes.clone(), relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(), gaps: self.gaps.clone(), params: self.params, }; let rkyv_data = rkyv::to_bytes::<_, 256>(&snap) .map_err(|e| format!("rkyv serialize: {}", e))?; let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len()); data.extend_from_slice(&RKYV_MAGIC); data.extend_from_slice(&1u32.to_le_bytes()); // format version data.extend_from_slice(&nodes_size.to_le_bytes()); data.extend_from_slice(&rels_size.to_le_bytes()); data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes()); data.extend_from_slice(&rkyv_data); let path = snapshot_path(); let tmp_path = path.with_extension("rkyv.tmp"); fs::write(&tmp_path, &data) .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; fs::rename(&tmp_path, &path) .map_err(|e| format!("rename: {}", e))?; Ok(()) } /// Try loading store from mmap'd rkyv snapshot. /// Returns None if snapshot is missing or stale (log sizes don't match). fn load_snapshot_mmap() -> Result, String> { let path = snapshot_path(); if !path.exists() { return Ok(None); } let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); let file = fs::File::open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; let mmap = unsafe { memmap2::Mmap::map(&file) } .map_err(|e| format!("mmap {}: {}", path.display(), e))?; if mmap.len() < RKYV_HEADER_LEN { return Ok(None); } if mmap[..4] != RKYV_MAGIC { return Ok(None); } // [4..8] = version, skip for now let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap()); let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap()); let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize; if cached_nodes != nodes_size || cached_rels != rels_size { return Ok(None); // stale } if mmap.len() < RKYV_HEADER_LEN + data_len { return Ok(None); // truncated } let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len]; // SAFETY: we wrote this file ourselves via save_snapshot(). // Skip full validation (check_archived_root) — the staleness header // already confirms this snapshot matches the current log state. let archived = unsafe { rkyv::archived_root::(rkyv_data) }; let snap: Snapshot = > ::deserialize(archived, &mut rkyv::Infallible).unwrap(); let mut store = Store { nodes: snap.nodes, relations: snap.relations, gaps: snap.gaps, params: snap.params, ..Default::default() }; // Rebuild uuid_to_key (not serialized) for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } store.loaded_nodes_size = nodes_size; store.loaded_rels_size = rels_size; Ok(Some(store)) } } /// Check and repair corrupt capnp log files. /// /// Reads each message sequentially, tracking file position. On the first /// corrupt message, truncates the file to the last good position. Also /// removes stale caches so the next load replays from the repaired log. pub fn fsck() -> Result<(), String> { let mut any_corrupt = false; for (path, kind) in [ (nodes_path(), "node"), (relations_path(), "relation"), ] { if !path.exists() { continue; } let file = fs::File::open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; let file_len = file.metadata() .map_err(|e| format!("stat {}: {}", path.display(), e))?.len(); let mut reader = BufReader::new(file); let mut good_messages = 0u64; let mut last_good_pos = 0u64; loop { let pos = reader.stream_position() .map_err(|e| format!("tell {}: {}", path.display(), e))?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => { // read_message fails at EOF (normal) or on corrupt framing if pos < file_len { // Not at EOF — corrupt framing eprintln!("{}: corrupt message at offset {}, truncating", kind, pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .map_err(|e| format!("open for truncate: {}", e))?; file.set_len(pos) .map_err(|e| format!("truncate {}: {}", path.display(), e))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, pos, good_messages); } break; } }; // Validate the message content too let valid = if kind == "node" { msg.get_root::() .and_then(|l| l.get_nodes().map(|_| ())) .is_ok() } else { msg.get_root::() .and_then(|l| l.get_relations().map(|_| ())) .is_ok() }; if valid { good_messages += 1; last_good_pos = reader.stream_position() .map_err(|e| format!("tell {}: {}", path.display(), e))?; } else { eprintln!("{}: corrupt message content at offset {}, truncating to {}", kind, pos, last_good_pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .map_err(|e| format!("open for truncate: {}", e))?; file.set_len(last_good_pos) .map_err(|e| format!("truncate {}: {}", path.display(), e))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, last_good_pos, good_messages); break; } } if !any_corrupt { eprintln!("{}: {} messages, all clean", kind, good_messages); } } if any_corrupt { // Nuke caches so next load replays from the repaired logs for p in [state_path(), snapshot_path()] { if p.exists() { fs::remove_file(&p) .map_err(|e| format!("remove {}: {}", p.display(), e))?; eprintln!("removed stale cache: {}", p.display()); } } eprintln!("repair complete — run `poc-memory status` to verify"); } else { eprintln!("store is clean"); } Ok(()) }