persist: fix store race condition with concurrent writers

The cache staleness mechanism (log-size headers, tmp+rename) was sound,
but save() was re-reading the current log size from the filesystem
instead of using the size at load time. With concurrent writers, this
caused the cache to claim validity for log data it didn't contain.

Fix: track loaded_nodes_size/loaded_rels_size through the Store
lifecycle. Set them on load (all three paths: rkyv snapshot, bincode
cache, log replay) and update after each append via fstat on the
already-open fd.

Also fix append atomicity: replace BufWriter (which may issue multiple
write() syscalls) with serialize-to-Vec + single write_all(), ensuring
O_APPEND atomicity without depending on flock.

Make from_capnp() pub for use by the history command.
This commit is contained in:
ProofOfConcept 2026-03-06 21:38:26 -05:00
parent 7ed6d8622c
commit f4c4e1bb39
2 changed files with 55 additions and 25 deletions

View file

@ -53,6 +53,8 @@ impl Store {
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
// Bootstrap: write rkyv snapshot if missing
if !snapshot_path().exists() {
if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
@ -75,6 +77,10 @@ impl Store {
store.replay_relations(&rels_p)?;
}
// Record log sizes after replay — this is the state we reflect
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
// Drop edges referencing deleted/missing nodes
store.relations.retain(|r|
store.nodes.contains_key(&r.source_key) &&
@ -144,16 +150,12 @@ impl Store {
Ok(())
}
/// Append nodes to the log file
pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> {
/// Append nodes to the log file.
/// Serializes to a Vec first, then does a single write() syscall
/// so the append is atomic with O_APPEND even without flock.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut writer = BufWriter::new(file);
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
@ -162,21 +164,26 @@ impl Store {
node.to_capnp(list.reborrow().get(i as u32));
}
}
serialize::write_message(&mut writer, &msg)
.map_err(|e| format!("write nodes: {}", e))?;
writer.flush().map_err(|e| format!("flush: {}", e))?;
Ok(())
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize nodes: {}", e))?;
/// Append relations to the log file
pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = relations_path();
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut writer = BufWriter::new(file);
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write nodes: {}", e))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Append relations to the log file.
/// Single write() syscall for atomic O_APPEND.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let mut msg = message::Builder::new_default();
{
@ -186,9 +193,19 @@ impl Store {
rel.to_capnp(list.reborrow().get(i as u32));
}
}
serialize::write_message(&mut writer, &msg)
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize relations: {}", e))?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write relations: {}", e))?;
writer.flush().map_err(|e| format!("flush: {}", e))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
@ -202,8 +219,12 @@ impl Store {
fs::create_dir_all(parent).ok();
}
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
// Use log sizes from load time, not current filesystem sizes.
// If another writer appended since we loaded, our recorded size
// will be smaller than the actual log → next reader detects stale
// cache and replays the (correct, append-only) log.
let nodes_size = self.loaded_nodes_size;
let rels_size = self.loaded_rels_size;
let bincode_data = bincode::serialize(self)
.map_err(|e| format!("bincode serialize: {}", e))?;
@ -312,6 +333,8 @@ impl Store {
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
Ok(Some(store))
}

View file

@ -59,7 +59,7 @@ macro_rules! capnp_message {
skip: [$($sf:ident),* $(,)?] $(,)?
) => {
impl $struct {
pub(crate) fn from_capnp(r: $reader) -> Result<Self, String> {
pub fn from_capnp(r: $reader) -> Result<Self, String> {
paste::paste! {
Ok(Self {
$($tf: read_text(r.[<get_ $tf>]()),)*
@ -89,7 +89,7 @@ pub fn memory_dir() -> PathBuf {
crate::config::get().data_dir.clone()
}
pub(crate) fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") }
pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") }
pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") }
pub(crate) fn state_path() -> PathBuf { memory_dir().join("state.bin") }
pub(crate) fn snapshot_path() -> PathBuf { memory_dir().join("snapshot.rkyv") }
@ -388,6 +388,13 @@ pub struct Store {
pub retrieval_log: Vec<RetrievalEvent>,
pub gaps: Vec<GapRecord>,
pub params: Params,
/// Log sizes at load time — used by save() to write correct staleness header.
/// If another writer appended since we loaded, our cache will be marked stale
/// (recorded size < actual size), forcing the next reader to replay the log.
#[serde(skip)]
pub(crate) loaded_nodes_size: u64,
#[serde(skip)]
pub(crate) loaded_rels_size: u64,
}
/// Snapshot for mmap: full store state minus retrieval_log (which