// channels.rs — Channel client for the thalamus // // Discovers channel daemon sockets in ~/.consciousness/channels/, // connects via capnp RPC, and provides send/recv operations. // // Each daemon socket speaks the channel.capnp protocol. The channel // manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. use std::collections::HashMap; use std::path::PathBuf; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info, warn}; use crate::channel_capnp::channel_server; /// A live connection to a channel daemon. struct DaemonConnection { #[allow(dead_code)] prefix: String, client: channel_server::Client, // Hold the RPC system task so it doesn't get dropped _rpc_task: tokio::task::JoinHandle>, } /// Manages all channel daemon connections. pub struct ChannelManager { daemons: HashMap, channels_dir: PathBuf, } impl ChannelManager { pub fn new() -> Self { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); Self { daemons: HashMap::new(), channels_dir, } } /// Connect to a daemon socket, returning the capnp client. async fn connect(path: &std::path::Path) -> Result< (channel_server::Client, tokio::task::JoinHandle>), Box, > { let stream = UnixStream::connect(path).await?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let task = tokio::task::spawn_local(rpc_system); Ok((client, task)) } /// Scan the channels directory for daemon sockets and connect. pub async fn discover(&mut self) { let dir = match std::fs::read_dir(&self.channels_dir) { Ok(d) => d, Err(_) => return, // directory doesn't exist yet }; for entry in dir.flatten() { let path = entry.path(); if path.extension().map_or(false, |e| e == "sock") { let prefix = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(); if self.daemons.contains_key(&prefix) { continue; } match Self::connect(&path).await { Ok((client, task)) => { info!("connected to channel daemon: {}", prefix); self.daemons.insert( prefix.clone(), DaemonConnection { prefix, client, _rpc_task: task, }, ); } Err(e) => { warn!("failed to connect to {}: {}", path.display(), e); } } } } } /// Find the daemon client for a channel path. fn client_for(&self, channel: &str) -> Option<&channel_server::Client> { let prefix = channel.split('.').next()?; self.daemons.get(prefix).map(|d| &d.client) } /// Send a message to a channel. pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); req.send().promise.await .map_err(|e| format!("send failed: {}", e))?; Ok(()) } /// Read from a channel. pub async fn recv( &self, channel: &str, all_new: bool, min_count: u32, ) -> Result { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); let reply = req.send().promise.await .map_err(|e| format!("recv failed: {}", e))?; let text = reply.get() .map_err(|e| format!("recv reply error: {}", e))? .get_text() .map_err(|e| format!("recv text error: {}", e))?; Ok(text.to_str().unwrap_or("").to_string()) } /// List connected daemon prefixes. pub fn prefixes(&self) -> Vec<&str> { self.daemons.keys().map(|s| s.as_str()).collect() } /// List all channels from all connected daemons. pub async fn list_all(&self) -> Vec<(String, bool, u32)> { let mut result = Vec::new(); for daemon in self.daemons.values() { let req = daemon.client.list_request(); if let Ok(reply) = req.send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } } result } } /// One-shot query: connect to a daemon socket, call list(), return results. /// Safe to call from any async context (no LocalSet needed). async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> { let stream = match UnixStream::connect(sock).await { Ok(s) => s, Err(_) => return Vec::new(), }; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let rpc_handle = tokio::task::spawn_local(async move { let _ = rpc_system.await; }); let mut result = Vec::new(); if let Ok(reply) = client.list_request().send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } rpc_handle.abort(); result } /// Fetch channel status from all daemon sockets. /// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) }) .await .unwrap_or_default() } async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); let mut sup = super::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); // restart any dead daemons let mut result = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { result.push((daemon_name, false, 0)); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); let channels = query_one_daemon(&sock).await; if channels.is_empty() { result.push((daemon_name, false, 0)); } else { result.extend(channels); } } result }