diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index d50cf3d..f4fe155 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -31,6 +31,115 @@ pub fn is_daemon() -> bool { STORE_HANDLE.get().is_some() || LOCAL_STORE.with(|s| s.borrow().is_some()) } +// ── Socket RPC ───────────────────────────────────────────────── + +use std::sync::Mutex; +use std::path::PathBuf; + +pub fn socket_path() -> PathBuf { + dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/mcp.sock") +} + +// Cached socket connection for RPC forwarding +static SOCKET_CONN: OnceLock>> = OnceLock::new(); + +struct SocketConn { + reader: std::io::BufReader, + writer: std::io::BufWriter, + next_id: u64, +} + +impl SocketConn { + fn connect() -> Result { + use std::os::unix::net::UnixStream; + use std::io::{BufRead, BufReader, BufWriter, Write}; + + let path = socket_path(); + let stream = UnixStream::connect(&path)?; + let mut reader = BufReader::new(stream.try_clone()?); + let mut writer = BufWriter::new(stream); + + // Initialize MCP connection + let init = serde_json::json!({"jsonrpc": "2.0", "id": 1, "method": "initialize", + "params": {"protocolVersion": "2024-11-05", "capabilities": {}, + "clientInfo": {"name": "forward", "version": "0.1"}}}); + writeln!(writer, "{}", init)?; + writer.flush()?; + let mut buf = String::new(); + reader.read_line(&mut buf)?; + + Ok(Self { reader, writer, next_id: 1 }) + } + + fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result { + use std::io::{BufRead, Write}; + + self.next_id += 1; + let call = serde_json::json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call", + "params": {"name": tool_name, "arguments": args}}); + writeln!(self.writer, "{}", call)?; + self.writer.flush()?; + + let mut buf = String::new(); + self.reader.read_line(&mut buf)?; + + let resp: serde_json::Value = serde_json::from_str(&buf)?; + if let Some(err) = resp.get("error") { + anyhow::bail!("daemon error: {}", err); + } + let result = resp.get("result").cloned().unwrap_or(serde_json::json!({})); + let text = result.get("content") + .and_then(|c| c.as_array()) + .and_then(|arr| arr.first()) + .and_then(|c| c.get("text")) + .and_then(|t| t.as_str()) + .unwrap_or(""); + Ok(text.to_string()) + } +} + +/// Forward a tool call to the daemon socket, or execute locally if daemon is down. +/// Used by external processes that don't have direct store access. +pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result { + let conn_lock = SOCKET_CONN.get_or_init(|| Mutex::new(None)); + let mut guard = conn_lock.lock().unwrap(); + + // Try cached connection first + if let Some(conn) = guard.as_mut() { + match conn.call(tool_name, &args) { + Ok(result) => return Ok(result), + Err(_) => { + // Connection broken, clear cache and retry + *guard = None; + } + } + } + + // Try to establish new connection + match SocketConn::connect() { + Ok(mut conn) => { + let result = conn.call(tool_name, &args); + *guard = Some(conn); + result + } + Err(_) => { + // Socket unavailable - fall back to local store + drop(guard); // Release lock before blocking + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(rpc_local(tool_name, &args)) + }) + } + } +} + +/// Execute a tool locally when daemon isn't running. +async fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result { + run_with_local_store(tool_name, args.clone()).await +} + // ── Helpers ──────────────────────────────────────────────────── fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> { @@ -179,7 +288,7 @@ macro_rules! memory_tool { #[allow(unused_mut)] let mut map = serde_json::Map::new(); $($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)? - return crate::mcp_server::memory_rpc(stringify!($name), serde_json::Value::Object(map)); + return memory_rpc(stringify!($name), serde_json::Value::Object(map)); } let prov = match agent { Some(a) => a.state.lock().await.provenance.clone(), @@ -208,7 +317,7 @@ macro_rules! memory_tool { #[allow(unused_mut)] let mut map = serde_json::Map::new(); $($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)? - return crate::mcp_server::memory_rpc(stringify!($name), serde_json::Value::Object(map)); + return memory_rpc(stringify!($name), serde_json::Value::Object(map)); } let prov = match agent { Some(a) => a.state.lock().await.provenance.clone(), @@ -270,7 +379,7 @@ async fn dispatch( // Forward to daemon let name = tool_name.to_string(); return tokio::task::spawn_blocking(move || { - crate::mcp_server::memory_rpc(&name, args) + memory_rpc(&name, args) }).await.map_err(|e| anyhow::anyhow!("spawn_blocking: {}", e))?; } diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 5064251..0ec0cf6 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -1,8 +1,9 @@ // cli/agent.rs — agent subcommand handlers +use crate::agent::tools::memory; use crate::store; -pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> { +pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> { // Mark as agent so tool calls (e.g. poc-memory render) don't // pollute the user's seen set as a side effect // SAFETY: single-threaded at this point (CLI startup, before any agent work) @@ -22,11 +23,10 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option let resolved_targets: Vec = if !target.is_empty() { target.to_vec() } else if let Some(q) = query { - // Use RPC to resolve query - let result = crate::mcp_server::memory_rpc( - "memory_query", - serde_json::json!({"query": format!("{} | limit:{}", q, count)}), - ).map_err(|e| e.to_string())?; + // Resolve query via typed API + let q_str = format!("{} | limit:{}", q, count); + let result = memory::memory_query(None, &q_str, None).await + .map_err(|e| e.to_string())?; let keys: Vec = result.lines() .filter(|l| !l.is_empty() && *l != "no results") .map(|s| s.to_string()) diff --git a/src/main.rs b/src/main.rs index 00a48f0..109ee31 100644 --- a/src/main.rs +++ b/src/main.rs @@ -449,7 +449,7 @@ impl Run for AgentCmd { async fn run(self) -> Result<(), String> { match self { Self::Run { agent, count, target, query, dry_run, local, state_dir } - => cli::agent::cmd_run_agent(&agent, count, &target, query.as_deref(), dry_run, local, state_dir.as_deref()), + => cli::agent::cmd_run_agent(&agent, count, &target, query.as_deref(), dry_run, local, state_dir.as_deref()).await, } } } diff --git a/src/mcp_server.rs b/src/mcp_server.rs index 935e211..b883ee2 100644 --- a/src/mcp_server.rs +++ b/src/mcp_server.rs @@ -3,121 +3,19 @@ // Exposes memory tools to external processes (consciousness-mcp, poc-memory) // via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock. // -// Also provides memory_rpc() for use by external callers. +// Socket RPC client (memory_rpc) is in agent/tools/memory.rs. use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::path::PathBuf; -use std::sync::{Arc, Mutex, OnceLock}; +use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{UnixListener, UnixStream}; use crate::agent::tools::Tool; -pub fn socket_path() -> PathBuf { - dirs::home_dir() - .unwrap_or_default() - .join(".consciousness/mcp.sock") -} - -// Cached socket connection -static SOCKET_CONN: OnceLock>> = OnceLock::new(); - -struct SocketConn { - reader: std::io::BufReader, - writer: std::io::BufWriter, - next_id: u64, -} - -impl SocketConn { - fn connect() -> Result { - use std::os::unix::net::UnixStream; - use std::io::{BufRead, BufReader, BufWriter, Write}; - - let path = socket_path(); - let stream = UnixStream::connect(&path)?; - let mut reader = BufReader::new(stream.try_clone()?); - let mut writer = BufWriter::new(stream); - - // Initialize - let init = json!({"jsonrpc": "2.0", "id": 1, "method": "initialize", - "params": {"protocolVersion": "2024-11-05", "capabilities": {}, - "clientInfo": {"name": "forward", "version": "0.1"}}}); - writeln!(writer, "{}", init)?; - writer.flush()?; - let mut buf = String::new(); - reader.read_line(&mut buf)?; - - Ok(Self { reader, writer, next_id: 1 }) - } - - fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result { - use std::io::{BufRead, Write}; - - self.next_id += 1; - let call = json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call", - "params": {"name": tool_name, "arguments": args}}); - writeln!(self.writer, "{}", call)?; - self.writer.flush()?; - - let mut buf = String::new(); - self.reader.read_line(&mut buf)?; - - let resp: serde_json::Value = serde_json::from_str(&buf)?; - if let Some(err) = resp.get("error") { - anyhow::bail!("daemon error: {}", err); - } - let result = resp.get("result").cloned().unwrap_or(json!({})); - let text = result.get("content") - .and_then(|c| c.as_array()) - .and_then(|arr| arr.first()) - .and_then(|c| c.get("text")) - .and_then(|t| t.as_str()) - .unwrap_or(""); - Ok(text.to_string()) - } -} - -/// Forward a tool call to the daemon socket, or execute locally if daemon is down. -/// Used by external processes that don't have direct store access. -pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result { - let conn_lock = SOCKET_CONN.get_or_init(|| Mutex::new(None)); - let mut guard = conn_lock.lock().unwrap(); - - // Try cached connection first - if let Some(conn) = guard.as_mut() { - match conn.call(tool_name, &args) { - Ok(result) => return Ok(result), - Err(_) => { - // Connection broken, clear cache and retry - *guard = None; - } - } - } - - // Try to establish new connection - match SocketConn::connect() { - Ok(mut conn) => { - let result = conn.call(tool_name, &args); - *guard = Some(conn); - result - } - Err(_) => { - // Socket unavailable - fall back to local store - drop(guard); // Release lock before blocking - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(rpc_local(tool_name, &args)) - }) - } - } -} - -/// Execute a tool locally when daemon isn't running. -async fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result { - crate::agent::tools::memory::run_with_local_store(tool_name, args.clone()).await -} +// Re-export for backwards compatibility +pub use crate::agent::tools::memory::{socket_path, memory_rpc}; #[derive(Debug, Deserialize)] #[allow(dead_code)] diff --git a/src/mind/identity.rs b/src/mind/identity.rs index 994104f..e18cb4d 100644 --- a/src/mind/identity.rs +++ b/src/mind/identity.rs @@ -92,14 +92,16 @@ fn load_memory_files(memory_project: Option<&Path>, context_groups: &[ContextGro continue; } ContextSource::Store => { - // Load from the memory graph store via RPC + // Load from the memory graph store via typed API for key in &group.keys { - if let Ok(content) = crate::mcp_server::memory_rpc( - "memory_render", - serde_json::json!({"key": key, "raw": true}), - ) { - if !content.trim().is_empty() { - memories.push((key.clone(), content)); + let content = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on( + crate::agent::tools::memory::memory_render(None, key, Some(true)) + ) + }); + if let Ok(c) = content { + if !c.trim().is_empty() { + memories.push((key.clone(), c)); } } } diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index ce2b07c..1f1f960 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -222,17 +222,19 @@ fn resolve( } "organize" => { - // Show seed nodes with content and links via RPC + // Show seed nodes with content and links via typed API let mut text = format!("### Seed nodes ({} starting points)\n\n", keys.len()); let mut result_keys = Vec::new(); for key in keys { - match crate::mcp_server::memory_rpc( - "memory_render", - serde_json::json!({"key": key}), - ) { - Ok(content) if !content.trim().is_empty() => { - text.push_str(&format!("#### {}\n\n{}\n\n---\n\n", key, content)); + let content = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on( + crate::agent::tools::memory::memory_render(None, key, None) + ) + }); + match content { + Ok(c) if !c.trim().is_empty() => { + text.push_str(&format!("#### {}\n\n{}\n\n---\n\n", key, c)); result_keys.push(key.clone()); } _ => continue, @@ -619,10 +621,11 @@ pub fn run_agent( } else { format!("{} | limit:{}", def.query, padded) }; - let result = crate::mcp_server::memory_rpc( - "memory_query", - serde_json::json!({"query": query}), - ).map_err(|e| e.to_string())?; + let result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on( + crate::agent::tools::memory::memory_query(None, &query, None) + ) + }).map_err(|e| e.to_string())?; let filtered: Vec = result.lines() .filter(|l| !l.is_empty() && *l != "no results") .map(|s| s.to_string())