diff --git a/poc-memory/src/store/mod.rs b/poc-memory/src/store/mod.rs index 5f0d71e..121f361 100644 --- a/poc-memory/src/store/mod.rs +++ b/poc-memory/src/store/mod.rs @@ -191,7 +191,11 @@ impl Store { } /// Process parsed memory units: diff against existing nodes, persist changes. + /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize), String> { + let _lock = types::StoreLock::acquire()?; + self.refresh_nodes()?; + let node_type = classify_filename(filename); let mut new_nodes = Vec::new(); let mut updated_nodes = Vec::new(); @@ -218,14 +222,14 @@ impl Store { } if !new_nodes.is_empty() { - self.append_nodes(&new_nodes)?; + self.append_nodes_unlocked(&new_nodes)?; for node in &new_nodes { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node.clone()); } } if !updated_nodes.is_empty() { - self.append_nodes(&updated_nodes)?; + self.append_nodes_unlocked(&updated_nodes)?; for node in &updated_nodes { self.nodes.insert(node.key.clone(), node.clone()); } diff --git a/poc-memory/src/store/ops.rs b/poc-memory/src/store/ops.rs index 21363b6..c27aa11 100644 --- a/poc-memory/src/store/ops.rs +++ b/poc-memory/src/store/ops.rs @@ -8,13 +8,17 @@ use super::types::*; use std::collections::{HashMap, HashSet}; impl Store { - /// Add or update a node (appends to log + updates cache) + /// Add or update a node (appends to log + updates cache). + /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> { + let _lock = StoreLock::acquire()?; + self.refresh_nodes()?; + if let Some(existing) = self.nodes.get(&node.key) { node.uuid = existing.uuid; node.version = existing.version + 1; } - self.append_nodes(&[node.clone()])?; + self.append_nodes_unlocked(&[node.clone()])?; self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node); Ok(()) @@ -38,7 +42,11 @@ impl Store { } /// Upsert with explicit provenance (for agent-created nodes). + /// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs. pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: Provenance) -> Result<&'static str, String> { + let _lock = StoreLock::acquire()?; + self.refresh_nodes()?; + if let Some(existing) = self.nodes.get(key) { if existing.content == content { return Ok("unchanged"); @@ -47,13 +55,13 @@ impl Store { node.content = content.to_string(); node.provenance = provenance; node.version += 1; - self.append_nodes(std::slice::from_ref(&node))?; + self.append_nodes_unlocked(std::slice::from_ref(&node))?; self.nodes.insert(key.to_string(), node); Ok("updated") } else { let mut node = new_node(key, content); node.provenance = provenance; - self.append_nodes(std::slice::from_ref(&node))?; + self.append_nodes_unlocked(std::slice::from_ref(&node))?; self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(key.to_string(), node); Ok("created") @@ -61,13 +69,17 @@ impl Store { } /// Soft-delete a node (appends deleted version, removes from cache). + /// Holds StoreLock across refresh + write to see concurrent creates. pub fn delete_node(&mut self, key: &str) -> Result<(), String> { + let _lock = StoreLock::acquire()?; + self.refresh_nodes()?; + let node = self.nodes.get(key) .ok_or_else(|| format!("No node '{}'", key))?; let mut deleted = node.clone(); deleted.deleted = true; deleted.version += 1; - self.append_nodes(std::slice::from_ref(&deleted))?; + self.append_nodes_unlocked(std::slice::from_ref(&deleted))?; self.nodes.remove(key); Ok(()) } @@ -79,10 +91,15 @@ impl Store { /// on relations, and created_at is preserved untouched. /// /// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations. + /// Holds StoreLock across refresh + write to prevent races. pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<(), String> { if old_key == new_key { return Ok(()); } + + let _lock = StoreLock::acquire()?; + self.refresh_nodes()?; + if self.nodes.contains_key(new_key) { return Err(format!("Key '{}' already exists", new_key)); } @@ -112,10 +129,10 @@ impl Store { }) .collect(); - // Persist (each append acquires its own file lock) - self.append_nodes(&[renamed.clone(), tombstone])?; + // Persist under single lock + self.append_nodes_unlocked(&[renamed.clone(), tombstone])?; if !updated_rels.is_empty() { - self.append_relations(&updated_rels)?; + self.append_relations_unlocked(&updated_rels)?; } // Update in-memory cache diff --git a/poc-memory/src/store/persist.rs b/poc-memory/src/store/persist.rs index f0b3ae5..46185c8 100644 --- a/poc-memory/src/store/persist.rs +++ b/poc-memory/src/store/persist.rs @@ -169,12 +169,58 @@ impl Store { Ok(()) } + /// Find all duplicate keys: keys with multiple live UUIDs in the log. + /// Returns a map from key → vec of all live Node versions (one per UUID). + /// The "winner" in self.nodes is always one of them. + pub fn find_duplicates(&self) -> Result>, String> { + let path = nodes_path(); + if !path.exists() { return Ok(HashMap::new()); } + + let file = fs::File::open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + // Track latest version of each UUID + let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new(); + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = msg.get_root::() + .map_err(|e| format!("read node log: {}", e))?; + for node_reader in log.get_nodes() + .map_err(|e| format!("get nodes: {}", e))? { + let node = Node::from_capnp(node_reader)?; + let dominated = by_uuid.get(&node.uuid) + .map(|n| node.version >= n.version) + .unwrap_or(true); + if dominated { + by_uuid.insert(node.uuid, node); + } + } + } + + // Group live (non-deleted) nodes by key + let mut by_key: HashMap> = HashMap::new(); + for node in by_uuid.into_values() { + if !node.deleted { + by_key.entry(node.key.clone()).or_default().push(node); + } + } + + // Keep only duplicates + by_key.retain(|_, nodes| nodes.len() > 1); + Ok(by_key) + } + /// Append nodes to the log file. /// Serializes to a Vec first, then does a single write() syscall /// so the append is atomic with O_APPEND even without flock. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> { let _lock = StoreLock::acquire()?; + self.append_nodes_unlocked(nodes) + } + /// Append nodes without acquiring the lock. Caller must hold StoreLock. + pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); @@ -199,11 +245,55 @@ impl Store { Ok(()) } + /// Replay only new entries appended to the node log since we last loaded. + /// Call under StoreLock to catch writes from concurrent processes. + pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> { + let path = nodes_path(); + let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); + if current_size <= self.loaded_nodes_size { + return Ok(()); // no new data + } + + let file = fs::File::open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size)) + .map_err(|e| format!("seek nodes log: {}", e))?; + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = msg.get_root::() + .map_err(|e| format!("read node log delta: {}", e))?; + for node_reader in log.get_nodes() + .map_err(|e| format!("get nodes delta: {}", e))? { + let node = Node::from_capnp(node_reader)?; + let dominated = self.nodes.get(&node.key) + .map(|n| node.version >= n.version) + .unwrap_or(true); + if dominated { + if node.deleted { + self.nodes.remove(&node.key); + self.uuid_to_key.remove(&node.uuid); + } else { + self.uuid_to_key.insert(node.uuid, node.key.clone()); + self.nodes.insert(node.key.clone(), node); + } + } + } + } + + self.loaded_nodes_size = current_size; + Ok(()) + } + /// Append relations to the log file. /// Single write() syscall for atomic O_APPEND. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> { let _lock = StoreLock::acquire()?; + self.append_relations_unlocked(relations) + } + /// Append relations without acquiring the lock. Caller must hold StoreLock. + pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::();