wire channel list RPC into consciousness F5 screen
fetch_all_channels() connects to each daemon socket and calls list() via capnp RPC. Runs on a dedicated thread (capnp uses Rc). Results sent back via mpsc channel, TUI reads cached state. Fetched at startup and when switching to F5 thalamus screen. Also calls ensure_running() to restart dead daemons. Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
e7be2a3ba0
commit
8e66f0a66c
4 changed files with 198 additions and 50 deletions
80
src/bin/channel-test.rs
Normal file
80
src/bin/channel-test.rs
Normal file
|
|
@ -0,0 +1,80 @@
|
||||||
|
// channel-test — quick RPC test tool for channel daemons
|
||||||
|
//
|
||||||
|
// Usage: channel-test <socket-path> [list|recv|send <channel> <msg>]
|
||||||
|
|
||||||
|
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
|
||||||
|
use futures::AsyncReadExt;
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||||
|
|
||||||
|
use poc_memory::channel_capnp::channel_server;
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "current_thread")]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let args: Vec<String> = std::env::args().collect();
|
||||||
|
if args.len() < 2 {
|
||||||
|
eprintln!("usage: channel-test <socket-path> [list|recv <channel>|send <channel> <msg>]");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let sock_path = args[1].clone();
|
||||||
|
let cmd = args.get(2).cloned().unwrap_or_else(|| "list".to_string());
|
||||||
|
let arg3 = args.get(3).cloned().unwrap_or_default();
|
||||||
|
let arg_rest: String = args.iter().skip(4).cloned().collect::<Vec<_>>().join(" ");
|
||||||
|
|
||||||
|
tokio::task::LocalSet::new()
|
||||||
|
.run_until(async move {
|
||||||
|
let stream = UnixStream::connect(&sock_path).await?;
|
||||||
|
let (reader, writer) = stream.compat().split();
|
||||||
|
let rpc_network = Box::new(twoparty::VatNetwork::new(
|
||||||
|
futures::io::BufReader::new(reader),
|
||||||
|
futures::io::BufWriter::new(writer),
|
||||||
|
rpc_twoparty_capnp::Side::Client,
|
||||||
|
Default::default(),
|
||||||
|
));
|
||||||
|
let mut rpc_system = RpcSystem::new(rpc_network, None);
|
||||||
|
let client: channel_server::Client =
|
||||||
|
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
|
||||||
|
tokio::task::spawn_local(rpc_system);
|
||||||
|
|
||||||
|
match cmd.as_str() {
|
||||||
|
"list" => {
|
||||||
|
let reply = client.list_request().send().promise.await?;
|
||||||
|
let channels = reply.get()?.get_channels()?;
|
||||||
|
println!("{} channels:", channels.len());
|
||||||
|
for ch in channels.iter() {
|
||||||
|
let name = ch.get_name()?.to_str()?;
|
||||||
|
let connected = ch.get_connected();
|
||||||
|
let unread = ch.get_unread();
|
||||||
|
println!(" {} connected={} unread={}", name, connected, unread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"recv" => {
|
||||||
|
let mut req = client.recv_request();
|
||||||
|
req.get().set_channel(&arg3);
|
||||||
|
req.get().set_all_new(true);
|
||||||
|
req.get().set_min_count(20);
|
||||||
|
let reply = req.send().promise.await?;
|
||||||
|
let text = reply.get()?.get_text()?.to_str()?;
|
||||||
|
if text.is_empty() {
|
||||||
|
println!("(no messages)");
|
||||||
|
} else {
|
||||||
|
println!("{}", text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"send" => {
|
||||||
|
let mut req = client.send_request();
|
||||||
|
req.get().set_channel(&arg3);
|
||||||
|
req.get().set_message(&arg_rest);
|
||||||
|
req.send().promise.await?;
|
||||||
|
println!("sent to {}", arg3);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
eprintln!("unknown command: {}", cmd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<(), Box<dyn std::error::Error>>(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
@ -814,6 +814,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
|
||||||
let mut idle_state = poc_memory::thalamus::idle::State::new();
|
let mut idle_state = poc_memory::thalamus::idle::State::new();
|
||||||
idle_state.load();
|
idle_state.load();
|
||||||
|
|
||||||
|
// Channel status fetcher — async results sent back via mpsc
|
||||||
|
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
|
||||||
|
// Kick off initial fetch
|
||||||
|
{
|
||||||
|
let tx = channel_tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let result = poc_memory::thalamus::channels::fetch_all_channels().await;
|
||||||
|
let _ = tx.send(result).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Create UI channel
|
// Create UI channel
|
||||||
let (ui_tx, mut ui_rx) = ui_channel::channel();
|
let (ui_tx, mut ui_rx) = ui_channel::channel();
|
||||||
|
|
||||||
|
|
@ -940,6 +951,14 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
|
||||||
}
|
}
|
||||||
app.handle_key(key);
|
app.handle_key(key);
|
||||||
idle_state.user_activity();
|
idle_state.user_activity();
|
||||||
|
// Trigger async channel refresh on F5
|
||||||
|
if app.screen == tui::Screen::Thalamus {
|
||||||
|
let tx = channel_tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let result = poc_memory::thalamus::channels::fetch_all_channels().await;
|
||||||
|
let _ = tx.send(result).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
dirty = true;
|
dirty = true;
|
||||||
}
|
}
|
||||||
Some(Ok(Event::Mouse(mouse))) => {
|
Some(Ok(Event::Mouse(mouse))) => {
|
||||||
|
|
@ -988,6 +1007,12 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
|
||||||
dirty = true;
|
dirty = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Channel status arrived from async fetch
|
||||||
|
Some(channels) = channel_rx.recv() => {
|
||||||
|
app.set_channel_status(channels);
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
|
||||||
// UI messages (lowest priority — processed in bulk during render)
|
// UI messages (lowest priority — processed in bulk during render)
|
||||||
Some(msg) = ui_rx.recv() => {
|
Some(msg) = ui_rx.recv() => {
|
||||||
app.handle_ui_message(msg);
|
app.handle_ui_message(msg);
|
||||||
|
|
|
||||||
|
|
@ -176,3 +176,87 @@ impl ChannelManager {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// One-shot query: connect to a daemon socket, call list(), return results.
|
||||||
|
/// Safe to call from any async context (no LocalSet needed).
|
||||||
|
async fn query_one_daemon(sock: &std::path::Path) -> Vec<(String, bool, u32)> {
|
||||||
|
let stream = match UnixStream::connect(sock).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(_) => return Vec::new(),
|
||||||
|
};
|
||||||
|
let (reader, writer) = stream.compat().split();
|
||||||
|
let rpc_network = Box::new(twoparty::VatNetwork::new(
|
||||||
|
futures::io::BufReader::new(reader),
|
||||||
|
futures::io::BufWriter::new(writer),
|
||||||
|
rpc_twoparty_capnp::Side::Client,
|
||||||
|
Default::default(),
|
||||||
|
));
|
||||||
|
let mut rpc_system = RpcSystem::new(rpc_network, None);
|
||||||
|
let client: channel_server::Client =
|
||||||
|
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
|
||||||
|
|
||||||
|
let rpc_handle = tokio::task::spawn_local(async move {
|
||||||
|
let _ = rpc_system.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut result = Vec::new();
|
||||||
|
if let Ok(reply) = client.list_request().send().promise.await {
|
||||||
|
if let Ok(r) = reply.get() {
|
||||||
|
if let Ok(channels) = r.get_channels() {
|
||||||
|
for ch in channels.iter() {
|
||||||
|
if let Ok(name) = ch.get_name() {
|
||||||
|
result.push((
|
||||||
|
name.to_str().unwrap_or("").to_string(),
|
||||||
|
ch.get_connected(),
|
||||||
|
ch.get_unread(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc_handle.abort();
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch channel status from all daemon sockets.
|
||||||
|
/// Runs on a dedicated thread because capnp-rpc uses Rc (not Send).
|
||||||
|
pub async fn fetch_all_channels() -> Vec<(String, bool, u32)> {
|
||||||
|
tokio::task::spawn_blocking(|| {
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let local = tokio::task::LocalSet::new();
|
||||||
|
local.block_on(&rt, fetch_all_channels_inner())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> {
|
||||||
|
let channels_dir = dirs::home_dir()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.join(".consciousness/channels");
|
||||||
|
|
||||||
|
let mut sup = super::supervisor::Supervisor::new();
|
||||||
|
sup.load_config();
|
||||||
|
sup.ensure_running(); // restart any dead daemons
|
||||||
|
|
||||||
|
let mut result = Vec::new();
|
||||||
|
for (daemon_name, _enabled, alive) in sup.status() {
|
||||||
|
if !alive {
|
||||||
|
result.push((daemon_name, false, 0));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let sock = channels_dir.join(format!("{}.sock", daemon_name));
|
||||||
|
let channels = query_one_daemon(&sock).await;
|
||||||
|
if channels.is_empty() {
|
||||||
|
result.push((daemon_name, false, 0));
|
||||||
|
} else {
|
||||||
|
result.extend(channels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -351,7 +351,7 @@ pub struct App {
|
||||||
/// Pane areas from last draw (for mouse click -> pane selection).
|
/// Pane areas from last draw (for mouse click -> pane selection).
|
||||||
pub(crate) pane_areas: [Rect; 3], // [autonomous, conversation, tools]
|
pub(crate) pane_areas: [Rect; 3], // [autonomous, conversation, tools]
|
||||||
/// Active screen (F1-F4).
|
/// Active screen (F1-F4).
|
||||||
pub(crate) screen: Screen,
|
pub screen: Screen,
|
||||||
/// Debug screen scroll offset.
|
/// Debug screen scroll offset.
|
||||||
pub(crate) debug_scroll: u16,
|
pub(crate) debug_scroll: u16,
|
||||||
/// Index of selected context section in debug view (for expand/collapse).
|
/// Index of selected context section in debug view (for expand/collapse).
|
||||||
|
|
@ -395,7 +395,7 @@ pub(crate) struct ChannelStatus {
|
||||||
|
|
||||||
/// Screens toggled by F-keys.
|
/// Screens toggled by F-keys.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||||
pub(crate) enum Screen {
|
pub enum Screen {
|
||||||
/// F1 — conversation
|
/// F1 — conversation
|
||||||
Interact,
|
Interact,
|
||||||
/// F2 — context window, model info, budget
|
/// F2 — context window, model info, budget
|
||||||
|
|
@ -851,52 +851,11 @@ impl App {
|
||||||
self.draw_main(frame, size);
|
self.draw_main(frame, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Refresh channel status by querying daemon sockets.
|
/// Update channel status from async fetch results.
|
||||||
/// Called from the status tick, not every render frame.
|
pub fn set_channel_status(&mut self, channels: Vec<(String, bool, u32)>) {
|
||||||
pub fn refresh_channels(&mut self) {
|
self.channel_status = channels.into_iter()
|
||||||
let channels_dir = dirs::home_dir()
|
.map(|(name, connected, unread)| ChannelStatus { name, connected, unread })
|
||||||
.unwrap_or_default()
|
.collect();
|
||||||
.join(".consciousness/channels");
|
|
||||||
|
|
||||||
let mut status = Vec::new();
|
|
||||||
|
|
||||||
// Read supervisor config to know which daemons exist
|
|
||||||
let mut sup = crate::thalamus::supervisor::Supervisor::new();
|
|
||||||
sup.load_config();
|
|
||||||
|
|
||||||
for (daemon_name, enabled, alive) in sup.status() {
|
|
||||||
if !alive {
|
|
||||||
status.push(ChannelStatus {
|
|
||||||
name: daemon_name,
|
|
||||||
connected: false,
|
|
||||||
unread: 0,
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to connect and call list()
|
|
||||||
let sock = channels_dir.join(format!("{}.sock", daemon_name));
|
|
||||||
match std::os::unix::net::UnixStream::connect(&sock) {
|
|
||||||
Ok(_stream) => {
|
|
||||||
// For now, just show the daemon as connected
|
|
||||||
// TODO: actual capnp list() call
|
|
||||||
status.push(ChannelStatus {
|
|
||||||
name: daemon_name,
|
|
||||||
connected: true,
|
|
||||||
unread: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
status.push(ChannelStatus {
|
|
||||||
name: daemon_name,
|
|
||||||
connected: false,
|
|
||||||
unread: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.channel_status = status;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Snapshot idle state for F5 display.
|
/// Snapshot idle state for F5 display.
|
||||||
|
|
@ -916,8 +875,8 @@ impl App {
|
||||||
self.debug_scroll = 0;
|
self.debug_scroll = 0;
|
||||||
// Refresh data for status screens on entry
|
// Refresh data for status screens on entry
|
||||||
match screen {
|
match screen {
|
||||||
Screen::Thalamus => self.refresh_channels(),
|
// Channel refresh triggered asynchronously from event loop
|
||||||
// idle_info is updated from the event loop, not here
|
Screen::Thalamus => {}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue