store: lock-refresh-write pattern to prevent duplicate UUIDs

All write paths (upsert_node, upsert_provenance, delete_node,
rename_node, ingest_units) now hold StoreLock across the full
refresh→check→write cycle. This prevents the race where two
concurrent processes both see a key as "new" and create separate
UUIDs for it.

Adds append_nodes_unlocked() and append_relations_unlocked() for
callers already holding the lock. Adds refresh_nodes() to replay
log tail under lock before deciding create vs update.

Also adds find_duplicates() for detecting existing duplicates
in the log (replays full log, groups live nodes by key).
This commit is contained in:
ProofOfConcept 2026-03-10 14:30:21 -04:00
parent 8bbc246b3d
commit 37ae37667b
3 changed files with 121 additions and 10 deletions

View file

@ -191,7 +191,11 @@ impl Store {
} }
/// Process parsed memory units: diff against existing nodes, persist changes. /// Process parsed memory units: diff against existing nodes, persist changes.
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize), String> { fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize), String> {
let _lock = types::StoreLock::acquire()?;
self.refresh_nodes()?;
let node_type = classify_filename(filename); let node_type = classify_filename(filename);
let mut new_nodes = Vec::new(); let mut new_nodes = Vec::new();
let mut updated_nodes = Vec::new(); let mut updated_nodes = Vec::new();
@ -218,14 +222,14 @@ impl Store {
} }
if !new_nodes.is_empty() { if !new_nodes.is_empty() {
self.append_nodes(&new_nodes)?; self.append_nodes_unlocked(&new_nodes)?;
for node in &new_nodes { for node in &new_nodes {
self.uuid_to_key.insert(node.uuid, node.key.clone()); self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone()); self.nodes.insert(node.key.clone(), node.clone());
} }
} }
if !updated_nodes.is_empty() { if !updated_nodes.is_empty() {
self.append_nodes(&updated_nodes)?; self.append_nodes_unlocked(&updated_nodes)?;
for node in &updated_nodes { for node in &updated_nodes {
self.nodes.insert(node.key.clone(), node.clone()); self.nodes.insert(node.key.clone(), node.clone());
} }

View file

@ -8,13 +8,17 @@ use super::types::*;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
impl Store { impl Store {
/// Add or update a node (appends to log + updates cache) /// Add or update a node (appends to log + updates cache).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> { pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if let Some(existing) = self.nodes.get(&node.key) { if let Some(existing) = self.nodes.get(&node.key) {
node.uuid = existing.uuid; node.uuid = existing.uuid;
node.version = existing.version + 1; node.version = existing.version + 1;
} }
self.append_nodes(&[node.clone()])?; self.append_nodes_unlocked(&[node.clone()])?;
self.uuid_to_key.insert(node.uuid, node.key.clone()); self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node); self.nodes.insert(node.key.clone(), node);
Ok(()) Ok(())
@ -38,7 +42,11 @@ impl Store {
} }
/// Upsert with explicit provenance (for agent-created nodes). /// Upsert with explicit provenance (for agent-created nodes).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: Provenance) -> Result<&'static str, String> { pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: Provenance) -> Result<&'static str, String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if let Some(existing) = self.nodes.get(key) { if let Some(existing) = self.nodes.get(key) {
if existing.content == content { if existing.content == content {
return Ok("unchanged"); return Ok("unchanged");
@ -47,13 +55,13 @@ impl Store {
node.content = content.to_string(); node.content = content.to_string();
node.provenance = provenance; node.provenance = provenance;
node.version += 1; node.version += 1;
self.append_nodes(std::slice::from_ref(&node))?; self.append_nodes_unlocked(std::slice::from_ref(&node))?;
self.nodes.insert(key.to_string(), node); self.nodes.insert(key.to_string(), node);
Ok("updated") Ok("updated")
} else { } else {
let mut node = new_node(key, content); let mut node = new_node(key, content);
node.provenance = provenance; node.provenance = provenance;
self.append_nodes(std::slice::from_ref(&node))?; self.append_nodes_unlocked(std::slice::from_ref(&node))?;
self.uuid_to_key.insert(node.uuid, node.key.clone()); self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(key.to_string(), node); self.nodes.insert(key.to_string(), node);
Ok("created") Ok("created")
@ -61,13 +69,17 @@ impl Store {
} }
/// Soft-delete a node (appends deleted version, removes from cache). /// Soft-delete a node (appends deleted version, removes from cache).
/// Holds StoreLock across refresh + write to see concurrent creates.
pub fn delete_node(&mut self, key: &str) -> Result<(), String> { pub fn delete_node(&mut self, key: &str) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
let node = self.nodes.get(key) let node = self.nodes.get(key)
.ok_or_else(|| format!("No node '{}'", key))?; .ok_or_else(|| format!("No node '{}'", key))?;
let mut deleted = node.clone(); let mut deleted = node.clone();
deleted.deleted = true; deleted.deleted = true;
deleted.version += 1; deleted.version += 1;
self.append_nodes(std::slice::from_ref(&deleted))?; self.append_nodes_unlocked(std::slice::from_ref(&deleted))?;
self.nodes.remove(key); self.nodes.remove(key);
Ok(()) Ok(())
} }
@ -79,10 +91,15 @@ impl Store {
/// on relations, and created_at is preserved untouched. /// on relations, and created_at is preserved untouched.
/// ///
/// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations. /// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations.
/// Holds StoreLock across refresh + write to prevent races.
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<(), String> { pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<(), String> {
if old_key == new_key { if old_key == new_key {
return Ok(()); return Ok(());
} }
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if self.nodes.contains_key(new_key) { if self.nodes.contains_key(new_key) {
return Err(format!("Key '{}' already exists", new_key)); return Err(format!("Key '{}' already exists", new_key));
} }
@ -112,10 +129,10 @@ impl Store {
}) })
.collect(); .collect();
// Persist (each append acquires its own file lock) // Persist under single lock
self.append_nodes(&[renamed.clone(), tombstone])?; self.append_nodes_unlocked(&[renamed.clone(), tombstone])?;
if !updated_rels.is_empty() { if !updated_rels.is_empty() {
self.append_relations(&updated_rels)?; self.append_relations_unlocked(&updated_rels)?;
} }
// Update in-memory cache // Update in-memory cache

View file

@ -169,12 +169,58 @@ impl Store {
Ok(()) Ok(())
} }
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
/// Returns a map from key → vec of all live Node versions (one per UUID).
/// The "winner" in self.nodes is always one of them.
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>, String> {
let path = nodes_path();
if !path.exists() { return Ok(HashMap::new()); }
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Track latest version of each UUID
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = Node::from_capnp(node_reader)?;
let dominated = by_uuid.get(&node.uuid)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
by_uuid.insert(node.uuid, node);
}
}
}
// Group live (non-deleted) nodes by key
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
for node in by_uuid.into_values() {
if !node.deleted {
by_key.entry(node.key.clone()).or_default().push(node);
}
}
// Keep only duplicates
by_key.retain(|_, nodes| nodes.len() > 1);
Ok(by_key)
}
/// Append nodes to the log file. /// Append nodes to the log file.
/// Serializes to a Vec first, then does a single write() syscall /// Serializes to a Vec first, then does a single write() syscall
/// so the append is atomic with O_APPEND even without flock. /// so the append is atomic with O_APPEND even without flock.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> { pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
let _lock = StoreLock::acquire()?; let _lock = StoreLock::acquire()?;
self.append_nodes_unlocked(nodes)
}
/// Append nodes without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> {
let mut msg = message::Builder::new_default(); let mut msg = message::Builder::new_default();
{ {
let log = msg.init_root::<memory_capnp::node_log::Builder>(); let log = msg.init_root::<memory_capnp::node_log::Builder>();
@ -199,11 +245,55 @@ impl Store {
Ok(()) Ok(())
} }
/// Replay only new entries appended to the node log since we last loaded.
/// Call under StoreLock to catch writes from concurrent processes.
pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> {
let path = nodes_path();
let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
if current_size <= self.loaded_nodes_size {
return Ok(()); // no new data
}
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size))
.map_err(|e| format!("seek nodes log: {}", e))?;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log delta: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes delta: {}", e))? {
let node = Node::from_capnp(node_reader)?;
let dominated = self.nodes.get(&node.key)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
}
}
}
}
self.loaded_nodes_size = current_size;
Ok(())
}
/// Append relations to the log file. /// Append relations to the log file.
/// Single write() syscall for atomic O_APPEND. /// Single write() syscall for atomic O_APPEND.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> { pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?; let _lock = StoreLock::acquire()?;
self.append_relations_unlocked(relations)
}
/// Append relations without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> {
let mut msg = message::Builder::new_default(); let mut msg = message::Builder::new_default();
{ {
let log = msg.init_root::<memory_capnp::relation_log::Builder>(); let log = msg.init_root::<memory_capnp::relation_log::Builder>();