diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index ae8081b..58eaa20 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -1,15 +1,38 @@ use std::sync::Arc; // tools/memory.rs — Native memory graph operations // -// Direct library calls into the store — no subprocess spawning. -// One function per tool for use in the Tool registry. +// If running in the daemon process (STORE_HANDLE set), accesses +// the store directly. Otherwise forwards to the daemon via socket. use anyhow::{Context, Result}; +use std::sync::OnceLock; use crate::hippocampus::memory::MemoryNode; use crate::store::StoreView; use crate::store::Store; +// ── Store handle ─────────────────────────────────────────────── + +/// Global store handle. Set by daemon at startup. +/// If None, tools forward to daemon socket. +static STORE_HANDLE: OnceLock>> = OnceLock::new(); + +// Thread-local store for rpc_local fallback path. +thread_local! { + static LOCAL_STORE: std::cell::RefCell>>> = + const { std::cell::RefCell::new(None) }; +} + +/// Set the global store handle. Call once at daemon startup. +pub fn set_store(store: Arc>) { + STORE_HANDLE.set(store).ok(); +} + +/// Check if we're running in daemon mode (have direct store access). +pub fn is_daemon() -> bool { + STORE_HANDLE.get().is_some() || LOCAL_STORE.with(|s| s.borrow().is_some()) +} + // ── Helpers ──────────────────────────────────────────────────── fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> { @@ -20,17 +43,92 @@ fn get_f64(args: &serde_json::Value, name: &str) -> Result { args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name)) } -async fn cached_store() -> Result>> { +async fn cached_store() -> Result>> { + // Check thread-local first (rpc_local fallback path) + if let Some(store) = LOCAL_STORE.with(|s| s.borrow().clone()) { + return Ok(store); + } + // Use global handle if set (daemon mode) + if let Some(store) = STORE_HANDLE.get() { + return Ok(store.clone()); + } + // Fallback to loading (for backwards compat during transition) Store::cached().await.map_err(|e| anyhow::anyhow!("{}", e)) } -async fn get_provenance(agent: &Option>) -> String { +/// Run a tool with a temporarily-opened store (for rpc_local fallback). +pub fn run_with_local_store(tool_name: &str, args: serde_json::Value) -> Result { + let store = Store::load().map_err(|e| anyhow::anyhow!("{}", e))?; + let arc = Arc::new(crate::Mutex::new(store)); + + LOCAL_STORE.with(|s| *s.borrow_mut() = Some(arc)); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let name = tool_name.to_string(); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(dispatch(&name, &None, args)) + })); + LOCAL_STORE.with(|s| *s.borrow_mut() = None); + + result.map_err(|_| anyhow::anyhow!("tool panicked"))? +} + +/// Get provenance from agent, or from args._provenance, or "manual". +async fn get_provenance(agent: &Option>, args: &serde_json::Value) -> String { + // Check args first (set by RPC path) + if let Some(p) = args.get("_provenance").and_then(|v| v.as_str()) { + return p.to_string(); + } match agent { Some(a) => a.state.lock().await.provenance.clone(), None => "manual".to_string(), } } +/// Single entry point for all memory/journal tool calls. +/// If not daemon, forwards to daemon with provenance attached. +async fn dispatch( + tool_name: &str, + agent: &Option>, + args: serde_json::Value, +) -> Result { + if !is_daemon() { + // Forward to daemon, attaching provenance + let mut args = args; + if let Some(a) = agent { + let prov = a.state.lock().await.provenance.clone(); + args.as_object_mut().map(|o| o.insert("_provenance".into(), prov.into())); + } + let name = tool_name.to_string(); + return tokio::task::spawn_blocking(move || { + crate::mcp_server::memory_rpc(&name, args) + }).await.map_err(|e| anyhow::anyhow!("spawn_blocking: {}", e))?; + } + + // Daemon path - dispatch to implementation + match tool_name { + "memory_render" => render(&args).await, + "memory_write" => write(agent, &args).await, + "memory_search" => search(&args).await, + "memory_links" => links(&args).await, + "memory_link_set" => link_set(&args).await, + "memory_link_add" => link_add(agent, &args).await, + "memory_used" => used(&args).await, + "memory_weight_set" => weight_set(&args).await, + "memory_rename" => rename(&args).await, + "memory_supersede" => supersede(agent, &args).await, + "memory_query" => query(&args).await, + "graph_topology" => graph_topology().await, + "graph_health" => graph_health().await, + "journal_tail" => journal_tail(&args).await, + "journal_new" => journal_new(agent, &args).await, + "journal_update" => journal_update(agent, &args).await, + _ => anyhow::bail!("unknown tool: {}", tool_name), + } +} + // ── Definitions ──────────────────────────────────────────────── pub fn memory_tools() -> [super::Tool; 13] { @@ -38,34 +136,34 @@ pub fn memory_tools() -> [super::Tool; 13] { [ Tool { name: "memory_render", description: "Read a memory node's content and links.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { render(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_render", &a, v).await })) }, Tool { name: "memory_write", description: "Create or update a memory node.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"},"content":{"type":"string","description":"Full content (markdown)"}},"required":["key","content"]}"#, - handler: Arc::new(|a, v| Box::pin(async move { write(&a, &v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_write", &a, v).await })) }, Tool { name: "memory_search", description: "Search the memory graph via spreading activation. Give 2-4 seed node keys.", parameters_json: r#"{"type":"object","properties":{"keys":{"type":"array","items":{"type":"string"},"description":"Seed node keys to activate from"}},"required":["keys"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { search(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_search", &a, v).await })) }, Tool { name: "memory_links", description: "Show a node's neighbors with link strengths.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { links(&v) })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_links", &a, v).await })) }, Tool { name: "memory_link_set", description: "Set link strength between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"},"strength":{"type":"number","description":"0.01 to 1.0"}},"required":["source","target","strength"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { link_set(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_link_set", &a, v).await })) }, Tool { name: "memory_link_add", description: "Add a new link between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"}},"required":["source","target"]}"#, - handler: Arc::new(|a, v| Box::pin(async move { link_add(&a, &v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_link_add", &a, v).await })) }, Tool { name: "memory_used", description: "Mark a node as useful (boosts weight).", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { used(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_used", &a, v).await })) }, Tool { name: "memory_weight_set", description: "Set a node's weight directly (0.01 to 1.0).", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string"},"weight":{"type":"number","description":"0.01 to 1.0"}},"required":["key","weight"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { weight_set(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_weight_set", &a, v).await })) }, Tool { name: "memory_rename", description: "Rename a node key in place.", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"}},"required":["old_key","new_key"]}"#, - handler: Arc::new(|_a, v| Box::pin(async move { rename(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_rename", &a, v).await })) }, Tool { name: "memory_supersede", description: "Mark a node as superseded by another (sets weight to 0.01).", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"},"reason":{"type":"string"}},"required":["old_key","new_key"]}"#, - handler: Arc::new(|a, v| Box::pin(async move { supersede(&a, &v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_supersede", &a, v).await })) }, Tool { name: "memory_query", description: "Run a structured query against the memory graph.", parameters_json: r#"{ @@ -76,13 +174,13 @@ pub fn memory_tools() -> [super::Tool; 13] { }, "required": ["query"] }"#, - handler: Arc::new(|_a, v| Box::pin(async move { query(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_query", &a, v).await })) }, Tool { name: "graph_topology", description: "Show graph topology stats (nodes, edges, clustering, hubs).", parameters_json: r#"{"type":"object","properties":{}}"#, - handler: Arc::new(|_a, _v| Box::pin(async { graph_topology().await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("graph_topology", &a, v).await })) }, Tool { name: "graph_health", description: "Show graph health report with maintenance recommendations.", parameters_json: r#"{"type":"object","properties":{}}"#, - handler: Arc::new(|_a, _v| Box::pin(async { graph_health().await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("graph_health", &a, v).await })) }, ] } @@ -100,7 +198,7 @@ pub fn journal_tools() -> [super::Tool; 3] { "after": {"type": "string", "description": "Only entries after this date (YYYY-MM-DD)"} } }"#, - handler: Arc::new(|_a, v| Box::pin(async move { journal_tail(&v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_tail", &a, v).await })) }, Tool { name: "journal_new", description: "Start a new journal/digest entry.", parameters_json: r#"{ "type": "object", @@ -112,7 +210,7 @@ pub fn journal_tools() -> [super::Tool; 3] { }, "required": ["name", "title", "body"] }"#, - handler: Arc::new(|a, v| Box::pin(async move { journal_new(&a, &v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_new", &a, v).await })) }, Tool { name: "journal_update", description: "Append text to the most recent entry at a level.", parameters_json: r#"{ "type": "object", @@ -122,7 +220,7 @@ pub fn journal_tools() -> [super::Tool; 3] { }, "required": ["body"] }"#, - handler: Arc::new(|a, v| Box::pin(async move { journal_update(&a, &v).await })) }, + handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_update", &a, v).await })) }, ] } @@ -140,7 +238,7 @@ async fn render(args: &serde_json::Value) -> Result { async fn write(agent: &Option>, args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let content = get_str(args, "content")?; - let prov = get_provenance(agent).await; + let prov = get_provenance(agent, args).await; let arc = cached_store().await?; let mut store = arc.lock().await; let result = store.upsert_provenance(key, content, &prov) @@ -183,7 +281,7 @@ async fn search(args: &serde_json::Value) -> Result { .collect::>().join("\n")) } -fn links(args: &serde_json::Value) -> Result { +async fn links(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let node = MemoryNode::load(key) .ok_or_else(|| anyhow::anyhow!("node not found: {}", key))?; @@ -211,7 +309,7 @@ async fn link_add(agent: &Option>, args: &se let mut store = arc.lock().await; let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?; let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?; - let prov = get_provenance(agent).await; + let prov = get_provenance(agent, args).await; let strength = store.add_link(&s, &t, &prov).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("linked {} → {} (strength={:.2})", s, t, strength)) @@ -261,7 +359,7 @@ async fn supersede(agent: &Option>, args: &s .ok_or_else(|| anyhow::anyhow!("node not found: {}", old_key))?; let notice = format!("**SUPERSEDED** by `{}` — {}\n\n---\n\n{}", new_key, reason, content.trim()); - let prov = get_provenance(agent).await; + let prov = get_provenance(agent, args).await; store.upsert_provenance(old_key, ¬ice, &prov) .map_err(|e| anyhow::anyhow!("{}", e))?; store.set_weight(old_key, 0.01).map_err(|e| anyhow::anyhow!("{}", e))?; @@ -363,7 +461,7 @@ async fn journal_new(agent: &Option>, args: }; let mut node = crate::store::new_node(&key, &content); node.node_type = level_to_node_type(level); - node.provenance = get_provenance(agent).await; + node.provenance = get_provenance(agent, args).await; store.upsert_node(node).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; let word_count = body.split_whitespace().count(); @@ -385,7 +483,7 @@ async fn journal_update(agent: &Option>, arg }; let existing = store.nodes.get(&key).unwrap().content.clone(); let new_content = format!("{}\n\n{}", existing.trim_end(), body); - let prov = get_provenance(agent).await; + let prov = get_provenance(agent, args).await; store.upsert_provenance(&key, &new_content, &prov) .map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; diff --git a/src/user/mod.rs b/src/user/mod.rs index 4389924..0a50a6e 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -212,6 +212,12 @@ async fn start(cli: crate::user::CliArgs) -> Result<()> { }) .expect("spawn UI thread"); + // Initialize store and set global handle for memory tools + match crate::store::Store::cached().await { + Ok(store) => crate::agent::tools::memory::set_store(store), + Err(e) => eprintln!("Store init failed: {}", e), + } + // Start MCP server for external tool access let mut tools: Vec = Vec::new(); tools.extend(crate::agent::tools::memory::memory_tools());