mcp_server: Unix socket server for external tool access

Exposes memory/journal tools over ~/.consciousness/mcp.sock via
JSON-RPC 2.0 (MCP protocol). External processes (consciousness-mcp,
poc-memory) will connect here instead of accessing the store directly.

Handles: initialize, tools/list, tools/call
Dispatches to the same tool handlers the agent uses internally.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-12 21:05:13 -04:00
parent 72f4f1b617
commit 2c0f2065e0
3 changed files with 268 additions and 0 deletions

View file

@ -68,6 +68,9 @@ pub mod cli;
// Thalamus — universal notification routing and channel infrastructure // Thalamus — universal notification routing and channel infrastructure
pub mod thalamus; pub mod thalamus;
// MCP server — exposes memory tools over Unix socket
pub mod mcp_server;
// Re-export at crate root — capnp codegen emits `crate::daemon_capnp::` paths // Re-export at crate root — capnp codegen emits `crate::daemon_capnp::` paths
pub use thalamus::daemon_capnp; pub use thalamus::daemon_capnp;

255
src/mcp_server.rs Normal file
View file

@ -0,0 +1,255 @@
// mcp_server.rs — MCP server over Unix domain socket
//
// Exposes memory tools to external processes (consciousness-mcp, poc-memory)
// via JSON-RPC 2.0 over newline-delimited JSON on ~/.consciousness/mcp.sock.
//
// Also provides memory_rpc() for use by external callers.
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{UnixListener, UnixStream};
use crate::agent::tools::Tool;
pub fn socket_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/mcp.sock")
}
/// Forward a tool call to the daemon socket, or execute locally if daemon is down.
/// Used by external processes that don't have direct store access.
pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
use std::os::unix::net::UnixStream;
use std::io::{BufRead, BufReader, BufWriter, Write};
let path = socket_path();
let stream = match UnixStream::connect(&path) {
Ok(s) => s,
Err(_) => return rpc_local(tool_name, &args),
};
let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = BufWriter::new(stream);
// Initialize
let init = json!({"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "forward", "version": "0.1"}}});
writeln!(writer, "{}", init)?;
writer.flush()?;
let mut buf = String::new();
reader.read_line(&mut buf)?;
// Call tool
let call = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/call",
"params": {"name": tool_name, "arguments": args}});
writeln!(writer, "{}", call)?;
writer.flush()?;
buf.clear();
reader.read_line(&mut buf)?;
let resp: serde_json::Value = serde_json::from_str(&buf)?;
if let Some(err) = resp.get("error") {
anyhow::bail!("daemon error: {}", err);
}
let result = resp.get("result").cloned().unwrap_or(json!({}));
let text = result.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
Ok(text.to_string())
}
/// Execute a tool locally when daemon isn't running.
fn rpc_local(tool_name: &str, args: &serde_json::Value) -> Result<String> {
crate::agent::tools::memory::run_with_local_store(tool_name, args.clone())
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct JsonRpcRequest {
jsonrpc: String,
id: Option<serde_json::Value>,
method: String,
params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
struct JsonRpcResponse {
jsonrpc: &'static str,
id: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize)]
struct JsonRpcError {
code: i64,
message: String,
}
impl JsonRpcResponse {
fn success(id: serde_json::Value, result: serde_json::Value) -> Self {
Self { jsonrpc: "2.0", id, result: Some(result), error: None }
}
fn error(id: serde_json::Value, code: i64, message: impl Into<String>) -> Self {
Self {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcError { code, message: message.into() }),
}
}
}
/// Start the MCP server. Call once at daemon startup.
pub async fn start(tools: Vec<Tool>) -> Result<()> {
let path = socket_path();
// Clean up stale socket
if path.exists() {
std::fs::remove_file(&path).ok();
}
// Ensure parent directory exists
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&path)
.with_context(|| format!("binding MCP socket at {:?}", path))?;
dbglog!("[mcp-server] listening on {:?}", path);
let tools = Arc::new(tools);
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let tools = tools.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, &tools).await {
dbglog!("[mcp-server] connection error: {:#}", e);
}
});
}
Err(e) => {
dbglog!("[mcp-server] accept error: {}", e);
}
}
}
});
Ok(())
}
async fn handle_connection(stream: UnixStream, tools: &[Tool]) -> Result<()> {
let (reader, writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
break; // EOF
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let response = match serde_json::from_str::<JsonRpcRequest>(trimmed) {
Ok(req) => handle_request(req, tools).await,
Err(e) => JsonRpcResponse::error(
serde_json::Value::Null,
-32700,
format!("Parse error: {}", e),
),
};
let mut out = serde_json::to_string(&response)?;
out.push('\n');
writer.write_all(out.as_bytes()).await?;
writer.flush().await?;
}
Ok(())
}
async fn handle_request(req: JsonRpcRequest, tools: &[Tool]) -> JsonRpcResponse {
let id = req.id.unwrap_or(serde_json::Value::Null);
match req.method.as_str() {
"initialize" => {
JsonRpcResponse::success(id, json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "consciousness",
"version": env!("CARGO_PKG_VERSION")
}
}))
}
"notifications/initialized" => {
// Notification, no response needed but we return success anyway
JsonRpcResponse::success(id, json!({}))
}
"tools/list" => {
let tool_list: Vec<serde_json::Value> = tools.iter().map(|t| {
json!({
"name": t.name,
"description": t.description,
"inputSchema": serde_json::from_str::<serde_json::Value>(t.parameters_json)
.unwrap_or(json!({"type": "object"}))
})
}).collect();
JsonRpcResponse::success(id, json!({ "tools": tool_list }))
}
"tools/call" => {
let params = req.params.unwrap_or(json!({}));
let name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
let args = params.get("arguments").cloned().unwrap_or(json!({}));
match tools.iter().find(|t| t.name == name) {
Some(tool) => {
match (tool.handler)(None, args).await {
Ok(result) => JsonRpcResponse::success(id, json!({
"content": [{ "type": "text", "text": result }]
})),
Err(e) => JsonRpcResponse::error(id, -32000, format!("{:#}", e)),
}
}
None => JsonRpcResponse::error(id, -32601, format!("Unknown tool: {}", name)),
}
}
_ => JsonRpcResponse::error(id, -32601, format!("Method not found: {}", req.method)),
}
}
/// Remove the socket file on shutdown.
pub fn cleanup() {
let path = socket_path();
if path.exists() {
std::fs::remove_file(&path).ok();
}
}

View file

@ -212,10 +212,20 @@ async fn start(cli: crate::user::CliArgs) -> Result<()> {
}) })
.expect("spawn UI thread"); .expect("spawn UI thread");
// Start MCP server for external tool access
let mut tools: Vec<crate::agent::tools::Tool> = Vec::new();
tools.extend(crate::agent::tools::memory::memory_tools());
tools.extend(crate::agent::tools::memory::journal_tools());
if let Err(e) = crate::mcp_server::start(tools).await {
eprintln!("MCP server failed to start: {:#}", e);
}
// Mind event loop — runs on the main tokio runtime // Mind event loop — runs on the main tokio runtime
mind.init().await; mind.init().await;
mind.run(mind_rx, turn_rx).await; mind.run(mind_rx, turn_rx).await;
crate::mcp_server::cleanup();
ui_handle.join().unwrap_or_else(|_| Err(anyhow::anyhow!("UI thread panicked"))) ui_handle.join().unwrap_or_else(|_| Err(anyhow::anyhow!("UI thread panicked")))
} }