subscribe to channel daemon notifications

consciousness binary subscribes to all channel daemons on startup.
Notifications forwarded via NotifyForwarder callback through mpsc.
Pending notifications stored for thalamus agent consumption.
Channel list refreshed automatically when notifications arrive.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-04-03 20:14:22 -04:00
parent 7d1637a2f0
commit b24e8e87a2
2 changed files with 144 additions and 2 deletions

View file

@ -825,6 +825,11 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
});
}
// Subscribe to channel daemon notifications
let notify_rx = poc_memory::thalamus::channels::subscribe_all();
let mut pending_notifications: Vec<poc_memory::thalamus::channels::ChannelNotification> = Vec::new();
// Create UI channel
let (ui_tx, mut ui_rx) = ui_channel::channel();
@ -999,6 +1004,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
// Update idle state for F5 screen
idle_state.decay_ewma();
app.update_idle(&idle_state);
// Drain channel notifications into thalamus pending list
while let Ok(notif) = notify_rx.try_recv() {
pending_notifications.push(notif);
// Refresh channel list when notifications arrive
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = poc_memory::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
}
// DMN timer (only when no turn is running)

View file

@ -15,7 +15,8 @@ use tokio::net::UnixStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::{info, warn};
use crate::channel_capnp::channel_server;
use crate::channel_capnp::{channel_client, channel_server};
use capnp::capability::Promise;
/// A live connection to a channel daemon.
struct DaemonConnection {
@ -177,8 +178,133 @@ impl ChannelManager {
}
}
// ── Channel notifications ───────────────────────────────────────
/// A notification received from a channel daemon.
#[derive(Debug, Clone)]
pub struct ChannelNotification {
pub channel: String,
pub urgency: u8,
pub preview: String,
pub count: u32,
}
/// Callback implementation that forwards notifications via mpsc.
struct NotifyForwarder {
tx: std::sync::mpsc::Sender<ChannelNotification>,
}
impl channel_client::Server for NotifyForwarder {
fn notify(
&mut self,
params: channel_client::NotifyParams,
_results: channel_client::NotifyResults,
) -> Promise<(), capnp::Error> {
if let Ok(params) = params.get() {
if let Ok(notifications) = params.get_notifications() {
for n in notifications.iter() {
let _ = self.tx.send(ChannelNotification {
channel: n.get_channel()
.ok()
.and_then(|c| c.to_str().ok())
.unwrap_or("")
.to_string(),
urgency: n.get_urgency(),
preview: n.get_preview()
.ok()
.and_then(|p| p.to_str().ok())
.unwrap_or("")
.to_string(),
count: n.get_count(),
});
}
}
}
Promise::ok(())
}
}
/// Subscribe to all channel daemons. Runs on a background thread
/// with its own LocalSet (capnp uses Rc). Notifications forwarded
/// via the returned receiver.
pub fn subscribe_all() -> std::sync::mpsc::Receiver<ChannelNotification> {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let channels_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
let mut sup = super::supervisor::Supervisor::new();
sup.load_config();
for (daemon_name, _enabled, alive) in sup.status() {
if !alive { continue; }
let sock = channels_dir.join(format!("{}.sock", daemon_name));
let tx = tx.clone();
tokio::task::spawn_local(async move {
if let Err(e) = subscribe_one_daemon(&sock, tx).await {
warn!("subscribe to {} failed: {}", sock.display(), e);
}
});
}
// Keep the LocalSet alive — daemon connections need it
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
});
});
rx
}
async fn subscribe_one_daemon(
sock: &std::path::Path,
tx: std::sync::mpsc::Sender<ChannelNotification>,
) -> Result<(), Box<dyn std::error::Error>> {
let stream = UnixStream::connect(sock).await?;
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let forwarder = NotifyForwarder { tx };
let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder);
let mut rpc_system = RpcSystem::new(rpc_network, None);
let server: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
// Subscribe with our callback
let mut req = server.subscribe_request();
req.get().set_callback(callback_client);
req.send().promise.await?;
info!("subscribed to {}", sock.display());
// Keep connection alive
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
// ── One-shot queries ────────────────────────────────────────────
/// One-shot query: connect to a daemon socket, call list(), return results.
/// Safe to call from any async context (no LocalSet needed).
async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> {
let stream = match UnixStream::connect(sock).await {
Ok(s) => s,