// channels.rs — Channel client for the thalamus // // Discovers channel daemon sockets in ~/.consciousness/channels/, // connects via capnp RPC, and provides send/recv operations. // // Each daemon socket speaks the channel.capnp protocol. The channel // manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. use std::collections::HashMap; use std::path::PathBuf; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info, warn}; use crate::channel_capnp::channel_server; /// A live connection to a channel daemon. struct DaemonConnection { #[allow(dead_code)] prefix: String, client: channel_server::Client, // Hold the RPC system task so it doesn't get dropped _rpc_task: tokio::task::JoinHandle>, } /// Manages all channel daemon connections. pub struct ChannelManager { daemons: HashMap, channels_dir: PathBuf, } impl ChannelManager { pub fn new() -> Self { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); Self { daemons: HashMap::new(), channels_dir, } } /// Connect to a daemon socket, returning the capnp client. async fn connect(path: &std::path::Path) -> Result< (channel_server::Client, tokio::task::JoinHandle>), Box, > { let stream = UnixStream::connect(path).await?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let task = tokio::task::spawn_local(rpc_system); Ok((client, task)) } /// Scan the channels directory for daemon sockets and connect. pub async fn discover(&mut self) { let dir = match std::fs::read_dir(&self.channels_dir) { Ok(d) => d, Err(_) => return, // directory doesn't exist yet }; for entry in dir.flatten() { let path = entry.path(); if path.extension().map_or(false, |e| e == "sock") { let prefix = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(); if self.daemons.contains_key(&prefix) { continue; } match Self::connect(&path).await { Ok((client, task)) => { info!("connected to channel daemon: {}", prefix); self.daemons.insert( prefix.clone(), DaemonConnection { prefix, client, _rpc_task: task, }, ); } Err(e) => { warn!("failed to connect to {}: {}", path.display(), e); } } } } } /// Find the daemon client for a channel path. fn client_for(&self, channel: &str) -> Option<&channel_server::Client> { let prefix = channel.split('.').next()?; self.daemons.get(prefix).map(|d| &d.client) } /// Send a message to a channel. pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); req.send().promise.await .map_err(|e| format!("send failed: {}", e))?; Ok(()) } /// Read from a channel. pub async fn recv( &self, channel: &str, all_new: bool, min_count: u32, ) -> Result { let client = self.client_for(channel) .ok_or_else(|| format!("no daemon for channel: {}", channel))?; let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); let reply = req.send().promise.await .map_err(|e| format!("recv failed: {}", e))?; let text = reply.get() .map_err(|e| format!("recv reply error: {}", e))? .get_text() .map_err(|e| format!("recv text error: {}", e))?; Ok(text.to_str().unwrap_or("").to_string()) } /// List connected daemon prefixes. pub fn prefixes(&self) -> Vec<&str> { self.daemons.keys().map(|s| s.as_str()).collect() } /// List all channels from all connected daemons. pub async fn list_all(&self) -> Vec<(String, bool, u32)> { let mut result = Vec::new(); for daemon in self.daemons.values() { let req = daemon.client.list_request(); if let Ok(reply) = req.send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } } result } }