// channels.rs — Channel client for the thalamus // // Discovers channel daemon sockets in ~/.consciousness/channels/, // connects via capnp RPC, and provides send/recv operations. // // Each daemon socket speaks the channel.capnp protocol. The channel // manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use log::{info, warn}; use crate::channel_capnp::{channel_client, channel_server}; use capnp::capability::Promise; // ── Channel notifications ─────────────────────────────────────── /// A notification received from a channel daemon. #[derive(Debug, Clone)] pub struct ChannelNotification { pub channel: String, pub urgency: u8, pub preview: String, pub count: u32, } /// Callback implementation that forwards notifications via mpsc. struct NotifyForwarder { tx: std::sync::mpsc::Sender, } impl channel_client::Server for NotifyForwarder { fn notify( &mut self, params: channel_client::NotifyParams, _results: channel_client::NotifyResults, ) -> Promise<(), capnp::Error> { if let Ok(params) = params.get() { if let Ok(notifications) = params.get_notifications() { for n in notifications.iter() { let _ = self.tx.send(ChannelNotification { channel: n.get_channel() .ok() .and_then(|c| c.to_str().ok()) .unwrap_or("") .to_string(), urgency: n.get_urgency(), preview: n.get_preview() .ok() .and_then(|p| p.to_str().ok()) .unwrap_or("") .to_string(), count: n.get_count(), }); } } } Promise::ok(()) } } /// Subscribe to all channel daemons. Runs on a background thread /// with its own LocalSet (capnp uses Rc). Notifications forwarded /// via the returned receiver. pub fn subscribe_all() -> std::sync::mpsc::Receiver { let (tx, rx) = std::sync::mpsc::channel(); std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); let mut sup = super::supervisor::Supervisor::new(); sup.load_config(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); let tx = tx.clone(); tokio::task::spawn_local(async move { if let Err(e) = subscribe_one_daemon(&sock, tx).await { warn!("subscribe to {} failed: {}", sock.display(), e); } }); } // Keep the LocalSet alive — daemon connections need it loop { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; } }); }); rx } async fn subscribe_one_daemon( sock: &std::path::Path, tx: std::sync::mpsc::Sender, ) -> Result<(), Box> { let stream = UnixStream::connect(sock).await?; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let forwarder = NotifyForwarder { tx }; let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder); let mut rpc_system = RpcSystem::new(rpc_network, None); let server: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); // Subscribe with our callback let mut req = server.subscribe_request(); req.get().set_callback(callback_client); req.send().promise.await?; info!("subscribed to {}", sock.display()); // Keep connection alive loop { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; } } // ── One-shot queries ──────────────────────────────────────────── /// One-shot query: connect to a daemon socket, call list(), return results. /// Returns None if connection failed, Some(vec) if connected (possibly empty). async fn query_one_daemon(sock: &std::path::Path) -> Option> { let stream = match UnixStream::connect(sock).await { Ok(s) => s, Err(_) => return None, }; let (reader, writer) = stream.compat().split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let client: channel_server::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let rpc_handle = tokio::task::spawn_local(async move { let _ = rpc_system.await; }); let mut result = Vec::new(); if let Ok(reply) = client.list_request().send().promise.await { if let Ok(r) = reply.get() { if let Ok(channels) = r.get_channels() { for ch in channels.iter() { if let Ok(name) = ch.get_name() { result.push(( name.to_str().unwrap_or("").to_string(), ch.get_connected(), ch.get_unread(), )); } } } } } rpc_handle.abort(); Some(result) } /// Fetch channel status from all daemon sockets. /// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, fetch_all_channels_inner()) }) .await .unwrap_or_default() } async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); let mut sup = super::supervisor::Supervisor::new(); sup.load_config(); sup.ensure_running(); // restart any dead daemons let mut result = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { result.push((daemon_name, false, 0)); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); match query_one_daemon(&sock).await { None => { // Connection failed despite socket existing result.push((daemon_name, false, 0)); } Some(channels) if channels.is_empty() => { // Connected but no channels yet result.push((daemon_name, true, 0)); } Some(channels) => { result.extend(channels); } } } result }