channels: add open/close RPCs for dynamic pane management

Add open/close to the channel capnp schema. The tmux daemon implements
open by finding a pane by name (pane title or window name) and
attaching pipe-pane; close detaches and removes from state.

Tool handlers channel_open and channel_close added to the tool
registry.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-04 18:25:20 -04:00 committed by Kent Overstreet
parent a14e85afe1
commit e8e9386856
3 changed files with 153 additions and 1 deletions

View file

@ -218,6 +218,98 @@ impl channel_server::Server for ChannelServerImpl {
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
Promise::ok(()) Promise::ok(())
} }
fn open(
&mut self,
params: channel_server::OpenParams,
_results: channel_server::OpenResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let label = pry!(pry!(params.get_label()).to_str()).to_string();
// Check if already open
{
let s = self.state.borrow();
if s.pane_labels.contains(&label) {
return Promise::ok(());
}
}
// Find the tmux pane by name (window or pane title)
let pane_id = match find_pane_by_name(&label) {
Some(id) => id,
None => return Promise::err(capnp::Error::failed(
format!("no tmux pane named '{}'", label))),
};
info!("opening channel tmux.{} (pane {})", label, pane_id);
// Register in state
{
let mut s = self.state.borrow_mut();
s.pane_labels.push(label.clone());
}
// Start pipe-pane reader
let pane = PaneConfig { pane_id, label };
let reader_state = self.state.clone();
tokio::task::spawn_local(async move {
pipe_pane_reader(reader_state, pane).await;
});
Promise::ok(())
}
fn close(
&mut self,
params: channel_server::CloseParams,
_results: channel_server::CloseResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
let mut s = self.state.borrow_mut();
if let Some(pos) = s.pane_labels.iter().position(|l| *l == label) {
info!("closing channel tmux.{}", label);
s.pane_labels.remove(pos);
s.channel_logs.remove(&format!("tmux.{}", label));
// Disconnect pipe-pane — find the pane ID
if let Some(pane_id) = find_pane_by_name(&label) {
let _ = std::process::Command::new("tmux")
.args(["pipe-pane", "-t", &pane_id])
.output();
}
}
Promise::ok(())
}
}
// ── Pane lookup ──────────────────────────────────────────────
/// Find a tmux pane by its title/name. Returns the pane ID (e.g. "%5")
/// if found. Searches pane titles first, then window names.
fn find_pane_by_name(name: &str) -> Option<String> {
let output = std::process::Command::new("tmux")
.args(["list-panes", "-a", "-F", "#{pane_id}\t#{pane_title}\t#{window_name}"])
.output()
.ok()?;
if !output.status.success() { return None; }
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
let parts: Vec<&str> = line.splitn(3, '\t').collect();
if parts.len() < 3 { continue; }
let pane_id = parts[0];
let pane_title = parts[1];
let window_name = parts[2];
if pane_title == name || window_name == name {
return Some(pane_id.to_string());
}
}
None
} }
// ── Cleanup ─────────────────────────────────────────────────── // ── Cleanup ───────────────────────────────────────────────────

View file

@ -56,4 +56,11 @@ interface ChannelServer {
# List available channels and their status. # List available channels and their status.
list @3 () -> (channels :List(ChannelInfo)); list @3 () -> (channels :List(ChannelInfo));
# Open a channel — start monitoring. Daemon-specific semantics:
# tmux: find pane by label name, attach pipe-pane.
open @4 (label :Text) -> ();
# Close a channel — stop monitoring and clean up.
close @5 (channel :Text) -> ();
} }

View file

@ -10,7 +10,7 @@ use super::Tool;
// ── Tool registry ────────────────────────────────────────────── // ── Tool registry ──────────────────────────────────────────────
pub fn tools() -> [Tool; 4] { pub fn tools() -> [Tool; 6] {
[ [
Tool { name: "channel_list", Tool { name: "channel_list",
description: "List all available channels and their status (connected, unread count).", description: "List all available channels and their status (connected, unread count).",
@ -28,6 +28,14 @@ pub fn tools() -> [Tool; 4] {
description: "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", description: "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.",
parameters_json: r#"{"type":"object","properties":{}}"#, parameters_json: r#"{"type":"object","properties":{}}"#,
handler: |_a, _v| Box::pin(async { channel_notifications().await }) }, handler: |_a, _v| Box::pin(async { channel_notifications().await }) },
Tool { name: "channel_open",
description: "Open a channel — start monitoring. For tmux: finds the pane by name and attaches pipe-pane.",
parameters_json: r#"{"type":"object","properties":{"label":{"type":"string","description":"Channel label / tmux pane name"}},"required":["label"]}"#,
handler: |_a, v| Box::pin(async move { channel_open(&v).await }) },
Tool { name: "channel_close",
description: "Close a channel — stop monitoring and clean up.",
parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. tmux.ktest)"}},"required":["channel"]}"#,
handler: |_a, v| Box::pin(async move { channel_close(&v).await }) },
] ]
} }
@ -107,6 +115,35 @@ async fn channel_notifications() -> Result<String> {
} }
} }
async fn channel_open(args: &serde_json::Value) -> Result<String> {
let label = args.get("label").and_then(|v| v.as_str())
.context("label is required")?
.to_string();
let prefix = label.split('.').next().unwrap_or("tmux");
let sock = daemon_sock(prefix)?;
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_open(&sock, &label))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
async fn channel_close(args: &serde_json::Value) -> Result<String> {
let channel = args.get("channel").and_then(|v| v.as_str())
.context("channel is required")?
.to_string();
let sock = daemon_sock(&channel)?;
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, rpc_close(&sock, &channel))
}).await?
.map_err(|e| anyhow::anyhow!("{}", e))
}
// ── Socket helpers ───────────────────────────────────────────── // ── Socket helpers ─────────────────────────────────────────────
fn channels_dir() -> std::path::PathBuf { fn channels_dir() -> std::path::PathBuf {
@ -195,6 +232,22 @@ async fn rpc_list(sock: &std::path::Path) -> Option<Vec<(String, bool, u32)>> {
Some(result) Some(result)
} }
async fn rpc_open(sock: &std::path::Path, label: &str) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.open_request();
req.get().set_label(label);
req.send().promise.await.map_err(|e| format!("open failed: {e}"))?;
Ok(format!("opened channel tmux.{}", label))
}
async fn rpc_close(sock: &std::path::Path, channel: &str) -> Result<String, String> {
let client = rpc_connect(sock).await?;
let mut req = client.close_request();
req.get().set_channel(channel);
req.send().promise.await.map_err(|e| format!("close failed: {e}"))?;
Ok(format!("closed channel {}", channel))
}
// ── Fetch all channels ───────────────────────────────────────── // ── Fetch all channels ─────────────────────────────────────────
/// Fetch channel status from all daemon sockets. /// Fetch channel status from all daemon sockets.