tools/channels: clean up, inline defs, remove blocking wrappers

- Inline tool definitions into tools() — no separate definitions()
- Remove dispatch() and dispatch_blocking()
- Remove rpc_blocking helper
- channel_recv/send use spawn_blocking for capnp LocalSet bridge
  (same pattern as fetch_all_channels)
- All tool functions private — only tools() is exported
- fetch_all_channels remains pub (used by thalamus screen)

TODO: mind/mod.rs still references thalamus::channels::fetch_all_channels,
should switch to tools::channels::fetch_all_channels.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-04 15:26:45 -04:00 committed by Kent Overstreet
parent 6d6da07f91
commit fdb8c989f5

View file

@ -7,78 +7,58 @@ use anyhow::{Context, Result};
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
use super::ToolDef; use super::{Tool, ToolDef};
// ── Definitions ──────────────────────────────────────────────── // ── Tool registry ──────────────────────────────────────────────
pub fn definitions() -> Vec<ToolDef> { pub fn tools() -> Vec<Tool> {
vec![ vec![
ToolDef::new( Tool {
"channel_list", def: ToolDef::new("channel_list",
"List all available channels and their status (connected, unread count).", "List all available channels and their status (connected, unread count).",
json!({"type": "object", "properties": {}}), json!({"type": "object", "properties": {}})),
), handler: |_a, _v| Box::pin(async { channel_list().await }),
ToolDef::new( },
"channel_recv", Tool {
def: ToolDef::new("channel_recv",
"Read messages from a channel.", "Read messages from a channel.",
json!({ json!({"type": "object", "properties": {
"type": "object",
"properties": {
"channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"}, "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"},
"all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true},
"min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20}
}, "required": ["channel"]})),
handler: |_a, v| Box::pin(async move { channel_recv(&v).await }),
}, },
"required": ["channel"] Tool {
}), def: ToolDef::new("channel_send",
),
ToolDef::new(
"channel_send",
"Send a message to a channel.", "Send a message to a channel.",
json!({ json!({"type": "object", "properties": {
"type": "object",
"properties": {
"channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"}, "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"},
"message": {"type": "string", "description": "Message to send"} "message": {"type": "string", "description": "Message to send"}
}, "required": ["channel", "message"]})),
handler: |_a, v| Box::pin(async move { channel_send(&v).await }),
}, },
"required": ["channel", "message"] Tool {
}), def: ToolDef::new("channel_notifications",
),
ToolDef::new(
"channel_notifications",
"Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.",
json!({"type": "object", "properties": {}}), json!({"type": "object", "properties": {}})),
), handler: |_a, _v| Box::pin(async { channel_notifications().await }),
] },
}
pub fn tools() -> Vec<super::Tool> {
use super::Tool;
let defs = definitions();
vec![
Tool { def: defs[0].clone(), handler: |_a, _v| Box::pin(async move { channel_list().await }) },
Tool { def: defs[1].clone(), handler: |_a, v| Box::pin(async move { channel_recv(&v).await }) },
Tool { def: defs[2].clone(), handler: |_a, v| Box::pin(async move { channel_send(&v).await }) },
Tool { def: defs[3].clone(), handler: |_a, _v| Box::pin(async move { channel_notifications().await }) },
] ]
} }
// ── Tool implementations ─────────────────────────────────────── // ── Tool implementations ───────────────────────────────────────
pub async fn channel_list() -> Result<String> { async fn channel_list() -> Result<String> {
let result = fetch_all_channels().await; let result = fetch_all_channels().await;
Ok(format_channel_list(&result)) if result.is_empty() {
return Ok("No channels configured.".into());
} }
Ok(result.iter().map(|(name, connected, unread)| {
fn format_channel_list(channels: &[(String, bool, u32)]) -> String {
if channels.is_empty() {
return "No channels configured.".into();
}
channels.iter().map(|(name, connected, unread)| {
let status = if *connected { "connected" } else { "disconnected" }; let status = if *connected { "connected" } else { "disconnected" };
let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() };
format!("{}{}{}", name, status, unread_str) format!("{}{}{}", name, status, unread_str)
}).collect::<Vec<_>>().join("\n") }).collect::<Vec<_>>().join("\n"))
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -92,17 +72,19 @@ struct RecvArgs {
fn default_true() -> bool { true } fn default_true() -> bool { true }
fn default_min_count() -> u32 { 20 } fn default_min_count() -> u32 { 20 }
pub async fn channel_recv(args: &serde_json::Value) -> Result<String> { async fn channel_recv(args: &serde_json::Value) -> Result<String> {
let a: RecvArgs = serde_json::from_value(args.clone()) let a: RecvArgs = serde_json::from_value(args.clone())
.context("invalid channel_recv arguments")?; .context("invalid channel_recv arguments")?;
let sock = daemon_sock(&a.channel)?; let sock = daemon_sock(&a.channel)?;
let channel = a.channel; let channel = a.channel;
let all_new = a.all_new; let all_new = a.all_new;
let min_count = a.min_count; let min_count = a.min_count;
// capnp-rpc needs LocalSet — bridge to it via spawn_blocking
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
rpc_blocking(|local, rt| { let rt = tokio::runtime::Builder::new_current_thread()
local.block_on(rt, rpc_recv(&sock, &channel, all_new, min_count)) .enable_all().build().unwrap();
}) let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_recv(&sock, &channel, all_new, min_count))
}).await? }).await?
.map_err(|e| anyhow::anyhow!("{}", e)) .map_err(|e| anyhow::anyhow!("{}", e))
} }
@ -113,35 +95,31 @@ struct SendArgs {
message: String, message: String,
} }
pub async fn channel_send(args: &serde_json::Value) -> Result<String> { async fn channel_send(args: &serde_json::Value) -> Result<String> {
let a: SendArgs = serde_json::from_value(args.clone()) let a: SendArgs = serde_json::from_value(args.clone())
.context("invalid channel_send arguments")?; .context("invalid channel_send arguments")?;
let sock = daemon_sock(&a.channel)?; let sock = daemon_sock(&a.channel)?;
let channel = a.channel; let channel = a.channel;
let message = a.message; let message = a.message;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
rpc_blocking(|local, rt| { let rt = tokio::runtime::Builder::new_current_thread()
local.block_on(rt, rpc_send(&sock, &channel, &message)) .enable_all().build().unwrap();
}) let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_send(&sock, &channel, &message))
}).await? }).await?
.map_err(|e| anyhow::anyhow!("{}", e)) .map_err(|e| anyhow::anyhow!("{}", e))
} }
pub async fn channel_notifications() -> Result<String> { async fn channel_notifications() -> Result<String> {
let result = fetch_all_channels().await; let result = fetch_all_channels().await;
Ok(format_notifications(&result)) let unread: Vec<_> = result.iter().filter(|(_, _, u)| *u > 0).collect();
}
fn format_notifications(channels: &[(String, bool, u32)]) -> String {
let unread: Vec<_> = channels.iter().filter(|(_, _, u)| *u > 0).collect();
if unread.is_empty() { if unread.is_empty() {
"No pending notifications.".into() Ok("No pending notifications.".into())
} else { } else {
unread.iter() Ok(unread.iter()
.map(|(name, _, count)| format!("{}: {} unread", name, count)) .map(|(name, _, count)| format!("{}: {} unread", name, count))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join("\n") .join("\n"))
} }
} }
@ -164,21 +142,6 @@ fn daemon_sock(channel: &str) -> Result<std::path::PathBuf> {
// ── Channel RPC ──────────────────────────────────────────────── // ── Channel RPC ────────────────────────────────────────────────
/// Create a tokio runtime + LocalSet for capnp RPC calls.
/// capnp-rpc uses Rc so futures aren't Send — this must run
/// on a dedicated thread via spawn_blocking.
fn rpc_blocking<F, T>(f: F) -> T
where
F: FnOnce(&tokio::task::LocalSet, &tokio::runtime::Runtime) -> T,
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
f(&local, &rt)
}
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt; use tokio_util::compat::TokioAsyncReadCompatExt;
@ -202,48 +165,28 @@ async fn rpc_connect(sock: &std::path::Path) -> Result<channel_server::Client, S
} }
async fn rpc_recv( async fn rpc_recv(
sock: &std::path::Path, sock: &std::path::Path, channel: &str, all_new: bool, min_count: u32,
channel: &str,
all_new: bool,
min_count: u32,
) -> Result<String, String> { ) -> Result<String, String> {
let client = rpc_connect(sock).await?; let client = rpc_connect(sock).await?;
let mut req = client.recv_request(); let mut req = client.recv_request();
req.get().set_channel(channel); req.get().set_channel(channel);
req.get().set_all_new(all_new); req.get().set_all_new(all_new);
req.get().set_min_count(min_count); req.get().set_min_count(min_count);
let reply = req.send().promise.await.map_err(|e| format!("recv failed: {e}"))?;
let reply = req.send().promise.await let text = reply.get().map_err(|e| format!("{e}"))?
.map_err(|e| format!("recv failed: {e}"))?; .get_text().map_err(|e| format!("{e}"))?
let text = reply.get() .to_str().map_err(|e| format!("{e}"))?;
.map_err(|e| format!("reply error: {e}"))? if text.is_empty() { Ok("(no messages)".into()) } else { Ok(text.to_string()) }
.get_text()
.map_err(|e| format!("text error: {e}"))?
.to_str()
.map_err(|e| format!("utf8 error: {e}"))?;
if text.is_empty() {
Ok("(no messages)".into())
} else {
Ok(text.to_string())
}
} }
async fn rpc_send( async fn rpc_send(
sock: &std::path::Path, sock: &std::path::Path, channel: &str, message: &str,
channel: &str,
message: &str,
) -> Result<String, String> { ) -> Result<String, String> {
let client = rpc_connect(sock).await?; let client = rpc_connect(sock).await?;
let mut req = client.send_request(); let mut req = client.send_request();
req.get().set_channel(channel); req.get().set_channel(channel);
req.get().set_message(message); req.get().set_message(message);
req.send().promise.await.map_err(|e| format!("send failed: {e}"))?;
req.send().promise.await
.map_err(|e| format!("send failed: {e}"))?;
Ok(format!("sent to {}", channel)) Ok(format!("sent to {}", channel))
} }
@ -271,25 +214,20 @@ async fn rpc_list(sock: &std::path::Path) -> Option<Vec<(String, bool, u32)>> {
// ── Fetch all channels ───────────────────────────────────────── // ── Fetch all channels ─────────────────────────────────────────
/// Fetch channel status from all daemon sockets. /// Fetch channel status from all daemon sockets.
/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). /// Runs on a dedicated thread with LocalSet because capnp-rpc
/// uses Rc (not Send). This is the boundary between the
/// multi-thread runtime and the capnp world.
pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> {
tokio::task::spawn_blocking(|| { tokio::task::spawn_blocking(|| {
fetch_all_channels_blocking() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, fetch_all_channels_inner())
}) })
.await .await
.unwrap_or_default() .unwrap_or_default()
} }
/// Blocking version for synchronous contexts.
pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, fetch_all_channels_inner())
}
async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> {
let channels_dir = channels_dir(); let channels_dir = channels_dir();