// tools/channels.rs — Channel tools (list, recv, send, notifications) // // Shared by consciousness agent and the MCP server. // One-shot capnp RPC calls to channel daemon sockets. use anyhow::{Context, Result}; use serde::Deserialize; use super::Tool; // ── Tool registry ────────────────────────────────────────────── pub fn tools() -> [Tool; 6] { [ Tool { name: "channel_list", description: "List all available channels and their status (connected, unread count).", parameters_json: r#"{"type":"object","properties":{}}"#, handler: |_a, _v| Box::pin(async { channel_list().await }) }, Tool { name: "channel_recv", description: "Read messages from a channel.", parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. irc.#bcachefs, telegram.kent)"},"all_new":{"type":"boolean","description":"If true, return all unconsumed messages","default":true},"min_count":{"type":"integer","description":"Minimum number of lines to return","default":20}},"required":["channel"]}"#, handler: |_a, v| Box::pin(async move { channel_recv(&v).await }) }, Tool { name: "channel_send", description: "Send a message to a channel.", parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"},"message":{"type":"string","description":"Message to send"}},"required":["channel","message"]}"#, handler: |_a, v| Box::pin(async move { channel_send(&v).await }) }, Tool { name: "channel_notifications", description: "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", parameters_json: r#"{"type":"object","properties":{}}"#, handler: |_a, _v| Box::pin(async { channel_notifications().await }) }, Tool { name: "channel_open", description: "Open a channel — start monitoring. For tmux: finds the pane by name and attaches pipe-pane.", parameters_json: r#"{"type":"object","properties":{"label":{"type":"string","description":"Channel label / tmux pane name"}},"required":["label"]}"#, handler: |_a, v| Box::pin(async move { channel_open(&v).await }) }, Tool { name: "channel_close", description: "Close a channel — stop monitoring and clean up.", parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. tmux.ktest)"}},"required":["channel"]}"#, handler: |_a, v| Box::pin(async move { channel_close(&v).await }) }, ] } // ── Tool implementations ─────────────────────────────────────── async fn channel_list() -> Result { let result = fetch_all_channels().await; if result.is_empty() { return Ok("No channels configured.".into()); } Ok(result.iter().map(|(name, connected, unread)| { let status = if *connected { "connected" } else { "disconnected" }; let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; format!("{} — {}{}", name, status, unread_str) }).collect::>().join("\n")) } #[derive(Deserialize)] struct RecvArgs { channel: String, #[serde(default = "default_true")] all_new: bool, #[serde(default = "default_min_count")] min_count: u32, } fn default_true() -> bool { true } fn default_min_count() -> u32 { 20 } async fn channel_recv(args: &serde_json::Value) -> Result { let a: RecvArgs = serde_json::from_value(args.clone()) .context("invalid channel_recv arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let all_new = a.all_new; let min_count = a.min_count; // capnp-rpc needs LocalSet — bridge to it via spawn_blocking tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, rpc_recv(&sock, &channel, all_new, min_count)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } #[derive(Deserialize)] struct SendArgs { channel: String, message: String, } async fn channel_send(args: &serde_json::Value) -> Result { let a: SendArgs = serde_json::from_value(args.clone()) .context("invalid channel_send arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let message = a.message; tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, rpc_send(&sock, &channel, &message)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } async fn channel_notifications() -> Result { let result = fetch_all_channels().await; let unread: Vec<_> = result.iter().filter(|(_, _, u)| *u > 0).collect(); if unread.is_empty() { Ok("No pending notifications.".into()) } else { Ok(unread.iter() .map(|(name, _, count)| format!("{}: {} unread", name, count)) .collect::>() .join("\n")) } } async fn channel_open(args: &serde_json::Value) -> Result { let label = args.get("label").and_then(|v| v.as_str()) .context("label is required")? .to_string(); let prefix = label.split('.').next().unwrap_or("tmux"); let sock = daemon_sock(prefix)?; tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, rpc_open(&sock, &label)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } async fn channel_close(args: &serde_json::Value) -> Result { let channel = args.get("channel").and_then(|v| v.as_str()) .context("channel is required")? .to_string(); let sock = daemon_sock(&channel)?; tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, rpc_close(&sock, &channel)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } // ── Socket helpers ───────────────────────────────────────────── fn channels_dir() -> std::path::PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels") } fn daemon_sock(channel: &str) -> Result { let prefix = channel.split('.').next().unwrap_or(""); let sock = channels_dir().join(format!("{}.sock", prefix)); if !sock.exists() { anyhow::bail!("no daemon for channel: {}", channel); } Ok(sock) } // ── Channel RPC ──────────────────────────────────────────────── use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio_util::compat::TokioAsyncReadCompatExt; use crate::channel_capnp::channel_server; async fn rpc_connect(sock: &std::path::Path) -> Result { let stream = tokio::net::UnixStream::connect(sock).await .map_err(|e| format!("connect failed: {e}"))?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); Ok(client) } async fn rpc_recv( sock: &std::path::Path, channel: &str, all_new: bool, min_count: u32, ) -> Result { let client = rpc_connect(sock).await?; let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); let reply = req.send().promise.await.map_err(|e| format!("recv failed: {e}"))?; let text = reply.get().map_err(|e| format!("{e}"))? .get_text().map_err(|e| format!("{e}"))? .to_str().map_err(|e| format!("{e}"))?; if text.is_empty() { Ok("(no messages)".into()) } else { Ok(text.to_string()) } } async fn rpc_send( sock: &std::path::Path, channel: &str, message: &str, ) -> Result { let client = rpc_connect(sock).await?; let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); req.send().promise.await.map_err(|e| format!("send failed: {e}"))?; Ok(format!("sent to {}", channel)) } async fn rpc_list(sock: &std::path::Path) -> Option> { let client = rpc_connect(sock).await.ok()?; let mut result = Vec::new(); if let Ok(reply) = client.list_request().send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } Some(result) } async fn rpc_open(sock: &std::path::Path, label: &str) -> Result { let client = rpc_connect(sock).await?; let mut req = client.open_request(); req.get().set_label(label); req.send().promise.await.map_err(|e| format!("open failed: {e}"))?; Ok(format!("opened channel tmux.{}", label)) } async fn rpc_close(sock: &std::path::Path, channel: &str) -> Result { let client = rpc_connect(sock).await?; let mut req = client.close_request(); req.get().set_channel(channel); req.send().promise.await.map_err(|e| format!("close failed: {e}"))?; Ok(format!("closed channel {}", channel)) } // ── Fetch all channels ───────────────────────────────────────── /// Fetch channel status from all daemon sockets. /// Runs on a dedicated thread with LocalSet because capnp-rpc /// uses Rc (not Send). This is the boundary between the /// multi-thread runtime and the capnp world. pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) }) .await .unwrap_or_default() } async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = channels_dir(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); let mut result = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { result.push((daemon_name, false, 0)); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); match rpc_list(&sock).await { None => result.push((daemon_name, false, 0)), Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)), Some(channels) => result.extend(channels), } } result }