store: internal locking, remove Arc<Mutex<Store>> wrapper

Store now has internal Mutex for capnp appends and AtomicU64 for
size tracking. All methods take &self. The external Arc<Mutex<Store>>
is replaced with Arc<Store>.

- Store::append_lock protects file appends
- local.rs functions take &Store (not &mut Store)
- access_local() returns Arc<Store>
- All .lock().await calls removed from callers

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 21:49:54 -04:00
parent 4696bb8b7d
commit b3d0a3ab25
13 changed files with 86 additions and 70 deletions

View file

@ -16,7 +16,7 @@ pub fn memory_render(store: &Store, _provenance: &str, key: &str, raw: Option<bo
}
}
pub fn memory_write(store: &mut Store, provenance: &str, key: &str, content: &str) -> Result<String> {
pub fn memory_write(store: &Store, provenance: &str, key: &str, content: &str) -> Result<String> {
let result = store.upsert_provenance(key, content, provenance)
.map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
@ -91,7 +91,7 @@ pub fn memory_links(store: &Store, _provenance: &str, key: &str) -> Result<Vec<L
Ok(links)
}
pub fn memory_link_set(store: &mut Store, _provenance: &str, source: &str, target: &str, strength: f32) -> Result<String> {
pub fn memory_link_set(store: &Store, _provenance: &str, source: &str, target: &str, strength: f32) -> Result<String> {
let s = store.resolve_key(source).map_err(|e| anyhow::anyhow!("{}", e))?;
let t = store.resolve_key(target).map_err(|e| anyhow::anyhow!("{}", e))?;
let old = store.set_link_strength(&s, &t, strength).map_err(|e| anyhow::anyhow!("{}", e))?;
@ -99,7 +99,7 @@ pub fn memory_link_set(store: &mut Store, _provenance: &str, source: &str, targe
Ok(format!("{}{} strength {:.2}{:.2}", s, t, old, strength))
}
pub fn memory_link_add(store: &mut Store, provenance: &str, source: &str, target: &str) -> Result<String> {
pub fn memory_link_add(store: &Store, provenance: &str, source: &str, target: &str) -> Result<String> {
let s = store.resolve_key(source).map_err(|e| anyhow::anyhow!("{}", e))?;
let t = store.resolve_key(target).map_err(|e| anyhow::anyhow!("{}", e))?;
let strength = store.add_link(&s, &t, provenance).map_err(|e| anyhow::anyhow!("{}", e))?;
@ -107,7 +107,7 @@ pub fn memory_link_add(store: &mut Store, provenance: &str, source: &str, target
Ok(format!("linked {}{} (strength={:.2})", s, t, strength))
}
pub fn memory_delete(store: &mut Store, _provenance: &str, key: &str) -> Result<String> {
pub fn memory_delete(store: &Store, _provenance: &str, key: &str) -> Result<String> {
let resolved = store.resolve_key(key).map_err(|e| anyhow::anyhow!("{}", e))?;
store.delete_node(&resolved).map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
@ -164,21 +164,21 @@ pub fn memory_history(store: &Store, _provenance: &str, key: &str, full: Option<
Ok(out)
}
pub fn memory_weight_set(store: &mut Store, _provenance: &str, key: &str, weight: f32) -> Result<String> {
pub fn memory_weight_set(store: &Store, _provenance: &str, key: &str, weight: f32) -> Result<String> {
let resolved = store.resolve_key(key).map_err(|e| anyhow::anyhow!("{}", e))?;
let (old, new) = store.set_weight(&resolved, weight).map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(format!("weight {} {:.2}{:.2}", resolved, old, new))
}
pub fn memory_rename(store: &mut Store, _provenance: &str, old_key: &str, new_key: &str) -> Result<String> {
pub fn memory_rename(store: &Store, _provenance: &str, old_key: &str, new_key: &str) -> Result<String> {
let resolved = store.resolve_key(old_key).map_err(|e| anyhow::anyhow!("{}", e))?;
store.rename_node(&resolved, new_key).map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(format!("Renamed '{}' → '{}'", resolved, new_key))
}
pub fn memory_supersede(store: &mut Store, provenance: &str, old_key: &str, new_key: &str, reason: Option<&str>) -> Result<String> {
pub fn memory_supersede(store: &Store, provenance: &str, old_key: &str, new_key: &str, reason: Option<&str>) -> Result<String> {
let reason = reason.unwrap_or("superseded");
let content = store.get_node(old_key)
.map_err(|e| anyhow::anyhow!("{}", e))?
@ -293,7 +293,7 @@ fn level_to_node_type(level: i64) -> crate::store::NodeType {
}
}
pub fn journal_new(store: &mut Store, provenance: &str, name: &str, title: &str, body: &str, level: Option<i64>) -> Result<String> {
pub fn journal_new(store: &Store, provenance: &str, name: &str, title: &str, body: &str, level: Option<i64>) -> Result<String> {
let level = level.unwrap_or(0);
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M");
let content = format!("## {}{}\n\n{}", ts, title, body);
@ -326,7 +326,7 @@ pub fn journal_new(store: &mut Store, provenance: &str, name: &str, title: &str,
Ok(format!("New entry '{}' ({} words)", title, word_count))
}
pub fn journal_update(store: &mut Store, provenance: &str, body: &str, level: Option<i64>) -> Result<String> {
pub fn journal_update(store: &Store, provenance: &str, body: &str, level: Option<i64>) -> Result<String> {
let level = level.unwrap_or(0);
let node_type = level_to_node_type(level);
let all_keys = store.all_keys()?;
@ -396,7 +396,7 @@ pub fn graph_communities(store: &Store, _provenance: &str, top_n: Option<usize>,
Ok(out)
}
pub fn graph_normalize_strengths(store: &mut Store, _provenance: &str, apply: Option<bool>) -> Result<String> {
pub fn graph_normalize_strengths(store: &Store, _provenance: &str, apply: Option<bool>) -> Result<String> {
use crate::store::{StoreView, RelationType};
let apply = apply.unwrap_or(false);

View file

@ -19,8 +19,7 @@ pub struct MemoryNode {
impl MemoryNode {
/// Load a node from the store by key.
pub fn load(key: &str) -> Option<Self> {
let arc = super::access_local().ok()?;
let store = arc.try_lock().ok()?;
let store = super::access_local().ok()?;
Self::from_store(&store, key)
}

View file

@ -30,7 +30,7 @@ pub use local::{LinkInfo, JournalEntry};
// ── Store access ───────────────────────────────────────────────
/// Daemon's store (eager init) or client's fallback local store.
static STORE_ACCESS: OnceLock<Option<Arc<crate::Mutex<Store>>>> = OnceLock::new();
static STORE_ACCESS: OnceLock<Option<Arc<Store>>> = OnceLock::new();
// Client's socket connection (thread-local for lock-free access).
thread_local! {
@ -39,9 +39,9 @@ thread_local! {
/// How we access the memory store.
pub enum StoreAccess {
Daemon(Arc<crate::Mutex<Store>>), // Direct store access
Client, // Socket to daemon (in thread-local)
None(String), // Error: couldn't get access
Daemon(Arc<Store>), // Direct store access
Client, // Socket to daemon (in thread-local)
None(String), // Error: couldn't get access
}
/// Get store access: daemon's store, socket, or local fallback.
@ -65,7 +65,7 @@ pub fn access() -> StoreAccess {
// Socket failed - try local store as fallback (cached in STORE_ACCESS)
let store_opt = STORE_ACCESS.get_or_init(|| {
Store::load().ok().map(|s| Arc::new(crate::Mutex::new(s)))
Store::load().ok().map(Arc::new)
});
match store_opt {
@ -75,7 +75,7 @@ pub fn access() -> StoreAccess {
}
/// Get local store access. Returns error if only RPC available.
pub fn access_local() -> Result<Arc<crate::Mutex<Store>>> {
pub fn access_local() -> Result<Arc<Store>> {
match access() {
StoreAccess::Daemon(arc) => Ok(arc),
StoreAccess::Client => anyhow::bail!("direct store access not available via RPC"),
@ -248,12 +248,12 @@ macro_rules! memory_tool {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
// Call hippocampus with appropriate mutability
// Call hippocampus (all methods now take &self, deref Arc)
(@call mut, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
local::$name(&mut $store, $prov $(, $arg)*)
local::$name(&*$store, $prov $(, $arg)*)
};
(@call ref, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
local::$name(&$store, $prov $(, $arg)*)
local::$name(&*$store, $prov $(, $arg)*)
};
// ── Main rules ─────────────────────────────────────────────────
@ -273,9 +273,7 @@ macro_rules! memory_tool {
};
match access() {
StoreAccess::Daemon(arc) => {
#[allow(unused_mut)]
let mut store = arc.lock().await;
StoreAccess::Daemon(store) => {
memory_tool!(@call $m, $name, store, &prov $($(, $arg)*)?)
}
StoreAccess::Client => {

View file

@ -269,8 +269,15 @@ impl Store {
}
// Record log sizes
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
use std::sync::atomic::Ordering;
store.loaded_nodes_size.store(
fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
store.loaded_rels_size.store(
fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
// Orphan edges filtered naturally during for_each_relation (unresolvable UUIDs skipped)
@ -408,7 +415,9 @@ impl Store {
}
/// Append nodes to the log file. Returns the offset where the message was written.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<u64> {
pub fn append_nodes(&self, nodes: &[Node]) -> Result<u64> {
use std::sync::atomic::Ordering;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
@ -421,6 +430,9 @@ impl Store {
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize nodes"))?;
// Lock for file append
let _guard = self.append_lock.lock().unwrap();
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
@ -433,12 +445,17 @@ impl Store {
(&file).write_all(&buf)
.with_context(|| format!("write nodes"))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
self.loaded_nodes_size.store(
file.metadata().map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
Ok(offset)
}
/// Append relations to the log file.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> {
pub fn append_relations(&self, relations: &[Relation]) -> Result<()> {
use std::sync::atomic::Ordering;
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
@ -451,6 +468,9 @@ impl Store {
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize relations"))?;
// Lock for file append
let _guard = self.append_lock.lock().unwrap();
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
@ -459,7 +479,10 @@ impl Store {
(&file).write_all(&buf)
.with_context(|| format!("write relations"))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
self.loaded_rels_size.store(
file.metadata().map(|m| m.len()).unwrap_or(0),
Ordering::Relaxed
);
Ok(())
}

View file

@ -34,6 +34,8 @@ use crate::graph::{self, Graph};
use anyhow::{bail, Result};
use redb::Database;
use std::sync::atomic::AtomicU64;
use std::sync::Mutex;
/// Strip .md suffix from a key, handling both bare keys and section keys.
/// "identity.md" → "identity", "foo.md#section" → "foo#section", "identity" → "identity"
@ -46,11 +48,13 @@ pub fn strip_md_suffix(key: &str) -> String {
}
}
// The full in-memory store
// The full in-memory store with internal locking
pub struct Store {
/// Log sizes at load time — used for staleness detection.
pub(crate) loaded_nodes_size: u64,
pub(crate) loaded_rels_size: u64,
loaded_nodes_size: AtomicU64,
loaded_rels_size: AtomicU64,
/// Protects capnp log appends (redb handles its own locking)
append_lock: Mutex<()>,
/// redb index database
pub(crate) db: Option<redb::Database>,
}
@ -58,8 +62,9 @@ pub struct Store {
impl Default for Store {
fn default() -> Self {
Store {
loaded_nodes_size: 0,
loaded_rels_size: 0,
loaded_nodes_size: AtomicU64::new(0),
loaded_rels_size: AtomicU64::new(0),
append_lock: Mutex::new(()),
db: None,
}
}

View file

@ -16,7 +16,7 @@ pub fn current_provenance() -> String {
impl Store {
/// Add or update a node (appends to log + updates index).
pub fn upsert_node(&mut self, mut node: Node) -> Result<()> {
pub fn upsert_node(&self, mut node: Node) -> Result<()> {
if let Some(existing) = self.get_node(&node.key)? {
node.uuid = existing.uuid;
node.version = existing.version + 1;
@ -30,7 +30,7 @@ impl Store {
}
/// Add a relation (appends to log + indexes)
pub fn add_relation(&mut self, rel: Relation) -> Result<()> {
pub fn add_relation(&self, rel: Relation) -> Result<()> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let txn = db.begin_write()?;
self.append_relations(std::slice::from_ref(&rel))?;
@ -70,13 +70,13 @@ impl Store {
///
/// Provenance is determined by the POC_PROVENANCE env var if set,
/// otherwise defaults to Manual.
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str> {
pub fn upsert(&self, key: &str, content: &str) -> Result<&'static str> {
let prov = current_provenance();
self.upsert_provenance(key, content, &prov)
}
/// Upsert with explicit provenance (for agent-created nodes).
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> {
pub fn upsert_provenance(&self, key: &str, content: &str, provenance: &str) -> Result<&'static str> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
if let Some(existing) = self.get_node(key)? {
@ -105,7 +105,7 @@ impl Store {
}
/// Soft-delete a node (appends deleted version, removes from index).
pub fn delete_node(&mut self, key: &str) -> Result<()> {
pub fn delete_node(&self, key: &str) -> Result<()> {
let prov = current_provenance();
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
@ -129,7 +129,7 @@ impl Store {
/// Graph edges (source/target UUIDs) are unaffected — they're already
/// UUID-based. We update the human-readable source_key/target_key strings
/// on relations, and created_at is preserved untouched.
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<()> {
pub fn rename_node(&self, old_key: &str, new_key: &str) -> Result<()> {
if old_key == new_key {
return Ok(());
}
@ -199,7 +199,7 @@ impl Store {
}
/// Cap node degree by soft-deleting edges from mega-hubs.
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize)> {
pub fn cap_degree(&self, max_degree: usize) -> Result<(usize, usize)> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let keys = index::all_keys(db)?;
@ -306,7 +306,7 @@ impl Store {
}
/// Set a node's weight directly. Returns (old, new).
pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> {
pub fn set_weight(&self, key: &str, weight: f32) -> Result<(f32, f32)> {
let weight = weight.clamp(0.01, 1.0);
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let mut node = self.get_node(key)?
@ -327,7 +327,7 @@ impl Store {
/// Set the strength of a link between two nodes.
/// Returns the old strength. Creates link if it doesn't exist.
pub fn set_link_strength(&mut self, source: &str, target: &str, strength: f32) -> Result<f32> {
pub fn set_link_strength(&self, source: &str, target: &str, strength: f32) -> Result<f32> {
let strength = strength.clamp(0.01, 1.0);
let source_uuid = self.get_node(source)?
@ -372,7 +372,7 @@ impl Store {
/// Add a link between two nodes with Jaccard-based initial strength.
/// Returns the strength, or a message if the link already exists.
pub fn add_link(&mut self, source: &str, target: &str, provenance: &str) -> Result<f32> {
pub fn add_link(&self, source: &str, target: &str, provenance: &str) -> Result<f32> {
let source_uuid = self.get_node(source)?
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("source not found: {}", source))?;