diff --git a/src/hippocampus/store/capnp.rs b/src/hippocampus/store/capnp.rs index a8eee3c..1f997d3 100644 --- a/src/hippocampus/store/capnp.rs +++ b/src/hippocampus/store/capnp.rs @@ -215,18 +215,8 @@ impl Store { Ok(by_key) } - /// Append nodes to the log file. - /// Serializes to a Vec first, then does a single write() syscall - /// so the append is atomic with O_APPEND even without flock. - /// Returns the offset where the message was written. + /// Append nodes to the log file. Returns the offset where the message was written. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result { - let _lock = StoreLock::acquire()?; - self.append_nodes_unlocked(nodes) - } - - /// Append nodes without acquiring the lock. Caller must hold StoreLock. - /// Returns the offset where the message was written. - pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); @@ -255,55 +245,8 @@ impl Store { Ok(offset) } - /// Replay only new entries appended to the node log since we last loaded. - /// Call under StoreLock to catch writes from concurrent processes. - pub(crate) fn refresh_nodes(&mut self) -> Result<()> { - let path = nodes_path(); - let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); - if current_size <= self.loaded_nodes_size { - return Ok(()); // no new data - } - - let file = fs::File::open(&path) - .with_context(|| format!("open {}", path.display()))?; - let mut reader = BufReader::new(file); - reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size)) - .with_context(|| format!("seek nodes log"))?; - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = msg.get_root::() - .with_context(|| format!("read node log delta"))?; - for node_reader in log.get_nodes() - .with_context(|| format!("get nodes delta"))? { - let node = Node::from_capnp_migrate(node_reader)?; - let dominated = self.nodes.get(&node.key) - .map(|n| node.version >= n.version) - .unwrap_or(true); - if dominated { - if node.deleted { - self.nodes.remove(&node.key); - self.uuid_to_key.remove(&node.uuid); - } else { - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(node.key.clone(), node); - } - } - } - } - - self.loaded_nodes_size = current_size; - Ok(()) - } - /// Append relations to the log file. - /// Single write() syscall for atomic O_APPEND. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> { - let _lock = StoreLock::acquire()?; - self.append_relations_unlocked(relations) - } - - /// Append relations without acquiring the lock. Caller must hold StoreLock. - pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<()> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index f45fdb0..cd3ee97 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -16,16 +16,12 @@ pub fn current_provenance() -> String { impl Store { /// Add or update a node (appends to log + updates index). - /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. pub fn upsert_node(&mut self, mut node: Node) -> Result<()> { - let _lock = StoreLock::acquire()?; - self.refresh_nodes()?; - if let Some(existing) = self.nodes.get(&node.key) { node.uuid = existing.uuid; node.version = existing.version + 1; } - let offset = self.append_nodes_unlocked(&[node.clone()])?; + let offset = self.append_nodes(&[node.clone()])?; if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } @@ -63,11 +59,7 @@ impl Store { } /// Upsert with explicit provenance (for agent-created nodes). - /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> { - let _lock = StoreLock::acquire()?; - self.refresh_nodes()?; - if let Some(existing) = self.nodes.get(key) { if existing.content == content { return Ok("unchanged"); @@ -77,7 +69,7 @@ impl Store { node.provenance = provenance.to_string(); node.timestamp = now_epoch(); node.version += 1; - let offset = self.append_nodes_unlocked(std::slice::from_ref(&node))?; + let offset = self.append_nodes(std::slice::from_ref(&node))?; if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } @@ -86,7 +78,7 @@ impl Store { } else { let mut node = new_node(key, content); node.provenance = provenance.to_string(); - let offset = self.append_nodes_unlocked(std::slice::from_ref(&node))?; + let offset = self.append_nodes(std::slice::from_ref(&node))?; if let Some(ref database) = self.db { index::index_node(database, &node.key, offset, &node.uuid)?; } @@ -96,12 +88,8 @@ impl Store { } } - /// Soft-delete a node (appends deleted version, removes from cache + redb). - /// Holds StoreLock across refresh + write to see concurrent creates. + /// Soft-delete a node (appends deleted version, removes from index). pub fn delete_node(&mut self, key: &str) -> Result<()> { - let _lock = StoreLock::acquire()?; - self.refresh_nodes()?; - let prov = current_provenance(); let node = self.nodes.get(key) @@ -112,7 +100,7 @@ impl Store { deleted.version += 1; deleted.provenance = prov; deleted.timestamp = now_epoch(); - self.append_nodes_unlocked(std::slice::from_ref(&deleted))?; + self.append_nodes(std::slice::from_ref(&deleted))?; if let Some(ref database) = self.db { index::remove_node(database, key, &uuid)?; } @@ -125,17 +113,10 @@ impl Store { /// Graph edges (source/target UUIDs) are unaffected — they're already /// UUID-based. We update the human-readable source_key/target_key strings /// on relations, and created_at is preserved untouched. - /// - /// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations. - /// Holds StoreLock across refresh + write to prevent races. pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<()> { if old_key == new_key { return Ok(()); } - - let _lock = StoreLock::acquire()?; - self.refresh_nodes()?; - if self.nodes.contains_key(new_key) { bail!("Key '{}' already exists", new_key); } @@ -172,9 +153,9 @@ impl Store { .collect(); // Persist under single lock - let offset = self.append_nodes_unlocked(&[renamed.clone(), tombstone.clone()])?; + let offset = self.append_nodes(&[renamed.clone(), tombstone.clone()])?; if !updated_rels.is_empty() { - self.append_relations_unlocked(&updated_rels)?; + self.append_relations(&updated_rels)?; } // Update index: remove old key, add renamed diff --git a/src/hippocampus/store/types.rs b/src/hippocampus/store/types.rs index cc5f138..4db374b 100644 --- a/src/hippocampus/store/types.rs +++ b/src/hippocampus/store/types.rs @@ -5,13 +5,11 @@ use crate::memory_capnp; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use uuid::Uuid; use std::collections::HashMap; -use std::fs; -use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; @@ -94,29 +92,6 @@ pub fn memory_dir() -> PathBuf { pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") } pub(crate) fn db_path() -> PathBuf { memory_dir().join("index.redb") } -fn lock_path() -> PathBuf { memory_dir().join(".store.lock") } - -/// RAII file lock using flock(2). Dropped when scope exits. -pub(crate) struct StoreLock { - _file: fs::File, -} - -impl StoreLock { - pub(crate) fn acquire() -> Result { - let path = lock_path(); - let file = fs::OpenOptions::new() - .create(true).truncate(false).write(true).open(&path) - .with_context(|| format!("open lock {}", path.display()))?; - - // Blocking exclusive lock - let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) }; - if ret != 0 { - bail!("flock: {}", std::io::Error::last_os_error()); - } - Ok(StoreLock { _file: file }) - } - // Lock released automatically when _file is dropped (flock semantics) -} pub fn now_epoch() -> i64 { SystemTime::now()