diff --git a/src/bin/consciousness.rs b/src/bin/consciousness.rs index 2fde2c7..aa89115 100644 --- a/src/bin/consciousness.rs +++ b/src/bin/consciousness.rs @@ -825,6 +825,11 @@ async fn run(cli: cli::CliArgs) -> Result<()> { }); } + // Subscribe to channel daemon notifications + let notify_rx = poc_memory::thalamus::channels::subscribe_all(); + let mut pending_notifications: Vec = Vec::new(); + + // Create UI channel let (ui_tx, mut ui_rx) = ui_channel::channel(); @@ -999,6 +1004,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> { // Update idle state for F5 screen idle_state.decay_ewma(); app.update_idle(&idle_state); + + // Drain channel notifications into thalamus pending list + while let Ok(notif) = notify_rx.try_recv() { + pending_notifications.push(notif); + // Refresh channel list when notifications arrive + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = poc_memory::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } } // DMN timer (only when no turn is running) diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 97e001c..13c5919 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -15,7 +15,8 @@ use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info, warn}; -use crate::channel_capnp::channel_server; +use crate::channel_capnp::{channel_client, channel_server}; +use capnp::capability::Promise; /// A live connection to a channel daemon. struct DaemonConnection { @@ -177,8 +178,133 @@ impl ChannelManager { } } +// ── Channel notifications ─────────────────────────────────────── + +/// A notification received from a channel daemon. +#[derive(Debug, Clone)] +pub struct ChannelNotification { + pub channel: String, + pub urgency: u8, + pub preview: String, + pub count: u32, +} + +/// Callback implementation that forwards notifications via mpsc. +struct NotifyForwarder { + tx: std::sync::mpsc::Sender, +} + +impl channel_client::Server for NotifyForwarder { + fn notify( + &mut self, + params: channel_client::NotifyParams, + _results: channel_client::NotifyResults, + ) -> Promise<(), capnp::Error> { + if let Ok(params) = params.get() { + if let Ok(notifications) = params.get_notifications() { + for n in notifications.iter() { + let _ = self.tx.send(ChannelNotification { + channel: n.get_channel() + .ok() + .and_then(|c| c.to_str().ok()) + .unwrap_or("") + .to_string(), + urgency: n.get_urgency(), + preview: n.get_preview() + .ok() + .and_then(|p| p.to_str().ok()) + .unwrap_or("") + .to_string(), + count: n.get_count(), + }); + } + } + } + Promise::ok(()) + } +} + +/// Subscribe to all channel daemons. Runs on a background thread +/// with its own LocalSet (capnp uses Rc). Notifications forwarded +/// via the returned receiver. +pub fn subscribe_all() -> std::sync::mpsc::Receiver { + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let channels_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + + let mut sup = super::supervisor::Supervisor::new(); + sup.load_config(); + + for (daemon_name, _enabled, alive) in sup.status() { + if !alive { continue; } + + let sock = channels_dir.join(format!("{}.sock", daemon_name)); + let tx = tx.clone(); + + tokio::task::spawn_local(async move { + if let Err(e) = subscribe_one_daemon(&sock, tx).await { + warn!("subscribe to {} failed: {}", sock.display(), e); + } + }); + } + + // Keep the LocalSet alive — daemon connections need it + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } + }); + }); + + rx +} + +async fn subscribe_one_daemon( + sock: &std::path::Path, + tx: std::sync::mpsc::Sender, +) -> Result<(), Box> { + let stream = UnixStream::connect(sock).await?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + + let forwarder = NotifyForwarder { tx }; + let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder); + + let mut rpc_system = RpcSystem::new(rpc_network, None); + let server: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + tokio::task::spawn_local(rpc_system); + + // Subscribe with our callback + let mut req = server.subscribe_request(); + req.get().set_callback(callback_client); + req.send().promise.await?; + + info!("subscribed to {}", sock.display()); + + // Keep connection alive + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } +} + +// ── One-shot queries ──────────────────────────────────────────── + /// One-shot query: connect to a daemon socket, call list(), return results. -/// Safe to call from any async context (no LocalSet needed). async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> { let stream = match UnixStream::connect(sock).await { Ok(s) => s,