From 713bb0772974d2e08889080cb78af33fdd8c3771 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Fri, 1 May 2026 18:16:21 -0400 Subject: [PATCH] =?UTF-8?q?bin:=20add=20ch=20=E2=80=94=20minimal=20channel?= =?UTF-8?q?=20CLI=20(send/recv)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Speaks the channel.capnp protocol over the per-daemon Unix socket at ~/.consciousness/channels/.sock. Useful for ad-hoc sends from shell, tests, and out-of-process tools that don't want to embed a capnp client. ch send ch recv [--all-new] [--min-count N] Co-Authored-By: Proof of Concept --- src/bin/ch.rs | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 src/bin/ch.rs diff --git a/src/bin/ch.rs b/src/bin/ch.rs new file mode 100644 index 0000000..025fe3c --- /dev/null +++ b/src/bin/ch.rs @@ -0,0 +1,112 @@ +// `ch` — minimal channel CLI. +// +// ch send +// ch recv [--all-new] [--min-count N] +// +// Connects to ~/.consciousness/channels/.sock and speaks the +// channel.capnp protocol to the appropriate daemon. + +use std::path::PathBuf; +use std::process::ExitCode; + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio_util::compat::TokioAsyncReadCompatExt; + +use consciousness::channel_capnp::channel_server; + +fn channels_dir() -> PathBuf { + dirs::home_dir().unwrap_or_default().join(".consciousness/channels") +} + +fn sock_for(channel: &str) -> PathBuf { + let top = channel.split('.').next().unwrap_or(channel); + channels_dir().join(format!("{top}.sock")) +} + +async fn connect(sock: &std::path::Path) -> Result { + let stream = tokio::net::UnixStream::connect(sock).await + .map_err(|e| format!("connect {}: {e}", sock.display()))?; + let (reader, writer) = stream.compat().split(); + let network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc = RpcSystem::new(network, None); + let client: channel_server::Client = rpc.bootstrap(rpc_twoparty_capnp::Side::Server); + tokio::task::spawn_local(rpc); + Ok(client) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> ExitCode { + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("usage: {} [args...]", args[0]); + return ExitCode::from(2); + } + + let cmd = args[1].clone(); + let local = tokio::task::LocalSet::new(); + let result: Result<(), String> = local.run_until(async move { + match cmd.as_str() { + "send" => { + if args.len() < 4 { + return Err("usage: ch send ".into()); + } + let channel = &args[2]; + let message = args[3..].join(" "); + let sock = sock_for(channel); + let client = connect(&sock).await?; + let mut req = client.send_request(); + req.get().set_channel(channel); + req.get().set_message(&message); + req.send().promise.await.map_err(|e| format!("send: {e}"))?; + println!("sent to {channel}"); + Ok(()) + } + "recv" => { + if args.len() < 3 { + return Err("usage: ch recv [--all-new] [--min-count N]".into()); + } + let channel = &args[2]; + let mut all_new = false; + let mut min_count: u32 = 20; + let mut i = 3; + while i < args.len() { + match args[i].as_str() { + "--all-new" => { all_new = true; i += 1; } + "--min-count" => { + min_count = args.get(i+1) + .ok_or("--min-count needs an argument")? + .parse().map_err(|e| format!("--min-count: {e}"))?; + i += 2; + } + other => return Err(format!("unknown arg: {other}")), + } + } + let sock = sock_for(channel); + let client = connect(&sock).await?; + let mut req = client.recv_request(); + req.get().set_channel(channel); + req.get().set_all_new(all_new); + req.get().set_min_count(min_count); + let reply = req.send().promise.await.map_err(|e| format!("recv: {e}"))?; + let text = reply.get().map_err(|e| e.to_string())? + .get_text().map_err(|e| e.to_string())? + .to_str().map_err(|e| e.to_string())?; + print!("{text}"); + if !text.ends_with('\n') { println!(); } + Ok(()) + } + other => Err(format!("unknown command: {other} (use send|recv)")), + } + }).await; + + match result { + Ok(()) => ExitCode::SUCCESS, + Err(e) => { eprintln!("error: {e}"); ExitCode::from(1) } + } +}