consciousness/channels/tmux/src/main.rs

466 lines
16 KiB
Rust
Raw Normal View History

// channel-tmux — Tmux pane channel daemon
//
// Uses tmux pipe-pane to stream pane output directly — no polling.
// Each configured pane gets a Unix socket pair; pipe-pane sends
// output to one end, the daemon reads from the other and pushes
// new lines into ChannelLogs.
//
// Config: ~/.consciousness/channels/tmux.json5
// Socket: ~/.consciousness/channels/tmux.sock
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio::io::AsyncBufReadExt;
use tokio::net::UnixListener;
use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, warn, error};
2026-04-09 19:58:07 -04:00
use consciousness::channel_capnp::channel_server;
use consciousness::thalamus::channel_log::ChannelLog;
// ── Config ─────────────────────────────────────────────────────
#[derive(Clone, serde::Serialize, serde::Deserialize)]
struct PaneConfig {
/// Human-readable label: becomes the channel name "tmux.<label>",
/// and the tmux pane title / window name the live pane id is
/// resolved from. The pane id is deliberately not stored — it is
/// ephemeral (recycled across pane and tmux-server restarts), so it
/// is looked up fresh on every connect attempt.
label: String,
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
struct Config {
#[serde(default)]
panes: Vec<PaneConfig>,
}
fn config_path() -> std::path::PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels/tmux.json5")
}
fn load_config() -> Config {
match std::fs::read_to_string(config_path()) {
Ok(text) => json5::from_str(&text)
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", config_path().display())),
Err(_) => {
info!("no tmux.json5, starting with no pre-configured panes");
Config { panes: vec![] }
}
}
}
fn save_config(config: &Config) {
match serde_json::to_string_pretty(config) {
Ok(json) => {
if let Err(e) = std::fs::write(config_path(), json) {
error!("failed to write config: {}", e);
}
}
Err(e) => error!("failed to serialize config: {}", e),
}
}
// ── State ─────────────────────────────────────────────────────
struct State {
config: Config,
channel_logs: BTreeMap<String, ChannelLog>,
/// Tracks which panes are actually connected (pipe-pane active)
connected: BTreeMap<String, bool>,
}
type SharedState = Rc<RefCell<State>>;
impl State {
fn new(config: Config) -> Self {
Self {
config,
channel_logs: BTreeMap::new(),
connected: BTreeMap::new(),
}
}
/// Whether a pane with this label is registered.
fn has_pane(&self, label: &str) -> bool {
self.config.panes.iter().any(|p| p.label == label)
}
/// Check if a pane is connected
fn is_connected(&self, label: &str) -> bool {
self.connected.get(label).copied().unwrap_or(false)
}
/// Set connection state for a pane
fn set_connected(&mut self, label: &str, connected: bool) {
self.connected.insert(label.to_string(), connected);
}
/// Register a pane and persist.
fn add_pane(&mut self, label: String) {
if !self.config.panes.iter().any(|p| p.label == label) {
self.config.panes.push(PaneConfig { label });
save_config(&self.config);
}
}
/// Unregister a pane and persist. Returns whether it was registered.
fn remove_pane(&mut self, label: &str) -> bool {
if let Some(idx) = self.config.panes.iter().position(|p| p.label == label) {
self.config.panes.remove(idx);
self.connected.remove(label);
save_config(&self.config);
true
} else {
false
}
}
}
// ── Pipe-Pane Reader ──────────────────────────────────────────
/// Wait between connect attempts for a pane that is not yet reachable.
const RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
/// Keep a pane streamed into its channel log for as long as it stays
/// registered. The pane id is resolved fresh by label on every connect
/// attempt — tmux pane ids are ephemeral, so the label (pane title /
/// window name) is the durable identity. Retries until the pane exists
/// and pipe-pane succeeds, and reconnects the same way if the pipe
/// later drops. Returns once close() unregisters the pane.
async fn pipe_pane_reader(state: SharedState, label: String) {
let pipe_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels/tmux-pipes");
std::fs::create_dir_all(&pipe_dir).ok();
let pipe_path = pipe_dir.join(format!("{}.pipe", label));
let channel_key = format!("tmux.{}", label);
loop {
if !state.borrow().has_pane(&label) {
return;
}
connect_and_stream(&state, &label, &pipe_path, &channel_key).await;
state.borrow_mut().set_connected(&label, false);
if !state.borrow().has_pane(&label) {
return;
}
tokio::time::sleep(RETRY_INTERVAL).await;
}
}
/// One connect attempt: resolve the pane's live id by label, point its
/// output at the FIFO with pipe-pane, and stream lines into the channel
/// log. Returns on the first failure, or when the stream ends.
async fn connect_and_stream(
state: &SharedState,
label: &str,
pipe_path: &std::path::Path,
channel_key: &str,
) {
let pane_id = match find_pane_by_name(label) {
Some(id) => id,
None => return,
};
// Fresh FIFO for this attempt.
let _ = std::fs::remove_file(pipe_path);
unsafe {
let c_path = std::ffi::CString::new(pipe_path.to_str().unwrap()).unwrap();
libc::mkfifo(c_path.as_ptr(), 0o644);
}
// Point the pane's output at our FIFO.
let pipe_cmd = format!("cat >> {}", pipe_path.to_string_lossy());
match std::process::Command::new("tmux")
.args(["pipe-pane", "-t", &pane_id, &pipe_cmd])
.output()
{
Ok(o) if o.status.success() => {}
Ok(o) => {
warn!("pipe-pane failed for {} ({}): {}", label, pane_id,
String::from_utf8_lossy(&o.stderr));
return;
}
Err(e) => {
error!("running tmux pipe-pane for {}: {}", label, e);
return;
}
}
let file = match tokio::fs::File::open(pipe_path).await {
Ok(f) => f,
Err(e) => {
warn!("opening pipe for {}: {}", label, e);
return;
}
};
info!("connected channel tmux.{} (pane {})", label, pane_id);
state.borrow_mut().set_connected(label, true);
let mut lines = tokio::io::BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let mut s = state.borrow_mut();
s.channel_logs
.entry(channel_key.to_string())
.or_insert_with(ChannelLog::new)
.push(line);
}
warn!("pipe-pane stream ended for {}", label);
}
// ── ChannelServer Implementation ───────────────────────────────
struct ChannelServerImpl {
state: SharedState,
}
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl {
fn recv(
self: Rc<Self>,
params: channel_server::RecvParams,
mut results: channel_server::RecvResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new();
let min_count = params.get_min_count() as usize;
let mut s = self.state.borrow_mut();
let text = match s.channel_logs.get_mut(&channel) {
Some(log) => {
if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
}
None => String::new(),
};
results.get().set_text(&text);
std::future::ready(Ok(()))
}
fn send(
self: Rc<Self>,
params: channel_server::SendParams,
_results: channel_server::SendResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string();
// Send to tmux pane via send-keys — resolve the live pane id by
// label (it is not stored).
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
if let Some(pane_id) = find_pane_by_name(label) {
let _ = std::process::Command::new("tmux")
.args(["send-keys", "-t", &pane_id, &message, "Enter"])
.output();
let channel_key = format!("tmux.{}", label);
let mut s = self.state.borrow_mut();
let log = s.channel_logs
.entry(channel_key)
.or_insert_with(ChannelLog::new);
log.push_own(format!("> {}", message));
}
std::future::ready(Ok(()))
}
fn list(
self: Rc<Self>,
_params: channel_server::ListParams,
mut results: channel_server::ListResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow();
let channels: Vec<_> = s.config.panes.iter().map(|p| {
let key = format!("tmux.{}", p.label);
let connected = s.is_connected(&p.label);
let unread = s.channel_logs.get(&key).map_or(0, |l| l.unread());
(key, connected, unread)
}).collect();
let mut list = results.get().init_channels(channels.len() as u32);
for (i, (name, connected, unread)) in channels.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_name(name);
entry.set_connected(*connected);
entry.set_unread(*unread as u32);
}
std::future::ready(Ok(()))
}
fn subscribe(
self: Rc<Self>,
_params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
std::future::ready(Ok(()))
}
fn open(
self: Rc<Self>,
params: channel_server::OpenParams,
_results: channel_server::OpenResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let label = pry!(pry!(params.get_label()).to_str()).to_string();
// Already registered — nothing to do.
if self.state.borrow().has_pane(&label) {
return std::future::ready(Ok(()));
}
info!("opening channel tmux.{}", label);
// Register the label and persist. The pane id is not stored —
// the reader resolves it by label on every connect attempt, so
// this succeeds even if the pane does not exist yet; the reader
// connects once it appears.
self.state.borrow_mut().add_pane(label.clone());
let reader_state = self.state.clone();
tokio::task::spawn_local(async move {
pipe_pane_reader(reader_state, label).await;
});
std::future::ready(Ok(()))
}
fn close(
self: Rc<Self>,
params: channel_server::CloseParams,
_results: channel_server::CloseResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
let mut s = self.state.borrow_mut();
if s.remove_pane(&label) {
info!("closing channel tmux.{}", label);
s.channel_logs.remove(&format!("tmux.{}", label));
// Stop piping if the pane is still around (if it is gone the
// pipe is already dead). The reader then sees the pane
// unregistered and exits.
if let Some(pane_id) = find_pane_by_name(&label) {
let _ = std::process::Command::new("tmux")
.args(["pipe-pane", "-t", &pane_id])
.output();
}
}
std::future::ready(Ok(()))
}
}
// ── Pane lookup ──────────────────────────────────────────────
/// Find a tmux pane by its title/name. Returns the pane ID (e.g. "%5")
/// if found. Searches pane titles first, then window names.
fn find_pane_by_name(name: &str) -> Option<String> {
let output = std::process::Command::new("tmux")
.args(["list-panes", "-a", "-F", "#{pane_id}\t#{pane_title}\t#{window_name}"])
.output()
.ok()?;
if !output.status.success() { return None; }
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
let parts: Vec<&str> = line.splitn(3, '\t').collect();
if parts.len() < 3 { continue; }
let pane_id = parts[0];
let pane_title = parts[1];
let window_name = parts[2];
if pane_title == name || window_name == name {
return Some(pane_id.to_string());
}
}
None
}
// ── Main ───────────────────────────────────────────────────────
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let config = load_config();
let state = Rc::new(RefCell::new(State::new(config)));
let sock_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
std::fs::create_dir_all(&sock_dir)?;
let sock_path = sock_dir.join("tmux.sock");
let _ = std::fs::remove_file(&sock_path);
info!("tmux channel daemon starting on {}", sock_path.display());
tokio::task::LocalSet::new()
.run_until(async move {
// Start a pipe-pane reader for each configured pane; each
// resolves its live pane id by label and retries until
// connected.
for pane in state.borrow().config.panes.clone() {
let reader_state = state.clone();
tokio::task::spawn_local(async move {
pipe_pane_reader(reader_state, pane.label).await;
});
}
// Listen for channel protocol connections
let listener = UnixListener::bind(&sock_path)?;
loop {
let (stream, _) = listener.accept().await?;
let (reader, writer) = stream.compat().split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let server = ChannelServerImpl {
state: state.clone(),
};
let client: channel_server::Client =
capnp_rpc::new_client(server);
let rpc_system = RpcSystem::new(
Box::new(network),
Some(client.client),
);
tokio::task::spawn_local(rpc_system);
info!("channel client connected");
}
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
})
.await
}