Consolidate memory RPC in tools/memory.rs
- Move memory_rpc(), socket_path(), SocketConn from mcp_server.rs - Convert remaining callers to typed async API: - defs.rs: organize placeholder, run_agent query - cli/agent.rs: query resolution (now async) - mind/identity.rs: Store context loading - Re-export socket_path/memory_rpc from mcp_server for compatibility All external memory access now goes through tools/memory.rs typed API. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
5b07a81aa7
commit
fb46ab095d
6 changed files with 146 additions and 134 deletions
|
|
@ -31,6 +31,115 @@ pub fn is_daemon() -> bool {
|
||||||
STORE_HANDLE.get().is_some() || LOCAL_STORE.with(|s| s.borrow().is_some())
|
STORE_HANDLE.get().is_some() || LOCAL_STORE.with(|s| s.borrow().is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Socket RPC ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
pub fn socket_path() -> PathBuf {
|
||||||
|
dirs::home_dir()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.join(".consciousness/mcp.sock")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cached socket connection for RPC forwarding
|
||||||
|
static SOCKET_CONN: OnceLock<Mutex<Option<SocketConn>>> = OnceLock::new();
|
||||||
|
|
||||||
|
struct SocketConn {
|
||||||
|
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
|
||||||
|
writer: std::io::BufWriter<std::os::unix::net::UnixStream>,
|
||||||
|
next_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SocketConn {
|
||||||
|
fn connect() -> Result<Self> {
|
||||||
|
use std::os::unix::net::UnixStream;
|
||||||
|
use std::io::{BufRead, BufReader, BufWriter, Write};
|
||||||
|
|
||||||
|
let path = socket_path();
|
||||||
|
let stream = UnixStream::connect(&path)?;
|
||||||
|
let mut reader = BufReader::new(stream.try_clone()?);
|
||||||
|
let mut writer = BufWriter::new(stream);
|
||||||
|
|
||||||
|
// Initialize MCP connection
|
||||||
|
let init = serde_json::json!({"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
||||||
|
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
|
||||||
|
"clientInfo": {"name": "forward", "version": "0.1"}}});
|
||||||
|
writeln!(writer, "{}", init)?;
|
||||||
|
writer.flush()?;
|
||||||
|
let mut buf = String::new();
|
||||||
|
reader.read_line(&mut buf)?;
|
||||||
|
|
||||||
|
Ok(Self { reader, writer, next_id: 1 })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result<String> {
|
||||||
|
use std::io::{BufRead, Write};
|
||||||
|
|
||||||
|
self.next_id += 1;
|
||||||
|
let call = serde_json::json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call",
|
||||||
|
"params": {"name": tool_name, "arguments": args}});
|
||||||
|
writeln!(self.writer, "{}", call)?;
|
||||||
|
self.writer.flush()?;
|
||||||
|
|
||||||
|
let mut buf = String::new();
|
||||||
|
self.reader.read_line(&mut buf)?;
|
||||||
|
|
||||||
|
let resp: serde_json::Value = serde_json::from_str(&buf)?;
|
||||||
|
if let Some(err) = resp.get("error") {
|
||||||
|
anyhow::bail!("daemon error: {}", err);
|
||||||
|
}
|
||||||
|
let result = resp.get("result").cloned().unwrap_or(serde_json::json!({}));
|
||||||
|
let text = result.get("content")
|
||||||
|
.and_then(|c| c.as_array())
|
||||||
|
.and_then(|arr| arr.first())
|
||||||
|
.and_then(|c| c.get("text"))
|
||||||
|
.and_then(|t| t.as_str())
|
||||||
|
.unwrap_or("");
|
||||||
|
Ok(text.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Forward a tool call to the daemon socket, or execute locally if daemon is down.
|
||||||
|
/// Used by external processes that don't have direct store access.
|
||||||
|
pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
|
||||||
|
let conn_lock = SOCKET_CONN.get_or_init(|| Mutex::new(None));
|
||||||
|
let mut guard = conn_lock.lock().unwrap();
|
||||||
|
|
||||||
|
// Try cached connection first
|
||||||
|
if let Some(conn) = guard.as_mut() {
|
||||||
|
match conn.call(tool_name, &args) {
|
||||||
|
Ok(result) => return Ok(result),
|
||||||
|
Err(_) => {
|
||||||
|
// Connection broken, clear cache and retry
|
||||||
|
*guard = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to establish new connection
|
||||||
|
match SocketConn::connect() {
|
||||||
|
Ok(mut conn) => {
|
||||||
|
let result = conn.call(tool_name, &args);
|
||||||
|
*guard = Some(conn);
|
||||||
|
result
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Socket unavailable - fall back to local store
|
||||||
|
drop(guard); // Release lock before blocking
|
||||||
|
tokio::task::block_in_place(|| {
|
||||||
|
tokio::runtime::Handle::current()
|
||||||
|
.block_on(rpc_local(tool_name, &args))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute a tool locally when daemon isn't running.
|
||||||
|
async fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result<String> {
|
||||||
|
run_with_local_store(tool_name, args.clone()).await
|
||||||
|
}
|
||||||
|
|
||||||
// ── Helpers ────────────────────────────────────────────────────
|
// ── Helpers ────────────────────────────────────────────────────
|
||||||
|
|
||||||
fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> {
|
fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> {
|
||||||
|
|
@ -179,7 +288,7 @@ macro_rules! memory_tool {
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
let mut map = serde_json::Map::new();
|
let mut map = serde_json::Map::new();
|
||||||
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
|
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
|
||||||
return crate::mcp_server::memory_rpc(stringify!($name), serde_json::Value::Object(map));
|
return memory_rpc(stringify!($name), serde_json::Value::Object(map));
|
||||||
}
|
}
|
||||||
let prov = match agent {
|
let prov = match agent {
|
||||||
Some(a) => a.state.lock().await.provenance.clone(),
|
Some(a) => a.state.lock().await.provenance.clone(),
|
||||||
|
|
@ -208,7 +317,7 @@ macro_rules! memory_tool {
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
let mut map = serde_json::Map::new();
|
let mut map = serde_json::Map::new();
|
||||||
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
|
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
|
||||||
return crate::mcp_server::memory_rpc(stringify!($name), serde_json::Value::Object(map));
|
return memory_rpc(stringify!($name), serde_json::Value::Object(map));
|
||||||
}
|
}
|
||||||
let prov = match agent {
|
let prov = match agent {
|
||||||
Some(a) => a.state.lock().await.provenance.clone(),
|
Some(a) => a.state.lock().await.provenance.clone(),
|
||||||
|
|
@ -270,7 +379,7 @@ async fn dispatch(
|
||||||
// Forward to daemon
|
// Forward to daemon
|
||||||
let name = tool_name.to_string();
|
let name = tool_name.to_string();
|
||||||
return tokio::task::spawn_blocking(move || {
|
return tokio::task::spawn_blocking(move || {
|
||||||
crate::mcp_server::memory_rpc(&name, args)
|
memory_rpc(&name, args)
|
||||||
}).await.map_err(|e| anyhow::anyhow!("spawn_blocking: {}", e))?;
|
}).await.map_err(|e| anyhow::anyhow!("spawn_blocking: {}", e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
// cli/agent.rs — agent subcommand handlers
|
// cli/agent.rs — agent subcommand handlers
|
||||||
|
|
||||||
|
use crate::agent::tools::memory;
|
||||||
use crate::store;
|
use crate::store;
|
||||||
|
|
||||||
pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> {
|
pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> {
|
||||||
// Mark as agent so tool calls (e.g. poc-memory render) don't
|
// Mark as agent so tool calls (e.g. poc-memory render) don't
|
||||||
// pollute the user's seen set as a side effect
|
// pollute the user's seen set as a side effect
|
||||||
// SAFETY: single-threaded at this point (CLI startup, before any agent work)
|
// SAFETY: single-threaded at this point (CLI startup, before any agent work)
|
||||||
|
|
@ -22,11 +23,10 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option
|
||||||
let resolved_targets: Vec<String> = if !target.is_empty() {
|
let resolved_targets: Vec<String> = if !target.is_empty() {
|
||||||
target.to_vec()
|
target.to_vec()
|
||||||
} else if let Some(q) = query {
|
} else if let Some(q) = query {
|
||||||
// Use RPC to resolve query
|
// Resolve query via typed API
|
||||||
let result = crate::mcp_server::memory_rpc(
|
let q_str = format!("{} | limit:{}", q, count);
|
||||||
"memory_query",
|
let result = memory::memory_query(None, &q_str, None).await
|
||||||
serde_json::json!({"query": format!("{} | limit:{}", q, count)}),
|
.map_err(|e| e.to_string())?;
|
||||||
).map_err(|e| e.to_string())?;
|
|
||||||
let keys: Vec<String> = result.lines()
|
let keys: Vec<String> = result.lines()
|
||||||
.filter(|l| !l.is_empty() && *l != "no results")
|
.filter(|l| !l.is_empty() && *l != "no results")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
|
|
|
||||||
|
|
@ -449,7 +449,7 @@ impl Run for AgentCmd {
|
||||||
async fn run(self) -> Result<(), String> {
|
async fn run(self) -> Result<(), String> {
|
||||||
match self {
|
match self {
|
||||||
Self::Run { agent, count, target, query, dry_run, local, state_dir }
|
Self::Run { agent, count, target, query, dry_run, local, state_dir }
|
||||||
=> cli::agent::cmd_run_agent(&agent, count, &target, query.as_deref(), dry_run, local, state_dir.as_deref()),
|
=> cli::agent::cmd_run_agent(&agent, count, &target, query.as_deref(), dry_run, local, state_dir.as_deref()).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,121 +3,19 @@
|
||||||
// Exposes memory tools to external processes (consciousness-mcp, poc-memory)
|
// Exposes memory tools to external processes (consciousness-mcp, poc-memory)
|
||||||
// via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock.
|
// via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock.
|
||||||
//
|
//
|
||||||
// Also provides memory_rpc() for use by external callers.
|
// Socket RPC client (memory_rpc) is in agent/tools/memory.rs.
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::path::PathBuf;
|
use std::sync::Arc;
|
||||||
use std::sync::{Arc, Mutex, OnceLock};
|
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||||
use tokio::net::{UnixListener, UnixStream};
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
|
|
||||||
use crate::agent::tools::Tool;
|
use crate::agent::tools::Tool;
|
||||||
|
|
||||||
pub fn socket_path() -> PathBuf {
|
// Re-export for backwards compatibility
|
||||||
dirs::home_dir()
|
pub use crate::agent::tools::memory::{socket_path, memory_rpc};
|
||||||
.unwrap_or_default()
|
|
||||||
.join(".consciousness/mcp.sock")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cached socket connection
|
|
||||||
static SOCKET_CONN: OnceLock<Mutex<Option<SocketConn>>> = OnceLock::new();
|
|
||||||
|
|
||||||
struct SocketConn {
|
|
||||||
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
|
|
||||||
writer: std::io::BufWriter<std::os::unix::net::UnixStream>,
|
|
||||||
next_id: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SocketConn {
|
|
||||||
fn connect() -> Result<Self> {
|
|
||||||
use std::os::unix::net::UnixStream;
|
|
||||||
use std::io::{BufRead, BufReader, BufWriter, Write};
|
|
||||||
|
|
||||||
let path = socket_path();
|
|
||||||
let stream = UnixStream::connect(&path)?;
|
|
||||||
let mut reader = BufReader::new(stream.try_clone()?);
|
|
||||||
let mut writer = BufWriter::new(stream);
|
|
||||||
|
|
||||||
// Initialize
|
|
||||||
let init = json!({"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
|
||||||
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
|
|
||||||
"clientInfo": {"name": "forward", "version": "0.1"}}});
|
|
||||||
writeln!(writer, "{}", init)?;
|
|
||||||
writer.flush()?;
|
|
||||||
let mut buf = String::new();
|
|
||||||
reader.read_line(&mut buf)?;
|
|
||||||
|
|
||||||
Ok(Self { reader, writer, next_id: 1 })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result<String> {
|
|
||||||
use std::io::{BufRead, Write};
|
|
||||||
|
|
||||||
self.next_id += 1;
|
|
||||||
let call = json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call",
|
|
||||||
"params": {"name": tool_name, "arguments": args}});
|
|
||||||
writeln!(self.writer, "{}", call)?;
|
|
||||||
self.writer.flush()?;
|
|
||||||
|
|
||||||
let mut buf = String::new();
|
|
||||||
self.reader.read_line(&mut buf)?;
|
|
||||||
|
|
||||||
let resp: serde_json::Value = serde_json::from_str(&buf)?;
|
|
||||||
if let Some(err) = resp.get("error") {
|
|
||||||
anyhow::bail!("daemon error: {}", err);
|
|
||||||
}
|
|
||||||
let result = resp.get("result").cloned().unwrap_or(json!({}));
|
|
||||||
let text = result.get("content")
|
|
||||||
.and_then(|c| c.as_array())
|
|
||||||
.and_then(|arr| arr.first())
|
|
||||||
.and_then(|c| c.get("text"))
|
|
||||||
.and_then(|t| t.as_str())
|
|
||||||
.unwrap_or("");
|
|
||||||
Ok(text.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Forward a tool call to the daemon socket, or execute locally if daemon is down.
|
|
||||||
/// Used by external processes that don't have direct store access.
|
|
||||||
pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
|
|
||||||
let conn_lock = SOCKET_CONN.get_or_init(|| Mutex::new(None));
|
|
||||||
let mut guard = conn_lock.lock().unwrap();
|
|
||||||
|
|
||||||
// Try cached connection first
|
|
||||||
if let Some(conn) = guard.as_mut() {
|
|
||||||
match conn.call(tool_name, &args) {
|
|
||||||
Ok(result) => return Ok(result),
|
|
||||||
Err(_) => {
|
|
||||||
// Connection broken, clear cache and retry
|
|
||||||
*guard = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to establish new connection
|
|
||||||
match SocketConn::connect() {
|
|
||||||
Ok(mut conn) => {
|
|
||||||
let result = conn.call(tool_name, &args);
|
|
||||||
*guard = Some(conn);
|
|
||||||
result
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
// Socket unavailable - fall back to local store
|
|
||||||
drop(guard); // Release lock before blocking
|
|
||||||
tokio::task::block_in_place(|| {
|
|
||||||
tokio::runtime::Handle::current()
|
|
||||||
.block_on(rpc_local(tool_name, &args))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute a tool locally when daemon isn't running.
|
|
||||||
async fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result<String> {
|
|
||||||
crate::agent::tools::memory::run_with_local_store(tool_name, args.clone()).await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
|
|
||||||
|
|
@ -92,14 +92,16 @@ fn load_memory_files(memory_project: Option<&Path>, context_groups: &[ContextGro
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ContextSource::Store => {
|
ContextSource::Store => {
|
||||||
// Load from the memory graph store via RPC
|
// Load from the memory graph store via typed API
|
||||||
for key in &group.keys {
|
for key in &group.keys {
|
||||||
if let Ok(content) = crate::mcp_server::memory_rpc(
|
let content = tokio::task::block_in_place(|| {
|
||||||
"memory_render",
|
tokio::runtime::Handle::current().block_on(
|
||||||
serde_json::json!({"key": key, "raw": true}),
|
crate::agent::tools::memory::memory_render(None, key, Some(true))
|
||||||
) {
|
)
|
||||||
if !content.trim().is_empty() {
|
});
|
||||||
memories.push((key.clone(), content));
|
if let Ok(c) = content {
|
||||||
|
if !c.trim().is_empty() {
|
||||||
|
memories.push((key.clone(), c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -222,17 +222,19 @@ fn resolve(
|
||||||
}
|
}
|
||||||
|
|
||||||
"organize" => {
|
"organize" => {
|
||||||
// Show seed nodes with content and links via RPC
|
// Show seed nodes with content and links via typed API
|
||||||
let mut text = format!("### Seed nodes ({} starting points)\n\n", keys.len());
|
let mut text = format!("### Seed nodes ({} starting points)\n\n", keys.len());
|
||||||
let mut result_keys = Vec::new();
|
let mut result_keys = Vec::new();
|
||||||
|
|
||||||
for key in keys {
|
for key in keys {
|
||||||
match crate::mcp_server::memory_rpc(
|
let content = tokio::task::block_in_place(|| {
|
||||||
"memory_render",
|
tokio::runtime::Handle::current().block_on(
|
||||||
serde_json::json!({"key": key}),
|
crate::agent::tools::memory::memory_render(None, key, None)
|
||||||
) {
|
)
|
||||||
Ok(content) if !content.trim().is_empty() => {
|
});
|
||||||
text.push_str(&format!("#### {}\n\n{}\n\n---\n\n", key, content));
|
match content {
|
||||||
|
Ok(c) if !c.trim().is_empty() => {
|
||||||
|
text.push_str(&format!("#### {}\n\n{}\n\n---\n\n", key, c));
|
||||||
result_keys.push(key.clone());
|
result_keys.push(key.clone());
|
||||||
}
|
}
|
||||||
_ => continue,
|
_ => continue,
|
||||||
|
|
@ -619,10 +621,11 @@ pub fn run_agent(
|
||||||
} else {
|
} else {
|
||||||
format!("{} | limit:{}", def.query, padded)
|
format!("{} | limit:{}", def.query, padded)
|
||||||
};
|
};
|
||||||
let result = crate::mcp_server::memory_rpc(
|
let result = tokio::task::block_in_place(|| {
|
||||||
"memory_query",
|
tokio::runtime::Handle::current().block_on(
|
||||||
serde_json::json!({"query": query}),
|
crate::agent::tools::memory::memory_query(None, &query, None)
|
||||||
).map_err(|e| e.to_string())?;
|
)
|
||||||
|
}).map_err(|e| e.to_string())?;
|
||||||
let filtered: Vec<String> = result.lines()
|
let filtered: Vec<String> = result.lines()
|
||||||
.filter(|l| !l.is_empty() && *l != "no results")
|
.filter(|l| !l.is_empty() && *l != "no results")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue