// mcp_server.rs — MCP server over Unix domain socket // // Exposes memory tools to external processes (consciousness-mcp, poc-memory) // via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock. // // Socket RPC client (memory_rpc) is in agent/tools/memory.rs. use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{UnixListener, UnixStream}; use crate::agent::tools::Tool; use crate::agent::tools::memory::socket_path; #[derive(Debug, Deserialize)] #[allow(dead_code)] struct JsonRpcRequest { jsonrpc: String, id: Option, method: String, params: Option, } #[derive(Debug, Serialize)] struct JsonRpcResponse { jsonrpc: &'static str, id: serde_json::Value, #[serde(skip_serializing_if = "Option::is_none")] result: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } #[derive(Debug, Serialize)] struct JsonRpcError { code: i64, message: String, } impl JsonRpcResponse { fn success(id: serde_json::Value, result: serde_json::Value) -> Self { Self { jsonrpc: "2.0", id, result: Some(result), error: None } } fn error(id: serde_json::Value, code: i64, message: impl Into) -> Self { Self { jsonrpc: "2.0", id, result: None, error: Some(JsonRpcError { code, message: message.into() }), } } } /// Start the MCP server. Call once at daemon startup. pub async fn start(tools: Vec) -> Result<()> { let path = socket_path(); // Clean up stale socket if path.exists() { std::fs::remove_file(&path).ok(); } // Ensure parent directory exists if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; } let listener = UnixListener::bind(&path) .with_context(|| format!("binding MCP socket at {:?}", path))?; dbglog!("[mcp-server] listening on {:?}", path); let tools = Arc::new(tools); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _addr)) => { let tools = tools.clone(); tokio::spawn(async move { if let Err(e) = handle_connection(stream, &tools).await { dbglog!("[mcp-server] connection error: {:#}", e); } }); } Err(e) => { dbglog!("[mcp-server] accept error: {}", e); } } } }); Ok(()) } async fn handle_connection(stream: UnixStream, tools: &[Tool]) -> Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); let mut writer = BufWriter::new(writer); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { break; // EOF } let trimmed = line.trim(); if trimmed.is_empty() { continue; } let response = match serde_json::from_str::(trimmed) { Ok(req) => handle_request(req, tools).await, Err(e) => JsonRpcResponse::error( serde_json::Value::Null, -32700, format!("Parse error: {}", e), ), }; let mut out = serde_json::to_string(&response)?; out.push('\n'); writer.write_all(out.as_bytes()).await?; writer.flush().await?; } Ok(()) } async fn handle_request(req: JsonRpcRequest, tools: &[Tool]) -> JsonRpcResponse { let id = req.id.unwrap_or(serde_json::Value::Null); match req.method.as_str() { "initialize" => { JsonRpcResponse::success(id, json!({ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "consciousness", "version": env!("CARGO_PKG_VERSION") } })) } "notifications/initialized" => { // Notification, no response needed but we return success anyway JsonRpcResponse::success(id, json!({})) } "tools/list" => { let tool_list: Vec = tools.iter().map(|t| { json!({ "name": t.name, "description": t.description, "inputSchema": serde_json::from_str::(t.parameters_json) .unwrap_or(json!({"type": "object"})) }) }).collect(); JsonRpcResponse::success(id, json!({ "tools": tool_list })) } "tools/call" => { let params = req.params.unwrap_or(json!({})); let name = params.get("name").and_then(|v| v.as_str()).unwrap_or(""); let args = params.get("arguments").cloned().unwrap_or(json!({})); match tools.iter().find(|t| t.name == name) { Some(tool) => { match (tool.handler)(None, args).await { Ok(result) => JsonRpcResponse::success(id, json!({ "content": [{ "type": "text", "text": result }] })), Err(e) => JsonRpcResponse::error(id, -32000, format!("{:#}", e)), } } None => JsonRpcResponse::error(id, -32601, format!("Unknown tool: {}", name)), } } _ => JsonRpcResponse::error(id, -32601, format!("Method not found: {}", req.method)), } } /// Remove the socket file on shutdown. pub fn cleanup() { let path = socket_path(); if path.exists() { std::fs::remove_file(&path).ok(); } }