From b24e8e87a2c39228c0c3a7c8f1835c9026ad9f3f Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 3 Apr 2026 20:14:22 -0400 Subject: [PATCH] subscribe to channel daemon notifications consciousness binary subscribes to all channel daemons on startup. Notifications forwarded via NotifyForwarder callback through mpsc. Pending notifications stored for thalamus agent consumption. Channel list refreshed automatically when notifications arrive. Co-Developed-By: Kent Overstreet --- src/bin/consciousness.rs | 16 +++++ src/thalamus/channels.rs | 130 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/bin/consciousness.rs b/src/bin/consciousness.rs index 2fde2c7..aa89115 100644 --- a/src/bin/consciousness.rs +++ b/src/bin/consciousness.rs @@ -825,6 +825,11 @@ async fn run(cli: cli::CliArgs) -> Result<()> { }); } + // Subscribe to channel daemon notifications + let notify_rx = poc_memory::thalamus::channels::subscribe_all(); + let mut pending_notifications: Vec = Vec::new(); + + // Create UI channel let (ui_tx, mut ui_rx) = ui_channel::channel(); @@ -999,6 +1004,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> { // Update idle state for F5 screen idle_state.decay_ewma(); app.update_idle(&idle_state); + + // Drain channel notifications into thalamus pending list + while let Ok(notif) = notify_rx.try_recv() { + pending_notifications.push(notif); + // Refresh channel list when notifications arrive + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = poc_memory::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } } // DMN timer (only when no turn is running) diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 97e001c..13c5919 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -15,7 +15,8 @@ use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info, warn}; -use crate::channel_capnp::channel_server; +use crate::channel_capnp::{channel_client, channel_server}; +use capnp::capability::Promise; /// A live connection to a channel daemon. struct DaemonConnection { @@ -177,8 +178,133 @@ impl ChannelManager { } } +// ── Channel notifications ─────────────────────────────────────── + +/// A notification received from a channel daemon. +#[derive(Debug, Clone)] +pub struct ChannelNotification { + pub channel: String, + pub urgency: u8, + pub preview: String, + pub count: u32, +} + +/// Callback implementation that forwards notifications via mpsc. +struct NotifyForwarder { + tx: std::sync::mpsc::Sender, +} + +impl channel_client::Server for NotifyForwarder { + fn notify( + &mut self, + params: channel_client::NotifyParams, + _results: channel_client::NotifyResults, + ) -> Promise<(), capnp::Error> { + if let Ok(params) = params.get() { + if let Ok(notifications) = params.get_notifications() { + for n in notifications.iter() { + let _ = self.tx.send(ChannelNotification { + channel: n.get_channel() + .ok() + .and_then(|c| c.to_str().ok()) + .unwrap_or("") + .to_string(), + urgency: n.get_urgency(), + preview: n.get_preview() + .ok() + .and_then(|p| p.to_str().ok()) + .unwrap_or("") + .to_string(), + count: n.get_count(), + }); + } + } + } + Promise::ok(()) + } +} + +/// Subscribe to all channel daemons. Runs on a background thread +/// with its own LocalSet (capnp uses Rc). Notifications forwarded +/// via the returned receiver. +pub fn subscribe_all() -> std::sync::mpsc::Receiver { + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let channels_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + + let mut sup = super::supervisor::Supervisor::new(); + sup.load_config(); + + for (daemon_name, _enabled, alive) in sup.status() { + if !alive { continue; } + + let sock = channels_dir.join(format!("{}.sock", daemon_name)); + let tx = tx.clone(); + + tokio::task::spawn_local(async move { + if let Err(e) = subscribe_one_daemon(&sock, tx).await { + warn!("subscribe to {} failed: {}", sock.display(), e); + } + }); + } + + // Keep the LocalSet alive — daemon connections need it + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } + }); + }); + + rx +} + +async fn subscribe_one_daemon( + sock: &std::path::Path, + tx: std::sync::mpsc::Sender, +) -> Result<(), Box> { + let stream = UnixStream::connect(sock).await?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + + let forwarder = NotifyForwarder { tx }; + let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder); + + let mut rpc_system = RpcSystem::new(rpc_network, None); + let server: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + tokio::task::spawn_local(rpc_system); + + // Subscribe with our callback + let mut req = server.subscribe_request(); + req.get().set_callback(callback_client); + req.send().promise.await?; + + info!("subscribed to {}", sock.display()); + + // Keep connection alive + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } +} + +// ── One-shot queries ──────────────────────────────────────────── + /// One-shot query: connect to a daemon socket, call list(), return results. -/// Safe to call from any async context (no LocalSet needed). async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> { let stream = match UnixStream::connect(sock).await { Ok(s) => s,