From f4c4e1bb396be0183709ce739aa24c7f27803bdb Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 6 Mar 2026 21:38:26 -0500 Subject: [PATCH] persist: fix store race condition with concurrent writers The cache staleness mechanism (log-size headers, tmp+rename) was sound, but save() was re-reading the current log size from the filesystem instead of using the size at load time. With concurrent writers, this caused the cache to claim validity for log data it didn't contain. Fix: track loaded_nodes_size/loaded_rels_size through the Store lifecycle. Set them on load (all three paths: rkyv snapshot, bincode cache, log replay) and update after each append via fstat on the already-open fd. Also fix append atomicity: replace BufWriter (which may issue multiple write() syscalls) with serialize-to-Vec + single write_all(), ensuring O_APPEND atomicity without depending on flock. Make from_capnp() pub for use by the history command. --- src/store/persist.rs | 69 +++++++++++++++++++++++++++++--------------- src/store/types.rs | 11 +++++-- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/store/persist.rs b/src/store/persist.rs index f1b4d39..cac15dd 100644 --- a/src/store/persist.rs +++ b/src/store/persist.rs @@ -53,6 +53,8 @@ impl Store { for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } + store.loaded_nodes_size = nodes_size; + store.loaded_rels_size = rels_size; // Bootstrap: write rkyv snapshot if missing if !snapshot_path().exists() { if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { @@ -75,6 +77,10 @@ impl Store { store.replay_relations(&rels_p)?; } + // Record log sizes after replay — this is the state we reflect + store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); + store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); + // Drop edges referencing deleted/missing nodes store.relations.retain(|r| store.nodes.contains_key(&r.source_key) && @@ -144,16 +150,12 @@ impl Store { Ok(()) } - /// Append nodes to the log file - pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> { + /// Append nodes to the log file. + /// Serializes to a Vec first, then does a single write() syscall + /// so the append is atomic with O_APPEND even without flock. + pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> { let _lock = StoreLock::acquire()?; - let path = nodes_path(); - let file = fs::OpenOptions::new() - .create(true).append(true).open(&path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); - let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); @@ -162,21 +164,26 @@ impl Store { node.to_capnp(list.reborrow().get(i as u32)); } } - serialize::write_message(&mut writer, &msg) - .map_err(|e| format!("write nodes: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; - Ok(()) - } + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize nodes: {}", e))?; - /// Append relations to the log file - pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> { - let _lock = StoreLock::acquire()?; - - let path = relations_path(); + let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); + use std::io::Write; + (&file).write_all(&buf) + .map_err(|e| format!("write nodes: {}", e))?; + + self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); + Ok(()) + } + + /// Append relations to the log file. + /// Single write() syscall for atomic O_APPEND. + pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> { + let _lock = StoreLock::acquire()?; let mut msg = message::Builder::new_default(); { @@ -186,9 +193,19 @@ impl Store { rel.to_capnp(list.reborrow().get(i as u32)); } } - serialize::write_message(&mut writer, &msg) + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize relations: {}", e))?; + + let path = relations_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + use std::io::Write; + (&file).write_all(&buf) .map_err(|e| format!("write relations: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; + + self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } @@ -202,8 +219,12 @@ impl Store { fs::create_dir_all(parent).ok(); } - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); + // Use log sizes from load time, not current filesystem sizes. + // If another writer appended since we loaded, our recorded size + // will be smaller than the actual log → next reader detects stale + // cache and replays the (correct, append-only) log. + let nodes_size = self.loaded_nodes_size; + let rels_size = self.loaded_rels_size; let bincode_data = bincode::serialize(self) .map_err(|e| format!("bincode serialize: {}", e))?; @@ -312,6 +333,8 @@ impl Store { for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } + store.loaded_nodes_size = nodes_size; + store.loaded_rels_size = rels_size; Ok(Some(store)) } diff --git a/src/store/types.rs b/src/store/types.rs index cce4140..f47982d 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -59,7 +59,7 @@ macro_rules! capnp_message { skip: [$($sf:ident),* $(,)?] $(,)? ) => { impl $struct { - pub(crate) fn from_capnp(r: $reader) -> Result { + pub fn from_capnp(r: $reader) -> Result { paste::paste! { Ok(Self { $($tf: read_text(r.[]()),)* @@ -89,7 +89,7 @@ pub fn memory_dir() -> PathBuf { crate::config::get().data_dir.clone() } -pub(crate) fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } +pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") } pub(crate) fn state_path() -> PathBuf { memory_dir().join("state.bin") } pub(crate) fn snapshot_path() -> PathBuf { memory_dir().join("snapshot.rkyv") } @@ -388,6 +388,13 @@ pub struct Store { pub retrieval_log: Vec, pub gaps: Vec, pub params: Params, + /// Log sizes at load time — used by save() to write correct staleness header. + /// If another writer appended since we loaded, our cache will be marked stale + /// (recorded size < actual size), forcing the next reader to replay the log. + #[serde(skip)] + pub(crate) loaded_nodes_size: u64, + #[serde(skip)] + pub(crate) loaded_rels_size: u64, } /// Snapshot for mmap: full store state minus retrieval_log (which