diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs index c26555c..10b0575 100644 --- a/src/agent/tools/channels.rs +++ b/src/agent/tools/channels.rs @@ -7,78 +7,58 @@ use anyhow::{Context, Result}; use serde::Deserialize; use serde_json::json; -use super::ToolDef; +use super::{Tool, ToolDef}; -// ── Definitions ──────────────────────────────────────────────── +// ── Tool registry ────────────────────────────────────────────── -pub fn definitions() -> Vec { +pub fn tools() -> Vec { vec![ - ToolDef::new( - "channel_list", - "List all available channels and their status (connected, unread count).", - json!({"type": "object", "properties": {}}), - ), - ToolDef::new( - "channel_recv", - "Read messages from a channel.", - json!({ - "type": "object", - "properties": { + Tool { + def: ToolDef::new("channel_list", + "List all available channels and their status (connected, unread count).", + json!({"type": "object", "properties": {}})), + handler: |_a, _v| Box::pin(async { channel_list().await }), + }, + Tool { + def: ToolDef::new("channel_recv", + "Read messages from a channel.", + json!({"type": "object", "properties": { "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"}, "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} - }, - "required": ["channel"] - }), - ), - ToolDef::new( - "channel_send", - "Send a message to a channel.", - json!({ - "type": "object", - "properties": { + }, "required": ["channel"]})), + handler: |_a, v| Box::pin(async move { channel_recv(&v).await }), + }, + Tool { + def: ToolDef::new("channel_send", + "Send a message to a channel.", + json!({"type": "object", "properties": { "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"}, "message": {"type": "string", "description": "Message to send"} - }, - "required": ["channel", "message"] - }), - ), - ToolDef::new( - "channel_notifications", - "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", - json!({"type": "object", "properties": {}}), - ), - ] -} - -pub fn tools() -> Vec { - use super::Tool; - let defs = definitions(); - vec![ - Tool { def: defs[0].clone(), handler: |_a, _v| Box::pin(async move { channel_list().await }) }, - Tool { def: defs[1].clone(), handler: |_a, v| Box::pin(async move { channel_recv(&v).await }) }, - Tool { def: defs[2].clone(), handler: |_a, v| Box::pin(async move { channel_send(&v).await }) }, - Tool { def: defs[3].clone(), handler: |_a, _v| Box::pin(async move { channel_notifications().await }) }, + }, "required": ["channel", "message"]})), + handler: |_a, v| Box::pin(async move { channel_send(&v).await }), + }, + Tool { + def: ToolDef::new("channel_notifications", + "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", + json!({"type": "object", "properties": {}})), + handler: |_a, _v| Box::pin(async { channel_notifications().await }), + }, ] } // ── Tool implementations ─────────────────────────────────────── -pub async fn channel_list() -> Result { +async fn channel_list() -> Result { let result = fetch_all_channels().await; - Ok(format_channel_list(&result)) -} - - -fn format_channel_list(channels: &[(String, bool, u32)]) -> String { - if channels.is_empty() { - return "No channels configured.".into(); + if result.is_empty() { + return Ok("No channels configured.".into()); } - channels.iter().map(|(name, connected, unread)| { + Ok(result.iter().map(|(name, connected, unread)| { let status = if *connected { "connected" } else { "disconnected" }; let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; format!("{} — {}{}", name, status, unread_str) - }).collect::>().join("\n") + }).collect::>().join("\n")) } #[derive(Deserialize)] @@ -92,17 +72,19 @@ struct RecvArgs { fn default_true() -> bool { true } fn default_min_count() -> u32 { 20 } -pub async fn channel_recv(args: &serde_json::Value) -> Result { +async fn channel_recv(args: &serde_json::Value) -> Result { let a: RecvArgs = serde_json::from_value(args.clone()) .context("invalid channel_recv arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let all_new = a.all_new; let min_count = a.min_count; + // capnp-rpc needs LocalSet — bridge to it via spawn_blocking tokio::task::spawn_blocking(move || { - rpc_blocking(|local, rt| { - local.block_on(rt, rpc_recv(&sock, &channel, all_new, min_count)) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, rpc_recv(&sock, &channel, all_new, min_count)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } @@ -113,35 +95,31 @@ struct SendArgs { message: String, } -pub async fn channel_send(args: &serde_json::Value) -> Result { +async fn channel_send(args: &serde_json::Value) -> Result { let a: SendArgs = serde_json::from_value(args.clone()) .context("invalid channel_send arguments")?; let sock = daemon_sock(&a.channel)?; let channel = a.channel; let message = a.message; tokio::task::spawn_blocking(move || { - rpc_blocking(|local, rt| { - local.block_on(rt, rpc_send(&sock, &channel, &message)) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, rpc_send(&sock, &channel, &message)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } -pub async fn channel_notifications() -> Result { +async fn channel_notifications() -> Result { let result = fetch_all_channels().await; - Ok(format_notifications(&result)) -} - - -fn format_notifications(channels: &[(String, bool, u32)]) -> String { - let unread: Vec<_> = channels.iter().filter(|(_, _, u)| *u > 0).collect(); + let unread: Vec<_> = result.iter().filter(|(_, _, u)| *u > 0).collect(); if unread.is_empty() { - "No pending notifications.".into() + Ok("No pending notifications.".into()) } else { - unread.iter() + Ok(unread.iter() .map(|(name, _, count)| format!("{}: {} unread", name, count)) .collect::>() - .join("\n") + .join("\n")) } } @@ -164,21 +142,6 @@ fn daemon_sock(channel: &str) -> Result { // ── Channel RPC ──────────────────────────────────────────────── -/// Create a tokio runtime + LocalSet for capnp RPC calls. -/// capnp-rpc uses Rc so futures aren't Send — this must run -/// on a dedicated thread via spawn_blocking. -fn rpc_blocking(f: F) -> T -where - F: FnOnce(&tokio::task::LocalSet, &tokio::runtime::Runtime) -> T, -{ - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let local = tokio::task::LocalSet::new(); - f(&local, &rt) -} - use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -202,48 +165,28 @@ async fn rpc_connect(sock: &std::path::Path) -> Result Result { let client = rpc_connect(sock).await?; - let mut req = client.recv_request(); req.get().set_channel(channel); req.get().set_all_new(all_new); req.get().set_min_count(min_count); - - let reply = req.send().promise.await - .map_err(|e| format!("recv failed: {e}"))?; - let text = reply.get() - .map_err(|e| format!("reply error: {e}"))? - .get_text() - .map_err(|e| format!("text error: {e}"))? - .to_str() - .map_err(|e| format!("utf8 error: {e}"))?; - - if text.is_empty() { - Ok("(no messages)".into()) - } else { - Ok(text.to_string()) - } + let reply = req.send().promise.await.map_err(|e| format!("recv failed: {e}"))?; + let text = reply.get().map_err(|e| format!("{e}"))? + .get_text().map_err(|e| format!("{e}"))? + .to_str().map_err(|e| format!("{e}"))?; + if text.is_empty() { Ok("(no messages)".into()) } else { Ok(text.to_string()) } } async fn rpc_send( - sock: &std::path::Path, - channel: &str, - message: &str, + sock: &std::path::Path, channel: &str, message: &str, ) -> Result { let client = rpc_connect(sock).await?; - let mut req = client.send_request(); req.get().set_channel(channel); req.get().set_message(message); - - req.send().promise.await - .map_err(|e| format!("send failed: {e}"))?; - + req.send().promise.await.map_err(|e| format!("send failed: {e}"))?; Ok(format!("sent to {}", channel)) } @@ -271,25 +214,20 @@ async fn rpc_list(sock: &std::path::Path) -> Option> { // ── Fetch all channels ───────────────────────────────────────── /// Fetch channel status from all daemon sockets. -/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). +/// Runs on a dedicated thread with LocalSet because capnp-rpc +/// uses Rc (not Send). This is the boundary between the +/// multi-thread runtime and the capnp world. pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { tokio::task::spawn_blocking(|| { - fetch_all_channels_blocking() + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, fetch_all_channels_inner()) }) .await .unwrap_or_default() } -/// Blocking version for synchronous contexts. -pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let local = tokio::task::LocalSet::new(); - local.block_on(&rt, fetch_all_channels_inner()) -} - async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { let channels_dir = channels_dir();