diff --git a/Cargo.lock b/Cargo.lock index 1bd4a39..3a67aa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -271,18 +271,18 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "capnp" -version = "0.20.6" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053b81915c2ce1629b8fb964f578b18cb39b23ef9d5b24120d0dfc959569a1d9" +checksum = "3d1c82ec25a9501d60e22eef4be1b2c271769b5a96e224d0875baef28529cf30" dependencies = [ "embedded-io", ] [[package]] name = "capnp-futures" -version = "0.20.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b70b0d44372d42654e3efac38c1643c7b0f9d3a9e9b72b635f942ff3f17e891" +checksum = "73b69dfddccc57844f9a90f9d72b44b97c326914851ea94fb7da40ef9cad6e8d" dependencies = [ "capnp", "futures-channel", @@ -291,9 +291,9 @@ dependencies = [ [[package]] name = "capnp-rpc" -version = "0.20.3" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5a945dd7eac211c30763aa1dbf86ed8e58129d01442d4d2d516facfdb859a1e" +checksum = "07ccca6d26009f4d6c12b741994f33b421da613b5dcf461508e236b53ef862f1" dependencies = [ "capnp", "capnp-futures", @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "capnpc" -version = "0.20.1" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aa3d5f01e69ed11656d2c7c47bf34327ea9bfb5c85c7de787fcd7b6c5e45b61" +checksum = "fca02be865c8c5a78bfc24b9819006ab6b59bef238467203928e26459557af93" dependencies = [ "capnp", ] @@ -770,9 +770,9 @@ checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" [[package]] name = "embedded-io" -version = "0.6.1" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" +checksum = "9eb1aa714776b75c7e67e1da744b81a129b3ff919c8712b5e1b32252c1f07cc7" [[package]] name = "env_filter" diff --git a/Cargo.toml b/Cargo.toml index e4aaca4..c831e17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ version.workspace = true edition.workspace = true [dependencies] -capnp = "0.20" +capnp = "0.25" uuid = { version = "1", features = ["v4"] } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -52,13 +52,13 @@ log = "0.4" ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] } crossterm = { version = "0.29", features = ["event-stream"] } skillratings = "0.28" -capnp-rpc = "0.20" +capnp-rpc = "0.25" tokio-util = { version = "0.7", features = ["compat"] } env_logger = "0.11" tokio-scoped = "0.2.0" [build-dependencies] -capnpc = "0.20" +capnpc = "0.25" [lib] name = "poc_memory" diff --git a/channels/irc/Cargo.toml b/channels/irc/Cargo.toml index 55cec1d..b59ab06 100644 --- a/channels/irc/Cargo.toml +++ b/channels/irc/Cargo.toml @@ -4,8 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] -capnp = "0.20" -capnp-rpc = "0.20" +capnp = "0.25" +capnp-rpc = "0.25" dirs = "6" futures = "0.3" json5 = "1.3" diff --git a/channels/irc/src/main.rs b/channels/irc/src/main.rs index 739113a..5ba44a4 100644 --- a/channels/irc/src/main.rs +++ b/channels/irc/src/main.rs @@ -12,14 +12,12 @@ // Socket: ~/.consciousness/channels/irc.sock use std::cell::RefCell; -use std::collections::VecDeque; use std::io; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; @@ -481,12 +479,21 @@ struct ChannelServerImpl { state: SharedState, } +macro_rules! pry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return std::future::ready(Err(e.into())), + } + }; +} + impl channel_server::Server for ChannelServerImpl { fn recv( - &mut self, + self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); @@ -501,25 +508,25 @@ impl channel_server::Server for ChannelServerImpl { }; results.get().set_text(&text); - Promise::ok(()) + std::future::ready(Ok(())) } fn send( - &mut self, + self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); - let message = pry!(pry!(params.get_message()).to_str()).to_string(); - - // Parse channel path to IRC target: - // irc.#bcachefs -> #bcachefs - // irc.pm.nick -> nick (PRIVMSG) - let target = channel_to_target(&channel); - + ) -> impl std::future::Future> { let state = self.state.clone(); - Promise::from_future(async move { + async move { + let params = params.get()?; + let channel = params.get_channel()?.to_str()?.to_string(); + let message = params.get_message()?.to_str()?.to_string(); + + // Parse channel path to IRC target: + // irc.#bcachefs -> #bcachefs + // irc.pm.nick -> nick (PRIVMSG) + let target = channel_to_target(&channel); + { let mut s = state.borrow_mut(); s.send_privmsg(&target, &message).await @@ -540,25 +547,25 @@ impl channel_server::Server for ChannelServerImpl { .push_own(log_line); Ok(()) - }) + } } fn subscribe( - &mut self, + self: Rc, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); - Promise::ok(()) + std::future::ready(Ok(())) } fn list( - &mut self, + self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let s = self.state.borrow(); let connected = s.connected; @@ -574,7 +581,7 @@ impl channel_server::Server for ChannelServerImpl { ); } - Promise::ok(()) + std::future::ready(Ok(())) } } diff --git a/channels/socat/Cargo.toml b/channels/socat/Cargo.toml index 0f57cb3..8c67129 100644 --- a/channels/socat/Cargo.toml +++ b/channels/socat/Cargo.toml @@ -4,8 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] -capnp = "0.20" -capnp-rpc = "0.20" +capnp = "0.25" +capnp-rpc = "0.25" dirs = "6" futures = "0.3" poc-memory = { path = "../.." } diff --git a/channels/socat/src/main.rs b/channels/socat/src/main.rs index 890f013..edb16eb 100644 --- a/channels/socat/src/main.rs +++ b/channels/socat/src/main.rs @@ -11,8 +11,7 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use tokio::net::{TcpStream, UnixListener, UnixStream}; @@ -163,10 +162,19 @@ async fn connect_outbound(state: SharedState, label: String, addr: String) -> Re struct ChannelServerImpl { state: SharedState } +macro_rules! pry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return std::future::ready(Err(e.into())), + } + }; +} + impl channel_server::Server for ChannelServerImpl { fn recv( - &mut self, params: channel_server::RecvParams, mut results: channel_server::RecvResults, - ) -> Promise<(), capnp::Error> { + self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); @@ -178,12 +186,12 @@ impl channel_server::Server for ChannelServerImpl { .unwrap_or_default(); results.get().set_text(&text); - Promise::ok(()) + std::future::ready(Ok(())) } fn send( - &mut self, params: channel_server::SendParams, _results: channel_server::SendResults, - ) -> Promise<(), capnp::Error> { + self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string(); @@ -195,12 +203,12 @@ impl channel_server::Server for ChannelServerImpl { } ch.log.push_own(format!("> {}", message)); } - Promise::ok(()) + std::future::ready(Ok(())) } fn list( - &mut self, _params: channel_server::ListParams, mut results: channel_server::ListResults, - ) -> Promise<(), capnp::Error> { + self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, + ) -> impl std::future::Future> { let s = self.state.borrow(); let channels: Vec<_> = s.channels.iter() .map(|(name, ch)| (name.clone(), ch.writer.is_some(), ch.log.unread())) @@ -213,33 +221,33 @@ impl channel_server::Server for ChannelServerImpl { entry.set_connected(*connected); entry.set_unread(*unread as u32); } - Promise::ok(()) + std::future::ready(Ok(())) } fn subscribe( - &mut self, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, - ) -> Promise<(), capnp::Error> { + self: Rc, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, + ) -> impl std::future::Future> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); - Promise::ok(()) + std::future::ready(Ok(())) } fn open( - &mut self, params: channel_server::OpenParams, _results: channel_server::OpenResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let label = pry!(pry!(params.get_label()).to_str()).to_string(); - + self: Rc, params: channel_server::OpenParams, _results: channel_server::OpenResults, + ) -> impl std::future::Future> { let state = self.state.clone(); - Promise::from_future(async move { + async move { + let params = params.get()?; + let label = params.get_label()?.to_str()?.to_string(); + connect_outbound(state, label.clone(), label).await .map_err(|e| capnp::Error::failed(e)) - }) + } } fn close( - &mut self, params: channel_server::CloseParams, _results: channel_server::CloseResults, - ) -> Promise<(), capnp::Error> { + self: Rc, params: channel_server::CloseParams, _results: channel_server::CloseResults, + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); @@ -248,7 +256,7 @@ impl channel_server::Server for ChannelServerImpl { info!("closing {}", channel); ch.writer = None; } - Promise::ok(()) + std::future::ready(Ok(())) } } diff --git a/channels/telegram/Cargo.toml b/channels/telegram/Cargo.toml index 6705da0..afc4bb0 100644 --- a/channels/telegram/Cargo.toml +++ b/channels/telegram/Cargo.toml @@ -4,8 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] -capnp = "0.20" -capnp-rpc = "0.20" +capnp = "0.25" +capnp-rpc = "0.25" dirs = "6" futures = "0.3" poc-memory = { path = "../.." } diff --git a/channels/telegram/src/main.rs b/channels/telegram/src/main.rs index 16c75e5..abe5598 100644 --- a/channels/telegram/src/main.rs +++ b/channels/telegram/src/main.rs @@ -8,16 +8,14 @@ // don't kill the Telegram connection. use std::cell::RefCell; -use std::collections::VecDeque; use std::path::PathBuf; use std::rc::Rc; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tokio_util::compat::TokioAsyncReadCompatExt; -use log::{info, warn, error}; +use log::{info, error}; use poc_memory::channel_capnp::{channel_client, channel_server}; @@ -228,12 +226,21 @@ struct ChannelServerImpl { state: SharedState, } +macro_rules! pry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return std::future::ready(Err(e.into())), + } + }; +} + impl channel_server::Server for ChannelServerImpl { fn recv( - &mut self, + self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); @@ -248,20 +255,20 @@ impl channel_server::Server for ChannelServerImpl { }; results.get().set_text(&text); - Promise::ok(()) + std::future::ready(Ok(())) } fn send( - &mut self, + self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); - let message = pry!(pry!(params.get_message()).to_str()).to_string(); - + ) -> impl std::future::Future> { let state = self.state.clone(); - Promise::from_future(async move { + async move { + let params = params.get()?; + let _channel = params.get_channel()?.to_str()?.to_string(); + let message = params.get_message()?.to_str()?.to_string(); + let (url, client, chat_id) = { let s = state.borrow(); (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) @@ -280,25 +287,25 @@ impl channel_server::Server for ChannelServerImpl { .push_own(format!("[agent] {}", message)); } Ok(()) - }) + } } fn subscribe( - &mut self, + self: Rc, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); - Promise::ok(()) + std::future::ready(Ok(())) } fn list( - &mut self, + self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let s = self.state.borrow(); let connected = s.connected; @@ -313,7 +320,7 @@ impl channel_server::Server for ChannelServerImpl { ); } - Promise::ok(()) + std::future::ready(Ok(())) } } diff --git a/channels/tmux/Cargo.toml b/channels/tmux/Cargo.toml index 7e7324b..da1b499 100644 --- a/channels/tmux/Cargo.toml +++ b/channels/tmux/Cargo.toml @@ -4,8 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] -capnp = "0.20" -capnp-rpc = "0.20" +capnp = "0.25" +capnp-rpc = "0.25" dirs = "6" libc = "0.2" scopeguard = "1" diff --git a/channels/tmux/src/main.rs b/channels/tmux/src/main.rs index 54facf1..7ff0ce4 100644 --- a/channels/tmux/src/main.rs +++ b/channels/tmux/src/main.rs @@ -12,8 +12,7 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::io::AsyncBufReadExt; use tokio::net::UnixListener; @@ -145,12 +144,21 @@ struct ChannelServerImpl { state: SharedState, } +macro_rules! pry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return std::future::ready(Err(e.into())), + } + }; +} + impl channel_server::Server for ChannelServerImpl { fn recv( - &mut self, + self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); @@ -165,14 +173,14 @@ impl channel_server::Server for ChannelServerImpl { }; results.get().set_text(&text); - Promise::ok(()) + std::future::ready(Ok(())) } fn send( - &mut self, + self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string(); @@ -193,14 +201,14 @@ impl channel_server::Server for ChannelServerImpl { log.push_own(format!("> {}", message)); } - Promise::ok(()) + std::future::ready(Ok(())) } fn list( - &mut self, + self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let s = self.state.borrow(); let channels: Vec<_> = s.panes.keys().map(|label| { let key = format!("tmux.{}", label); @@ -215,22 +223,22 @@ impl channel_server::Server for ChannelServerImpl { entry.set_connected(*connected); entry.set_unread(*unread as u32); } - Promise::ok(()) + std::future::ready(Ok(())) } fn subscribe( - &mut self, + self: Rc, _params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, - ) -> Promise<(), capnp::Error> { - Promise::ok(()) + ) -> impl std::future::Future> { + std::future::ready(Ok(())) } fn open( - &mut self, + self: Rc, params: channel_server::OpenParams, _results: channel_server::OpenResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let label = pry!(pry!(params.get_label()).to_str()).to_string(); @@ -238,15 +246,15 @@ impl channel_server::Server for ChannelServerImpl { { let s = self.state.borrow(); if s.panes.contains_key(&label) { - return Promise::ok(()); + return std::future::ready(Ok(())); } } // Find the tmux pane by name (window or pane title) let pane_id = match find_pane_by_name(&label) { Some(id) => id, - None => return Promise::err(capnp::Error::failed( - format!("no tmux pane named '{}'", label))), + None => return std::future::ready(Err(capnp::Error::failed( + format!("no tmux pane named '{}'", label)))), }; info!("opening channel tmux.{} (pane {})", label, pane_id); @@ -264,14 +272,14 @@ impl channel_server::Server for ChannelServerImpl { pipe_pane_reader(reader_state, pane).await; }); - Promise::ok(()) + std::future::ready(Ok(())) } fn close( - &mut self, + self: Rc, params: channel_server::CloseParams, _results: channel_server::CloseResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string(); @@ -287,7 +295,7 @@ impl channel_server::Server for ChannelServerImpl { .output(); } - Promise::ok(()) + std::future::ready(Ok(())) } } diff --git a/src/claude/rpc.rs b/src/claude/rpc.rs index 0264596..451a5ea 100644 --- a/src/claude/rpc.rs +++ b/src/claude/rpc.rs @@ -7,7 +7,6 @@ use super::idle; use crate::thalamus::{daemon_capnp, notify}; use daemon_capnp::daemon; -use capnp::capability::Promise; use std::cell::RefCell; use std::rc::Rc; use log::info; @@ -24,112 +23,112 @@ impl DaemonImpl { impl daemon::Server for DaemonImpl { fn user( - &mut self, + self: Rc, params: daemon::UserParams, _results: daemon::UserResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); self.state.borrow_mut().handle_user(&pane); - Promise::ok(()) + std::future::ready(Ok(())) } fn response( - &mut self, + self: Rc, params: daemon::ResponseParams, _results: daemon::ResponseResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); self.state.borrow_mut().handle_response(&pane); - Promise::ok(()) + std::future::ready(Ok(())) } fn sleep( - &mut self, + self: Rc, params: daemon::SleepParams, _results: daemon::SleepResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let until = pry!(params.get()).get_until(); self.state.borrow_mut().handle_sleep(until); - Promise::ok(()) + std::future::ready(Ok(())) } fn wake( - &mut self, + self: Rc, _params: daemon::WakeParams, _results: daemon::WakeResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow_mut().handle_wake(); - Promise::ok(()) + std::future::ready(Ok(())) } fn quiet( - &mut self, + self: Rc, params: daemon::QuietParams, _results: daemon::QuietResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let secs = pry!(params.get()).get_seconds(); self.state.borrow_mut().handle_quiet(secs); - Promise::ok(()) + std::future::ready(Ok(())) } fn consolidating( - &mut self, + self: Rc, _params: daemon::ConsolidatingParams, _results: daemon::ConsolidatingResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow_mut().consolidating = true; info!("consolidation started"); - Promise::ok(()) + std::future::ready(Ok(())) } fn consolidated( - &mut self, + self: Rc, _params: daemon::ConsolidatedParams, _results: daemon::ConsolidatedResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow_mut().consolidating = false; info!("consolidation ended"); - Promise::ok(()) + std::future::ready(Ok(())) } fn dream_start( - &mut self, + self: Rc, _params: daemon::DreamStartParams, _results: daemon::DreamStartResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let mut s = self.state.borrow_mut(); s.dreaming = true; s.dream_start = crate::thalamus::now(); info!("dream started"); - Promise::ok(()) + std::future::ready(Ok(())) } fn dream_end( - &mut self, + self: Rc, _params: daemon::DreamEndParams, _results: daemon::DreamEndResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let mut s = self.state.borrow_mut(); s.dreaming = false; s.dream_start = 0.0; info!("dream ended"); - Promise::ok(()) + std::future::ready(Ok(())) } fn afk( - &mut self, + self: Rc, _params: daemon::AfkParams, _results: daemon::AfkResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow_mut().handle_afk(); - Promise::ok(()) + std::future::ready(Ok(())) } fn test_nudge( - &mut self, + self: Rc, _params: daemon::TestNudgeParams, mut results: daemon::TestNudgeResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let mut state = self.state.borrow_mut(); let ctx = state.build_context(true); let extra = if ctx.is_empty() { @@ -144,85 +143,85 @@ impl daemon::Server for DaemonImpl { let ok = state.send(&msg); results.get().set_sent(ok); results.get().set_message(&msg); - Promise::ok(()) + std::future::ready(Ok(())) } fn session_timeout( - &mut self, + self: Rc, params: daemon::SessionTimeoutParams, _results: daemon::SessionTimeoutResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let secs = pry!(params.get()).get_seconds(); self.state.borrow_mut().handle_session_timeout(secs); - Promise::ok(()) + std::future::ready(Ok(())) } fn idle_timeout( - &mut self, + self: Rc, params: daemon::IdleTimeoutParams, _results: daemon::IdleTimeoutResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let secs = pry!(params.get()).get_seconds(); self.state.borrow_mut().handle_idle_timeout(secs); - Promise::ok(()) + std::future::ready(Ok(())) } fn notify_timeout( - &mut self, + self: Rc, params: daemon::NotifyTimeoutParams, _results: daemon::NotifyTimeoutResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let secs = pry!(params.get()).get_seconds(); self.state.borrow_mut().handle_notify_timeout(secs); - Promise::ok(()) + std::future::ready(Ok(())) } fn save( - &mut self, + self: Rc, _params: daemon::SaveParams, _results: daemon::SaveResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow().save(); info!("state saved"); - Promise::ok(()) + std::future::ready(Ok(())) } fn debug( - &mut self, + self: Rc, _params: daemon::DebugParams, mut results: daemon::DebugResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let json = self.state.borrow().debug_json(); results.get().set_json(&json); - Promise::ok(()) + std::future::ready(Ok(())) } fn ewma( - &mut self, + self: Rc, params: daemon::EwmaParams, mut results: daemon::EwmaResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let value = pry!(params.get()).get_value(); let current = self.state.borrow_mut().handle_ewma(value); results.get().set_current(current); - Promise::ok(()) + std::future::ready(Ok(())) } fn stop( - &mut self, + self: Rc, _params: daemon::StopParams, _results: daemon::StopResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { self.state.borrow_mut().running = false; info!("stopping"); - Promise::ok(()) + std::future::ready(Ok(())) } fn status( - &mut self, + self: Rc, _params: daemon::StatusParams, mut results: daemon::StatusResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let s = self.state.borrow(); let mut status = results.get().init_status(); @@ -255,14 +254,14 @@ impl daemon::Server for DaemonImpl { status.set_block_reason(s.block_reason()); status.set_activity_ewma(s.activity_ewma); - Promise::ok(()) + std::future::ready(Ok(())) } fn notify( - &mut self, + self: Rc, params: daemon::NotifyParams, mut results: daemon::NotifyResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let notif = pry!(params.get_notification()); let ntype = pry!(pry!(notif.get_type()).to_str()).to_string(); @@ -275,14 +274,14 @@ impl daemon::Server for DaemonImpl { .notifications .submit(ntype, urgency, message); results.get().set_interrupt(interrupt); - Promise::ok(()) + std::future::ready(Ok(())) } fn get_notifications( - &mut self, + self: Rc, params: daemon::GetNotificationsParams, mut results: daemon::GetNotificationsResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let min_urgency = pry!(params.get()).get_min_urgency(); let mut s = self.state.borrow_mut(); @@ -304,14 +303,14 @@ impl daemon::Server for DaemonImpl { entry.set_timestamp(n.timestamp); } - Promise::ok(()) + std::future::ready(Ok(())) } fn get_types( - &mut self, + self: Rc, _params: daemon::GetTypesParams, mut results: daemon::GetTypesResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let s = self.state.borrow(); let types = &s.notifications.types; @@ -325,14 +324,14 @@ impl daemon::Server for DaemonImpl { entry.set_threshold(info.threshold.map_or(-1, |t| t as i8)); } - Promise::ok(()) + std::future::ready(Ok(())) } fn set_threshold( - &mut self, + self: Rc, params: daemon::SetThresholdParams, _results: daemon::SetThresholdResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let ntype = pry!(pry!(params.get_type()).to_str()).to_string(); let level = params.get_level(); @@ -341,14 +340,14 @@ impl daemon::Server for DaemonImpl { .borrow_mut() .notifications .set_threshold(&ntype, level); - Promise::ok(()) + std::future::ready(Ok(())) } fn module_command( - &mut self, + self: Rc, params: daemon::ModuleCommandParams, mut results: daemon::ModuleCommandResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { let params = pry!(params.get()); let module = pry!(pry!(params.get_module()).to_str()).to_string(); let _command = pry!(pry!(params.get_command()).to_str()).to_string(); @@ -364,7 +363,7 @@ impl daemon::Server for DaemonImpl { results .get() .set_result(&format!("unknown module: {module}")); - Promise::ok(()) + std::future::ready(Ok(())) } } } @@ -375,7 +374,7 @@ macro_rules! pry { ($e:expr) => { match $e { Ok(v) => v, - Err(e) => return Promise::err(e.into()), + Err(e) => return std::future::ready(Err(e.into())), } }; } diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 22d299a..3bdbac6 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -12,8 +12,8 @@ use tokio::net::UnixStream; use tokio_util::compat::TokioAsyncReadCompatExt; use log::{info, warn}; +use std::rc::Rc; use crate::channel_capnp::{channel_client, channel_server}; -use capnp::capability::Promise; // ── Channel notifications ─────────────────────────────────────── @@ -33,10 +33,10 @@ struct NotifyForwarder { impl channel_client::Server for NotifyForwarder { fn notify( - &mut self, + self: Rc, params: channel_client::NotifyParams, _results: channel_client::NotifyResults, - ) -> Promise<(), capnp::Error> { + ) -> impl std::future::Future> { if let Ok(params) = params.get() { if let Ok(notifications) = params.get_notifications() { for n in notifications.iter() { @@ -57,7 +57,7 @@ impl channel_client::Server for NotifyForwarder { } } } - Promise::ok(()) + std::future::ready(Ok(())) } }