// channels.rs — Channel client for the thalamus // // Discovers channel daemon sockets in ~/.consciousness/channels/, // connects via capnp RPC, and provides send/recv operations. // // Each daemon socket speaks the channel.capnp protocol. The channel // manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. use std::collections::HashMap; use std::path::PathBuf; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info, warn}; use crate::channel_capnp::{channel_client, channel_server}; use capnp::capability::Promise; /// A live connection to a channel daemon. struct DaemonConnection { #[allow(dead_code)] prefix: String, client: channel_server::Client, // Hold the RPC system task so it doesn't get dropped _rpc_task: tokio::task::JoinHandle>, } /// Manages all channel daemon connections. pub struct ChannelManager { daemons: HashMap, channels_dir: PathBuf, } impl ChannelManager { pub fn new() -> Self { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); Self { daemons: HashMap::new(), channels_dir, } } /// Connect to a daemon socket, returning the capnp client. async fn connect(path: &std::path::Path) -> Result< (channel_server::Client, tokio::task::JoinHandle>), Box, > { let stream = UnixStream::connect(path).await?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let task = tokio::task::spawn_local(rpc_system); Ok((client, task)) } /// Scan the channels directory for daemon sockets and connect. pub async fn discover(&mut self) { let dir = match std::fs::read_dir(&self.channels_dir) { Ok(d) => d, Err(_) => return, // directory doesn't exist yet }; for entry in dir.flatten() { let path = entry.path(); if path.extension().map_or(false, |e| e == "sock") { let prefix = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(); if self.daemons.contains_key(&prefix) { continue; } match Self::connect(&path).await { Ok((client, task)) => { info!("connected to channel daemon: {}", prefix); self.daemons.insert( prefix.clone(), DaemonConnection { prefix, client, _rpc_task: task, }, ); } Err(e) => { warn!("failed to connect to {}: {}", path.display(), e); } } } } } /// Find the daemon client for a channel path. fn client_for(&self, channel: &str) -> Option<&channel_server::Client> { let prefix = channel.split('.').next()?; self.daemons.get(prefix).map(|d| &d.client) } /// Send a message to a channel. pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); req.send().promise.await .map_err(|e| format!("send failed: {}", e))?; Ok(()) } /// Read from a channel. pub async fn recv( &self, channel: &str, all_new: bool, min_count: u32, ) -> Result { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); let reply = req.send().promise.await .map_err(|e| format!("recv failed: {}", e))?; let text = reply.get() .map_err(|e| format!("recv reply error: {}", e))? .get_text() .map_err(|e| format!("recv text error: {}", e))?; Ok(text.to_str().unwrap_or("").to_string()) } /// List connected daemon prefixes. pub fn prefixes(&self) -> Vec<&str> { self.daemons.keys().map(|s| s.as_str()).collect() } /// List all channels from all connected daemons. pub async fn list_all(&self) -> Vec<(String, bool, u32)> { let mut result = Vec::new(); for daemon in self.daemons.values() { let req = daemon.client.list_request(); if let Ok(reply) = req.send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } } result } } // ── Channel notifications ─────────────────────────────────────── /// A notification received from a channel daemon. #[derive(Debug, Clone)] pub struct ChannelNotification { pub channel: String, pub urgency: u8, pub preview: String, pub count: u32, } /// Callback implementation that forwards notifications via mpsc. struct NotifyForwarder { tx: std::sync::mpsc::Sender, } impl channel_client::Server for NotifyForwarder { fn notify( &mut self, params: channel_client::NotifyParams, _results: channel_client::NotifyResults, ) -> Promise<(), capnp::Error> { if let Ok(params) = params.get() { if let Ok(notifications) = params.get_notifications() { for n in notifications.iter() { let _ = self.tx.send(ChannelNotification { channel: n.get_channel() .ok() .and_then(|c| c.to_str().ok()) .unwrap_or("") .to_string(), urgency: n.get_urgency(), preview: n.get_preview() .ok() .and_then(|p| p.to_str().ok()) .unwrap_or("") .to_string(), count: n.get_count(), }); } } } Promise::ok(()) } } /// Subscribe to all channel daemons. Runs on a background thread /// with its own LocalSet (capnp uses Rc). Notifications forwarded /// via the returned receiver. pub fn subscribe_all() -> std::sync::mpsc::Receiver { let (tx, rx) = std::sync::mpsc::channel(); std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); let mut sup = super::supervisor::Supervisor::new(); sup.load_config(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); let tx = tx.clone(); tokio::task::spawn_local(async move { if let Err(e) = subscribe_one_daemon(&sock, tx).await { warn!("subscribe to {} failed: {}", sock.display(), e); } }); } // Keep the LocalSet alive — daemon connections need it loop { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; } }); }); rx } async fn subscribe_one_daemon( sock: &std::path::Path, tx: std::sync::mpsc::Sender, ) -> Result<(), Box> { let stream = UnixStream::connect(sock).await?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let forwarder = NotifyForwarder { tx }; let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder); let mut rpc_system = RpcSystem::new(rpc_network, None); let server: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); // Subscribe with our callback let mut req = server.subscribe_request(); req.get().set_callback(callback_client); req.send().promise.await?; info!("subscribed to {}", sock.display()); // Keep connection alive loop { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; } } // ── One-shot queries ──────────────────────────────────────────── /// One-shot query: connect to a daemon socket, call list(), return results. async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> { let stream = match UnixStream::connect(sock).await { Ok(s) => s, Err(_) => return Vec::new(), }; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let rpc_handle = tokio::task::spawn_local(async move { let _ = rpc_system.await; }); let mut result = Vec::new(); if let Ok(reply) = client.list_request().send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } rpc_handle.abort(); result } /// Fetch channel status from all daemon sockets. /// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) }) .await .unwrap_or_default() } /// Blocking version for use from synchronous contexts (MCP server, etc.). pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) } async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); let mut sup = super::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); // restart any dead daemons let mut result = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { result.push((daemon_name, false, 0)); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); let channels = query_one_daemon(&sock).await; if channels.is_empty() { // Daemon is running but has no channels yet (e.g. telegram with no messages) result.push((daemon_name, true, 0)); } else { result.extend(channels); } } result }