bin: add ch — minimal channel CLI (send/recv)

Speaks the channel.capnp protocol over the per-daemon Unix socket at
~/.consciousness/channels/<top>.sock. Useful for ad-hoc sends from
shell, tests, and out-of-process tools that don't want to embed a
capnp client.

  ch send <channel> <message...>
  ch recv <channel> [--all-new] [--min-count N]

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-05-01 18:16:21 -04:00
commit 713bb07729

112
src/bin/ch.rs Normal file
View file

@ -0,0 +1,112 @@
// `ch` — minimal channel CLI.
//
// ch send <channel-path> <message>
// ch recv <channel-path> [--all-new] [--min-count N]
//
// Connects to ~/.consciousness/channels/<top>.sock and speaks the
// channel.capnp protocol to the appropriate daemon.
use std::path::PathBuf;
use std::process::ExitCode;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use consciousness::channel_capnp::channel_server;
fn channels_dir() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/channels")
}
fn sock_for(channel: &str) -> PathBuf {
let top = channel.split('.').next().unwrap_or(channel);
channels_dir().join(format!("{top}.sock"))
}
async fn connect(sock: &std::path::Path) -> Result<channel_server::Client, String> {
let stream = tokio::net::UnixStream::connect(sock).await
.map_err(|e| format!("connect {}: {e}", sock.display()))?;
let (reader, writer) = stream.compat().split();
let network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc = RpcSystem::new(network, None);
let client: channel_server::Client = rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc);
Ok(client)
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("usage: {} <send|recv> <channel> [args...]", args[0]);
return ExitCode::from(2);
}
let cmd = args[1].clone();
let local = tokio::task::LocalSet::new();
let result: Result<(), String> = local.run_until(async move {
match cmd.as_str() {
"send" => {
if args.len() < 4 {
return Err("usage: ch send <channel> <message...>".into());
}
let channel = &args[2];
let message = args[3..].join(" ");
let sock = sock_for(channel);
let client = connect(&sock).await?;
let mut req = client.send_request();
req.get().set_channel(channel);
req.get().set_message(&message);
req.send().promise.await.map_err(|e| format!("send: {e}"))?;
println!("sent to {channel}");
Ok(())
}
"recv" => {
if args.len() < 3 {
return Err("usage: ch recv <channel> [--all-new] [--min-count N]".into());
}
let channel = &args[2];
let mut all_new = false;
let mut min_count: u32 = 20;
let mut i = 3;
while i < args.len() {
match args[i].as_str() {
"--all-new" => { all_new = true; i += 1; }
"--min-count" => {
min_count = args.get(i+1)
.ok_or("--min-count needs an argument")?
.parse().map_err(|e| format!("--min-count: {e}"))?;
i += 2;
}
other => return Err(format!("unknown arg: {other}")),
}
}
let sock = sock_for(channel);
let client = connect(&sock).await?;
let mut req = client.recv_request();
req.get().set_channel(channel);
req.get().set_all_new(all_new);
req.get().set_min_count(min_count);
let reply = req.send().promise.await.map_err(|e| format!("recv: {e}"))?;
let text = reply.get().map_err(|e| e.to_string())?
.get_text().map_err(|e| e.to_string())?
.to_str().map_err(|e| e.to_string())?;
print!("{text}");
if !text.ends_with('\n') { println!(); }
Ok(())
}
other => Err(format!("unknown command: {other} (use send|recv)")),
}
}).await;
match result {
Ok(()) => ExitCode::SUCCESS,
Err(e) => { eprintln!("error: {e}"); ExitCode::from(1) }
}
}