diff --git a/channels/tmux/src/main.rs b/channels/tmux/src/main.rs index b3e3711..56a48b6 100644 --- a/channels/tmux/src/main.rs +++ b/channels/tmux/src/main.rs @@ -218,6 +218,98 @@ impl channel_server::Server for ChannelServerImpl { ) -> Promise<(), capnp::Error> { Promise::ok(()) } + + fn open( + &mut self, + params: channel_server::OpenParams, + _results: channel_server::OpenResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let label = pry!(pry!(params.get_label()).to_str()).to_string(); + + // Check if already open + { + let s = self.state.borrow(); + if s.pane_labels.contains(&label) { + return Promise::ok(()); + } + } + + // Find the tmux pane by name (window or pane title) + let pane_id = match find_pane_by_name(&label) { + Some(id) => id, + None => return Promise::err(capnp::Error::failed( + format!("no tmux pane named '{}'", label))), + }; + + info!("opening channel tmux.{} (pane {})", label, pane_id); + + // Register in state + { + let mut s = self.state.borrow_mut(); + s.pane_labels.push(label.clone()); + } + + // Start pipe-pane reader + let pane = PaneConfig { pane_id, label }; + let reader_state = self.state.clone(); + tokio::task::spawn_local(async move { + pipe_pane_reader(reader_state, pane).await; + }); + + Promise::ok(()) + } + + fn close( + &mut self, + params: channel_server::CloseParams, + _results: channel_server::CloseResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); + let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string(); + + let mut s = self.state.borrow_mut(); + if let Some(pos) = s.pane_labels.iter().position(|l| *l == label) { + info!("closing channel tmux.{}", label); + s.pane_labels.remove(pos); + s.channel_logs.remove(&format!("tmux.{}", label)); + + // Disconnect pipe-pane — find the pane ID + if let Some(pane_id) = find_pane_by_name(&label) { + let _ = std::process::Command::new("tmux") + .args(["pipe-pane", "-t", &pane_id]) + .output(); + } + } + + Promise::ok(()) + } +} + +// ── Pane lookup ────────────────────────────────────────────── + +/// Find a tmux pane by its title/name. Returns the pane ID (e.g. "%5") +/// if found. Searches pane titles first, then window names. +fn find_pane_by_name(name: &str) -> Option { + let output = std::process::Command::new("tmux") + .args(["list-panes", "-a", "-F", "#{pane_id}\t#{pane_title}\t#{window_name}"]) + .output() + .ok()?; + if !output.status.success() { return None; } + + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines() { + let parts: Vec<&str> = line.splitn(3, '\t').collect(); + if parts.len() < 3 { continue; } + let pane_id = parts[0]; + let pane_title = parts[1]; + let window_name = parts[2]; + if pane_title == name || window_name == name { + return Some(pane_id.to_string()); + } + } + None } // ── Cleanup ─────────────────────────────────────────────────── diff --git a/schema/channel.capnp b/schema/channel.capnp index 4d70e76..1a61b17 100644 --- a/schema/channel.capnp +++ b/schema/channel.capnp @@ -56,4 +56,11 @@ interface ChannelServer { # List available channels and their status. list @3 () -> (channels :List(ChannelInfo)); + + # Open a channel — start monitoring. Daemon-specific semantics: + # tmux: find pane by label name, attach pipe-pane. + open @4 (label :Text) -> (); + + # Close a channel — stop monitoring and clean up. + close @5 (channel :Text) -> (); } diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs index c74eccd..a143934 100644 --- a/src/agent/tools/channels.rs +++ b/src/agent/tools/channels.rs @@ -10,7 +10,7 @@ use super::Tool; // ── Tool registry ────────────────────────────────────────────── -pub fn tools() -> [Tool; 4] { +pub fn tools() -> [Tool; 6] { [ Tool { name: "channel_list", description: "List all available channels and their status (connected, unread count).", @@ -28,6 +28,14 @@ pub fn tools() -> [Tool; 4] { description: "Get pending channel notifications (unread signals). Does not consume messages — use channel_recv for that.", parameters_json: r#"{"type":"object","properties":{}}"#, handler: |_a, _v| Box::pin(async { channel_notifications().await }) }, + Tool { name: "channel_open", + description: "Open a channel — start monitoring. For tmux: finds the pane by name and attaches pipe-pane.", + parameters_json: r#"{"type":"object","properties":{"label":{"type":"string","description":"Channel label / tmux pane name"}},"required":["label"]}"#, + handler: |_a, v| Box::pin(async move { channel_open(&v).await }) }, + Tool { name: "channel_close", + description: "Close a channel — stop monitoring and clean up.", + parameters_json: r#"{"type":"object","properties":{"channel":{"type":"string","description":"Channel path (e.g. tmux.ktest)"}},"required":["channel"]}"#, + handler: |_a, v| Box::pin(async move { channel_close(&v).await }) }, ] } @@ -107,6 +115,35 @@ async fn channel_notifications() -> Result { } } +async fn channel_open(args: &serde_json::Value) -> Result { + let label = args.get("label").and_then(|v| v.as_str()) + .context("label is required")? + .to_string(); + let prefix = label.split('.').next().unwrap_or("tmux"); + let sock = daemon_sock(prefix)?; + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, rpc_open(&sock, &label)) + }).await? + .map_err(|e| anyhow::anyhow!("{}", e)) +} + +async fn channel_close(args: &serde_json::Value) -> Result { + let channel = args.get("channel").and_then(|v| v.as_str()) + .context("channel is required")? + .to_string(); + let sock = daemon_sock(&channel)?; + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, rpc_close(&sock, &channel)) + }).await? + .map_err(|e| anyhow::anyhow!("{}", e)) +} + // ── Socket helpers ───────────────────────────────────────────── fn channels_dir() -> std::path::PathBuf { @@ -195,6 +232,22 @@ async fn rpc_list(sock: &std::path::Path) -> Option> { Some(result) } +async fn rpc_open(sock: &std::path::Path, label: &str) -> Result { + let client = rpc_connect(sock).await?; + let mut req = client.open_request(); + req.get().set_label(label); + req.send().promise.await.map_err(|e| format!("open failed: {e}"))?; + Ok(format!("opened channel tmux.{}", label)) +} + +async fn rpc_close(sock: &std::path::Path, channel: &str) -> Result { + let client = rpc_connect(sock).await?; + let mut req = client.close_request(); + req.get().set_channel(channel); + req.send().promise.await.map_err(|e| format!("close failed: {e}"))?; + Ok(format!("closed channel {}", channel)) +} + // ── Fetch all channels ───────────────────────────────────────── /// Fetch channel status from all daemon sockets.