From e104a16f614f0e38c5f249bfc96b738c0758f420 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 3 Apr 2026 20:30:07 -0400 Subject: [PATCH] consciousness-mcp: full MCP server in Rust Replaces the Python MCP bridge. Single binary speaks JSON-RPC over stdio, exposes 14 tools: - 10 memory tools (delegate to poc-memory CLI) - channel_list, channel_recv, channel_send, channel_notifications No external dependencies beyond serde_json. Channel tools use capnp RPC to talk to daemon sockets directly. Co-Developed-By: Kent Overstreet --- Cargo.toml | 4 + src/claude/mcp-server.rs | 504 +++++++++++++++++++++++++++++++++++++++ src/thalamus/channels.rs | 10 + 3 files changed, 518 insertions(+) create mode 100644 src/claude/mcp-server.rs diff --git a/Cargo.toml b/Cargo.toml index 833e41a..6fe7f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,3 +108,7 @@ path = "src/claude/memory-search.rs" [[bin]] name = "mcp-schema" path = "src/claude/mcp-schema.rs" + +[[bin]] +name = "consciousness-mcp" +path = "src/claude/mcp-server.rs" diff --git a/src/claude/mcp-server.rs b/src/claude/mcp-server.rs new file mode 100644 index 0000000..f8f245b --- /dev/null +++ b/src/claude/mcp-server.rs @@ -0,0 +1,504 @@ +// mcp-server — MCP server for Claude Code integration +// +// Speaks JSON-RPC over stdio. Exposes memory tools and channel +// operations. Replaces the Python MCP bridge entirely. +// +// Protocol: https://modelcontextprotocol.io/specification + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::io::{self, BufRead, Write}; +use std::process::Command; + +// ── JSON-RPC types ────────────────────────────────────────────── + +#[derive(Deserialize)] +struct Request { + jsonrpc: String, + method: String, + #[serde(default)] + params: Value, + id: Value, +} + +#[derive(Serialize)] +struct Response { + jsonrpc: String, + result: Value, + id: Value, +} + +#[derive(Serialize)] +struct ErrorResponse { + jsonrpc: String, + error: Value, + id: Value, +} + +fn respond(id: Value, result: Value) { + let resp = Response { jsonrpc: "2.0".into(), result, id }; + let json = serde_json::to_string(&resp).unwrap(); + let mut stdout = io::stdout().lock(); + let _ = writeln!(stdout, "{json}"); + let _ = stdout.flush(); +} + +fn respond_error(id: Value, code: i64, message: &str) { + let resp = ErrorResponse { + jsonrpc: "2.0".into(), + error: json!({ "code": code, "message": message }), + id, + }; + let json = serde_json::to_string(&resp).unwrap(); + let mut stdout = io::stdout().lock(); + let _ = writeln!(stdout, "{json}"); + let _ = stdout.flush(); +} + +fn notify(method: &str, params: Value) { + let json = serde_json::to_string(&json!({ + "jsonrpc": "2.0", + "method": method, + "params": params, + })).unwrap(); + let mut stdout = io::stdout().lock(); + let _ = writeln!(stdout, "{json}"); + let _ = stdout.flush(); +} + +// ── Tool definitions ──────────────────────────────────────────── + +fn tool_definitions() -> Vec { + // Memory tools — dispatch to poc-memory CLI + let memory_tools = vec![ + tool("memory_render", "Read a memory node's content and links.", + json!({"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}), + vec!["render"]), + tool("memory_write", "Create or update a memory node.", + json!({"type":"object","properties":{ + "key":{"type":"string","description":"Node key"}, + "content":{"type":"string","description":"Full content (markdown)"} + },"required":["key","content"]}), + vec!["write"]), + tool("memory_search", "Search the memory graph.", + json!({"type":"object","properties":{"terms":{"type":"array","items":{"type":"string"},"description":"Search terms"}},"required":["terms"]}), + vec!["graph", "spread"]), + tool("memory_links", "Show a node's links.", + json!({"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}), + vec!["graph", "link"]), + tool("memory_link_set", "Set link weights between nodes.", + json!({"type":"object","properties":{ + "source":{"type":"string"}, + "targets":{"type":"string","description":"target:weight pairs"} + },"required":["source","targets"]}), + vec!["graph", "link-set"]), + tool("memory_link_add", "Add a link between nodes.", + json!({"type":"object","properties":{ + "source":{"type":"string"}, + "target":{"type":"string"}, + "weight":{"type":"number"} + },"required":["source","target"]}), + vec!["graph", "link-add"]), + tool("memory_used", "Mark a memory node as useful.", + json!({"type":"object","properties":{"key":{"type":"string"}},"required":["key"]}), + vec!["used"]), + tool("memory_weight_set", "Set a node's weight directly.", + json!({"type":"object","properties":{ + "key":{"type":"string"}, + "weight":{"type":"number"} + },"required":["key","weight"]}), + vec!["weight-set"]), + tool("memory_rename", "Rename a memory node.", + json!({"type":"object","properties":{ + "old_key":{"type":"string"}, + "new_key":{"type":"string"} + },"required":["old_key","new_key"]}), + vec!["node", "rename"]), + tool("memory_query", "Query the memory graph.", + json!({"type":"object","properties":{"query":{"type":"string","description":"Query expression"}},"required":["query"]}), + vec!["query"]), + ]; + + // Channel tools + let channel_tools = vec![ + json!({ + "name": "channel_list", + "description": "List all available channels and their status (connected, unread count).", + "inputSchema": {"type": "object", "properties": {}} + }), + json!({ + "name": "channel_recv", + "description": "Read messages from a channel.", + "inputSchema": {"type": "object", "properties": { + "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs-ai, telegram.kent)"}, + "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, + "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} + }, "required": ["channel"]} + }), + json!({ + "name": "channel_send", + "description": "Send a message to a channel.", + "inputSchema": {"type": "object", "properties": { + "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs-ai, irc.pm.nick, telegram.kent)"}, + "message": {"type": "string", "description": "Message to send"} + }, "required": ["channel", "message"]} + }), + json!({ + "name": "channel_notifications", + "description": "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", + "inputSchema": {"type": "object", "properties": {}} + }), + ]; + + memory_tools.into_iter().chain(channel_tools).collect() +} + +fn tool(name: &str, desc: &str, schema: Value, _cli: Vec<&str>) -> Value { + json!({ + "name": name, + "description": desc, + "inputSchema": schema, + }) +} + +// ── Tool dispatch ─────────────────────────────────────────────── + +fn dispatch_tool(name: &str, args: &Value) -> Result { + match name { + // Memory tools — delegate to poc-memory CLI + n if n.starts_with("memory_") => dispatch_memory(n, args), + + // Channel tools + "channel_list" => dispatch_channel_list(), + "channel_recv" => dispatch_channel_recv(args), + "channel_send" => dispatch_channel_send(args), + "channel_notifications" => dispatch_channel_notifications(), + + _ => Err(format!("unknown tool: {name}")), + } +} + +fn dispatch_memory(name: &str, args: &Value) -> Result { + let cli_args = match name { + "memory_render" => vec!["render".into(), arg_str(args, "key")?], + "memory_write" => { + let key = arg_str(args, "key")?; + let content = arg_str(args, "content")?; + // Write content via stdin + return run_poc_memory_stdin(&["write", &key], &content); + } + "memory_search" => { + let mut cmd = vec!["graph".into(), "spread".into()]; + if let Some(terms) = args.get("terms").and_then(|t| t.as_array()) { + for t in terms { + if let Some(s) = t.as_str() { + cmd.push(s.to_string()); + } + } + } + cmd + } + "memory_links" => vec!["graph".into(), "link".into(), arg_str(args, "key")?], + "memory_link_set" => vec!["graph".into(), "link-set".into(), arg_str(args, "source")?, arg_str(args, "targets")?], + "memory_link_add" => { + let mut cmd = vec!["graph".into(), "link-add".into(), arg_str(args, "source")?, arg_str(args, "target")?]; + if let Some(w) = args.get("weight").and_then(|v| v.as_f64()) { + cmd.push(format!("{w:.2}")); + } + cmd + } + "memory_used" => vec!["used".into(), arg_str(args, "key")?], + "memory_weight_set" => vec!["weight-set".into(), arg_str(args, "key")?, format!("{:.2}", args.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0))], + "memory_rename" => vec!["node".into(), "rename".into(), arg_str(args, "old_key")?, arg_str(args, "new_key")?], + "memory_query" => vec!["query".into(), arg_str(args, "query")?], + _ => return Err(format!("unknown memory tool: {name}")), + }; + + let str_args: Vec<&str> = cli_args.iter().map(|s| s.as_str()).collect(); + run_poc_memory(&str_args) +} + +fn run_poc_memory(args: &[&str]) -> Result { + let result = Command::new("poc-memory") + .args(args) + .output() + .map_err(|e| format!("failed to run poc-memory: {e}"))?; + let out = String::from_utf8_lossy(&result.stdout); + let err = String::from_utf8_lossy(&result.stderr); + if result.status.success() { + Ok(format!("{}{}", out.trim(), if err.is_empty() { String::new() } else { format!("\n{}", err.trim()) })) + } else { + Err(format!("{}\n{}", out.trim(), err.trim())) + } +} + +fn run_poc_memory_stdin(args: &[&str], stdin: &str) -> Result { + use std::process::Stdio; + let mut child = Command::new("poc-memory") + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to run poc-memory: {e}"))?; + + if let Some(mut si) = child.stdin.take() { + let _ = si.write_all(stdin.as_bytes()); + } + + let result = child.wait_with_output() + .map_err(|e| format!("poc-memory failed: {e}"))?; + let out = String::from_utf8_lossy(&result.stdout); + let err = String::from_utf8_lossy(&result.stderr); + Ok(format!("{}{}", out.trim(), if err.is_empty() { String::new() } else { format!("\n{}", err.trim()) })) +} + +// ── Channel tool dispatch ─────────────────────────────────────── + +fn dispatch_channel_list() -> Result { + let result = poc_memory::thalamus::channels::fetch_all_channels_blocking(); + let mut lines = Vec::new(); + for (name, connected, unread) in &result { + let status = if *connected { "connected" } else { "disconnected" }; + let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; + lines.push(format!("{} — {}{}", name, status, unread_str)); + } + if lines.is_empty() { + Ok("No channels configured.".into()) + } else { + Ok(lines.join("\n")) + } +} + +fn dispatch_channel_recv(args: &Value) -> Result { + let channel = arg_str(args, "channel")?; + let all_new = args.get("all_new").and_then(|v| v.as_bool()).unwrap_or(true); + let min_count = args.get("min_count").and_then(|v| v.as_u64()).unwrap_or(20) as u32; + + // Find which daemon handles this channel + let prefix = channel.split('.').next().unwrap_or(""); + let sock = dirs::home_dir() + .unwrap_or_default() + .join(format!(".consciousness/channels/{}.sock", prefix)); + + if !sock.exists() { + return Err(format!("no daemon for channel: {}", channel)); + } + + // Use blocking one-shot RPC + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| e.to_string())?; + let local = tokio::task::LocalSet::new(); + + local.block_on(&rt, async { + channel_rpc_recv(&sock, &channel, all_new, min_count).await + }) +} + +fn dispatch_channel_send(args: &Value) -> Result { + let channel = arg_str(args, "channel")?; + let message = arg_str(args, "message")?; + + let prefix = channel.split('.').next().unwrap_or(""); + let sock = dirs::home_dir() + .unwrap_or_default() + .join(format!(".consciousness/channels/{}.sock", prefix)); + + if !sock.exists() { + return Err(format!("no daemon for channel: {}", channel)); + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| e.to_string())?; + let local = tokio::task::LocalSet::new(); + + local.block_on(&rt, async { + channel_rpc_send(&sock, &channel, &message).await + }) +} + +fn dispatch_channel_notifications() -> Result { + // Read pending notifications from the consciousness binary's state + // For now, just list channels with unread counts + let result = poc_memory::thalamus::channels::fetch_all_channels_blocking(); + let unread: Vec<_> = result.iter() + .filter(|(_, _, u)| *u > 0) + .collect(); + if unread.is_empty() { + Ok("No pending notifications.".into()) + } else { + let lines: Vec = unread.iter() + .map(|(name, _, count)| format!("{}: {} unread", name, count)) + .collect(); + Ok(lines.join("\n")) + } +} + +// ── Channel RPC helpers ───────────────────────────────────────── + +async fn channel_rpc_recv( + sock: &std::path::Path, + channel: &str, + all_new: bool, + min_count: u32, +) -> Result { + use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; + use futures::AsyncReadExt; + use tokio_util::compat::TokioAsyncReadCompatExt; + use poc_memory::channel_capnp::channel_server; + + let stream = tokio::net::UnixStream::connect(sock).await + .map_err(|e| format!("connect failed: {e}"))?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + tokio::task::spawn_local(rpc_system); + + let mut req = client.recv_request(); + req.get().set_channel(channel); + req.get().set_all_new(all_new); + req.get().set_min_count(min_count); + + let reply = req.send().promise.await + .map_err(|e| format!("recv failed: {e}"))?; + let text = reply.get() + .map_err(|e| format!("reply error: {e}"))? + .get_text() + .map_err(|e| format!("text error: {e}"))? + .to_str() + .map_err(|e| format!("utf8 error: {e}"))?; + + if text.is_empty() { + Ok("(no messages)".into()) + } else { + Ok(text.to_string()) + } +} + +async fn channel_rpc_send( + sock: &std::path::Path, + channel: &str, + message: &str, +) -> Result { + use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; + use futures::AsyncReadExt; + use tokio_util::compat::TokioAsyncReadCompatExt; + use poc_memory::channel_capnp::channel_server; + + let stream = tokio::net::UnixStream::connect(sock).await + .map_err(|e| format!("connect failed: {e}"))?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + tokio::task::spawn_local(rpc_system); + + let mut req = client.send_request(); + req.get().set_channel(channel); + req.get().set_message(message); + + req.send().promise.await + .map_err(|e| format!("send failed: {e}"))?; + + Ok(format!("sent to {}", channel)) +} + +// ── Helpers ───────────────────────────────────────────────────── + +fn arg_str(args: &Value, key: &str) -> Result { + args.get(key) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| format!("missing required argument: {key}")) +} + +fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { + poc_memory::thalamus::channels::fetch_all_channels_blocking() +} + +// ── Main loop ─────────────────────────────────────────────────── + +fn main() { + let stdin = io::stdin(); + let reader = stdin.lock(); + + for line in reader.lines() { + let line = match line { + Ok(l) if !l.is_empty() => l, + _ => continue, + }; + + let req: Request = match serde_json::from_str(&line) { + Ok(r) => r, + Err(_) => continue, + }; + + match req.method.as_str() { + "initialize" => { + respond(req.id, json!({ + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "consciousness", + "version": "0.4.0" + } + })); + } + + "notifications/initialized" => { + // Client ack — no response needed + } + + "tools/list" => { + let tools = tool_definitions(); + respond(req.id, json!({ "tools": tools })); + } + + "tools/call" => { + let name = req.params.get("name") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let args = req.params.get("arguments") + .cloned() + .unwrap_or(json!({})); + + match dispatch_tool(name, &args) { + Ok(text) => { + respond(req.id, json!({ + "content": [{"type": "text", "text": text}] + })); + } + Err(e) => { + respond(req.id, json!({ + "content": [{"type": "text", "text": e}], + "isError": true + })); + } + } + } + + _ => { + respond_error(req.id, -32601, &format!("unknown method: {}", req.method)); + } + } + } +} diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 13c5919..45594ec 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -361,6 +361,16 @@ pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { .unwrap_or_default() } +/// Blocking version for use from synchronous contexts (MCP server, etc.). +pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, fetch_all_channels_inner()) +} + async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = dirs::home_dir() .unwrap_or_default()