Upgrade capnp 0.20 → 0.25, capnp-rpc 0.20 → 0.25

RPC trait methods changed from &mut self to self: Rc<Self> and
return types from Promise<(), Error> to impl Future<Output = Result<...>>.

Updated all Server impls across 6 files: DaemonImpl (rpc.rs),
NotifyForwarder (channels.rs), and ChannelServerImpl in all channel
crates (irc, telegram, tmux, socat). Local pry! macro replaces
capnp_rpc::pry to match the new impl Future return type.

Warning-clean workspace build.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-07 12:02:59 -04:00
parent 382ebc95aa
commit a421c3c9f3
12 changed files with 221 additions and 192 deletions

20
Cargo.lock generated
View file

@ -271,18 +271,18 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]] [[package]]
name = "capnp" name = "capnp"
version = "0.20.6" version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053b81915c2ce1629b8fb964f578b18cb39b23ef9d5b24120d0dfc959569a1d9" checksum = "3d1c82ec25a9501d60e22eef4be1b2c271769b5a96e224d0875baef28529cf30"
dependencies = [ dependencies = [
"embedded-io", "embedded-io",
] ]
[[package]] [[package]]
name = "capnp-futures" name = "capnp-futures"
version = "0.20.1" version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b70b0d44372d42654e3efac38c1643c7b0f9d3a9e9b72b635f942ff3f17e891" checksum = "73b69dfddccc57844f9a90f9d72b44b97c326914851ea94fb7da40ef9cad6e8d"
dependencies = [ dependencies = [
"capnp", "capnp",
"futures-channel", "futures-channel",
@ -291,9 +291,9 @@ dependencies = [
[[package]] [[package]]
name = "capnp-rpc" name = "capnp-rpc"
version = "0.20.3" version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5a945dd7eac211c30763aa1dbf86ed8e58129d01442d4d2d516facfdb859a1e" checksum = "07ccca6d26009f4d6c12b741994f33b421da613b5dcf461508e236b53ef862f1"
dependencies = [ dependencies = [
"capnp", "capnp",
"capnp-futures", "capnp-futures",
@ -302,9 +302,9 @@ dependencies = [
[[package]] [[package]]
name = "capnpc" name = "capnpc"
version = "0.20.1" version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aa3d5f01e69ed11656d2c7c47bf34327ea9bfb5c85c7de787fcd7b6c5e45b61" checksum = "fca02be865c8c5a78bfc24b9819006ab6b59bef238467203928e26459557af93"
dependencies = [ dependencies = [
"capnp", "capnp",
] ]
@ -770,9 +770,9 @@ checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]] [[package]]
name = "embedded-io" name = "embedded-io"
version = "0.6.1" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" checksum = "9eb1aa714776b75c7e67e1da744b81a129b3ff919c8712b5e1b32252c1f07cc7"
[[package]] [[package]]
name = "env_filter" name = "env_filter"

View file

@ -19,7 +19,7 @@ version.workspace = true
edition.workspace = true edition.workspace = true
[dependencies] [dependencies]
capnp = "0.20" capnp = "0.25"
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
@ -52,13 +52,13 @@ log = "0.4"
ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] } ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] }
crossterm = { version = "0.29", features = ["event-stream"] } crossterm = { version = "0.29", features = ["event-stream"] }
skillratings = "0.28" skillratings = "0.28"
capnp-rpc = "0.20" capnp-rpc = "0.25"
tokio-util = { version = "0.7", features = ["compat"] } tokio-util = { version = "0.7", features = ["compat"] }
env_logger = "0.11" env_logger = "0.11"
tokio-scoped = "0.2.0" tokio-scoped = "0.2.0"
[build-dependencies] [build-dependencies]
capnpc = "0.20" capnpc = "0.25"
[lib] [lib]
name = "poc_memory" name = "poc_memory"

View file

