diff --git a/src/store/persist.rs b/src/store/persist.rs index f1b4d39..cac15dd 100644 --- a/src/store/persist.rs +++ b/src/store/persist.rs @@ -53,6 +53,8 @@ impl Store { for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } + store.loaded_nodes_size = nodes_size; + store.loaded_rels_size = rels_size; // Bootstrap: write rkyv snapshot if missing if !snapshot_path().exists() { if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { @@ -75,6 +77,10 @@ impl Store { store.replay_relations(&rels_p)?; } + // Record log sizes after replay — this is the state we reflect + store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); + store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); + // Drop edges referencing deleted/missing nodes store.relations.retain(|r| store.nodes.contains_key(&r.source_key) && @@ -144,16 +150,12 @@ impl Store { Ok(()) } - /// Append nodes to the log file - pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> { + /// Append nodes to the log file. + /// Serializes to a Vec first, then does a single write() syscall + /// so the append is atomic with O_APPEND even without flock. + pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> { let _lock = StoreLock::acquire()?; - let path = nodes_path(); - let file = fs::OpenOptions::new() - .create(true).append(true).open(&path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); - let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); @@ -162,21 +164,26 @@ impl Store { node.to_capnp(list.reborrow().get(i as u32)); } } - serialize::write_message(&mut writer, &msg) - .map_err(|e| format!("write nodes: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; - Ok(()) - } + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize nodes: {}", e))?; - /// Append relations to the log file - pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> { - let _lock = StoreLock::acquire()?; - - let path = relations_path(); + let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); + use std::io::Write; + (&file).write_all(&buf) + .map_err(|e| format!("write nodes: {}", e))?; + + self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); + Ok(()) + } + + /// Append relations to the log file. + /// Single write() syscall for atomic O_APPEND. + pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> { + let _lock = StoreLock::acquire()?; let mut msg = message::Builder::new_default(); { @@ -186,9 +193,19 @@ impl Store { rel.to_capnp(list.reborrow().get(i as u32)); } } - serialize::write_message(&mut writer, &msg) + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize relations: {}", e))?; + + let path = relations_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + use std::io::Write; + (&file).write_all(&buf) .map_err(|e| format!("write relations: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; + + self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } @@ -202,8 +219,12 @@ impl Store { fs::create_dir_all(parent).ok(); } - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); + // Use log sizes from load time, not current filesystem sizes. + // If another writer appended since we loaded, our recorded size + // will be smaller than the actual log → next reader detects stale + // cache and replays the (correct, append-only) log. + let nodes_size = self.loaded_nodes_size; + let rels_size = self.loaded_rels_size; let bincode_data = bincode::serialize(self) .map_err(|e| format!("bincode serialize: {}", e))?; @@ -312,6 +333,8 @@ impl Store { for (key, node) in &store.nodes { store.uuid_to_key.insert(node.uuid, key.clone()); } + store.loaded_nodes_size = nodes_size; + store.loaded_rels_size = rels_size; Ok(Some(store)) } diff --git a/src/store/types.rs b/src/store/types.rs index cce4140..f47982d 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -59,7 +59,7 @@ macro_rules! capnp_message { skip: [$($sf:ident),* $(,)?] $(,)? ) => { impl $struct { - pub(crate) fn from_capnp(r: $reader) -> Result { + pub fn from_capnp(r: $reader) -> Result { paste::paste! { Ok(Self { $($tf: read_text(r.[]()),)* @@ -89,7 +89,7 @@ pub fn memory_dir() -> PathBuf { crate::config::get().data_dir.clone() } -pub(crate) fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } +pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") } pub(crate) fn state_path() -> PathBuf { memory_dir().join("state.bin") } pub(crate) fn snapshot_path() -> PathBuf { memory_dir().join("snapshot.rkyv") } @@ -388,6 +388,13 @@ pub struct Store { pub retrieval_log: Vec, pub gaps: Vec, pub params: Params, + /// Log sizes at load time — used by save() to write correct staleness header. + /// If another writer appended since we loaded, our cache will be marked stale + /// (recorded size < actual size), forcing the next reader to replay the log. + #[serde(skip)] + pub(crate) loaded_nodes_size: u64, + #[serde(skip)] + pub(crate) loaded_rels_size: u64, } /// Snapshot for mmap: full store state minus retrieval_log (which