consciousness/src/agent/tools/channels.rs
ProofOfConcept 4fc9676545 channels: parallel queries with timeout per daemon
One misbehaving channel daemon (accepting connections but not
responding to capnp RPCs) would block channel_list indefinitely.

Spawn each daemon query as a separate task with a 3-second timeout.
A hung daemon now shows as disconnected instead of hanging the
entire tool call.

Co-Authored-By: Kent Overstreet <kent.overstreet@gmail.com>
2026-04-11 00:45:01 -04:00

358 lines
14 KiB
Rust

use std::sync::Arc;
// tools/channels.rs — Channel tools (list, recv, send, notifications)
//
// Shared by consciousness agent and the MCP server.
// One-shot capnp RPC calls to channel daemon sockets.
use anyhow::{Context, Result};
use serde::Deserialize;
use super::Tool;
// ── Tool registry ──────────────────────────────────────────────
pub fn tools() -> [Tool; 6] {
[
Tool { name: "channel_list",
description: "List all available channels and their status (connected, unread count).",
parameters_json: r#"{"type":"object","properties":{}}"#,
handler: Arc::new(|_a, _v| Box::pin(async { channel_list().await })) },
Tool { name: "channel_recv",
description: "Read messages from a channel.",
parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. irc.#bcachefs, telegram.kent)"},"all_new":{"type":"boolean","description":"If true, return all unconsumed messages","default":true},"min_count":{"type":"integer","description":"Minimum number of lines to return","default":20}},"required":["channel"]}"#,
handler: Arc::new(|_a, v| Box::pin(async move { channel_recv(&v).await })) },
Tool { name: "channel_send",
description: "Send a message to a channel.",
parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"},"message":{"type":"string","description":"Message to send"}},"required":["channel","message"]}"#,
handler: Arc::new(|_a, v| Box::pin(async move { channel_send(&v).await })) },
Tool { name: "channel_notifications",
description: "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.",
parameters_json: r#"{"type":"object","properties":{}}"#,
handler: Arc::new(|_a, _v| Box::pin(async { channel_notifications().await })) },
Tool { name: "channel_open",
description: "Open a channel — start monitoring. For tmux: finds the pane by name and attaches pipe-pane.",
parameters_json: r#"{"type":"object","properties":{"label":{"type":"string","description":"Channel label / tmux pane name"}},"required":["label"]}"#,
handler: Arc::new(|_a, v| Box::pin(async move { channel_open(&v).await })) },
Tool { name: "channel_close",
description: "Close a channel — stop monitoring and clean up.",
parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. tmux.ktest)"}},"required":["channel"]}"#,
handler: Arc::new(|_a, v| Box::pin(async move { channel_close(&v).await })) },
]
}
// ── Tool implementations ───────────────────────────────────────
async fn channel_list() -> Result<String> {
let result = fetch_all_channels().await;
if result.is_empty() {
return Ok("No channels configured.".into());
}
Ok(result.iter().map(|(name, connected, unread)| {
let status = if *connected { "connected" } else { "disconnected" };
let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() };
format!("{}{}{}", name, status, unread_str)
}).collect::<Vec<_>>().join("\n"))
}
#[derive(Deserialize)]
struct RecvArgs {
channel: String,
#[serde(default = "default_true")]
all_new: bool,
#[serde(default = "default_min_count")]
min_count: u32,
}
fn default_true() -> bool { true }
fn default_min_count() -> u32 { 20 }
async fn channel_recv(args: &serde_json::Value) -> Result<String> {
let a: RecvArgs = serde_json::from_value(args.clone())
.context("invalid channel_recv arguments")?;
let (sock, _) = find_daemon(&a.channel)?;
let channel = a.channel;
let all_new = a.all_new;
let min_count = a.min_count;
// capnp-rpc needs LocalSet — bridge to it via spawn_blocking
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_recv(&sock, &channel, all_new, min_count))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
#[derive(Deserialize)]
struct SendArgs {
channel: String,
message: String,
}
async fn channel_send(args: &serde_json::Value) -> Result<String> {
let a: SendArgs = serde_json::from_value(args.clone())
.context("invalid channel_send arguments")?;
let (sock, _) = find_daemon(&a.channel)?;
let channel = a.channel;
let message = a.message;
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_send(&sock, &channel, &message))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
async fn channel_notifications() -> Result<String> {
let result = fetch_all_channels().await;
let unread: Vec<_> = result.iter().filter(|(_, _, u)| *u > 0).collect();
if unread.is_empty() {
Ok("No pending notifications.".into())
} else {
Ok(unread.iter()
.map(|(name, _, count)| format!("{}: {} unread", name, count))
.collect::<Vec<_>>()
.join("\n"))
}
}
async fn channel_open(args: &serde_json::Value) -> Result<String> {
let label = args.get("label").and_then(|v| v.as_str())
.context("label is required")?
.to_string();
let (sock, sublabel) = find_daemon(&label)?;
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_open(&sock, &sublabel))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
async fn channel_close(args: &serde_json::Value) -> Result<String> {
let channel = args.get("channel").and_then(|v| v.as_str())
.context("channel is required")?
.to_string();
let (sock, _) = find_daemon(&channel)?;
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_close(&sock, &channel))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
// ── Daemon resolution ─────────────────────────────────────────
fn channels_dir() -> std::path::PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels")
}
/// Resolve a channel path to a daemon socket.
///
/// Walks the dot-delimited path from most-specific to least,
/// looking for a daemon socket at each level:
/// "tmux.ktest" → finds tmux.sock, returns ("tmux.sock", "ktest")
/// "irc.libera.#bcachefs" → finds irc.sock, returns ("irc.sock", "libera.#bcachefs")
///
/// If no daemon is running, tries to start one via the supervisor.
fn find_daemon(path: &str) -> Result<(std::path::PathBuf, String)> {
let dir = channels_dir();
// Returns the sub-path after the matched prefix
let rest_after = |prefix: &str| -> String {
if prefix.len() < path.len() {
path[prefix.len() + 1..].to_string()
} else {
String::new()
}
};
// Walk from most-specific to least, looking for a socket
let mut prefix = path;
loop {
let sock = dir.join(format!("{}.sock", prefix));
if sock.exists() {
return Ok((sock, rest_after(prefix)));
}
match prefix.rfind('.') {
Some(pos) => prefix = &prefix[..pos],
None => break,
}
}
// No running daemon found — register and start via supervisor
let top = path.split('.').next().unwrap_or(path);
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
if !sup.has_daemon(top) {
sup.add_daemon(top, crate::thalamus::supervisor::ChannelEntry {
binary: format!("consciousness-channel-{}", top),
enabled: true,
autostart: true,
});
}
sup.ensure_running();
// Wait for socket (up to 3 seconds)
let sock = dir.join(format!("{}.sock", top));
for _ in 0..30 {
if sock.exists() {
return Ok((sock, rest_after(top)));
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
anyhow::bail!("no daemon for channel path: {}", path)
}
// ── Channel RPC ────────────────────────────────────────────────
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::channel_capnp::channel_server;
async fn rpc_connect(sock: &std::path::Path) -> Result<channel_server::Client, String> {
let stream = tokio::net::UnixStream::connect(sock).await
.map_err(|e| format!("connect failed: {e}"))?;
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let client: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
Ok(client)
}
async fn rpc_recv(
sock: &std::path::Path, channel: &str, all_new: bool, min_count: u32,
) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.recv_request();
req.get().set_channel(channel);
req.get().set_all_new(all_new);
req.get().set_min_count(min_count);
let reply = req.send().promise.await.map_err(|e| format!("recv failed: {e}"))?;
let text = reply.get().map_err(|e| format!("{e}"))?
.get_text().map_err(|e| format!("{e}"))?
.to_str().map_err(|e| format!("{e}"))?;
if text.is_empty() { Ok("(no messages)".into()) } else { Ok(text.to_string()) }
}
async fn rpc_send(
sock: &std::path::Path, channel: &str, message: &str,
) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.send_request();
req.get().set_channel(channel);
req.get().set_message(message);
req.send().promise.await.map_err(|e| format!("send failed: {e}"))?;
Ok(format!("sent to {}", channel))
}
async fn rpc_list(sock: &std::path::Path) -> Option<Vec<(String, bool, u32)>> {
let client = rpc_connect(sock).await.ok()?;
let mut result = Vec::new();
if let Ok(reply) = client.list_request().send().promise.await {
if let Ok(r) = reply.get() {
if let Ok(channels) = r.get_channels() {
for ch in channels.iter() {
if let Ok(name) = ch.get_name() {
result.push((
name.to_str().unwrap_or("").to_string(),
ch.get_connected(),
ch.get_unread(),
));
}
}
}
}
}
Some(result)
}
async fn rpc_open(sock: &std::path::Path, label: &str) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.open_request();
req.get().set_label(label);
req.send().promise.await.map_err(|e| format!("open failed: {e}"))?;
Ok(format!("opened channel tmux.{}", label))
}
async fn rpc_close(sock: &std::path::Path, channel: &str) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.close_request();
req.get().set_channel(channel);
req.send().promise.await.map_err(|e| format!("close failed: {e}"))?;
Ok(format!("closed channel {}", channel))
}
// ── Fetch all channels ─────────────────────────────────────────
/// Fetch channel status from all daemon sockets.
/// Runs on a dedicated thread with LocalSet because capnp-rpc
/// uses Rc (not Send). This is the boundary between the
/// multi-thread runtime and the capnp world.
pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> {
tokio::task::spawn_blocking(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, fetch_all_channels_inner())
})
.await
.unwrap_or_default()
}
async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> {
let channels_dir = channels_dir();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
sup.ensure_running();
let mut futs = Vec::new();
for (daemon_name, _enabled, alive) in sup.status() {
if !alive {
futs.push(tokio::task::spawn_local({
let name = daemon_name.clone();
async move { vec![(name, false, 0u32)] }
}));
continue;
}
let sock = channels_dir.join(format!("{}.sock", daemon_name));
futs.push(tokio::task::spawn_local({
let name = daemon_name.clone();
async move {
match tokio::time::timeout(
std::time::Duration::from_secs(3),
rpc_list(&sock),
).await {
Ok(Some(channels)) if !channels.is_empty() => channels,
Ok(Some(_)) => vec![(name, true, 0)],
_ => vec![(name, false, 0)],
}
}
}));
}
let mut result = Vec::new();
for fut in futs {
if let Ok(entries) = fut.await {
result.extend(entries);
}
}
result
}