consciousness/src/store/persist.rs
ProofOfConcept f4c4e1bb39 persist: fix store race condition with concurrent writers
The cache staleness mechanism (log-size headers, tmp+rename) was sound,
but save() was re-reading the current log size from the filesystem
instead of using the size at load time. With concurrent writers, this
caused the cache to claim validity for log data it didn't contain.

Fix: track loaded_nodes_size/loaded_rels_size through the Store
lifecycle. Set them on load (all three paths: rkyv snapshot, bincode
cache, log replay) and update after each append via fstat on the
already-open fd.

Also fix append atomicity: replace BufWriter (which may issue multiple
write() syscalls) with serialize-to-Vec + single write_all(), ensuring
O_APPEND atomicity without depending on flock.

Make from_capnp() pub for use by the history command.
2026-03-06 21:38:26 -05:00

341 lines
14 KiB
Rust

// Persistence layer: load, save, replay, append, snapshot
//
// Three-tier loading strategy:
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
// 2. bincode cache (state.bin) — ~10ms
// 3. capnp log replay — ~40ms
//
// Logs are append-only; cache staleness uses log file sizes, not mtimes.
use super::types::*;
use crate::memory_capnp;
use capnp::message;
use capnp::serialize;
use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, BufWriter, Write as IoWrite};
use std::path::Path;
impl Store {
/// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs.
///
/// Staleness check uses log file sizes (not mtimes). Since logs are
/// append-only, any write grows the file, invalidating the cache.
/// This avoids the mtime race that caused data loss with concurrent
/// writers (dream loop, link audit, journal enrichment).
pub fn load() -> Result<Store, String> {
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
match Self::load_snapshot_mmap() {
Ok(Some(store)) => return Ok(store),
Ok(None) => {},
Err(e) => eprintln!("rkyv snapshot: {}", e),
}
// 2. Try bincode state.bin cache (~10ms)
let nodes_p = nodes_path();
let rels_p = relations_path();
let state_p = state_path();
let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
if let Ok(data) = fs::read(&state_p) {
if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC {
let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap());
let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap());
if cached_nodes == nodes_size && cached_rels == rels_size {
if let Ok(mut store) = bincode::deserialize::<Store>(&data[CACHE_HEADER_LEN..]) {
// Rebuild uuid_to_key (skipped by serde)
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
// Bootstrap: write rkyv snapshot if missing
if !snapshot_path().exists() {
if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
eprintln!("rkyv bootstrap: {}", e);
}
}
return Ok(store);
}
}
}
}
// Stale or no cache — rebuild from capnp logs
let mut store = Store::default();
if nodes_p.exists() {
store.replay_nodes(&nodes_p)?;
}
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
// Record log sizes after replay — this is the state we reflect
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
// Drop edges referencing deleted/missing nodes
store.relations.retain(|r|
store.nodes.contains_key(&r.source_key) &&
store.nodes.contains_key(&r.target_key)
);
store.save()?;
Ok(store)
}
/// Replay node log, keeping latest version per UUID
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = Node::from_capnp(node_reader)?;
let existing_version = self.nodes.get(&node.key)
.map(|n| n.version)
.unwrap_or(0);
if node.version >= existing_version {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
}
}
}
}
Ok(())
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.map_err(|e| format!("read relation log: {}", e))?;
for rel_reader in log.get_relations()
.map_err(|e| format!("get relations: {}", e))? {
let rel = Relation::from_capnp(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
.unwrap_or(0);
if rel.version >= existing_version {
by_uuid.insert(rel.uuid, rel);
}
}
}
self.relations = by_uuid.into_values()
.filter(|r| !r.deleted)
.collect();
Ok(())
}
/// Append nodes to the log file.
/// Serializes to a Vec first, then does a single write() syscall
/// so the append is atomic with O_APPEND even without flock.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
let mut list = log.init_nodes(nodes.len() as u32);
for (i, node) in nodes.iter().enumerate() {
node.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize nodes: {}", e))?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write nodes: {}", e))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Append relations to the log file.
/// Single write() syscall for atomic O_APPEND.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
let mut list = log.init_relations(relations.len() as u32);
for (i, rel) in relations.iter().enumerate() {
rel.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize relations: {}", e))?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write relations: {}", e))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Save the derived cache with log size header for staleness detection.
/// Uses atomic write (tmp + rename) to prevent partial reads.
pub fn save(&self) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = state_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).ok();
}
// Use log sizes from load time, not current filesystem sizes.
// If another writer appended since we loaded, our recorded size
// will be smaller than the actual log → next reader detects stale
// cache and replays the (correct, append-only) log.
let nodes_size = self.loaded_nodes_size;
let rels_size = self.loaded_rels_size;
let bincode_data = bincode::serialize(self)
.map_err(|e| format!("bincode serialize: {}", e))?;
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
data.extend_from_slice(&CACHE_MAGIC);
data.extend_from_slice(&nodes_size.to_le_bytes());
data.extend_from_slice(&rels_size.to_le_bytes());
data.extend_from_slice(&bincode_data);
// Atomic write: tmp file + rename
let tmp_path = path.with_extension("bin.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename {}{}: {}", tmp_path.display(), path.display(), e))?;
// Also write rkyv snapshot (mmap-friendly)
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
eprintln!("rkyv snapshot save: {}", e);
}
Ok(())
}
/// Serialize store as rkyv snapshot with staleness header.
/// Assumes StoreLock is already held by caller.
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
let snap = Snapshot {
nodes: self.nodes.clone(),
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
gaps: self.gaps.clone(),
params: self.params,
};
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
.map_err(|e| format!("rkyv serialize: {}", e))?;
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
data.extend_from_slice(&RKYV_MAGIC);
data.extend_from_slice(&1u32.to_le_bytes()); // format version
data.extend_from_slice(&nodes_size.to_le_bytes());
data.extend_from_slice(&rels_size.to_le_bytes());
data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes());
data.extend_from_slice(&rkyv_data);
let path = snapshot_path();
let tmp_path = path.with_extension("rkyv.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename: {}", e))?;
Ok(())
}
/// Try loading store from mmap'd rkyv snapshot.
/// Returns None if snapshot is missing or stale (log sizes don't match).
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
let path = snapshot_path();
if !path.exists() { return Ok(None); }
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mmap = unsafe { memmap2::Mmap::map(&file) }
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
// [4..8] = version, skip for now
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
if cached_nodes != nodes_size || cached_rels != rels_size {
return Ok(None); // stale
}
if mmap.len() < RKYV_HEADER_LEN + data_len {
return Ok(None); // truncated
}
let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len];
// SAFETY: we wrote this file ourselves via save_snapshot().
// Skip full validation (check_archived_root) — the staleness header
// already confirms this snapshot matches the current log state.
let archived = unsafe { rkyv::archived_root::<Snapshot>(rkyv_data) };
let snap: Snapshot = <ArchivedSnapshot as rkyv::Deserialize<Snapshot, rkyv::Infallible>>
::deserialize(archived, &mut rkyv::Infallible).unwrap();
let mut store = Store {
nodes: snap.nodes,
relations: snap.relations,
gaps: snap.gaps,
params: snap.params,
..Default::default()
};
// Rebuild uuid_to_key (not serialized)
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
Ok(Some(store))
}
}