From 2c0f2065e0d807df155cac8dd7e8acc5c4c07976 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 12 Apr 2026 21:05:13 -0400 Subject: [PATCH] mcp_server: Unix socket server for external tool access Exposes memory/journal tools over ~/.consciousness/mcp.sock via JSON-RPC 2.0 (MCP protocol). External processes (consciousness-mcp, poc-memory) will connect here instead of accessing the store directly. Handles: initialize, tools/list, tools/call Dispatches to the same tool handlers the agent uses internally. Co-Authored-By: Proof of Concept --- src/lib.rs | 3 + src/mcp_server.rs | 255 ++++++++++++++++++++++++++++++++++++++++++++++ src/user/mod.rs | 10 ++ 3 files changed, 268 insertions(+) create mode 100644 src/mcp_server.rs diff --git a/src/lib.rs b/src/lib.rs index 70dc645..6359931 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,6 +68,9 @@ pub mod cli; // Thalamus — universal notification routing and channel infrastructure pub mod thalamus; +// MCP server — exposes memory tools over Unix socket +pub mod mcp_server; + // Re-export at crate root — capnp codegen emits `crate::daemon_capnp::` paths pub use thalamus::daemon_capnp; diff --git a/src/mcp_server.rs b/src/mcp_server.rs new file mode 100644 index 0000000..816fec2 --- /dev/null +++ b/src/mcp_server.rs @@ -0,0 +1,255 @@ +// mcp_server.rs — MCP server over Unix domain socket +// +// Exposes memory tools to external processes (consciousness-mcp, poc-memory) +// via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock. +// +// Also provides memory_rpc() for use by external callers. + +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::net::{UnixListener, UnixStream}; + +use crate::agent::tools::Tool; + +pub fn socket_path() -> PathBuf { + dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/mcp.sock") +} + +/// Forward a tool call to the daemon socket, or execute locally if daemon is down. +/// Used by external processes that don't have direct store access. +pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result { + use std::os::unix::net::UnixStream; + use std::io::{BufRead, BufReader, BufWriter, Write}; + + let path = socket_path(); + let stream = match UnixStream::connect(&path) { + Ok(s) => s, + Err(_) => return rpc_local(tool_name, &args), + }; + let mut reader = BufReader::new(stream.try_clone()?); + let mut writer = BufWriter::new(stream); + + // Initialize + let init = json!({"jsonrpc": "2.0", "id": 1, "method": "initialize", + "params": {"protocolVersion": "2024-11-05", "capabilities": {}, + "clientInfo": {"name": "forward", "version": "0.1"}}}); + writeln!(writer, "{}", init)?; + writer.flush()?; + let mut buf = String::new(); + reader.read_line(&mut buf)?; + + // Call tool + let call = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/call", + "params": {"name": tool_name, "arguments": args}}); + writeln!(writer, "{}", call)?; + writer.flush()?; + buf.clear(); + reader.read_line(&mut buf)?; + + let resp: serde_json::Value = serde_json::from_str(&buf)?; + if let Some(err) = resp.get("error") { + anyhow::bail!("daemon error: {}", err); + } + let result = resp.get("result").cloned().unwrap_or(json!({})); + let text = result.get("content") + .and_then(|c| c.as_array()) + .and_then(|arr| arr.first()) + .and_then(|c| c.get("text")) + .and_then(|t| t.as_str()) + .unwrap_or(""); + Ok(text.to_string()) +} + +/// Execute a tool locally when daemon isn't running. +fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result { + crate::agent::tools::memory::run_with_local_store(tool_name, args.clone()) +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +struct JsonRpcRequest { + jsonrpc: String, + id: Option, + method: String, + params: Option, +} + +#[derive(Debug, Serialize)] +struct JsonRpcResponse { + jsonrpc: &'static str, + id: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +#[derive(Debug, Serialize)] +struct JsonRpcError { + code: i64, + message: String, +} + +impl JsonRpcResponse { + fn success(id: serde_json::Value, result: serde_json::Value) -> Self { + Self { jsonrpc: "2.0", id, result: Some(result), error: None } + } + + fn error(id: serde_json::Value, code: i64, message: impl Into) -> Self { + Self { + jsonrpc: "2.0", + id, + result: None, + error: Some(JsonRpcError { code, message: message.into() }), + } + } +} + +/// Start the MCP server. Call once at daemon startup. +pub async fn start(tools: Vec) -> Result<()> { + let path = socket_path(); + + // Clean up stale socket + if path.exists() { + std::fs::remove_file(&path).ok(); + } + + // Ensure parent directory exists + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + + let listener = UnixListener::bind(&path) + .with_context(|| format!("binding MCP socket at {:?}", path))?; + + dbglog!("[mcp-server] listening on {:?}", path); + + let tools = Arc::new(tools); + + tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, _addr)) => { + let tools = tools.clone(); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, &tools).await { + dbglog!("[mcp-server] connection error: {:#}", e); + } + }); + } + Err(e) => { + dbglog!("[mcp-server] accept error: {}", e); + } + } + } + }); + + Ok(()) +} + +async fn handle_connection(stream: UnixStream, tools: &[Tool]) -> Result<()> { + let (reader, writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); + let mut line = String::new(); + + loop { + line.clear(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + break; // EOF + } + + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let response = match serde_json::from_str::(trimmed) { + Ok(req) => handle_request(req, tools).await, + Err(e) => JsonRpcResponse::error( + serde_json::Value::Null, + -32700, + format!("Parse error: {}", e), + ), + }; + + let mut out = serde_json::to_string(&response)?; + out.push('\n'); + writer.write_all(out.as_bytes()).await?; + writer.flush().await?; + } + + Ok(()) +} + +async fn handle_request(req: JsonRpcRequest, tools: &[Tool]) -> JsonRpcResponse { + let id = req.id.unwrap_or(serde_json::Value::Null); + + match req.method.as_str() { + "initialize" => { + JsonRpcResponse::success(id, json!({ + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "consciousness", + "version": env!("CARGO_PKG_VERSION") + } + })) + } + + "notifications/initialized" => { + // Notification, no response needed but we return success anyway + JsonRpcResponse::success(id, json!({})) + } + + "tools/list" => { + let tool_list: Vec = tools.iter().map(|t| { + json!({ + "name": t.name, + "description": t.description, + "inputSchema": serde_json::from_str::(t.parameters_json) + .unwrap_or(json!({"type": "object"})) + }) + }).collect(); + + JsonRpcResponse::success(id, json!({ "tools": tool_list })) + } + + "tools/call" => { + let params = req.params.unwrap_or(json!({})); + let name = params.get("name").and_then(|v| v.as_str()).unwrap_or(""); + let args = params.get("arguments").cloned().unwrap_or(json!({})); + + match tools.iter().find(|t| t.name == name) { + Some(tool) => { + match (tool.handler)(None, args).await { + Ok(result) => JsonRpcResponse::success(id, json!({ + "content": [{ "type": "text", "text": result }] + })), + Err(e) => JsonRpcResponse::error(id, -32000, format!("{:#}", e)), + } + } + None => JsonRpcResponse::error(id, -32601, format!("Unknown tool: {}", name)), + } + } + + _ => JsonRpcResponse::error(id, -32601, format!("Method not found: {}", req.method)), + } +} + +/// Remove the socket file on shutdown. +pub fn cleanup() { + let path = socket_path(); + if path.exists() { + std::fs::remove_file(&path).ok(); + } +} diff --git a/src/user/mod.rs b/src/user/mod.rs index f588a16..4389924 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -212,10 +212,20 @@ async fn start(cli: crate::user::CliArgs) -> Result<()> { }) .expect("spawn UI thread"); + // Start MCP server for external tool access + let mut tools: Vec = Vec::new(); + tools.extend(crate::agent::tools::memory::memory_tools()); + tools.extend(crate::agent::tools::memory::journal_tools()); + if let Err(e) = crate::mcp_server::start(tools).await { + eprintln!("MCP server failed to start: {:#}", e); + } + // Mind event loop — runs on the main tokio runtime mind.init().await; mind.run(mind_rx, turn_rx).await; + crate::mcp_server::cleanup(); + ui_handle.join().unwrap_or_else(|_| Err(anyhow::anyhow!("UI thread panicked"))) }