consciousness/src/thalamus/channels.rs
ProofOfConcept 4fc9676545 channels: parallel queries with timeout per daemon
One misbehaving channel daemon (accepting connections but not
responding to capnp RPCs) would block channel_list indefinitely.

Spawn each daemon query as a separate task with a 3-second timeout.
A hung daemon now shows as disconnected instead of hanging the
entire tool call.

Co-Authored-By: Kent Overstreet <kent.overstreet@gmail.com>
2026-04-11 00:45:01 -04:00

243 lines
8.2 KiB
Rust

// channels.rs — Channel client for the thalamus
//
// Discovers channel daemon sockets in ~/.consciousness/channels/,
// connects via capnp RPC, and provides send/recv operations.
//
// Each daemon socket speaks the channel.capnp protocol. The channel
// manager routes by prefix: "irc.#bcachefs" → connects to irc.sock.
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio::net::UnixStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, warn};
use std::rc::Rc;
use crate::channel_capnp::{channel_client, channel_server};
// ── Channel notifications ───────────────────────────────────────
/// A notification received from a channel daemon.
#[derive(Debug, Clone)]
pub struct ChannelNotification {
pub channel: String,
pub urgency: u8,
pub preview: String,
pub count: u32,
}
/// Callback implementation that forwards notifications via mpsc.
struct NotifyForwarder {
tx: std::sync::mpsc::Sender<ChannelNotification>,
}
impl channel_client::Server for NotifyForwarder {
fn notify(
self: Rc<Self>,
params: channel_client::NotifyParams,
_results: channel_client::NotifyResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
if let Ok(params) = params.get() {
if let Ok(notifications) = params.get_notifications() {
for n in notifications.iter() {
let _ = self.tx.send(ChannelNotification {
channel: n.get_channel()
.ok()
.and_then(|c| c.to_str().ok())
.unwrap_or("")
.to_string(),
urgency: n.get_urgency(),
preview: n.get_preview()
.ok()
.and_then(|p| p.to_str().ok())
.unwrap_or("")
.to_string(),
count: n.get_count(),
});
}
}
}
std::future::ready(Ok(()))
}
}
/// Subscribe to all channel daemons. Runs on a background thread
/// with its own LocalSet (capnp uses Rc). Notifications forwarded
/// via the returned receiver.
pub fn subscribe_all() -> std::sync::mpsc::Receiver<ChannelNotification> {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let channels_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
let mut sup = super::supervisor::Supervisor::new();
sup.load_config();
for (daemon_name, _enabled, alive) in sup.status() {
if !alive { continue; }
let sock = channels_dir.join(format!("{}.sock", daemon_name));
let tx = tx.clone();
tokio::task::spawn_local(async move {
if let Err(e) = subscribe_one_daemon(&sock, tx).await {
warn!("subscribe to {} failed: {}", sock.display(), e);
}
});
}
// Keep the LocalSet alive — daemon connections need it
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
});
});
rx
}
async fn subscribe_one_daemon(
sock: &std::path::Path,
tx: std::sync::mpsc::Sender<ChannelNotification>,
) -> Result<(), Box<dyn std::error::Error>> {
let stream = UnixStream::connect(sock).await?;
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let forwarder = NotifyForwarder { tx };
let callback_client: channel_client::Client = capnp_rpc::new_client(forwarder);
let mut rpc_system = RpcSystem::new(rpc_network, None);
let server: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
// Subscribe with our callback
let mut req = server.subscribe_request();
req.get().set_callback(callback_client);
req.send().promise.await?;
info!("subscribed to {}", sock.display());
// Keep connection alive
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
// ── One-shot queries ────────────────────────────────────────────
/// One-shot query: connect to a daemon socket, call list(), return results.
/// Returns None if connection failed, Some(vec) if connected (possibly empty).
async fn query_one_daemon(sock: &std::path::Path) -> Option<Vec<(String, bool, u32)>> {
let stream = match UnixStream::connect(sock).await {
Ok(s) => s,
Err(_) => return None,
};
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let client: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
let rpc_handle = tokio::task::spawn_local(async move {
let _ = rpc_system.await;
});
let mut result = Vec::new();
if let Ok(reply) = client.list_request().send().promise.await {
if let Ok(r) = reply.get() {
if let Ok(channels) = r.get_channels() {
for ch in channels.iter() {
if let Ok(name) = ch.get_name() {
result.push((
name.to_str().unwrap_or("").to_string(),
ch.get_connected(),
ch.get_unread(),
));
}
}
}
}
}
rpc_handle.abort();
Some(result)
}
/// Fetch channel status from all daemon sockets.
/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send).
pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> {
tokio::task::spawn_blocking(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, fetch_all_channels_inner())
})
.await
.unwrap_or_default()
}
async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> {
let channels_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
let mut sup = super::supervisor::Supervisor::new();
sup.load_config();
sup.ensure_running(); // restart any dead daemons
let mut futs = Vec::new();
for (daemon_name, _enabled, alive) in sup.status() {
if !alive {
futs.push(tokio::task::spawn_local({
let name = daemon_name.clone();
async move { vec![(name, false, 0u32)] }
}));
continue;
}
let sock = channels_dir.join(format!("{}.sock", daemon_name));
futs.push(tokio::task::spawn_local({
let name = daemon_name.clone();
async move {
match tokio::time::timeout(
std::time::Duration::from_secs(3),
query_one_daemon(&sock),
).await {
Ok(Some(channels)) if !channels.is_empty() => channels,
Ok(Some(_)) => vec![(name, true, 0)],
_ => vec![(name, false, 0)],
}
}
}));
}
let mut result = Vec::new();
for fut in futs {
if let Ok(entries) = fut.await {
result.extend(entries);
}
}
result
}