319 lines
12 KiB
Rust
319 lines
12 KiB
Rust
|
|
// Persistence layer: load, save, replay, append, snapshot
|
||
|
|
//
|
||
|
|
// Three-tier loading strategy:
|
||
|
|
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
|
||
|
|
// 2. bincode cache (state.bin) — ~10ms
|
||
|
|
// 3. capnp log replay — ~40ms
|
||
|
|
//
|
||
|
|
// Logs are append-only; cache staleness uses log file sizes, not mtimes.
|
||
|
|
|
||
|
|
use super::types::*;
|
||
|
|
|
||
|
|
use crate::memory_capnp;
|
||
|
|
|
||
|
|
use capnp::message;
|
||
|
|
use capnp::serialize;
|
||
|
|
|
||
|
|
use std::collections::HashMap;
|
||
|
|
use std::fs;
|
||
|
|
use std::io::{BufReader, BufWriter, Write as IoWrite};
|
||
|
|
use std::path::Path;
|
||
|
|
|
||
|
|
impl Store {
|
||
|
|
/// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs.
|
||
|
|
///
|
||
|
|
/// Staleness check uses log file sizes (not mtimes). Since logs are
|
||
|
|
/// append-only, any write grows the file, invalidating the cache.
|
||
|
|
/// This avoids the mtime race that caused data loss with concurrent
|
||
|
|
/// writers (dream loop, link audit, journal enrichment).
|
||
|
|
pub fn load() -> Result<Store, String> {
|
||
|
|
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
|
||
|
|
match Self::load_snapshot_mmap() {
|
||
|
|
Ok(Some(store)) => return Ok(store),
|
||
|
|
Ok(None) => {},
|
||
|
|
Err(e) => eprintln!("rkyv snapshot: {}", e),
|
||
|
|
}
|
||
|
|
|
||
|
|
// 2. Try bincode state.bin cache (~10ms)
|
||
|
|
let nodes_p = nodes_path();
|
||
|
|
let rels_p = relations_path();
|
||
|
|
let state_p = state_path();
|
||
|
|
|
||
|
|
let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
||
|
|
let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
||
|
|
|
||
|
|
if let Ok(data) = fs::read(&state_p) {
|
||
|
|
if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC {
|
||
|
|
let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap());
|
||
|
|
let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap());
|
||
|
|
|
||
|
|
if cached_nodes == nodes_size && cached_rels == rels_size {
|
||
|
|
if let Ok(mut store) = bincode::deserialize::<Store>(&data[CACHE_HEADER_LEN..]) {
|
||
|
|
// Rebuild uuid_to_key (skipped by serde)
|
||
|
|
for (key, node) in &store.nodes {
|
||
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
||
|
|
}
|
||
|
|
// Bootstrap: write rkyv snapshot if missing
|
||
|
|
if !snapshot_path().exists() {
|
||
|
|
if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
|
||
|
|
eprintln!("rkyv bootstrap: {}", e);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return Ok(store);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Stale or no cache — rebuild from capnp logs
|
||
|
|
let mut store = Store::default();
|
||
|
|
|
||
|
|
if nodes_p.exists() {
|
||
|
|
store.replay_nodes(&nodes_p)?;
|
||
|
|
}
|
||
|
|
if rels_p.exists() {
|
||
|
|
store.replay_relations(&rels_p)?;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Drop edges referencing deleted/missing nodes
|
||
|
|
store.relations.retain(|r|
|
||
|
|
store.nodes.contains_key(&r.source_key) &&
|
||
|
|
store.nodes.contains_key(&r.target_key)
|
||
|
|
);
|
||
|
|
|
||
|
|
store.save()?;
|
||
|
|
Ok(store)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Replay node log, keeping latest version per UUID
|
||
|
|
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
|
||
|
|
let file = fs::File::open(path)
|
||
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||
|
|
let mut reader = BufReader::new(file);
|
||
|
|
|
||
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
||
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
||
|
|
.map_err(|e| format!("read node log: {}", e))?;
|
||
|
|
for node_reader in log.get_nodes()
|
||
|
|
.map_err(|e| format!("get nodes: {}", e))? {
|
||
|
|
let node = Node::from_capnp(node_reader)?;
|
||
|
|
let existing_version = self.nodes.get(&node.key)
|
||
|
|
.map(|n| n.version)
|
||
|
|
.unwrap_or(0);
|
||
|
|
if node.version >= existing_version {
|
||
|
|
if node.deleted {
|
||
|
|
self.nodes.remove(&node.key);
|
||
|
|
self.uuid_to_key.remove(&node.uuid);
|
||
|
|
} else {
|
||
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
||
|
|
self.nodes.insert(node.key.clone(), node);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Replay relation log, keeping latest version per UUID
|
||
|
|
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
|
||
|
|
let file = fs::File::open(path)
|
||
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||
|
|
let mut reader = BufReader::new(file);
|
||
|
|
|
||
|
|
// Collect all, then deduplicate by UUID keeping latest version
|
||
|
|
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
|
||
|
|
|
||
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
||
|
|
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
|
||
|
|
.map_err(|e| format!("read relation log: {}", e))?;
|
||
|
|
for rel_reader in log.get_relations()
|
||
|
|
.map_err(|e| format!("get relations: {}", e))? {
|
||
|
|
let rel = Relation::from_capnp(rel_reader)?;
|
||
|
|
let existing_version = by_uuid.get(&rel.uuid)
|
||
|
|
.map(|r| r.version)
|
||
|
|
.unwrap_or(0);
|
||
|
|
if rel.version >= existing_version {
|
||
|
|
by_uuid.insert(rel.uuid, rel);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
self.relations = by_uuid.into_values()
|
||
|
|
.filter(|r| !r.deleted)
|
||
|
|
.collect();
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Append nodes to the log file
|
||
|
|
pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> {
|
||
|
|
let _lock = StoreLock::acquire()?;
|
||
|
|
|
||
|
|
let path = nodes_path();
|
||
|
|
let file = fs::OpenOptions::new()
|
||
|
|
.create(true).append(true).open(&path)
|
||
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||
|
|
let mut writer = BufWriter::new(file);
|
||
|
|
|
||
|
|
let mut msg = message::Builder::new_default();
|
||
|
|
{
|
||
|
|
let log = msg.init_root::<memory_capnp::node_log::Builder>();
|
||
|
|
let mut list = log.init_nodes(nodes.len() as u32);
|
||
|
|
for (i, node) in nodes.iter().enumerate() {
|
||
|
|
node.to_capnp(list.reborrow().get(i as u32));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
serialize::write_message(&mut writer, &msg)
|
||
|
|
.map_err(|e| format!("write nodes: {}", e))?;
|
||
|
|
writer.flush().map_err(|e| format!("flush: {}", e))?;
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Append relations to the log file
|
||
|
|
pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> {
|
||
|
|
let _lock = StoreLock::acquire()?;
|
||
|
|
|
||
|
|
let path = relations_path();
|
||
|
|
let file = fs::OpenOptions::new()
|
||
|
|
.create(true).append(true).open(&path)
|
||
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||
|
|
let mut writer = BufWriter::new(file);
|
||
|
|
|
||
|
|
let mut msg = message::Builder::new_default();
|
||
|
|
{
|
||
|
|
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
|
||
|
|
let mut list = log.init_relations(relations.len() as u32);
|
||
|
|
for (i, rel) in relations.iter().enumerate() {
|
||
|
|
rel.to_capnp(list.reborrow().get(i as u32));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
serialize::write_message(&mut writer, &msg)
|
||
|
|
.map_err(|e| format!("write relations: {}", e))?;
|
||
|
|
writer.flush().map_err(|e| format!("flush: {}", e))?;
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Save the derived cache with log size header for staleness detection.
|
||
|
|
/// Uses atomic write (tmp + rename) to prevent partial reads.
|
||
|
|
pub fn save(&self) -> Result<(), String> {
|
||
|
|
let _lock = StoreLock::acquire()?;
|
||
|
|
|
||
|
|
let path = state_path();
|
||
|
|
if let Some(parent) = path.parent() {
|
||
|
|
fs::create_dir_all(parent).ok();
|
||
|
|
}
|
||
|
|
|
||
|
|
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
|
||
|
|
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
|
||
|
|
|
||
|
|
let bincode_data = bincode::serialize(self)
|
||
|
|
.map_err(|e| format!("bincode serialize: {}", e))?;
|
||
|
|
|
||
|
|
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
|
||
|
|
data.extend_from_slice(&CACHE_MAGIC);
|
||
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
||
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
||
|
|
data.extend_from_slice(&bincode_data);
|
||
|
|
|
||
|
|
// Atomic write: tmp file + rename
|
||
|
|
let tmp_path = path.with_extension("bin.tmp");
|
||
|
|
fs::write(&tmp_path, &data)
|
||
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
||
|
|
fs::rename(&tmp_path, &path)
|
||
|
|
.map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?;
|
||
|
|
|
||
|
|
// Also write rkyv snapshot (mmap-friendly)
|
||
|
|
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
|
||
|
|
eprintln!("rkyv snapshot save: {}", e);
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Serialize store as rkyv snapshot with staleness header.
|
||
|
|
/// Assumes StoreLock is already held by caller.
|
||
|
|
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
|
||
|
|
let snap = Snapshot {
|
||
|
|
nodes: self.nodes.clone(),
|
||
|
|
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
|
||
|
|
gaps: self.gaps.clone(),
|
||
|
|
params: self.params,
|
||
|
|
};
|
||
|
|
|
||
|
|
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
|
||
|
|
.map_err(|e| format!("rkyv serialize: {}", e))?;
|
||
|
|
|
||
|
|
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
|
||
|
|
data.extend_from_slice(&RKYV_MAGIC);
|
||
|
|
data.extend_from_slice(&1u32.to_le_bytes()); // format version
|
||
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
||
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
||
|
|
data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes());
|
||
|
|
data.extend_from_slice(&rkyv_data);
|
||
|
|
|
||
|
|
let path = snapshot_path();
|
||
|
|
let tmp_path = path.with_extension("rkyv.tmp");
|
||
|
|
fs::write(&tmp_path, &data)
|
||
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
||
|
|
fs::rename(&tmp_path, &path)
|
||
|
|
.map_err(|e| format!("rename: {}", e))?;
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Try loading store from mmap'd rkyv snapshot.
|
||
|
|
/// Returns None if snapshot is missing or stale (log sizes don't match).
|
||
|
|
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
|
||
|
|
let path = snapshot_path();
|
||
|
|
if !path.exists() { return Ok(None); }
|
||
|
|
|
||
|
|
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
|
||
|
|
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
|
||
|
|
|
||
|
|
let file = fs::File::open(&path)
|
||
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||
|
|
|
||
|
|
let mmap = unsafe { memmap2::Mmap::map(&file) }
|
||
|
|
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
|
||
|
|
|
||
|
|
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
|
||
|
|
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
|
||
|
|
|
||
|
|
// [4..8] = version, skip for now
|
||
|
|
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
|
||
|
|
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
|
||
|
|
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
|
||
|
|
|
||
|
|
if cached_nodes != nodes_size || cached_rels != rels_size {
|
||
|
|
return Ok(None); // stale
|
||
|
|
}
|
||
|
|
if mmap.len() < RKYV_HEADER_LEN + data_len {
|
||
|
|
return Ok(None); // truncated
|
||
|
|
}
|
||
|
|
|
||
|
|
let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len];
|
||
|
|
|
||
|
|
// SAFETY: we wrote this file ourselves via save_snapshot().
|
||
|
|
// Skip full validation (check_archived_root) — the staleness header
|
||
|
|
// already confirms this snapshot matches the current log state.
|
||
|
|
let archived = unsafe { rkyv::archived_root::<Snapshot>(rkyv_data) };
|
||
|
|
|
||
|
|
let snap: Snapshot = <ArchivedSnapshot as rkyv::Deserialize<Snapshot, rkyv::Infallible>>
|
||
|
|
::deserialize(archived, &mut rkyv::Infallible).unwrap();
|
||
|
|
|
||
|
|
let mut store = Store {
|
||
|
|
nodes: snap.nodes,
|
||
|
|
relations: snap.relations,
|
||
|
|
gaps: snap.gaps,
|
||
|
|
params: snap.params,
|
||
|
|
..Default::default()
|
||
|
|
};
|
||
|
|
|
||
|
|
// Rebuild uuid_to_key (not serialized)
|
||
|
|
for (key, node) in &store.nodes {
|
||
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(Some(store))
|
||
|
|
}
|
||
|
|
}
|