tools: unify channel and memory tools, clean up mcp-server

Move all tool definitions and dispatch out of mcp-server.rs:
- Channel tools: new tools/channels.rs with definitions, async
  dispatch, blocking dispatch, and capnp RPC helpers
- Memory tools: make tools/memory.rs pub so mcp-server can use it

mcp-server.rs is now pure JSON-RPC protocol plumbing (482 → 169 lines).
No tool-specific code remains in that file.

Also removes duplicated channel RPC helpers and fetch_all_channels
that were in both mcp-server.rs and thalamus/channels.rs.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-04 14:45:22 -04:00 committed by Kent Overstreet
parent 1ef137fb3a
commit 943f42d876
4 changed files with 392 additions and 352 deletions

345
src/agent/tools/channels.rs Normal file
View file

@ -0,0 +1,345 @@
// tools/channels.rs — Channel tools (list, recv, send, notifications)
//
// Shared by consciousness agent and the MCP server.
// One-shot capnp RPC calls to channel daemon sockets.
use anyhow::{Context, Result};
use serde::Deserialize;
use serde_json::json;
use super::ToolDef;
// ── Definitions ────────────────────────────────────────────────
pub fn definitions() -> Vec<ToolDef> {
vec![
ToolDef::new(
"channel_list",
"List all available channels and their status (connected, unread count).",
json!({"type": "object", "properties": {}}),
),
ToolDef::new(
"channel_recv",
"Read messages from a channel.",
json!({
"type": "object",
"properties": {
"channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"},
"all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true},
"min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20}
},
"required": ["channel"]
}),
),
ToolDef::new(
"channel_send",
"Send a message to a channel.",
json!({
"type": "object",
"properties": {
"channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"},
"message": {"type": "string", "description": "Message to send"}
},
"required": ["channel", "message"]
}),
),
ToolDef::new(
"channel_notifications",
"Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.",
json!({"type": "object", "properties": {}}),
),
]
}
// ── Dispatch ───────────────────────────────────────────────────
pub async fn dispatch(name: &str, args: &serde_json::Value) -> Result<String> {
match name {
"channel_list" => channel_list().await,
"channel_recv" => channel_recv(args).await,
"channel_send" => channel_send(args).await,
"channel_notifications" => channel_notifications().await,
_ => anyhow::bail!("unknown channel tool: {}", name),
}
}
/// Blocking dispatch for synchronous contexts (MCP server).
pub fn dispatch_blocking(name: &str, args: &serde_json::Value) -> Result<String> {
match name {
"channel_list" => Ok(channel_list_blocking()),
"channel_notifications" => Ok(channel_notifications_blocking()),
"channel_recv" | "channel_send" => {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async {
match name {
"channel_recv" => channel_recv(args).await,
"channel_send" => channel_send(args).await,
_ => unreachable!(),
}
})
}
_ => anyhow::bail!("unknown channel tool: {}", name),
}
}
// ── Tool implementations ───────────────────────────────────────
async fn channel_list() -> Result<String> {
let result = fetch_all_channels().await;
Ok(format_channel_list(&result))
}
fn channel_list_blocking() -> String {
let result = fetch_all_channels_blocking();
format_channel_list(&result)
}
fn format_channel_list(channels: &[(String, bool, u32)]) -> String {
if channels.is_empty() {
return "No channels configured.".into();
}
channels.iter().map(|(name, connected, unread)| {
let status = if *connected { "connected" } else { "disconnected" };
let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() };
format!("{}{}{}", name, status, unread_str)
}).collect::<Vec<_>>().join("\n")
}
#[derive(Deserialize)]
struct RecvArgs {
channel: String,
#[serde(default = "default_true")]
all_new: bool,
#[serde(default = "default_min_count")]
min_count: u32,
}
fn default_true() -> bool { true }
fn default_min_count() -> u32 { 20 }
async fn channel_recv(args: &serde_json::Value) -> Result<String> {
let a: RecvArgs = serde_json::from_value(args.clone())
.context("invalid channel_recv arguments")?;
let sock = daemon_sock(&a.channel)?;
let channel = a.channel;
let all_new = a.all_new;
let min_count = a.min_count;
tokio::task::spawn_blocking(move || {
rpc_blocking(|local, rt| {
local.block_on(rt, rpc_recv(&sock, &channel, all_new, min_count))
})
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
#[derive(Deserialize)]
struct SendArgs {
channel: String,
message: String,
}
async fn channel_send(args: &serde_json::Value) -> Result<String> {
let a: SendArgs = serde_json::from_value(args.clone())
.context("invalid channel_send arguments")?;
let sock = daemon_sock(&a.channel)?;
let channel = a.channel;
let message = a.message;
tokio::task::spawn_blocking(move || {
rpc_blocking(|local, rt| {
local.block_on(rt, rpc_send(&sock, &channel, &message))
})
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
async fn channel_notifications() -> Result<String> {
let result = fetch_all_channels().await;
Ok(format_notifications(&result))
}
fn channel_notifications_blocking() -> String {
let result = fetch_all_channels_blocking();
format_notifications(&result)
}
fn format_notifications(channels: &[(String, bool, u32)]) -> String {
let unread: Vec<_> = channels.iter().filter(|(_, _, u)| *u > 0).collect();
if unread.is_empty() {
"No pending notifications.".into()
} else {
unread.iter()
.map(|(name, _, count)| format!("{}: {} unread", name, count))
.collect::<Vec<_>>()
.join("\n")
}
}
// ── Socket helpers ─────────────────────────────────────────────
fn channels_dir() -> std::path::PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels")
}
fn daemon_sock(channel: &str) -> Result<std::path::PathBuf> {
let prefix = channel.split('.').next().unwrap_or("");
let sock = channels_dir().join(format!("{}.sock", prefix));
if !sock.exists() {
anyhow::bail!("no daemon for channel: {}", channel);
}
Ok(sock)
}
// ── Channel RPC ────────────────────────────────────────────────
/// Create a tokio runtime + LocalSet for capnp RPC calls.
/// capnp-rpc uses Rc so futures aren't Send — this must run
/// on a dedicated thread via spawn_blocking.
fn rpc_blocking<F, T>(f: F) -> T
where
F: FnOnce(&tokio::task::LocalSet, &tokio::runtime::Runtime) -> T,
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
f(&local, &rt)
}
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::channel_capnp::channel_server;
async fn rpc_connect(sock: &std::path::Path) -> Result<channel_server::Client, String> {
let stream = tokio::net::UnixStream::connect(sock).await
.map_err(|e| format!("connect failed: {e}"))?;
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let client: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
Ok(client)
}
async fn rpc_recv(
sock: &std::path::Path,
channel: &str,
all_new: bool,
min_count: u32,
) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.recv_request();
req.get().set_channel(channel);
req.get().set_all_new(all_new);
req.get().set_min_count(min_count);
let reply = req.send().promise.await
.map_err(|e| format!("recv failed: {e}"))?;
let text = reply.get()
.map_err(|e| format!("reply error: {e}"))?
.get_text()
.map_err(|e| format!("text error: {e}"))?
.to_str()
.map_err(|e| format!("utf8 error: {e}"))?;
if text.is_empty() {
Ok("(no messages)".into())
} else {
Ok(text.to_string())
}
}
async fn rpc_send(
sock: &std::path::Path,
channel: &str,
message: &str,
) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.send_request();
req.get().set_channel(channel);
req.get().set_message(message);
req.send().promise.await
.map_err(|e| format!("send failed: {e}"))?;
Ok(format!("sent to {}", channel))
}
async fn rpc_list(sock: &std::path::Path) -> Option<Vec<(String, bool, u32)>> {
let client = rpc_connect(sock).await.ok()?;
let mut result = Vec::new();
if let Ok(reply) = client.list_request().send().promise.await {
if let Ok(r) = reply.get() {
if let Ok(channels) = r.get_channels() {
for ch in channels.iter() {
if let Ok(name) = ch.get_name() {
result.push((
name.to_str().unwrap_or("").to_string(),
ch.get_connected(),
ch.get_unread(),
));
}
}
}
}
}
Some(result)
}
// ── Fetch all channels ─────────────────────────────────────────
/// Fetch channel status from all daemon sockets.
/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send).
pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> {
tokio::task::spawn_blocking(|| {
fetch_all_channels_blocking()
})
.await
.unwrap_or_default()
}
/// Blocking version for synchronous contexts.
pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, fetch_all_channels_inner())
}
async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> {
let channels_dir = channels_dir();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
sup.ensure_running();
let mut result = Vec::new();
for (daemon_name, _enabled, alive) in sup.status() {
if !alive {
result.push((daemon_name, false, 0));
continue;
}
let sock = channels_dir.join(format!("{}.sock", daemon_name));
match rpc_list(&sock).await {
None => result.push((daemon_name, false, 0)),
Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)),
Some(channels) => result.extend(channels),
}
}
result
}

View file

@ -6,10 +6,11 @@
// Core tools
mod bash;
pub mod channels;
mod edit;
mod glob;
mod grep;
mod memory;
pub mod memory;
mod read;
mod web;
mod write;
@ -191,6 +192,15 @@ pub async fn dispatch_shared(
});
}
// Channel tools
if name.starts_with("channel_") {
let result = channels::dispatch(name, args).await;
return Some(match result {
Ok(s) => ToolOutput::text(s),
Err(e) => ToolOutput::error(e),
});
}
// File and execution tools
let result = match name {
"read_file" => read::read_file(args),
@ -225,6 +235,7 @@ pub fn definitions() -> Vec<ToolDef> {
glob::definition(),
];
defs.extend(control::definitions());
defs.extend(channels::definitions());
defs.extend(memory::definitions());
defs
}