diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 4eaebfa..14cc489 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -18,166 +18,6 @@ use tracing::{info, warn}; use crate::channel_capnp::{channel_client, channel_server}; use capnp::capability::Promise; -/// A live connection to a channel daemon. -struct DaemonConnection { - #[allow(dead_code)] - prefix: String, - client: channel_server::Client, - // Hold the RPC system task so it doesn't get dropped - _rpc_task: tokio::task::JoinHandle>, -} - -/// Manages all channel daemon connections. -pub struct ChannelManager { - daemons: HashMap, - channels_dir: PathBuf, -} - -impl ChannelManager { - pub fn new() -> Self { - let channels_dir = dirs::home_dir() - .unwrap_or_default() - .join(".consciousness/channels"); - Self { - daemons: HashMap::new(), - channels_dir, - } - } - - /// Connect to a daemon socket, returning the capnp client. - async fn connect(path: &std::path::Path) -> Result< - (channel_server::Client, tokio::task::JoinHandle>), - Box, - > { - let stream = UnixStream::connect(path).await?; - let (reader, writer) = stream.compat().split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - let mut rpc_system = RpcSystem::new(rpc_network, None); - let client: channel_server::Client = - rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - - let task = tokio::task::spawn_local(rpc_system); - Ok((client, task)) - } - - /// Scan the channels directory for daemon sockets and connect. - pub async fn discover(&mut self) { - let dir = match std::fs::read_dir(&self.channels_dir) { - Ok(d) => d, - Err(_) => return, // directory doesn't exist yet - }; - - for entry in dir.flatten() { - let path = entry.path(); - if path.extension().map_or(false, |e| e == "sock") { - let prefix = path - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("") - .to_string(); - - if self.daemons.contains_key(&prefix) { - continue; - } - - match Self::connect(&path).await { - Ok((client, task)) => { - info!("connected to channel daemon: {}", prefix); - self.daemons.insert( - prefix.clone(), - DaemonConnection { - prefix, - client, - _rpc_task: task, - }, - ); - } - Err(e) => { - warn!("failed to connect to {}: {}", path.display(), e); - } - } - } - } - } - - /// Find the daemon client for a channel path. - fn client_for(&self, channel: &str) -> Option<&channel_server::Client> { - let prefix = channel.split('.').next()?; - self.daemons.get(prefix).map(|d| &d.client) - } - - /// Send a message to a channel. - pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> { - let client = self.client_for(channel) - .ok_or_else(|| format!("no daemon for channel: {}", channel))?; - - let mut req = client.send_request(); - req.get().set_channel(channel); - req.get().set_message(message); - req.send().promise.await - .map_err(|e| format!("send failed: {}", e))?; - Ok(()) - } - - /// Read from a channel. - pub async fn recv( - &self, - channel: &str, - all_new: bool, - min_count: u32, - ) -> Result { - let client = self.client_for(channel) - .ok_or_else(|| format!("no daemon for channel: {}", channel))?; - - let mut req = client.recv_request(); - req.get().set_channel(channel); - req.get().set_all_new(all_new); - req.get().set_min_count(min_count); - - let reply = req.send().promise.await - .map_err(|e| format!("recv failed: {}", e))?; - let text = reply.get() - .map_err(|e| format!("recv reply error: {}", e))? - .get_text() - .map_err(|e| format!("recv text error: {}", e))?; - Ok(text.to_str().unwrap_or("").to_string()) - } - - /// List connected daemon prefixes. - pub fn prefixes(&self) -> Vec<&str> { - self.daemons.keys().map(|s| s.as_str()).collect() - } - - /// List all channels from all connected daemons. - pub async fn list_all(&self) -> Vec<(String, bool, u32)> { - let mut result = Vec::new(); - for daemon in self.daemons.values() { - let req = daemon.client.list_request(); - if let Ok(reply) = req.send().promise.await { - if let Ok(r) = reply.get() { - if let Ok(channels) = r.get_channels() { - for ch in channels.iter() { - if let Ok(name) = ch.get_name() { - result.push(( - name.to_str().unwrap_or("").to_string(), - ch.get_connected(), - ch.get_unread(), - )); - } - } - } - } - } - } - result - } -} - // ── Channel notifications ─────────────────────────────────────── /// A notification received from a channel daemon. @@ -362,16 +202,6 @@ pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { .unwrap_or_default() } -/// Blocking version for use from synchronous contexts (MCP server, etc.). -pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let local = tokio::task::LocalSet::new(); - local.block_on(&rt, fetch_all_channels_inner()) -} - async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = dirs::home_dir() .unwrap_or_default()