channels: add tmux pane channel daemon
Standalone daemon that streams tmux pane output via pipe-pane (no polling). Each configured pane becomes a channel "tmux.<label>" accessible through the standard channel.capnp protocol. - pipe-pane streams PTY output directly to FIFOs - Async readers push new lines into ChannelLogs - send works via tmux send-keys - Cleanup disconnects pipe-pane on daemon exit Config: ~/.consciousness/channels/tmux.json5 Socket: ~/.consciousness/channels/tmux.sock Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
c2c5530ecc
commit
1ef137fb3a
3 changed files with 325 additions and 1 deletions
305
channels/tmux/src/main.rs
Normal file
305
channels/tmux/src/main.rs
Normal file
|
|
@ -0,0 +1,305 @@
|
|||
// channel-tmux — Tmux pane channel daemon
|
||||
//
|
||||
// Uses tmux pipe-pane to stream pane output directly — no polling.
|
||||
// Each configured pane gets a Unix socket pair; pipe-pane sends
|
||||
// output to one end, the daemon reads from the other and pushes
|
||||
// new lines into ChannelLogs.
|
||||
//
|
||||
// Config: ~/.consciousness/channels/tmux.json5
|
||||
// Socket: ~/.consciousness/channels/tmux.sock
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::BTreeMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use capnp::capability::Promise;
|
||||
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
|
||||
use futures::AsyncReadExt;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::net::UnixListener;
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use poc_memory::channel_capnp::{channel_client, channel_server};
|
||||
use poc_memory::thalamus::channel_log::ChannelLog;
|
||||
|
||||
// ── Config ─────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Clone, serde::Deserialize)]
|
||||
struct PaneConfig {
|
||||
/// Tmux pane ID, e.g. "0:1.0"
|
||||
pane_id: String,
|
||||
/// Human-readable label, becomes the channel name "tmux.<label>"
|
||||
label: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, serde::Deserialize)]
|
||||
struct Config {
|
||||
panes: Vec<PaneConfig>,
|
||||
}
|
||||
|
||||
fn load_config() -> Config {
|
||||
let path = dirs::home_dir()
|
||||
.unwrap_or_default()
|
||||
.join(".consciousness/channels/tmux.json5");
|
||||
let text = std::fs::read_to_string(&path)
|
||||
.unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display()));
|
||||
json5::from_str(&text)
|
||||
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display()))
|
||||
}
|
||||
|
||||
// ── State ─────────────────────────────────────────────────────
|
||||
|
||||
struct State {
|
||||
channel_logs: BTreeMap<String, ChannelLog>,
|
||||
pane_labels: Vec<String>,
|
||||
}
|
||||
|
||||
type SharedState = Rc<RefCell<State>>;
|
||||
|
||||
impl State {
|
||||
fn new(config: &Config) -> Self {
|
||||
Self {
|
||||
channel_logs: BTreeMap::new(),
|
||||
pane_labels: config.panes.iter().map(|p| p.label.clone()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Pipe-Pane Reader ──────────────────────────────────────────
|
||||
|
||||
/// Set up pipe-pane for a single pane, reading output into the channel log.
|
||||
async fn pipe_pane_reader(state: SharedState, pane: PaneConfig) {
|
||||
let pipe_dir = dirs::home_dir()
|
||||
.unwrap_or_default()
|
||||
.join(".consciousness/channels/tmux-pipes");
|
||||
std::fs::create_dir_all(&pipe_dir).ok();
|
||||
|
||||
let pipe_path = pipe_dir.join(format!("{}.pipe", pane.label));
|
||||
let _ = std::fs::remove_file(&pipe_path);
|
||||
|
||||
// Create a named pipe (FIFO)
|
||||
unsafe {
|
||||
let c_path = std::ffi::CString::new(pipe_path.to_str().unwrap()).unwrap();
|
||||
libc::mkfifo(c_path.as_ptr(), 0o644);
|
||||
}
|
||||
|
||||
// Tell tmux to pipe this pane's output to our FIFO
|
||||
let pipe_path_str = pipe_path.to_string_lossy().to_string();
|
||||
let result = std::process::Command::new("tmux")
|
||||
.args(["pipe-pane", "-t", &pane.pane_id, &format!("cat >> {}", pipe_path_str)])
|
||||
.output();
|
||||
|
||||
match result {
|
||||
Ok(output) if output.status.success() => {
|
||||
info!("pipe-pane set up for {} ({})", pane.label, pane.pane_id);
|
||||
}
|
||||
Ok(output) => {
|
||||
error!("pipe-pane failed for {}: {}", pane.label,
|
||||
String::from_utf8_lossy(&output.stderr));
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to run tmux pipe-pane for {}: {}", pane.label, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Open the FIFO and read lines
|
||||
let file = match tokio::fs::File::open(&pipe_path).await {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
error!("failed to open pipe for {}: {}", pane.label, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let channel_key = format!("tmux.{}", pane.label);
|
||||
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let mut s = state.borrow_mut();
|
||||
let log = s.channel_logs
|
||||
.entry(channel_key.clone())
|
||||
.or_insert_with(ChannelLog::new);
|
||||
log.push(line);
|
||||
}
|
||||
|
||||
warn!("pipe-pane reader ended for {}", pane.label);
|
||||
}
|
||||
|
||||
// ── ChannelServer Implementation ───────────────────────────────
|
||||
|
||||
struct ChannelServerImpl {
|
||||
state: SharedState,
|
||||
}
|
||||
|
||||
impl channel_server::Server for ChannelServerImpl {
|
||||
fn recv(
|
||||
&mut self,
|
||||
params: channel_server::RecvParams,
|
||||
mut results: channel_server::RecvResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let params = pry!(params.get());
|
||||
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
|
||||
let all_new = params.get_all_new();
|
||||
let min_count = params.get_min_count() as usize;
|
||||
|
||||
let mut s = self.state.borrow_mut();
|
||||
let text = match s.channel_logs.get_mut(&channel) {
|
||||
Some(log) => {
|
||||
if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
|
||||
}
|
||||
None => String::new(),
|
||||
};
|
||||
|
||||
results.get().set_text(&text);
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
fn send(
|
||||
&mut self,
|
||||
params: channel_server::SendParams,
|
||||
_results: channel_server::SendResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let params = pry!(params.get());
|
||||
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
|
||||
let message = pry!(pry!(params.get_message()).to_str()).to_string();
|
||||
|
||||
// Send to tmux pane via send-keys
|
||||
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
|
||||
let has_pane = self.state.borrow().pane_labels.iter().any(|l| l == label);
|
||||
if has_pane {
|
||||
let _ = std::process::Command::new("tmux")
|
||||
.args(["send-keys", "-t", label, &message, "Enter"])
|
||||
.output();
|
||||
|
||||
let channel_key = format!("tmux.{}", label);
|
||||
let mut s = self.state.borrow_mut();
|
||||
let log = s.channel_logs
|
||||
.entry(channel_key)
|
||||
.or_insert_with(ChannelLog::new);
|
||||
log.push_own(format!("> {}", message));
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
fn list(
|
||||
&mut self,
|
||||
_params: channel_server::ListParams,
|
||||
mut results: channel_server::ListResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let s = self.state.borrow();
|
||||
let channels: Vec<_> = s.pane_labels.iter().map(|label| {
|
||||
let key = format!("tmux.{}", label);
|
||||
let unread = s.channel_logs.get(&key).map_or(0, |l| l.unread());
|
||||
(key, true, unread)
|
||||
}).collect();
|
||||
|
||||
let mut list = results.get().init_channels(channels.len() as u32);
|
||||
for (i, (name, connected, unread)) in channels.iter().enumerate() {
|
||||
let mut entry = list.reborrow().get(i as u32);
|
||||
entry.set_name(name);
|
||||
entry.set_connected(*connected);
|
||||
entry.set_unread(*unread as u32);
|
||||
}
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
fn subscribe(
|
||||
&mut self,
|
||||
_params: channel_server::SubscribeParams,
|
||||
_results: channel_server::SubscribeResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
Promise::ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ── Cleanup ───────────────────────────────────────────────────
|
||||
|
||||
/// Remove pipe-pane connections on exit.
|
||||
fn cleanup_pipes(config: &Config) {
|
||||
for pane in &config.panes {
|
||||
// Disconnect pipe-pane
|
||||
let _ = std::process::Command::new("tmux")
|
||||
.args(["pipe-pane", "-t", &pane.pane_id])
|
||||
.output();
|
||||
}
|
||||
// Clean up FIFO files
|
||||
let pipe_dir = dirs::home_dir()
|
||||
.unwrap_or_default()
|
||||
.join(".consciousness/channels/tmux-pipes");
|
||||
let _ = std::fs::remove_dir_all(&pipe_dir);
|
||||
}
|
||||
|
||||
// ── Main ───────────────────────────────────────────────────────
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let config = load_config();
|
||||
let state = Rc::new(RefCell::new(State::new(&config)));
|
||||
|
||||
let sock_dir = dirs::home_dir()
|
||||
.unwrap_or_default()
|
||||
.join(".consciousness/channels");
|
||||
std::fs::create_dir_all(&sock_dir)?;
|
||||
let sock_path = sock_dir.join("tmux.sock");
|
||||
let _ = std::fs::remove_file(&sock_path);
|
||||
|
||||
info!("tmux channel daemon starting on {}", sock_path.display());
|
||||
|
||||
// Set up cleanup on exit
|
||||
let cleanup_config = config.clone();
|
||||
let _cleanup = scopeguard::guard(cleanup_config, |c| cleanup_pipes(&c));
|
||||
|
||||
tokio::task::LocalSet::new()
|
||||
.run_until(async move {
|
||||
// Start a pipe-pane reader for each configured pane
|
||||
for pane in &config.panes {
|
||||
let reader_state = state.clone();
|
||||
let pane = pane.clone();
|
||||
tokio::task::spawn_local(async move {
|
||||
pipe_pane_reader(reader_state, pane).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Listen for channel protocol connections
|
||||
let listener = UnixListener::bind(&sock_path)?;
|
||||
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
let (reader, writer) = stream.compat().split();
|
||||
let network = twoparty::VatNetwork::new(
|
||||
futures::io::BufReader::new(reader),
|
||||
futures::io::BufWriter::new(writer),
|
||||
rpc_twoparty_capnp::Side::Server,
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let server = ChannelServerImpl {
|
||||
state: state.clone(),
|
||||
};
|
||||
let client: channel_server::Client =
|
||||
capnp_rpc::new_client(server);
|
||||
|
||||
let rpc_system = RpcSystem::new(
|
||||
Box::new(network),
|
||||
Some(client.client),
|
||||
);
|
||||
|
||||
tokio::task::spawn_local(rpc_system);
|
||||
info!("channel client connected");
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
Ok::<(), Box<dyn std::error::Error>>(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue