From a8c239f3dee66c787696fd968ef93863bfe24174 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Tue, 7 Apr 2026 03:35:08 -0400 Subject: [PATCH] =?UTF-8?q?Cache=20Store=20in=20process=20=E2=80=94=20stop?= =?UTF-8?q?=20reloading=20on=20every=20tool=20call?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store::cached() returns a process-global Arc> that loads once and reloads only when log files change (is_stale() checks file sizes). All memory and journal tools use cached_store() instead of Store::load() per invocation. Fixes CPU saturation from HashMap hashing when multiple subconscious agents make concurrent tool calls. Co-Authored-By: Proof of Concept --- src/agent/tools/memory.rs | 92 ++++++++++++++++++-------------- src/hippocampus/store/persist.rs | 28 ++++++++++ 2 files changed, 80 insertions(+), 40 deletions(-) diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index ae9c036..6cc67b5 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -19,8 +19,8 @@ fn get_f64(args: &serde_json::Value, name: &str) -> Result { args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name)) } -fn load_store() -> Result { - Store::load().map_err(|e| anyhow::anyhow!("{}", e)) +async fn cached_store() -> Result>> { + Store::cached().await.map_err(|e| anyhow::anyhow!("{}", e)) } fn provenance() -> &'static str { "manual" } @@ -35,34 +35,34 @@ pub fn memory_tools() -> [super::Tool; 12] { handler: |_a, v| Box::pin(async move { render(&v) }) }, Tool { name: "memory_write", description: "Create or update a memory node.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"},"content":{"type":"string","description":"Full content (markdown)"}},"required":["key","content"]}"#, - handler: |_a, v| Box::pin(async move { write(&v) }) }, + handler: |_a, v| Box::pin(async move { write(&v).await }) }, Tool { name: "memory_search", description: "Search the memory graph via spreading activation. Give 2-4 seed node keys.", parameters_json: r#"{"type":"object","properties":{"keys":{"type":"array","items":{"type":"string"},"description":"Seed node keys to activate from"}},"required":["keys"]}"#, - handler: |_a, v| Box::pin(async move { search(&v) }) }, + handler: |_a, v| Box::pin(async move { search(&v).await }) }, Tool { name: "memory_links", description: "Show a node's neighbors with link strengths.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, handler: |_a, v| Box::pin(async move { links(&v) }) }, Tool { name: "memory_link_set", description: "Set link strength between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"},"strength":{"type":"number","description":"0.01 to 1.0"}},"required":["source","target","strength"]}"#, - handler: |_a, v| Box::pin(async move { link_set(&v) }) }, + handler: |_a, v| Box::pin(async move { link_set(&v).await }) }, Tool { name: "memory_link_add", description: "Add a new link between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"}},"required":["source","target"]}"#, - handler: |_a, v| Box::pin(async move { link_add(&v) }) }, + handler: |_a, v| Box::pin(async move { link_add(&v).await }) }, Tool { name: "memory_used", description: "Mark a node as useful (boosts weight).", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, - handler: |_a, v| Box::pin(async move { used(&v) }) }, + handler: |_a, v| Box::pin(async move { used(&v).await }) }, Tool { name: "memory_weight_set", description: "Set a node's weight directly (0.01 to 1.0).", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string"},"weight":{"type":"number","description":"0.01 to 1.0"}},"required":["key","weight"]}"#, - handler: |_a, v| Box::pin(async move { weight_set(&v) }) }, + handler: |_a, v| Box::pin(async move { weight_set(&v).await }) }, Tool { name: "memory_rename", description: "Rename a node key in place.", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"}},"required":["old_key","new_key"]}"#, - handler: |_a, v| Box::pin(async move { rename(&v) }) }, + handler: |_a, v| Box::pin(async move { rename(&v).await }) }, Tool { name: "memory_supersede", description: "Mark a node as superseded by another (sets weight to 0.01).", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"},"reason":{"type":"string"}},"required":["old_key","new_key"]}"#, - handler: |_a, v| Box::pin(async move { supersede(&v) }) }, + handler: |_a, v| Box::pin(async move { supersede(&v).await }) }, Tool { name: "memory_query", description: "Run a structured query against the memory graph.", parameters_json: r#"{"type":"object","properties":{"query":{"type":"string","description":"Query expression"}},"required":["query"]}"#, - handler: |_a, v| Box::pin(async move { query(&v) }) }, + handler: |_a, v| Box::pin(async move { query(&v).await }) }, Tool { name: "output", description: "Produce a named output value for passing between steps.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Output name"},"value":{"type":"string","description":"Output value"}},"required":["key","value"]}"#, handler: |_a, v| Box::pin(async move { output(&v) }) }, @@ -74,13 +74,13 @@ pub fn journal_tools() -> [super::Tool; 3] { [ Tool { name: "journal_tail", description: "Read the last N journal entries (default 1).", parameters_json: r#"{"type":"object","properties":{"count":{"type":"integer","description":"Number of entries (default 1)"}}}"#, - handler: |_a, v| Box::pin(async move { journal_tail(&v) }) }, + handler: |_a, v| Box::pin(async move { journal_tail(&v).await }) }, Tool { name: "journal_new", description: "Start a new journal entry.", parameters_json: r#"{"type":"object","properties":{"name":{"type":"string","description":"Short node name (becomes the key)"},"title":{"type":"string","description":"Descriptive title"},"body":{"type":"string","description":"Entry body"}},"required":["name","title","body"]}"#, - handler: |_a, v| Box::pin(async move { journal_new(&v) }) }, + handler: |_a, v| Box::pin(async move { journal_new(&v).await }) }, Tool { name: "journal_update", description: "Append text to the most recent journal entry.", parameters_json: r#"{"type":"object","properties":{"body":{"type":"string","description":"Text to append"}},"required":["body"]}"#, - handler: |_a, v| Box::pin(async move { journal_update(&v) }) }, + handler: |_a, v| Box::pin(async move { journal_update(&v).await }) }, ] } @@ -93,17 +93,18 @@ fn render(args: &serde_json::Value) -> Result { .render()) } -fn write(args: &serde_json::Value) -> Result { +async fn write(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let content = get_str(args, "content")?; - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; let result = store.upsert_provenance(key, content, provenance()) .map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("{} '{}'", result, key)) } -fn search(args: &serde_json::Value) -> Result { +async fn search(args: &serde_json::Value) -> Result { let keys: Vec = args.get("keys") .and_then(|v| v.as_array()) .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect()) @@ -111,8 +112,9 @@ fn search(args: &serde_json::Value) -> Result { if keys.is_empty() { anyhow::bail!("memory_search requires at least one seed key"); } - let store = load_store()?; - let graph = crate::graph::build_graph_fast(&store); + let arc = cached_store().await?; + let store = arc.lock().await; + let graph = crate::graph::build_graph_fast(&*store); let params = store.params(); let seeds: Vec<(String, f64)> = keys.iter() .filter_map(|k| { @@ -126,7 +128,7 @@ fn search(args: &serde_json::Value) -> Result { let seed_set: std::collections::HashSet<&str> = seeds.iter() .map(|(k, _)| k.as_str()).collect(); let results = crate::search::spreading_activation( - &seeds, &graph, &store, + &seeds, &graph, &*store, params.max_hops, params.edge_decay, params.min_activation, ); Ok(results.iter() @@ -148,8 +150,9 @@ fn links(args: &serde_json::Value) -> Result { Ok(out) } -fn link_set(args: &serde_json::Value) -> Result { - let mut store = load_store()?; +async fn link_set(args: &serde_json::Value) -> Result { + let arc = cached_store().await?; + let mut store = arc.lock().await; let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?; let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?; let strength = get_f64(args, "strength")? as f32; @@ -158,8 +161,9 @@ fn link_set(args: &serde_json::Value) -> Result { Ok(format!("{} ↔ {} strength {:.2} → {:.2}", s, t, old, strength)) } -fn link_add(args: &serde_json::Value) -> Result { - let mut store = load_store()?; +async fn link_add(args: &serde_json::Value) -> Result { + let arc = cached_store().await?; + let mut store = arc.lock().await; let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?; let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?; let strength = store.add_link(&s, &t, provenance()).map_err(|e| anyhow::anyhow!("{}", e))?; @@ -167,9 +171,10 @@ fn link_add(args: &serde_json::Value) -> Result { Ok(format!("linked {} → {} (strength={:.2})", s, t, strength)) } -fn used(args: &serde_json::Value) -> Result { +async fn used(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; if !store.nodes.contains_key(key) { anyhow::bail!("node not found: {}", key); } @@ -178,8 +183,9 @@ fn used(args: &serde_json::Value) -> Result { Ok(format!("marked {} as used", key)) } -fn weight_set(args: &serde_json::Value) -> Result { - let mut store = load_store()?; +async fn weight_set(args: &serde_json::Value) -> Result { + let arc = cached_store().await?; + let mut store = arc.lock().await; let key = store.resolve_key(get_str(args, "key")?).map_err(|e| anyhow::anyhow!("{}", e))?; let weight = get_f64(args, "weight")? as f32; let (old, new) = store.set_weight(&key, weight).map_err(|e| anyhow::anyhow!("{}", e))?; @@ -187,21 +193,23 @@ fn weight_set(args: &serde_json::Value) -> Result { Ok(format!("weight {} {:.2} → {:.2}", key, old, new)) } -fn rename(args: &serde_json::Value) -> Result { +async fn rename(args: &serde_json::Value) -> Result { let old_key = get_str(args, "old_key")?; let new_key = get_str(args, "new_key")?; - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; let resolved = store.resolve_key(old_key).map_err(|e| anyhow::anyhow!("{}", e))?; store.rename_node(&resolved, new_key).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("Renamed '{}' → '{}'", resolved, new_key)) } -fn supersede(args: &serde_json::Value) -> Result { +async fn supersede(args: &serde_json::Value) -> Result { let old_key = get_str(args, "old_key")?; let new_key = get_str(args, "new_key")?; let reason = args.get("reason").and_then(|v| v.as_str()).unwrap_or("superseded"); - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; let content = store.nodes.get(old_key) .map(|n| n.content.clone()) .ok_or_else(|| anyhow::anyhow!("node not found: {}", old_key))?; @@ -214,9 +222,10 @@ fn supersede(args: &serde_json::Value) -> Result { Ok(format!("superseded {} → {} ({})", old_key, new_key, reason)) } -fn query(args: &serde_json::Value) -> Result { +async fn query(args: &serde_json::Value) -> Result { let query_str = get_str(args, "query")?; - let store = load_store()?; + let arc = cached_store().await?; + let store = arc.lock().await; let graph = store.build_graph(); crate::query_parser::query_to_string(&store, &graph, query_str) .map_err(|e| anyhow::anyhow!("{}", e)) @@ -238,9 +247,10 @@ fn output(args: &serde_json::Value) -> Result { // ── Journal tools ────────────────────────────────────────────── -fn journal_tail(args: &serde_json::Value) -> Result { +async fn journal_tail(args: &serde_json::Value) -> Result { let count = args.get("count").and_then(|v| v.as_u64()).unwrap_or(1) as usize; - let store = load_store()?; + let arc = cached_store().await?; + let store = arc.lock().await; let mut entries: Vec<&crate::store::Node> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); @@ -256,7 +266,7 @@ fn journal_tail(args: &serde_json::Value) -> Result { } } -fn journal_new(args: &serde_json::Value) -> Result { +async fn journal_new(args: &serde_json::Value) -> Result { let name = get_str(args, "name")?; let title = get_str(args, "title")?; let body = get_str(args, "body")?; @@ -272,7 +282,8 @@ fn journal_new(args: &serde_json::Value) -> Result { .join("-"); let base_key = if base_key.len() > 80 { &base_key[..80] } else { base_key.as_str() }; - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; let key = if store.nodes.contains_key(base_key) { let mut n = 2; loop { @@ -292,9 +303,10 @@ fn journal_new(args: &serde_json::Value) -> Result { Ok(format!("New entry '{}' ({} words)", title, word_count)) } -fn journal_update(args: &serde_json::Value) -> Result { +async fn journal_update(args: &serde_json::Value) -> Result { let body = get_str(args, "body")?; - let mut store = load_store()?; + let arc = cached_store().await?; + let mut store = arc.lock().await; let latest_key = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .max_by_key(|n| n.created_at) diff --git a/src/hippocampus/store/persist.rs b/src/hippocampus/store/persist.rs index 5433df8..5668dd2 100644 --- a/src/hippocampus/store/persist.rs +++ b/src/hippocampus/store/persist.rs @@ -18,8 +18,36 @@ use std::collections::HashMap; use std::fs; use std::io::{BufReader, Seek}; use std::path::Path; +use std::sync::Arc; + +/// Process-global cached store. Reloads only when log files change. +static CACHED_STORE: tokio::sync::OnceCell>> = + tokio::sync::OnceCell::const_new(); impl Store { + /// Get or create the process-global cached store. + /// Reloads from disk if log files have changed since last load. + pub async fn cached() -> Result>, String> { + let store = CACHED_STORE.get_or_try_init(|| async { + let s = Store::load()?; + Ok::<_, String>(Arc::new(tokio::sync::Mutex::new(s))) + }).await?; + { + let mut guard = store.lock().await; + if guard.is_stale() { + *guard = Store::load()?; + } + } + Ok(store.clone()) + } + + /// Check if the on-disk logs have grown since we loaded. + pub fn is_stale(&self) -> bool { + let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); + let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); + nodes_size != self.loaded_nodes_size || rels_size != self.loaded_rels_size + } + /// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs. /// /// Staleness check uses log file sizes (not mtimes). Since logs are