// mcp_server.rs — MCP server over Unix domain socket // // Exposes memory tools to external processes (consciousness-mcp, poc-memory) // via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock. // // Also provides memory_rpc() for use by external callers. use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::path::PathBuf; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{UnixListener, UnixStream}; use crate::agent::tools::Tool; pub fn socket_path() -> PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/mcp.sock") } /// Forward a tool call to the daemon socket, or execute locally if daemon is down. /// Used by external processes that don't have direct store access. pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result { use std::os::unix::net::UnixStream; use std::io::{BufRead, BufReader, BufWriter, Write}; let path = socket_path(); let stream = match UnixStream::connect(&path) { Ok(s) => s, Err(_) => return rpc_local(tool_name, &args), }; let mut reader = BufReader::new(stream.try_clone()?); let mut writer = BufWriter::new(stream); // Initialize let init = json!({"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "forward", "version": "0.1"}}}); writeln!(writer, "{}", init)?; writer.flush()?; let mut buf = String::new(); reader.read_line(&mut buf)?; // Call tool let call = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": tool_name, "arguments": args}}); writeln!(writer, "{}", call)?; writer.flush()?; buf.clear(); reader.read_line(&mut buf)?; let resp: serde_json::Value = serde_json::from_str(&buf)?; if let Some(err) = resp.get("error") { anyhow::bail!("daemon error: {}", err); } let result = resp.get("result").cloned().unwrap_or(json!({})); let text = result.get("content") .and_then(|c| c.as_array()) .and_then(|arr| arr.first()) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) .unwrap_or(""); Ok(text.to_string()) } /// Execute a tool locally when daemon isn't running. fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result { crate::agent::tools::memory::run_with_local_store(tool_name, args.clone()) } #[derive(Debug, Deserialize)] #[allow(dead_code)] struct JsonRpcRequest { jsonrpc: String, id: Option, method: String, params: Option, } #[derive(Debug, Serialize)] struct JsonRpcResponse { jsonrpc: &'static str, id: serde_json::Value, #[serde(skip_serializing_if = "Option::is_none")] result: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } #[derive(Debug, Serialize)] struct JsonRpcError { code: i64, message: String, } impl JsonRpcResponse { fn success(id: serde_json::Value, result: serde_json::Value) -> Self { Self { jsonrpc: "2.0", id, result: Some(result), error: None } } fn error(id: serde_json::Value, code: i64, message: impl Into) -> Self { Self { jsonrpc: "2.0", id, result: None, error: Some(JsonRpcError { code, message: message.into() }), } } } /// Start the MCP server. Call once at daemon startup. pub async fn start(tools: Vec) -> Result<()> { let path = socket_path(); // Clean up stale socket if path.exists() { std::fs::remove_file(&path).ok(); } // Ensure parent directory exists if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; } let listener = UnixListener::bind(&path) .with_context(|| format!("binding MCP socket at {:?}", path))?; dbglog!("[mcp-server] listening on {:?}", path); let tools = Arc::new(tools); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _addr)) => { let tools = tools.clone(); tokio::spawn(async move { if let Err(e) = handle_connection(stream, &tools).await { dbglog!("[mcp-server] connection error: {:#}", e); } }); } Err(e) => { dbglog!("[mcp-server] accept error: {}", e); } } } }); Ok(()) } async fn handle_connection(stream: UnixStream, tools: &[Tool]) -> Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); let mut writer = BufWriter::new(writer); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { break; // EOF } let trimmed = line.trim(); if trimmed.is_empty() { continue; } let response = match serde_json::from_str::(trimmed) { Ok(req) => handle_request(req, tools).await, Err(e) => JsonRpcResponse::error( serde_json::Value::Null, -32700, format!("Parse error: {}", e), ), }; let mut out = serde_json::to_string(&response)?; out.push('\n'); writer.write_all(out.as_bytes()).await?; writer.flush().await?; } Ok(()) } async fn handle_request(req: JsonRpcRequest, tools: &[Tool]) -> JsonRpcResponse { let id = req.id.unwrap_or(serde_json::Value::Null); match req.method.as_str() { "initialize" => { JsonRpcResponse::success(id, json!({ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "consciousness", "version": env!("CARGO_PKG_VERSION") } })) } "notifications/initialized" => { // Notification, no response needed but we return success anyway JsonRpcResponse::success(id, json!({})) } "tools/list" => { let tool_list: Vec = tools.iter().map(|t| { json!({ "name": t.name, "description": t.description, "inputSchema": serde_json::from_str::(t.parameters_json) .unwrap_or(json!({"type": "object"})) }) }).collect(); JsonRpcResponse::success(id, json!({ "tools": tool_list })) } "tools/call" => { let params = req.params.unwrap_or(json!({})); let name = params.get("name").and_then(|v| v.as_str()).unwrap_or(""); let args = params.get("arguments").cloned().unwrap_or(json!({})); match tools.iter().find(|t| t.name == name) { Some(tool) => { match (tool.handler)(None, args).await { Ok(result) => JsonRpcResponse::success(id, json!({ "content": [{ "type": "text", "text": result }] })), Err(e) => JsonRpcResponse::error(id, -32000, format!("{:#}", e)), } } None => JsonRpcResponse::error(id, -32601, format!("Unknown tool: {}", name)), } } _ => JsonRpcResponse::error(id, -32601, format!("Method not found: {}", req.method)), } } /// Remove the socket file on shutdown. pub fn cleanup() { let path = socket_path(); if path.exists() { std::fs::remove_file(&path).ok(); } }