diff --git a/src/bin/channel-test.rs b/src/bin/channel-test.rs new file mode 100644 index 0000000..0b26af3 --- /dev/null +++ b/src/bin/channel-test.rs @@ -0,0 +1,80 @@ +// channel-test — quick RPC test tool for channel daemons +// +// Usage: channel-test [list|recv|send ] + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio::net::UnixStream; +use tokio_util::compat::TokioAsyncReadCompatExt; + +use poc_memory::channel_capnp::channel_server; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("usage: channel-test [list|recv |send ]"); + std::process::exit(1); + } + + let sock_path = args[1].clone(); + let cmd = args.get(2).cloned().unwrap_or_else(|| "list".to_string()); + let arg3 = args.get(3).cloned().unwrap_or_default(); + let arg_rest: String = args.iter().skip(4).cloned().collect::>().join(" "); + + tokio::task::LocalSet::new() + .run_until(async move { + let stream = UnixStream::connect(&sock_path).await?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + tokio::task::spawn_local(rpc_system); + + match cmd.as_str() { + "list" => { + let reply = client.list_request().send().promise.await?; + let channels = reply.get()?.get_channels()?; + println!("{} channels:", channels.len()); + for ch in channels.iter() { + let name = ch.get_name()?.to_str()?; + let connected = ch.get_connected(); + let unread = ch.get_unread(); + println!(" {} connected={} unread={}", name, connected, unread); + } + } + "recv" => { + let mut req = client.recv_request(); + req.get().set_channel(&arg3); + req.get().set_all_new(true); + req.get().set_min_count(20); + let reply = req.send().promise.await?; + let text = reply.get()?.get_text()?.to_str()?; + if text.is_empty() { + println!("(no messages)"); + } else { + println!("{}", text); + } + } + "send" => { + let mut req = client.send_request(); + req.get().set_channel(&arg3); + req.get().set_message(&arg_rest); + req.send().promise.await?; + println!("sent to {}", arg3); + } + _ => { + eprintln!("unknown command: {}", cmd); + } + } + + Ok::<(), Box>(()) + }) + .await +} diff --git a/src/bin/consciousness.rs b/src/bin/consciousness.rs index d014b1c..2fde2c7 100644 --- a/src/bin/consciousness.rs +++ b/src/bin/consciousness.rs @@ -814,6 +814,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> { let mut idle_state = poc_memory::thalamus::idle::State::new(); idle_state.load(); + // Channel status fetcher — async results sent back via mpsc + let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::>(4); + // Kick off initial fetch + { + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = poc_memory::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } + // Create UI channel let (ui_tx, mut ui_rx) = ui_channel::channel(); @@ -940,6 +951,14 @@ async fn run(cli: cli::CliArgs) -> Result<()> { } app.handle_key(key); idle_state.user_activity(); + // Trigger async channel refresh on F5 + if app.screen == tui::Screen::Thalamus { + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = poc_memory::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } dirty = true; } Some(Ok(Event::Mouse(mouse))) => { @@ -988,6 +1007,12 @@ async fn run(cli: cli::CliArgs) -> Result<()> { dirty = true; } + // Channel status arrived from async fetch + Some(channels) = channel_rx.recv() => { + app.set_channel_status(channels); + dirty = true; + } + // UI messages (lowest priority — processed in bulk during render) Some(msg) = ui_rx.recv() => { app.handle_ui_message(msg); diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 5fb55b0..97e001c 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -176,3 +176,87 @@ impl ChannelManager { result } } + +/// One-shot query: connect to a daemon socket, call list(), return results. +/// Safe to call from any async context (no LocalSet needed). +async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> { + let stream = match UnixStream::connect(sock).await { + Ok(s) => s, + Err(_) => return Vec::new(), + }; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + let rpc_handle = tokio::task::spawn_local(async move { + let _ = rpc_system.await; + }); + + let mut result = Vec::new(); + if let Ok(reply) = client.list_request().send().promise.await { + if let Ok(r) = reply.get() { + if let Ok(channels) = r.get_channels() { + for ch in channels.iter() { + if let Ok(name) = ch.get_name() { + result.push(( + name.to_str().unwrap_or("").to_string(), + ch.get_connected(), + ch.get_unread(), + )); + } + } + } + } + } + + rpc_handle.abort(); + result +} + +/// Fetch channel status from all daemon sockets. +/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send). +pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> { + tokio::task::spawn_blocking(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, fetch_all_channels_inner()) + }) + .await + .unwrap_or_default() +} + +async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { + let channels_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + + let mut sup = super::supervisor::Supervisor::new(); + sup.load_config(); + sup.ensure_running(); // restart any dead daemons + + let mut result = Vec::new(); + for (daemon_name, _enabled, alive) in sup.status() { + if !alive { + result.push((daemon_name, false, 0)); + continue; + } + let sock = channels_dir.join(format!("{}.sock", daemon_name)); + let channels = query_one_daemon(&sock).await; + if channels.is_empty() { + result.push((daemon_name, false, 0)); + } else { + result.extend(channels); + } + } + result +} diff --git a/src/user/tui/mod.rs b/src/user/tui/mod.rs index 458ae51..0c134c9 100644 --- a/src/user/tui/mod.rs +++ b/src/user/tui/mod.rs @@ -351,7 +351,7 @@ pub struct App { /// Pane areas from last draw (for mouse click -> pane selection). pub(crate) pane_areas: [Rect; 3], // [autonomous, conversation, tools] /// Active screen (F1-F4). - pub(crate) screen: Screen, + pub screen: Screen, /// Debug screen scroll offset. pub(crate) debug_scroll: u16, /// Index of selected context section in debug view (for expand/collapse). @@ -395,7 +395,7 @@ pub(crate) struct ChannelStatus { /// Screens toggled by F-keys. #[derive(Debug, Clone, Copy, PartialEq)] -pub(crate) enum Screen { +pub enum Screen { /// F1 — conversation Interact, /// F2 — context window, model info, budget @@ -851,52 +851,11 @@ impl App { self.draw_main(frame, size); } - /// Refresh channel status by querying daemon sockets. - /// Called from the status tick, not every render frame. - pub fn refresh_channels(&mut self) { - let channels_dir = dirs::home_dir() - .unwrap_or_default() - .join(".consciousness/channels"); - - let mut status = Vec::new(); - - // Read supervisor config to know which daemons exist - let mut sup = crate::thalamus::supervisor::Supervisor::new(); - sup.load_config(); - - for (daemon_name, enabled, alive) in sup.status() { - if !alive { - status.push(ChannelStatus { - name: daemon_name, - connected: false, - unread: 0, - }); - continue; - } - - // Try to connect and call list() - let sock = channels_dir.join(format!("{}.sock", daemon_name)); - match std::os::unix::net::UnixStream::connect(&sock) { - Ok(_stream) => { - // For now, just show the daemon as connected - // TODO: actual capnp list() call - status.push(ChannelStatus { - name: daemon_name, - connected: true, - unread: 0, - }); - } - Err(_) => { - status.push(ChannelStatus { - name: daemon_name, - connected: false, - unread: 0, - }); - } - } - } - - self.channel_status = status; + /// Update channel status from async fetch results. + pub fn set_channel_status(&mut self, channels: Vec<(String, bool, u32)>) { + self.channel_status = channels.into_iter() + .map(|(name, connected, unread)| ChannelStatus { name, connected, unread }) + .collect(); } /// Snapshot idle state for F5 display. @@ -916,8 +875,8 @@ impl App { self.debug_scroll = 0; // Refresh data for status screens on entry match screen { - Screen::Thalamus => self.refresh_channels(), - // idle_info is updated from the event loop, not here + // Channel refresh triggered asynchronously from event loop + Screen::Thalamus => {} _ => {} } }