diff --git a/Cargo.lock b/Cargo.lock index cb02751..8d47ece 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -623,6 +623,25 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "consciousness-channel-tmux" +version = "0.4.0" +dependencies = [ + "capnp", + "capnp-rpc", + "dirs", + "futures", + "json5", + "libc", + "poc-memory", + "scopeguard", + "serde", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "console-api" version = "0.8.1" diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs new file mode 100644 index 0000000..9e67347 --- /dev/null +++ b/src/agent/tools/channels.rs @@ -0,0 +1,345 @@ +// tools/channels.rs — Channel tools (list, recv, send, notifications) +// +// Shared by consciousness agent and the MCP server. +// One-shot capnp RPC calls to channel daemon sockets. + +use anyhow::{Context, Result}; +use serde::Deserialize; +use serde_json::json; + +use super::ToolDef; + +// ── Definitions ──────────────────────────────────────────────── + +pub fn definitions() -> Vec { + vec![ + ToolDef::new( + "channel_list", + "List all available channels and their status (connected, unread count).", + json!({"type": "object", "properties": {}}), + ), + ToolDef::new( + "channel_recv", + "Read messages from a channel.", + json!({ + "type": "object", + "properties": { + "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, telegram.kent)"}, + "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, + "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} + }, + "required": ["channel"] + }), + ), + ToolDef::new( + "channel_send", + "Send a message to a channel.", + json!({ + "type": "object", + "properties": { + "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs, irc.pm.nick, telegram.kent)"}, + "message": {"type": "string", "description": "Message to send"} + }, + "required": ["channel", "message"] + }), + ), + ToolDef::new( + "channel_notifications", + "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", + json!({"type": "object", "properties": {}}), + ), + ] +} + +// ── Dispatch ─────────────────────────────────────────────────── + +pub async fn dispatch(name: &str, args: &serde_json::Value) -> Result { + match name { + "channel_list" => channel_list().await, + "channel_recv" => channel_recv(args).await, + "channel_send" => channel_send(args).await, + "channel_notifications" => channel_notifications().await, + _ => anyhow::bail!("unknown channel tool: {}", name), + } +} + +/// Blocking dispatch for synchronous contexts (MCP server). +pub fn dispatch_blocking(name: &str, args: &serde_json::Value) -> Result { + match name { + "channel_list" => Ok(channel_list_blocking()), + "channel_notifications" => Ok(channel_notifications_blocking()), + "channel_recv" | "channel_send" => { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async { + match name { + "channel_recv" => channel_recv(args).await, + "channel_send" => channel_send(args).await, + _ => unreachable!(), + } + }) + } + _ => anyhow::bail!("unknown channel tool: {}", name), + } +} + +// ── Tool implementations ─────────────────────────────────────── + +async fn channel_list() -> Result { + let result = fetch_all_channels().await; + Ok(format_channel_list(&result)) +} + +fn channel_list_blocking() -> String { + let result = fetch_all_channels_blocking(); + format_channel_list(&result) +} + +fn format_channel_list(channels: &[(String, bool, u32)]) -> String { + if channels.is_empty() { + return "No channels configured.".into(); + } + channels.iter().map(|(name, connected, unread)| { + let status = if *connected { "connected" } else { "disconnected" }; + let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; + format!("{} — {}{}", name, status, unread_str) + }).collect::>().join("\n") +} + +#[derive(Deserialize)] +struct RecvArgs { + channel: String, + #[serde(default = "default_true")] + all_new: bool, + #[serde(default = "default_min_count")] + min_count: u32, +} +fn default_true() -> bool { true } +fn default_min_count() -> u32 { 20 } + +async fn channel_recv(args: &serde_json::Value) -> Result { + let a: RecvArgs = serde_json::from_value(args.clone()) + .context("invalid channel_recv arguments")?; + let sock = daemon_sock(&a.channel)?; + let channel = a.channel; + let all_new = a.all_new; + let min_count = a.min_count; + tokio::task::spawn_blocking(move || { + rpc_blocking(|local, rt| { + local.block_on(rt, rpc_recv(&sock, &channel, all_new, min_count)) + }) + }).await? + .map_err(|e| anyhow::anyhow!("{}", e)) +} + +#[derive(Deserialize)] +struct SendArgs { + channel: String, + message: String, +} + +async fn channel_send(args: &serde_json::Value) -> Result { + let a: SendArgs = serde_json::from_value(args.clone()) + .context("invalid channel_send arguments")?; + let sock = daemon_sock(&a.channel)?; + let channel = a.channel; + let message = a.message; + tokio::task::spawn_blocking(move || { + rpc_blocking(|local, rt| { + local.block_on(rt, rpc_send(&sock, &channel, &message)) + }) + }).await? + .map_err(|e| anyhow::anyhow!("{}", e)) +} + +async fn channel_notifications() -> Result { + let result = fetch_all_channels().await; + Ok(format_notifications(&result)) +} + +fn channel_notifications_blocking() -> String { + let result = fetch_all_channels_blocking(); + format_notifications(&result) +} + +fn format_notifications(channels: &[(String, bool, u32)]) -> String { + let unread: Vec<_> = channels.iter().filter(|(_, _, u)| *u > 0).collect(); + if unread.is_empty() { + "No pending notifications.".into() + } else { + unread.iter() + .map(|(name, _, count)| format!("{}: {} unread", name, count)) + .collect::>() + .join("\n") + } +} + +// ── Socket helpers ───────────────────────────────────────────── + +fn channels_dir() -> std::path::PathBuf { + dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels") +} + +fn daemon_sock(channel: &str) -> Result { + let prefix = channel.split('.').next().unwrap_or(""); + let sock = channels_dir().join(format!("{}.sock", prefix)); + if !sock.exists() { + anyhow::bail!("no daemon for channel: {}", channel); + } + Ok(sock) +} + +// ── Channel RPC ──────────────────────────────────────────────── + +/// Create a tokio runtime + LocalSet for capnp RPC calls. +/// capnp-rpc uses Rc so futures aren't Send — this must run +/// on a dedicated thread via spawn_blocking. +fn rpc_blocking(f: F) -> T +where + F: FnOnce(&tokio::task::LocalSet, &tokio::runtime::Runtime) -> T, +{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + f(&local, &rt) +} + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio_util::compat::TokioAsyncReadCompatExt; +use crate::channel_capnp::channel_server; + +async fn rpc_connect(sock: &std::path::Path) -> Result { + let stream = tokio::net::UnixStream::connect(sock).await + .map_err(|e| format!("connect failed: {e}"))?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + tokio::task::spawn_local(rpc_system); + Ok(client) +} + +async fn rpc_recv( + sock: &std::path::Path, + channel: &str, + all_new: bool, + min_count: u32, +) -> Result { + let client = rpc_connect(sock).await?; + + let mut req = client.recv_request(); + req.get().set_channel(channel); + req.get().set_all_new(all_new); + req.get().set_min_count(min_count); + + let reply = req.send().promise.await + .map_err(|e| format!("recv failed: {e}"))?; + let text = reply.get() + .map_err(|e| format!("reply error: {e}"))? + .get_text() + .map_err(|e| format!("text error: {e}"))? + .to_str() + .map_err(|e| format!("utf8 error: {e}"))?; + + if text.is_empty() { + Ok("(no messages)".into()) + } else { + Ok(text.to_string()) + } +} + +async fn rpc_send( + sock: &std::path::Path, + channel: &str, + message: &str, +) -> Result { + let client = rpc_connect(sock).await?; + + let mut req = client.send_request(); + req.get().set_channel(channel); + req.get().set_message(message); + + req.send().promise.await + .map_err(|e| format!("send failed: {e}"))?; + + Ok(format!("sent to {}", channel)) +} + +async fn rpc_list(sock: &std::path::Path) -> Option> { + let client = rpc_connect(sock).await.ok()?; + let mut result = Vec::new(); + if let Ok(reply) = client.list_request().send().promise.await { + if let Ok(r) = reply.get() { + if let Ok(channels) = r.get_channels() { + for ch in channels.iter() { + if let Ok(name) = ch.get_name() { + result.push(( + name.to_str().unwrap_or("").to_string(), + ch.get_connected(), + ch.get_unread(), + )); + } + } + } + } + } + Some(result) +} + +// ── Fetch all channels ───────────────────────────────────────── + +/// Fetch channel status from all daemon sockets. +/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). +pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { + tokio::task::spawn_blocking(|| { + fetch_all_channels_blocking() + }) + .await + .unwrap_or_default() +} + +/// Blocking version for synchronous contexts. +pub fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, fetch_all_channels_inner()) +} + +async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { + let channels_dir = channels_dir(); + + let mut sup = crate::thalamus::supervisor::Supervisor::new(); + sup.load_config(); + sup.ensure_running(); + + let mut result = Vec::new(); + for (daemon_name, _enabled, alive) in sup.status() { + if !alive { + result.push((daemon_name, false, 0)); + continue; + } + let sock = channels_dir.join(format!("{}.sock", daemon_name)); + match rpc_list(&sock).await { + None => result.push((daemon_name, false, 0)), + Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)), + Some(channels) => result.extend(channels), + } + } + result +} diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 81ba279..bf56bc3 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -6,10 +6,11 @@ // Core tools mod bash; +pub mod channels; mod edit; mod glob; mod grep; -mod memory; +pub mod memory; mod read; mod web; mod write; @@ -191,6 +192,15 @@ pub async fn dispatch_shared( }); } + // Channel tools + if name.starts_with("channel_") { + let result = channels::dispatch(name, args).await; + return Some(match result { + Ok(s) => ToolOutput::text(s), + Err(e) => ToolOutput::error(e), + }); + } + // File and execution tools let result = match name { "read_file" => read::read_file(args), @@ -225,6 +235,7 @@ pub fn definitions() -> Vec { glob::definition(), ]; defs.extend(control::definitions()); + defs.extend(channels::definitions()); defs.extend(memory::definitions()); defs } diff --git a/src/claude/mcp-server.rs b/src/claude/mcp-server.rs index f8f245b..c4e90a4 100644 --- a/src/claude/mcp-server.rs +++ b/src/claude/mcp-server.rs @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::io::{self, BufRead, Write}; -use std::process::Command; // ── JSON-RPC types ────────────────────────────────────────────── @@ -69,368 +68,34 @@ fn notify(method: &str, params: Value) { // ── Tool definitions ──────────────────────────────────────────── fn tool_definitions() -> Vec { - // Memory tools — dispatch to poc-memory CLI - let memory_tools = vec![ - tool("memory_render", "Read a memory node's content and links.", - json!({"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}), - vec!["render"]), - tool("memory_write", "Create or update a memory node.", - json!({"type":"object","properties":{ - "key":{"type":"string","description":"Node key"}, - "content":{"type":"string","description":"Full content (markdown)"} - },"required":["key","content"]}), - vec!["write"]), - tool("memory_search", "Search the memory graph.", - json!({"type":"object","properties":{"terms":{"type":"array","items":{"type":"string"},"description":"Search terms"}},"required":["terms"]}), - vec!["graph", "spread"]), - tool("memory_links", "Show a node's links.", - json!({"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}), - vec!["graph", "link"]), - tool("memory_link_set", "Set link weights between nodes.", - json!({"type":"object","properties":{ - "source":{"type":"string"}, - "targets":{"type":"string","description":"target:weight pairs"} - },"required":["source","targets"]}), - vec!["graph", "link-set"]), - tool("memory_link_add", "Add a link between nodes.", - json!({"type":"object","properties":{ - "source":{"type":"string"}, - "target":{"type":"string"}, - "weight":{"type":"number"} - },"required":["source","target"]}), - vec!["graph", "link-add"]), - tool("memory_used", "Mark a memory node as useful.", - json!({"type":"object","properties":{"key":{"type":"string"}},"required":["key"]}), - vec!["used"]), - tool("memory_weight_set", "Set a node's weight directly.", - json!({"type":"object","properties":{ - "key":{"type":"string"}, - "weight":{"type":"number"} - },"required":["key","weight"]}), - vec!["weight-set"]), - tool("memory_rename", "Rename a memory node.", - json!({"type":"object","properties":{ - "old_key":{"type":"string"}, - "new_key":{"type":"string"} - },"required":["old_key","new_key"]}), - vec!["node", "rename"]), - tool("memory_query", "Query the memory graph.", - json!({"type":"object","properties":{"query":{"type":"string","description":"Query expression"}},"required":["query"]}), - vec!["query"]), - ]; + use poc_memory::agent::tools; - // Channel tools - let channel_tools = vec![ - json!({ - "name": "channel_list", - "description": "List all available channels and their status (connected, unread count).", - "inputSchema": {"type": "object", "properties": {}} - }), - json!({ - "name": "channel_recv", - "description": "Read messages from a channel.", - "inputSchema": {"type": "object", "properties": { - "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs-ai, telegram.kent)"}, - "all_new": {"type": "boolean", "description": "If true, return all unconsumed messages. If false, return scrollback.", "default": true}, - "min_count": {"type": "integer", "description": "Minimum number of lines to return", "default": 20} - }, "required": ["channel"]} - }), - json!({ - "name": "channel_send", - "description": "Send a message to a channel.", - "inputSchema": {"type": "object", "properties": { - "channel": {"type": "string", "description": "Channel path (e.g. irc.#bcachefs-ai, irc.pm.nick, telegram.kent)"}, - "message": {"type": "string", "description": "Message to send"} - }, "required": ["channel", "message"]} - }), - json!({ - "name": "channel_notifications", - "description": "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", - "inputSchema": {"type": "object", "properties": {}} - }), - ]; + let all_defs = tools::memory::definitions().into_iter() + .chain(tools::channels::definitions()); - memory_tools.into_iter().chain(channel_tools).collect() -} - -fn tool(name: &str, desc: &str, schema: Value, _cli: Vec<&str>) -> Value { - json!({ - "name": name, - "description": desc, - "inputSchema": schema, - }) + all_defs.map(|td| json!({ + "name": td.function.name, + "description": td.function.description, + "inputSchema": td.function.parameters, + })).collect() } // ── Tool dispatch ─────────────────────────────────────────────── fn dispatch_tool(name: &str, args: &Value) -> Result { - match name { - // Memory tools — delegate to poc-memory CLI - n if n.starts_with("memory_") => dispatch_memory(n, args), + use poc_memory::agent::tools; - // Channel tools - "channel_list" => dispatch_channel_list(), - "channel_recv" => dispatch_channel_recv(args), - "channel_send" => dispatch_channel_send(args), - "channel_notifications" => dispatch_channel_notifications(), - - _ => Err(format!("unknown tool: {name}")), - } -} - -fn dispatch_memory(name: &str, args: &Value) -> Result { - let cli_args = match name { - "memory_render" => vec!["render".into(), arg_str(args, "key")?], - "memory_write" => { - let key = arg_str(args, "key")?; - let content = arg_str(args, "content")?; - // Write content via stdin - return run_poc_memory_stdin(&["write", &key], &content); - } - "memory_search" => { - let mut cmd = vec!["graph".into(), "spread".into()]; - if let Some(terms) = args.get("terms").and_then(|t| t.as_array()) { - for t in terms { - if let Some(s) = t.as_str() { - cmd.push(s.to_string()); - } - } - } - cmd - } - "memory_links" => vec!["graph".into(), "link".into(), arg_str(args, "key")?], - "memory_link_set" => vec!["graph".into(), "link-set".into(), arg_str(args, "source")?, arg_str(args, "targets")?], - "memory_link_add" => { - let mut cmd = vec!["graph".into(), "link-add".into(), arg_str(args, "source")?, arg_str(args, "target")?]; - if let Some(w) = args.get("weight").and_then(|v| v.as_f64()) { - cmd.push(format!("{w:.2}")); - } - cmd - } - "memory_used" => vec!["used".into(), arg_str(args, "key")?], - "memory_weight_set" => vec!["weight-set".into(), arg_str(args, "key")?, format!("{:.2}", args.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0))], - "memory_rename" => vec!["node".into(), "rename".into(), arg_str(args, "old_key")?, arg_str(args, "new_key")?], - "memory_query" => vec!["query".into(), arg_str(args, "query")?], - _ => return Err(format!("unknown memory tool: {name}")), - }; - - let str_args: Vec<&str> = cli_args.iter().map(|s| s.as_str()).collect(); - run_poc_memory(&str_args) -} - -fn run_poc_memory(args: &[&str]) -> Result { - let result = Command::new("poc-memory") - .args(args) - .output() - .map_err(|e| format!("failed to run poc-memory: {e}"))?; - let out = String::from_utf8_lossy(&result.stdout); - let err = String::from_utf8_lossy(&result.stderr); - if result.status.success() { - Ok(format!("{}{}", out.trim(), if err.is_empty() { String::new() } else { format!("\n{}", err.trim()) })) - } else { - Err(format!("{}\n{}", out.trim(), err.trim())) - } -} - -fn run_poc_memory_stdin(args: &[&str], stdin: &str) -> Result { - use std::process::Stdio; - let mut child = Command::new("poc-memory") - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|e| format!("failed to run poc-memory: {e}"))?; - - if let Some(mut si) = child.stdin.take() { - let _ = si.write_all(stdin.as_bytes()); + if name.starts_with("memory_") || name.starts_with("journal_") || name == "output" { + return tools::memory::dispatch(name, args, None) + .map_err(|e| e.to_string()); } - let result = child.wait_with_output() - .map_err(|e| format!("poc-memory failed: {e}"))?; - let out = String::from_utf8_lossy(&result.stdout); - let err = String::from_utf8_lossy(&result.stderr); - Ok(format!("{}{}", out.trim(), if err.is_empty() { String::new() } else { format!("\n{}", err.trim()) })) -} - -// ── Channel tool dispatch ─────────────────────────────────────── - -fn dispatch_channel_list() -> Result { - let result = poc_memory::thalamus::channels::fetch_all_channels_blocking(); - let mut lines = Vec::new(); - for (name, connected, unread) in &result { - let status = if *connected { "connected" } else { "disconnected" }; - let unread_str = if *unread > 0 { format!(" ({} unread)", unread) } else { String::new() }; - lines.push(format!("{} — {}{}", name, status, unread_str)); - } - if lines.is_empty() { - Ok("No channels configured.".into()) - } else { - Ok(lines.join("\n")) - } -} - -fn dispatch_channel_recv(args: &Value) -> Result { - let channel = arg_str(args, "channel")?; - let all_new = args.get("all_new").and_then(|v| v.as_bool()).unwrap_or(true); - let min_count = args.get("min_count").and_then(|v| v.as_u64()).unwrap_or(20) as u32; - - // Find which daemon handles this channel - let prefix = channel.split('.').next().unwrap_or(""); - let sock = dirs::home_dir() - .unwrap_or_default() - .join(format!(".consciousness/channels/{}.sock", prefix)); - - if !sock.exists() { - return Err(format!("no daemon for channel: {}", channel)); + if name.starts_with("channel_") { + return tools::channels::dispatch_blocking(name, args) + .map_err(|e| e.to_string()); } - // Use blocking one-shot RPC - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| e.to_string())?; - let local = tokio::task::LocalSet::new(); - - local.block_on(&rt, async { - channel_rpc_recv(&sock, &channel, all_new, min_count).await - }) -} - -fn dispatch_channel_send(args: &Value) -> Result { - let channel = arg_str(args, "channel")?; - let message = arg_str(args, "message")?; - - let prefix = channel.split('.').next().unwrap_or(""); - let sock = dirs::home_dir() - .unwrap_or_default() - .join(format!(".consciousness/channels/{}.sock", prefix)); - - if !sock.exists() { - return Err(format!("no daemon for channel: {}", channel)); - } - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| e.to_string())?; - let local = tokio::task::LocalSet::new(); - - local.block_on(&rt, async { - channel_rpc_send(&sock, &channel, &message).await - }) -} - -fn dispatch_channel_notifications() -> Result { - // Read pending notifications from the consciousness binary's state - // For now, just list channels with unread counts - let result = poc_memory::thalamus::channels::fetch_all_channels_blocking(); - let unread: Vec<_> = result.iter() - .filter(|(_, _, u)| *u > 0) - .collect(); - if unread.is_empty() { - Ok("No pending notifications.".into()) - } else { - let lines: Vec = unread.iter() - .map(|(name, _, count)| format!("{}: {} unread", name, count)) - .collect(); - Ok(lines.join("\n")) - } -} - -// ── Channel RPC helpers ───────────────────────────────────────── - -async fn channel_rpc_recv( - sock: &std::path::Path, - channel: &str, - all_new: bool, - min_count: u32, -) -> Result { - use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; - use futures::AsyncReadExt; - use tokio_util::compat::TokioAsyncReadCompatExt; - use poc_memory::channel_capnp::channel_server; - - let stream = tokio::net::UnixStream::connect(sock).await - .map_err(|e| format!("connect failed: {e}"))?; - let (reader, writer) = stream.compat().split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - let mut rpc_system = RpcSystem::new(rpc_network, None); - let client: channel_server::Client = - rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - tokio::task::spawn_local(rpc_system); - - let mut req = client.recv_request(); - req.get().set_channel(channel); - req.get().set_all_new(all_new); - req.get().set_min_count(min_count); - - let reply = req.send().promise.await - .map_err(|e| format!("recv failed: {e}"))?; - let text = reply.get() - .map_err(|e| format!("reply error: {e}"))? - .get_text() - .map_err(|e| format!("text error: {e}"))? - .to_str() - .map_err(|e| format!("utf8 error: {e}"))?; - - if text.is_empty() { - Ok("(no messages)".into()) - } else { - Ok(text.to_string()) - } -} - -async fn channel_rpc_send( - sock: &std::path::Path, - channel: &str, - message: &str, -) -> Result { - use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; - use futures::AsyncReadExt; - use tokio_util::compat::TokioAsyncReadCompatExt; - use poc_memory::channel_capnp::channel_server; - - let stream = tokio::net::UnixStream::connect(sock).await - .map_err(|e| format!("connect failed: {e}"))?; - let (reader, writer) = stream.compat().split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - let mut rpc_system = RpcSystem::new(rpc_network, None); - let client: channel_server::Client = - rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - tokio::task::spawn_local(rpc_system); - - let mut req = client.send_request(); - req.get().set_channel(channel); - req.get().set_message(message); - - req.send().promise.await - .map_err(|e| format!("send failed: {e}"))?; - - Ok(format!("sent to {}", channel)) -} - -// ── Helpers ───────────────────────────────────────────────────── - -fn arg_str(args: &Value, key: &str) -> Result { - args.get(key) - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .ok_or_else(|| format!("missing required argument: {key}")) -} - -fn fetch_all_channels_blocking() -> Vec<(String, bool, u32)> { - poc_memory::thalamus::channels::fetch_all_channels_blocking() + Err(format!("unknown tool: {name}")) } // ── Main loop ───────────────────────────────────────────────────