@ -4,8 +4,8 @@ version.workspace = true
edition.workspace = true edition.workspace = true
[dependencies] [dependencies]
capnp = "0.20" capnp = "0.25"
capnp-rpc = "0.20" capnp-rpc = "0.25"
dirs = "6" dirs = "6"
futures = "0.3" futures = "0.3"
json5 = "1.3" json5 = "1.3"

View file

@ -12,14 +12,12 @@
// Socket: ~/.consciousness/channels/irc.sock // Socket: ~/.consciousness/channels/irc.sock
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque;
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use capnp::capability::Promise; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -481,12 +479,21 @@ struct ChannelServerImpl {
state: SharedState, state: SharedState,
} }
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl { impl channel_server::Server for ChannelServerImpl {
fn recv( fn recv(
&mut self, self: Rc<Self>,
params: channel_server::RecvParams, params: channel_server::RecvParams,
mut results: channel_server::RecvResults, mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new(); let all_new = params.get_all_new();
@ -501,25 +508,25 @@ impl channel_server::Server for ChannelServerImpl {
}; };
results.get().set_text(&text); results.get().set_text(&text);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn send( fn send(
&mut self, self: Rc<Self>,
params: channel_server::SendParams, params: channel_server::SendParams,
_results: channel_server::SendResults, _results: channel_server::SendResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let state = self.state.clone();
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); async move {
let message = pry!(pry!(params.get_message()).to_str()).to_string(); let params = params.get()?;
let channel = params.get_channel()?.to_str()?.to_string();
let message = params.get_message()?.to_str()?.to_string();
// Parse channel path to IRC target: // Parse channel path to IRC target:
// irc.#bcachefs -> #bcachefs // irc.#bcachefs -> #bcachefs
// irc.pm.nick -> nick (PRIVMSG) // irc.pm.nick -> nick (PRIVMSG)
let target = channel_to_target(&channel); let target = channel_to_target(&channel);
let state = self.state.clone();
Promise::from_future(async move {
{ {
let mut s = state.borrow_mut(); let mut s = state.borrow_mut();
s.send_privmsg(&target, &message).await s.send_privmsg(&target, &message).await
@ -540,25 +547,25 @@ impl channel_server::Server for ChannelServerImpl {
.push_own(log_line); .push_own(log_line);
Ok(()) Ok(())
}) }
} }
fn subscribe( fn subscribe(
&mut self, self: Rc<Self>,
params: channel_server::SubscribeParams, params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults, _results: channel_server::SubscribeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let callback = pry!(pry!(params.get()).get_callback()); let callback = pry!(pry!(params.get()).get_callback());
self.state.borrow_mut().subscribers.push(callback); self.state.borrow_mut().subscribers.push(callback);
info!("client subscribed for notifications"); info!("client subscribed for notifications");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn list( fn list(
&mut self, self: Rc<Self>,
_params: channel_server::ListParams, _params: channel_server::ListParams,
mut results: channel_server::ListResults, mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let connected = s.connected; let connected = s.connected;
@ -574,7 +581,7 @@ impl channel_server::Server for ChannelServerImpl {
); );
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
} }

View file

@ -4,8 +4,8 @@ version.workspace = true
edition.workspace = true edition.workspace = true
[dependencies] [dependencies]
capnp = "0.20" capnp = "0.25"
capnp-rpc = "0.20" capnp-rpc = "0.25"
dirs = "6" dirs = "6"
futures = "0.3" futures = "0.3"
poc-memory = { path = "../.." } poc-memory = { path = "../.." }

View file

@ -11,8 +11,7 @@ use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::rc::Rc; use std::rc::Rc;
use capnp::capability::Promise; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::net::{TcpStream, UnixListener, UnixStream}; use tokio::net::{TcpStream, UnixListener, UnixStream};
@ -163,10 +162,19 @@ async fn connect_outbound(state: SharedState, label: String, addr: String) -> Re
struct ChannelServerImpl { state: SharedState } struct ChannelServerImpl { state: SharedState }
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl { impl channel_server::Server for ChannelServerImpl {
fn recv( fn recv(
&mut self, params: channel_server::RecvParams, mut results: channel_server::RecvResults, self: Rc<Self>, params: channel_server::RecvParams, mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new(); let all_new = params.get_all_new();
@ -178,12 +186,12 @@ impl channel_server::Server for ChannelServerImpl {
.unwrap_or_default(); .unwrap_or_default();
results.get().set_text(&text); results.get().set_text(&text);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn send( fn send(
&mut self, params: channel_server::SendParams, _results: channel_server::SendResults, self: Rc<Self>, params: channel_server::SendParams, _results: channel_server::SendResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string();
@ -195,12 +203,12 @@ impl channel_server::Server for ChannelServerImpl {
} }
ch.log.push_own(format!("> {}", message)); ch.log.push_own(format!("> {}", message));
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn list( fn list(
&mut self, _params: channel_server::ListParams, mut results: channel_server::ListResults, self: Rc<Self>, _params: channel_server::ListParams, mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let channels: Vec<_> = s.channels.iter() let channels: Vec<_> = s.channels.iter()
.map(|(name, ch)| (name.clone(), ch.writer.is_some(), ch.log.unread())) .map(|(name, ch)| (name.clone(), ch.writer.is_some(), ch.log.unread()))
@ -213,33 +221,33 @@ impl channel_server::Server for ChannelServerImpl {
entry.set_connected(*connected); entry.set_connected(*connected);
entry.set_unread(*unread as u32); entry.set_unread(*unread as u32);
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn subscribe( fn subscribe(
&mut self, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, self: Rc<Self>, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let callback = pry!(pry!(params.get()).get_callback()); let callback = pry!(pry!(params.get()).get_callback());
self.state.borrow_mut().subscribers.push(callback); self.state.borrow_mut().subscribers.push(callback);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn open( fn open(
&mut self, params: channel_server::OpenParams, _results: channel_server::OpenResults, self: Rc<Self>, params: channel_server::OpenParams, _results: channel_server::OpenResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let label = pry!(pry!(params.get_label()).to_str()).to_string();
let state = self.state.clone(); let state = self.state.clone();
Promise::from_future(async move { async move {
let params = params.get()?;
let label = params.get_label()?.to_str()?.to_string();
connect_outbound(state, label.clone(), label).await connect_outbound(state, label.clone(), label).await
.map_err(|e| capnp::Error::failed(e)) .map_err(|e| capnp::Error::failed(e))
}) }
} }
fn close( fn close(
&mut self, params: channel_server::CloseParams, _results: channel_server::CloseResults, self: Rc<Self>, params: channel_server::CloseParams, _results: channel_server::CloseResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
@ -248,7 +256,7 @@ impl channel_server::Server for ChannelServerImpl {
info!("closing {}", channel); info!("closing {}", channel);
ch.writer = None; ch.writer = None;
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
} }

View file

@ -4,8 +4,8 @@ version.workspace = true
edition.workspace = true edition.workspace = true
[dependencies] [dependencies]
capnp = "0.20" capnp = "0.25"
capnp-rpc = "0.20" capnp-rpc = "0.25"
dirs = "6" dirs = "6"
futures = "0.3" futures = "0.3"
poc-memory = { path = "../.." } poc-memory = { path = "../.." }

View file

@ -8,16 +8,14 @@
// don't kill the Telegram connection. // don't kill the Telegram connection.
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque;
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc; use std::rc::Rc;
use capnp::capability::Promise; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio_util::compat::TokioAsyncReadCompatExt; use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, warn, error}; use log::{info, error};
use poc_memory::channel_capnp::{channel_client, channel_server}; use poc_memory::channel_capnp::{channel_client, channel_server};
@ -228,12 +226,21 @@ struct ChannelServerImpl {
state: SharedState, state: SharedState,
} }
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl { impl channel_server::Server for ChannelServerImpl {
fn recv( fn recv(
&mut self, self: Rc<Self>,
params: channel_server::RecvParams, params: channel_server::RecvParams,
mut results: channel_server::RecvResults, mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new(); let all_new = params.get_all_new();
@ -248,20 +255,20 @@ impl channel_server::Server for ChannelServerImpl {
}; };
results.get().set_text(&text); results.get().set_text(&text);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn send( fn send(
&mut self, self: Rc<Self>,
params: channel_server::SendParams, params: channel_server::SendParams,
_results: channel_server::SendResults, _results: channel_server::SendResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string();
let state = self.state.clone(); let state = self.state.clone();
Promise::from_future(async move { async move {
let params = params.get()?;
let _channel = params.get_channel()?.to_str()?.to_string();
let message = params.get_message()?.to_str()?.to_string();
let (url, client, chat_id) = { let (url, client, chat_id) = {
let s = state.borrow(); let s = state.borrow();
(s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id)
@ -280,25 +287,25 @@ impl channel_server::Server for ChannelServerImpl {
.push_own(format!("[agent] {}", message)); .push_own(format!("[agent] {}", message));
} }
Ok(()) Ok(())
}) }
} }
fn subscribe( fn subscribe(
&mut self, self: Rc<Self>,
params: channel_server::SubscribeParams, params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults, _results: channel_server::SubscribeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let callback = pry!(pry!(params.get()).get_callback()); let callback = pry!(pry!(params.get()).get_callback());
self.state.borrow_mut().subscribers.push(callback); self.state.borrow_mut().subscribers.push(callback);
info!("client subscribed for notifications"); info!("client subscribed for notifications");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn list( fn list(
&mut self, self: Rc<Self>,
_params: channel_server::ListParams, _params: channel_server::ListParams,
mut results: channel_server::ListResults, mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let connected = s.connected; let connected = s.connected;
@ -313,7 +320,7 @@ impl channel_server::Server for ChannelServerImpl {
); );
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
} }

View file

@ -4,8 +4,8 @@ version.workspace = true
edition.workspace = true edition.workspace = true
[dependencies] [dependencies]
capnp = "0.20" capnp = "0.25"
capnp-rpc = "0.20" capnp-rpc = "0.25"
dirs = "6" dirs = "6"
libc = "0.2" libc = "0.2"
scopeguard = "1" scopeguard = "1"

View file

@ -12,8 +12,7 @@ use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::rc::Rc; use std::rc::Rc;
use capnp::capability::Promise; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -145,12 +144,21 @@ struct ChannelServerImpl {
state: SharedState, state: SharedState,
} }
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl { impl channel_server::Server for ChannelServerImpl {
fn recv( fn recv(
&mut self, self: Rc<Self>,
params: channel_server::RecvParams, params: channel_server::RecvParams,
mut results: channel_server::RecvResults, mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new(); let all_new = params.get_all_new();
@ -165,14 +173,14 @@ impl channel_server::Server for ChannelServerImpl {
}; };
results.get().set_text(&text); results.get().set_text(&text);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn send( fn send(
&mut self, self: Rc<Self>,
params: channel_server::SendParams, params: channel_server::SendParams,
_results: channel_server::SendResults, _results: channel_server::SendResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string();
@ -193,14 +201,14 @@ impl channel_server::Server for ChannelServerImpl {
log.push_own(format!("> {}", message)); log.push_own(format!("> {}", message));
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn list( fn list(
&mut self, self: Rc<Self>,
_params: channel_server::ListParams, _params: channel_server::ListParams,
mut results: channel_server::ListResults, mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let channels: Vec<_> = s.panes.keys().map(|label| { let channels: Vec<_> = s.panes.keys().map(|label| {
let key = format!("tmux.{}", label); let key = format!("tmux.{}", label);
@ -215,22 +223,22 @@ impl channel_server::Server for ChannelServerImpl {
entry.set_connected(*connected); entry.set_connected(*connected);
entry.set_unread(*unread as u32); entry.set_unread(*unread as u32);
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn subscribe( fn subscribe(
&mut self, self: Rc<Self>,
_params: channel_server::SubscribeParams, _params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults, _results: channel_server::SubscribeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
Promise::ok(()) std::future::ready(Ok(()))
} }
fn open( fn open(
&mut self, self: Rc<Self>,
params: channel_server::OpenParams, params: channel_server::OpenParams,
_results: channel_server::OpenResults, _results: channel_server::OpenResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let label = pry!(pry!(params.get_label()).to_str()).to_string(); let label = pry!(pry!(params.get_label()).to_str()).to_string();
@ -238,15 +246,15 @@ impl channel_server::Server for ChannelServerImpl {
{ {
let s = self.state.borrow(); let s = self.state.borrow();
if s.panes.contains_key(&label) { if s.panes.contains_key(&label) {
return Promise::ok(()); return std::future::ready(Ok(()));
} }
} }
// Find the tmux pane by name (window or pane title) // Find the tmux pane by name (window or pane title)
let pane_id = match find_pane_by_name(&label) { let pane_id = match find_pane_by_name(&label) {
Some(id) => id, Some(id) => id,
None => return Promise::err(capnp::Error::failed( None => return std::future::ready(Err(capnp::Error::failed(
format!("no tmux pane named '{}'", label))), format!("no tmux pane named '{}'", label)))),
}; };
info!("opening channel tmux.{} (pane {})", label, pane_id); info!("opening channel tmux.{} (pane {})", label, pane_id);
@ -264,14 +272,14 @@ impl channel_server::Server for ChannelServerImpl {
pipe_pane_reader(reader_state, pane).await; pipe_pane_reader(reader_state, pane).await;
}); });
Promise::ok(()) std::future::ready(Ok(()))
} }
fn close( fn close(
&mut self, self: Rc<Self>,
params: channel_server::CloseParams, params: channel_server::CloseParams,
_results: channel_server::CloseResults, _results: channel_server::CloseResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string(); let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
@ -287,7 +295,7 @@ impl channel_server::Server for ChannelServerImpl {
.output(); .output();
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
} }

View file

@ -7,7 +7,6 @@
use super::idle; use super::idle;
use crate::thalamus::{daemon_capnp, notify}; use crate::thalamus::{daemon_capnp, notify};
use daemon_capnp::daemon; use daemon_capnp::daemon;
use capnp::capability::Promise;
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
use log::info; use log::info;
@ -24,112 +23,112 @@ impl DaemonImpl {
impl daemon::Server for DaemonImpl { impl daemon::Server for DaemonImpl {
fn user( fn user(
&mut self, self: Rc<Self>,
params: daemon::UserParams, params: daemon::UserParams,
_results: daemon::UserResults, _results: daemon::UserResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string();
self.state.borrow_mut().handle_user(&pane); self.state.borrow_mut().handle_user(&pane);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn response( fn response(
&mut self, self: Rc<Self>,
params: daemon::ResponseParams, params: daemon::ResponseParams,
_results: daemon::ResponseResults, _results: daemon::ResponseResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string();
self.state.borrow_mut().handle_response(&pane); self.state.borrow_mut().handle_response(&pane);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn sleep( fn sleep(
&mut self, self: Rc<Self>,
params: daemon::SleepParams, params: daemon::SleepParams,
_results: daemon::SleepResults, _results: daemon::SleepResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let until = pry!(params.get()).get_until(); let until = pry!(params.get()).get_until();
self.state.borrow_mut().handle_sleep(until); self.state.borrow_mut().handle_sleep(until);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn wake( fn wake(
&mut self, self: Rc<Self>,
_params: daemon::WakeParams, _params: daemon::WakeParams,
_results: daemon::WakeResults, _results: daemon::WakeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow_mut().handle_wake(); self.state.borrow_mut().handle_wake();
Promise::ok(()) std::future::ready(Ok(()))
} }
fn quiet( fn quiet(
&mut self, self: Rc<Self>,
params: daemon::QuietParams, params: daemon::QuietParams,
_results: daemon::QuietResults, _results: daemon::QuietResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let secs = pry!(params.get()).get_seconds(); let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_quiet(secs); self.state.borrow_mut().handle_quiet(secs);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn consolidating( fn consolidating(
&mut self, self: Rc<Self>,
_params: daemon::ConsolidatingParams, _params: daemon::ConsolidatingParams,
_results: daemon::ConsolidatingResults, _results: daemon::ConsolidatingResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow_mut().consolidating = true; self.state.borrow_mut().consolidating = true;
info!("consolidation started"); info!("consolidation started");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn consolidated( fn consolidated(
&mut self, self: Rc<Self>,
_params: daemon::ConsolidatedParams, _params: daemon::ConsolidatedParams,
_results: daemon::ConsolidatedResults, _results: daemon::ConsolidatedResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow_mut().consolidating = false; self.state.borrow_mut().consolidating = false;
info!("consolidation ended"); info!("consolidation ended");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn dream_start( fn dream_start(
&mut self, self: Rc<Self>,
_params: daemon::DreamStartParams, _params: daemon::DreamStartParams,
_results: daemon::DreamStartResults, _results: daemon::DreamStartResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let mut s = self.state.borrow_mut(); let mut s = self.state.borrow_mut();
s.dreaming = true; s.dreaming = true;
s.dream_start = crate::thalamus::now(); s.dream_start = crate::thalamus::now();
info!("dream started"); info!("dream started");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn dream_end( fn dream_end(
&mut self, self: Rc<Self>,
_params: daemon::DreamEndParams, _params: daemon::DreamEndParams,
_results: daemon::DreamEndResults, _results: daemon::DreamEndResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let mut s = self.state.borrow_mut(); let mut s = self.state.borrow_mut();
s.dreaming = false; s.dreaming = false;
s.dream_start = 0.0; s.dream_start = 0.0;
info!("dream ended"); info!("dream ended");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn afk( fn afk(
&mut self, self: Rc<Self>,
_params: daemon::AfkParams, _params: daemon::AfkParams,
_results: daemon::AfkResults, _results: daemon::AfkResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow_mut().handle_afk(); self.state.borrow_mut().handle_afk();
Promise::ok(()) std::future::ready(Ok(()))
} }
fn test_nudge( fn test_nudge(
&mut self, self: Rc<Self>,
_params: daemon::TestNudgeParams, _params: daemon::TestNudgeParams,
mut results: daemon::TestNudgeResults, mut results: daemon::TestNudgeResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let mut state = self.state.borrow_mut(); let mut state = self.state.borrow_mut();
let ctx = state.build_context(true); let ctx = state.build_context(true);
let extra = if ctx.is_empty() { let extra = if ctx.is_empty() {
@ -144,85 +143,85 @@ impl daemon::Server for DaemonImpl {
let ok = state.send(&msg); let ok = state.send(&msg);
results.get().set_sent(ok); results.get().set_sent(ok);
results.get().set_message(&msg); results.get().set_message(&msg);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn session_timeout( fn session_timeout(
&mut self, self: Rc<Self>,
params: daemon::SessionTimeoutParams, params: daemon::SessionTimeoutParams,
_results: daemon::SessionTimeoutResults, _results: daemon::SessionTimeoutResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let secs = pry!(params.get()).get_seconds(); let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_session_timeout(secs); self.state.borrow_mut().handle_session_timeout(secs);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn idle_timeout( fn idle_timeout(
&mut self, self: Rc<Self>,
params: daemon::IdleTimeoutParams, params: daemon::IdleTimeoutParams,
_results: daemon::IdleTimeoutResults, _results: daemon::IdleTimeoutResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let secs = pry!(params.get()).get_seconds(); let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_idle_timeout(secs); self.state.borrow_mut().handle_idle_timeout(secs);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn notify_timeout( fn notify_timeout(
&mut self, self: Rc<Self>,
params: daemon::NotifyTimeoutParams, params: daemon::NotifyTimeoutParams,
_results: daemon::NotifyTimeoutResults, _results: daemon::NotifyTimeoutResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let secs = pry!(params.get()).get_seconds(); let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_notify_timeout(secs); self.state.borrow_mut().handle_notify_timeout(secs);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn save( fn save(
&mut self, self: Rc<Self>,
_params: daemon::SaveParams, _params: daemon::SaveParams,
_results: daemon::SaveResults, _results: daemon::SaveResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow().save(); self.state.borrow().save();
info!("state saved"); info!("state saved");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn debug( fn debug(
&mut self, self: Rc<Self>,
_params: daemon::DebugParams, _params: daemon::DebugParams,
mut results: daemon::DebugResults, mut results: daemon::DebugResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let json = self.state.borrow().debug_json(); let json = self.state.borrow().debug_json();
results.get().set_json(&json); results.get().set_json(&json);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn ewma( fn ewma(
&mut self, self: Rc<Self>,
params: daemon::EwmaParams, params: daemon::EwmaParams,
mut results: daemon::EwmaResults, mut results: daemon::EwmaResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let value = pry!(params.get()).get_value(); let value = pry!(params.get()).get_value();
let current = self.state.borrow_mut().handle_ewma(value); let current = self.state.borrow_mut().handle_ewma(value);
results.get().set_current(current); results.get().set_current(current);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn stop( fn stop(
&mut self, self: Rc<Self>,
_params: daemon::StopParams, _params: daemon::StopParams,
_results: daemon::StopResults, _results: daemon::StopResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
self.state.borrow_mut().running = false; self.state.borrow_mut().running = false;
info!("stopping"); info!("stopping");
Promise::ok(()) std::future::ready(Ok(()))
} }
fn status( fn status(
&mut self, self: Rc<Self>,
_params: daemon::StatusParams, _params: daemon::StatusParams,
mut results: daemon::StatusResults, mut results: daemon::StatusResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let mut status = results.get().init_status(); let mut status = results.get().init_status();
@ -255,14 +254,14 @@ impl daemon::Server for DaemonImpl {
status.set_block_reason(s.block_reason()); status.set_block_reason(s.block_reason());
status.set_activity_ewma(s.activity_ewma); status.set_activity_ewma(s.activity_ewma);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn notify( fn notify(
&mut self, self: Rc<Self>,
params: daemon::NotifyParams, params: daemon::NotifyParams,
mut results: daemon::NotifyResults, mut results: daemon::NotifyResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let notif = pry!(params.get_notification()); let notif = pry!(params.get_notification());
let ntype = pry!(pry!(notif.get_type()).to_str()).to_string(); let ntype = pry!(pry!(notif.get_type()).to_str()).to_string();
@ -275,14 +274,14 @@ impl daemon::Server for DaemonImpl {
.notifications .notifications
.submit(ntype, urgency, message); .submit(ntype, urgency, message);
results.get().set_interrupt(interrupt); results.get().set_interrupt(interrupt);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn get_notifications( fn get_notifications(
&mut self, self: Rc<Self>,
params: daemon::GetNotificationsParams, params: daemon::GetNotificationsParams,
mut results: daemon::GetNotificationsResults, mut results: daemon::GetNotificationsResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let min_urgency = pry!(params.get()).get_min_urgency(); let min_urgency = pry!(params.get()).get_min_urgency();
let mut s = self.state.borrow_mut(); let mut s = self.state.borrow_mut();
@ -304,14 +303,14 @@ impl daemon::Server for DaemonImpl {
entry.set_timestamp(n.timestamp); entry.set_timestamp(n.timestamp);
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn get_types( fn get_types(
&mut self, self: Rc<Self>,
_params: daemon::GetTypesParams, _params: daemon::GetTypesParams,
mut results: daemon::GetTypesResults, mut results: daemon::GetTypesResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow(); let s = self.state.borrow();
let types = &s.notifications.types; let types = &s.notifications.types;
@ -325,14 +324,14 @@ impl daemon::Server for DaemonImpl {
entry.set_threshold(info.threshold.map_or(-1, |t| t as i8)); entry.set_threshold(info.threshold.map_or(-1, |t| t as i8));
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
fn set_threshold( fn set_threshold(
&mut self, self: Rc<Self>,
params: daemon::SetThresholdParams, params: daemon::SetThresholdParams,
_results: daemon::SetThresholdResults, _results: daemon::SetThresholdResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let ntype = pry!(pry!(params.get_type()).to_str()).to_string(); let ntype = pry!(pry!(params.get_type()).to_str()).to_string();
let level = params.get_level(); let level = params.get_level();
@ -341,14 +340,14 @@ impl daemon::Server for DaemonImpl {
.borrow_mut() .borrow_mut()
.notifications .notifications
.set_threshold(&ntype, level); .set_threshold(&ntype, level);
Promise::ok(()) std::future::ready(Ok(()))
} }
fn module_command( fn module_command(
&mut self, self: Rc<Self>,
params: daemon::ModuleCommandParams, params: daemon::ModuleCommandParams,
mut results: daemon::ModuleCommandResults, mut results: daemon::ModuleCommandResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get()); let params = pry!(params.get());
let module = pry!(pry!(params.get_module()).to_str()).to_string(); let module = pry!(pry!(params.get_module()).to_str()).to_string();
let _command = pry!(pry!(params.get_command()).to_str()).to_string(); let _command = pry!(pry!(params.get_command()).to_str()).to_string();
@ -364,7 +363,7 @@ impl daemon::Server for DaemonImpl {
results results
.get() .get()
.set_result(&format!("unknown module: {module}")); .set_result(&format!("unknown module: {module}"));
Promise::ok(()) std::future::ready(Ok(()))
} }
} }
} }
@ -375,7 +374,7 @@ macro_rules! pry {
($e:expr) => { ($e:expr) => {
match $e { match $e {
Ok(v) => v, Ok(v) => v,
Err(e) => return Promise::err(e.into()), Err(e) => return std::future::ready(Err(e.into())),
} }
}; };
} }

View file

@ -12,8 +12,8 @@ use tokio::net::UnixStream;
use tokio_util::compat::TokioAsyncReadCompatExt; use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, warn}; use log::{info, warn};
use std::rc::Rc;
use crate::channel_capnp::{channel_client, channel_server}; use crate::channel_capnp::{channel_client, channel_server};
use capnp::capability::Promise;
// ── Channel notifications ─────────────────────────────────────── // ── Channel notifications ───────────────────────────────────────
@ -33,10 +33,10 @@ struct NotifyForwarder {
impl channel_client::Server for NotifyForwarder { impl channel_client::Server for NotifyForwarder {
fn notify( fn notify(
&mut self, self: Rc<Self>,
params: channel_client::NotifyParams, params: channel_client::NotifyParams,
_results: channel_client::NotifyResults, _results: channel_client::NotifyResults,
) -> Promise<(), capnp::Error> { ) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
if let Ok(params) = params.get() { if let Ok(params) = params.get() {
if let Ok(notifications) = params.get_notifications() { if let Ok(notifications) = params.get_notifications() {
for n in notifications.iter() { for n in notifications.iter() {
@ -57,7 +57,7 @@ impl channel_client::Server for NotifyForwarder {
} }
} }
} }
Promise::ok(()) std::future::ready(Ok(()))
} }
} }