// tools/channels.rs — Channel tools (list, recv, send, notifications) // // Shared by consciousness agent and the MCP server. // One-shot capnp RPC calls to channel daemon sockets. use anyhow::{Context, Result}; use serde::Deserialize; use serde_json::json; use super::ToolDef; // ── Definitions ──────────────────────────────────────────────── pub fn definitions() -> Vec { vec![ ToolDef::new( "channel_list", "List all available channels and their status (connected, unread count).", json!({"type": "object", "properties": {}}), ), ToolDef::new( "channel_recv", "Read messages from a channel.", json!({ "type": "object", "properties": { "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"}, "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} }, "required": ["channel"] }), ), ToolDef::new( "channel_send", "Send a message to a channel.", json!({ "type": "object", "properties": { "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"}, "message": {"type": "string", "description": "Message to send"} }, "required": ["channel", "message"] }), ), ToolDef::new( "channel_notifications", "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", json!({"type": "object", "properties": {}}), ), ] } // ── Dispatch ─────────────────────────────────────────────────── pub async fn dispatch(name: &str, args: &serde_json::Value) -> Result { match name { "channel_list" => channel_list().await, "channel_recv" => channel_recv(args).await, "channel_send" => channel_send(args).await, "channel_notifications" => channel_notifications().await, _ => anyhow::bail!("unknown channel tool: {}", name), } } /// Blocking dispatch for synchronous contexts (MCP server). pub fn dispatch_blocking(name: &str, args: &serde_json::Value) -> Result { match name { "channel_list" => Ok(channel_list_blocking()), "channel_notifications" => Ok(channel_notifications_blocking()), "channel_recv" | "channel_send" => { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; let local = tokio::task::LocalSet::new(); local.block_on(&rt, async { match name { "channel_recv" => channel_recv(args).await, "channel_send" => channel_send(args).await, _ => unreachable!(), } }) } _ => anyhow::bail!("unknown channel tool: {}", name), } } // ── Tool implementations ─────────────────────────────────────── async fn channel_list() -> Result { let result = fetch_all_channels().await; Ok(format_channel_list(&result)) } fn channel_list_blocking() -> String { let result = fetch_all_channels_blocking(); format_channel_list(&result) } fn format_channel_list(channels: &[(String, bool, u32)]) -> String { if channels.is_empty() { return "No channels configured.".into(); } channels.iter().map(|(name, connected, unread)| { let status = if *connected { "connected" } else { "disconnected" }; let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; format!("{} — {}{}", name, status, unread_str) }).collect::>().join("\n") } #[derive(Deserialize)] struct RecvArgs { channel: String, #[serde(default = "default_true")] all_new: bool, #[serde(default = "default_min_count")] min_count: u32, } fn default_true() -> bool { true } fn default_min_count() -> u32 { 20 } async fn channel_recv(args: &serde_json::Value) -> Result { let a: RecvArgs = serde_json::from_value(args.clone()) .context("invalid channel_recv arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let all_new = a.all_new; let min_count = a.min_count; tokio::task::spawn_blocking(move || { rpc_blocking(|local, rt| { local.block_on(rt, rpc_recv(&sock, &channel, all_new, min_count)) }) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } #[derive(Deserialize)] struct SendArgs { channel: String, message: String, } async fn channel_send(args: &serde_json::Value) -> Result { let a: SendArgs = serde_json::from_value(args.clone()) .context("invalid channel_send arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let message = a.message; tokio::task::spawn_blocking(move || { rpc_blocking(|local, rt| { local.block_on(rt, rpc_send(&sock, &channel, &message)) }) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } async fn channel_notifications() -> Result { let result = fetch_all_channels().await; Ok(format_notifications(&result)) } fn channel_notifications_blocking() -> String { let result = fetch_all_channels_blocking(); format_notifications(&result) } fn format_notifications(channels: &[(String, bool, u32)]) -> String { let unread: Vec<_> = channels.iter().filter(|(_, _, u)| *u > 0).collect(); if unread.is_empty() { "No pending notifications.".into() } else { unread.iter() .map(|(name, _, count)| format!("{}: {} unread", name, count)) .collect::>() .join("\n") } } // ── Socket helpers ───────────────────────────────────────────── fn channels_dir() -> std::path::PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels") } fn daemon_sock(channel: &str) -> Result { let prefix = channel.split('.').next().unwrap_or(""); let sock = channels_dir().join(format!("{}.sock", prefix)); if !sock.exists() { anyhow::bail!("no daemon for channel: {}", channel); } Ok(sock) } // ── Channel RPC ──────────────────────────────────────────────── /// Create a tokio runtime + LocalSet for capnp RPC calls. /// capnp-rpc uses Rc so futures aren't Send — this must run /// on a dedicated thread via spawn_blocking. fn rpc_blocking(f: F) -> T where F: FnOnce(&tokio::task::LocalSet, &tokio::runtime::Runtime) -> T, { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); f(&local, &rt) } use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio_util::compat::TokioAsyncReadCompatExt; use crate::channel_capnp::channel_server; async fn rpc_connect(sock: &std::path::Path) -> Result { let stream = tokio::net::UnixStream::connect(sock).await .map_err(|e| format!("connect failed: {e}"))?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); Ok(client) } async fn rpc_recv( sock: &std::path::Path, channel: &str, all_new: bool, min_count: u32, ) -> Result { let client = rpc_connect(sock).await?; let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); let reply = req.send().promise.await .map_err(|e| format!("recv failed: {e}"))?; let text = reply.get() .map_err(|e| format!("reply error: {e}"))? .get_text() .map_err(|e| format!("text error: {e}"))? .to_str() .map_err(|e| format!("utf8 error: {e}"))?; if text.is_empty() { Ok("(no messages)".into()) } else { Ok(text.to_string()) } } async fn rpc_send( sock: &std::path::Path, channel: &str, message: &str, ) -> Result { let client = rpc_connect(sock).await?; let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); req.send().promise.await .map_err(|e| format!("send failed: {e}"))?; Ok(format!("sent to {}", channel)) } async fn rpc_list(sock: &std::path::Path) -> Option> { let client = rpc_connect(sock).await.ok()?; let mut result = Vec::new(); if let Ok(reply) = client.list_request().send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } Some(result) } // ── Fetch all channels ───────────────────────────────────────── /// Fetch channel status from all daemon sockets. /// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { fetch_all_channels_blocking() }) .await .unwrap_or_default() } /// Blocking version for synchronous contexts. pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) } async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = channels_dir(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); let mut result = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { result.push((daemon_name, false, 0)); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); match rpc_list(&sock).await { None => result.push((daemon_name, false, 0)), Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)), Some(channels) => result.extend(channels), } } result }