Cache Store in process — stop reloading on every tool call

Store::cached() returns a process-global Arc<tokio::sync::Mutex<Store>>
that loads once and reloads only when log files change (is_stale()
checks file sizes). All memory and journal tools use cached_store()
instead of Store::load() per invocation.

Fixes CPU saturation from HashMap hashing when multiple subconscious
agents make concurrent tool calls.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-07 03:35:08 -04:00
parent 39dcf27bd0
commit a8c239f3de
2 changed files with 80 additions and 40 deletions

View file

@ -19,8 +19,8 @@ fn get_f64(args: &serde_json::Value, name: &str) -> Result<f64> {
args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name))
}
fn load_store() -> Result<Store> {
Store::load().map_err(|e| anyhow::anyhow!("{}", e))
async fn cached_store() -> Result<std::sync::Arc<tokio::sync::Mutex<Store>>> {
Store::cached().await.map_err(|e| anyhow::anyhow!("{}", e))
}
fn provenance() -> &'static str { "manual" }
@ -35,34 +35,34 @@ pub fn memory_tools() -> [super::Tool; 12] {
handler: |_a, v| Box::pin(async move { render(&v) }) },
Tool { name: "memory_write", description: "Create or update a memory node.",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"},"content":{"type":"string","description":"Full content (markdown)"}},"required":["key","content"]}"#,
handler: |_a, v| Box::pin(async move { write(&v) }) },
handler: |_a, v| Box::pin(async move { write(&v).await }) },
Tool { name: "memory_search", description: "Search the memory graph via spreading activation. Give 2-4 seed node keys.",
parameters_json: r#"{"type":"object","properties":{"keys":{"type":"array","items":{"type":"string"},"description":"Seed node keys to activate from"}},"required":["keys"]}"#,
handler: |_a, v| Box::pin(async move { search(&v) }) },
handler: |_a, v| Box::pin(async move { search(&v).await }) },
Tool { name: "memory_links", description: "Show a node's neighbors with link strengths.",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#,
handler: |_a, v| Box::pin(async move { links(&v) }) },
Tool { name: "memory_link_set", description: "Set link strength between two nodes.",
parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"},"strength":{"type":"number","description":"0.01 to 1.0"}},"required":["source","target","strength"]}"#,
handler: |_a, v| Box::pin(async move { link_set(&v) }) },
handler: |_a, v| Box::pin(async move { link_set(&v).await }) },
Tool { name: "memory_link_add", description: "Add a new link between two nodes.",
parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"}},"required":["source","target"]}"#,
handler: |_a, v| Box::pin(async move { link_add(&v) }) },
handler: |_a, v| Box::pin(async move { link_add(&v).await }) },
Tool { name: "memory_used", description: "Mark a node as useful (boosts weight).",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#,
handler: |_a, v| Box::pin(async move { used(&v) }) },
handler: |_a, v| Box::pin(async move { used(&v).await }) },
Tool { name: "memory_weight_set", description: "Set a node's weight directly (0.01 to 1.0).",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string"},"weight":{"type":"number","description":"0.01 to 1.0"}},"required":["key","weight"]}"#,
handler: |_a, v| Box::pin(async move { weight_set(&v) }) },
handler: |_a, v| Box::pin(async move { weight_set(&v).await }) },
Tool { name: "memory_rename", description: "Rename a node key in place.",
parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"}},"required":["old_key","new_key"]}"#,
handler: |_a, v| Box::pin(async move { rename(&v) }) },
handler: |_a, v| Box::pin(async move { rename(&v).await }) },
Tool { name: "memory_supersede", description: "Mark a node as superseded by another (sets weight to 0.01).",
parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"},"reason":{"type":"string"}},"required":["old_key","new_key"]}"#,
handler: |_a, v| Box::pin(async move { supersede(&v) }) },
handler: |_a, v| Box::pin(async move { supersede(&v).await }) },
Tool { name: "memory_query", description: "Run a structured query against the memory graph.",
parameters_json: r#"{"type":"object","properties":{"query":{"type":"string","description":"Query expression"}},"required":["query"]}"#,
handler: |_a, v| Box::pin(async move { query(&v) }) },
handler: |_a, v| Box::pin(async move { query(&v).await }) },
Tool { name: "output", description: "Produce a named output value for passing between steps.",
parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Output name"},"value":{"type":"string","description":"Output value"}},"required":["key","value"]}"#,
handler: |_a, v| Box::pin(async move { output(&v) }) },
@ -74,13 +74,13 @@ pub fn journal_tools() -> [super::Tool; 3] {
[
Tool { name: "journal_tail", description: "Read the last N journal entries (default 1).",
parameters_json: r#"{"type":"object","properties":{"count":{"type":"integer","description":"Number of entries (default 1)"}}}"#,
handler: |_a, v| Box::pin(async move { journal_tail(&v) }) },
handler: |_a, v| Box::pin(async move { journal_tail(&v).await }) },
Tool { name: "journal_new", description: "Start a new journal entry.",
parameters_json: r#"{"type":"object","properties":{"name":{"type":"string","description":"Short node name (becomes the key)"},"title":{"type":"string","description":"Descriptive title"},"body":{"type":"string","description":"Entry body"}},"required":["name","title","body"]}"#,
handler: |_a, v| Box::pin(async move { journal_new(&v) }) },
handler: |_a, v| Box::pin(async move { journal_new(&v).await }) },
Tool { name: "journal_update", description: "Append text to the most recent journal entry.",
parameters_json: r#"{"type":"object","properties":{"body":{"type":"string","description":"Text to append"}},"required":["body"]}"#,
handler: |_a, v| Box::pin(async move { journal_update(&v) }) },
handler: |_a, v| Box::pin(async move { journal_update(&v).await }) },
]
}
@ -93,17 +93,18 @@ fn render(args: &serde_json::Value) -> Result<String> {
.render())
}
fn write(args: &serde_json::Value) -> Result<String> {
async fn write(args: &serde_json::Value) -> Result<String> {
let key = get_str(args, "key")?;
let content = get_str(args, "content")?;
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
let result = store.upsert_provenance(key, content, provenance())
.map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(format!("{} '{}'", result, key))
}
fn search(args: &serde_json::Value) -> Result<String> {
async fn search(args: &serde_json::Value) -> Result<String> {
let keys: Vec<String> = args.get("keys")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())
@ -111,8 +112,9 @@ fn search(args: &serde_json::Value) -> Result<String> {
if keys.is_empty() {
anyhow::bail!("memory_search requires at least one seed key");
}
let store = load_store()?;
let graph = crate::graph::build_graph_fast(&store);
let arc = cached_store().await?;
let store = arc.lock().await;
let graph = crate::graph::build_graph_fast(&*store);
let params = store.params();
let seeds: Vec<(String, f64)> = keys.iter()
.filter_map(|k| {
@ -126,7 +128,7 @@ fn search(args: &serde_json::Value) -> Result<String> {
let seed_set: std::collections::HashSet<&str> = seeds.iter()
.map(|(k, _)| k.as_str()).collect();
let results = crate::search::spreading_activation(
&seeds, &graph, &store,
&seeds, &graph, &*store,
params.max_hops, params.edge_decay, params.min_activation,
);
Ok(results.iter()
@ -148,8 +150,9 @@ fn links(args: &serde_json::Value) -> Result<String> {
Ok(out)
}
fn link_set(args: &serde_json::Value) -> Result<String> {
let mut store = load_store()?;
async fn link_set(args: &serde_json::Value) -> Result<String> {
let arc = cached_store().await?;
let mut store = arc.lock().await;
let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?;
let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?;
let strength = get_f64(args, "strength")? as f32;
@ -158,8 +161,9 @@ fn link_set(args: &serde_json::Value) -> Result<String> {
Ok(format!("{}{} strength {:.2}{:.2}", s, t, old, strength))
}
fn link_add(args: &serde_json::Value) -> Result<String> {
let mut store = load_store()?;
async fn link_add(args: &serde_json::Value) -> Result<String> {
let arc = cached_store().await?;
let mut store = arc.lock().await;
let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?;
let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?;
let strength = store.add_link(&s, &t, provenance()).map_err(|e| anyhow::anyhow!("{}", e))?;
@ -167,9 +171,10 @@ fn link_add(args: &serde_json::Value) -> Result<String> {
Ok(format!("linked {}{} (strength={:.2})", s, t, strength))
}
fn used(args: &serde_json::Value) -> Result<String> {
async fn used(args: &serde_json::Value) -> Result<String> {
let key = get_str(args, "key")?;
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
if !store.nodes.contains_key(key) {
anyhow::bail!("node not found: {}", key);
}
@ -178,8 +183,9 @@ fn used(args: &serde_json::Value) -> Result<String> {
Ok(format!("marked {} as used", key))
}
fn weight_set(args: &serde_json::Value) -> Result<String> {
let mut store = load_store()?;
async fn weight_set(args: &serde_json::Value) -> Result<String> {
let arc = cached_store().await?;
let mut store = arc.lock().await;
let key = store.resolve_key(get_str(args, "key")?).map_err(|e| anyhow::anyhow!("{}", e))?;
let weight = get_f64(args, "weight")? as f32;
let (old, new) = store.set_weight(&key, weight).map_err(|e| anyhow::anyhow!("{}", e))?;
@ -187,21 +193,23 @@ fn weight_set(args: &serde_json::Value) -> Result<String> {
Ok(format!("weight {} {:.2}{:.2}", key, old, new))
}
fn rename(args: &serde_json::Value) -> Result<String> {
async fn rename(args: &serde_json::Value) -> Result<String> {
let old_key = get_str(args, "old_key")?;
let new_key = get_str(args, "new_key")?;
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
let resolved = store.resolve_key(old_key).map_err(|e| anyhow::anyhow!("{}", e))?;
store.rename_node(&resolved, new_key).map_err(|e| anyhow::anyhow!("{}", e))?;
store.save().map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(format!("Renamed '{}' → '{}'", resolved, new_key))
}
fn supersede(args: &serde_json::Value) -> Result<String> {
async fn supersede(args: &serde_json::Value) -> Result<String> {
let old_key = get_str(args, "old_key")?;
let new_key = get_str(args, "new_key")?;
let reason = args.get("reason").and_then(|v| v.as_str()).unwrap_or("superseded");
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
let content = store.nodes.get(old_key)
.map(|n| n.content.clone())
.ok_or_else(|| anyhow::anyhow!("node not found: {}", old_key))?;
@ -214,9 +222,10 @@ fn supersede(args: &serde_json::Value) -> Result<String> {
Ok(format!("superseded {}{} ({})", old_key, new_key, reason))
}
fn query(args: &serde_json::Value) -> Result<String> {
async fn query(args: &serde_json::Value) -> Result<String> {
let query_str = get_str(args, "query")?;
let store = load_store()?;
let arc = cached_store().await?;
let store = arc.lock().await;
let graph = store.build_graph();
crate::query_parser::query_to_string(&store, &graph, query_str)
.map_err(|e| anyhow::anyhow!("{}", e))
@ -238,9 +247,10 @@ fn output(args: &serde_json::Value) -> Result<String> {
// ── Journal tools ──────────────────────────────────────────────
fn journal_tail(args: &serde_json::Value) -> Result<String> {
async fn journal_tail(args: &serde_json::Value) -> Result<String> {
let count = args.get("count").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
let store = load_store()?;
let arc = cached_store().await?;
let store = arc.lock().await;
let mut entries: Vec<&crate::store::Node> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
@ -256,7 +266,7 @@ fn journal_tail(args: &serde_json::Value) -> Result<String> {
}
}
fn journal_new(args: &serde_json::Value) -> Result<String> {
async fn journal_new(args: &serde_json::Value) -> Result<String> {
let name = get_str(args, "name")?;
let title = get_str(args, "title")?;
let body = get_str(args, "body")?;
@ -272,7 +282,8 @@ fn journal_new(args: &serde_json::Value) -> Result<String> {
.join("-");
let base_key = if base_key.len() > 80 { &base_key[..80] } else { base_key.as_str() };
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
let key = if store.nodes.contains_key(base_key) {
let mut n = 2;
loop {
@ -292,9 +303,10 @@ fn journal_new(args: &serde_json::Value) -> Result<String> {
Ok(format!("New entry '{}' ({} words)", title, word_count))
}
fn journal_update(args: &serde_json::Value) -> Result<String> {
async fn journal_update(args: &serde_json::Value) -> Result<String> {
let body = get_str(args, "body")?;
let mut store = load_store()?;
let arc = cached_store().await?;
let mut store = arc.lock().await;
let latest_key = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.max_by_key(|n| n.created_at)