forked from kent/consciousness
channel-tmux: resolve pane ids by label, don't persist them
tmux pane ids (%6 etc.) are ephemeral — recycled across pane and tmux-server restarts. The daemon persisted the id in tmux.json5 and kept reusing it, so after a restart a channel would attach to whatever unrelated pane had since inherited that id. (Live: ktest's stored %6 had become a claude pane; the real ktest pane was %10.) Persist only the label — the pane title / window name, which is stable. pipe_pane_reader() is now a connect-retry loop: each attempt, connect_and_stream() resolves the live id with find_pane_by_name(); the loop retries until the pane exists and pipe-pane succeeds, and reconnects the same way if the pipe later drops. send() resolves the id at send time; open() just registers the label and lets the reader find it. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
190eb50ed9
commit
6e3bacb182
1 changed files with 104 additions and 78 deletions
|
|
@ -26,10 +26,12 @@ use consciousness::thalamus::channel_log::ChannelLog;
|
||||||
|
|
||||||
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
||||||
struct PaneConfig {
|
struct PaneConfig {
|
||||||
/// Human-readable label, becomes the channel name "tmux.<label>"
|
/// Human-readable label: becomes the channel name "tmux.<label>",
|
||||||
|
/// and the tmux pane title / window name the live pane id is
|
||||||
|
/// resolved from. The pane id is deliberately not stored — it is
|
||||||
|
/// ephemeral (recycled across pane and tmux-server restarts), so it
|
||||||
|
/// is looked up fresh on every connect attempt.
|
||||||
label: String,
|
label: String,
|
||||||
/// Tmux pane ID, e.g. "%5"
|
|
||||||
pane_id: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
|
@ -86,11 +88,9 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get pane_id for a label
|
/// Whether a pane with this label is registered.
|
||||||
fn get_pane(&self, label: &str) -> Option<&str> {
|
fn has_pane(&self, label: &str) -> bool {
|
||||||
self.config.panes.iter()
|
self.config.panes.iter().any(|p| p.label == label)
|
||||||
.find(|p| p.label == label)
|
|
||||||
.map(|p| p.pane_id.as_str())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if a pane is connected
|
/// Check if a pane is connected
|
||||||
|
|
@ -103,98 +103,124 @@ impl State {
|
||||||
self.connected.insert(label.to_string(), connected);
|
self.connected.insert(label.to_string(), connected);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a pane and persist
|
/// Register a pane and persist.
|
||||||
fn add_pane(&mut self, label: String, pane_id: String) {
|
fn add_pane(&mut self, label: String) {
|
||||||
if !self.config.panes.iter().any(|p| p.label == label) {
|
if !self.config.panes.iter().any(|p| p.label == label) {
|
||||||
self.config.panes.push(PaneConfig { label, pane_id });
|
self.config.panes.push(PaneConfig { label });
|
||||||
save_config(&self.config);
|
save_config(&self.config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a pane and persist
|
/// Unregister a pane and persist. Returns whether it was registered.
|
||||||
fn remove_pane(&mut self, label: &str) -> Option<String> {
|
fn remove_pane(&mut self, label: &str) -> bool {
|
||||||
if let Some(idx) = self.config.panes.iter().position(|p| p.label == label) {
|
if let Some(idx) = self.config.panes.iter().position(|p| p.label == label) {
|
||||||
let pane = self.config.panes.remove(idx);
|
self.config.panes.remove(idx);
|
||||||
self.connected.remove(label);
|
self.connected.remove(label);
|
||||||
save_config(&self.config);
|
save_config(&self.config);
|
||||||
Some(pane.pane_id)
|
true
|
||||||
} else {
|
} else {
|
||||||
None
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Pipe-Pane Reader ──────────────────────────────────────────
|
// ── Pipe-Pane Reader ──────────────────────────────────────────
|
||||||
|
|
||||||
/// Set up pipe-pane for a single pane, reading output into the channel log.
|
/// Wait between connect attempts for a pane that is not yet reachable.
|
||||||
async fn pipe_pane_reader(state: SharedState, pane: PaneConfig) {
|
const RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
|
||||||
|
|
||||||
|
/// Keep a pane streamed into its channel log for as long as it stays
|
||||||
|
/// registered. The pane id is resolved fresh by label on every connect
|
||||||
|
/// attempt — tmux pane ids are ephemeral, so the label (pane title /
|
||||||
|
/// window name) is the durable identity. Retries until the pane exists
|
||||||
|
/// and pipe-pane succeeds, and reconnects the same way if the pipe
|
||||||
|
/// later drops. Returns once close() unregisters the pane.
|
||||||
|
async fn pipe_pane_reader(state: SharedState, label: String) {
|
||||||
let pipe_dir = dirs::home_dir()
|
let pipe_dir = dirs::home_dir()
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.join(".consciousness/channels/tmux-pipes");
|
.join(".consciousness/channels/tmux-pipes");
|
||||||
std::fs::create_dir_all(&pipe_dir).ok();
|
std::fs::create_dir_all(&pipe_dir).ok();
|
||||||
|
let pipe_path = pipe_dir.join(format!("{}.pipe", label));
|
||||||
|
let channel_key = format!("tmux.{}", label);
|
||||||
|
|
||||||
let pipe_path = pipe_dir.join(format!("{}.pipe", pane.label));
|
loop {
|
||||||
let _ = std::fs::remove_file(&pipe_path);
|
if !state.borrow().has_pane(&label) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Create a named pipe (FIFO)
|
connect_and_stream(&state, &label, &pipe_path, &channel_key).await;
|
||||||
|
state.borrow_mut().set_connected(&label, false);
|
||||||
|
|
||||||
|
if !state.borrow().has_pane(&label) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(RETRY_INTERVAL).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One connect attempt: resolve the pane's live id by label, point its
|
||||||
|
/// output at the FIFO with pipe-pane, and stream lines into the channel
|
||||||
|
/// log. Returns on the first failure, or when the stream ends.
|
||||||
|
async fn connect_and_stream(
|
||||||
|
state: &SharedState,
|
||||||
|
label: &str,
|
||||||
|
pipe_path: &std::path::Path,
|
||||||
|
channel_key: &str,
|
||||||
|
) {
|
||||||
|
let pane_id = match find_pane_by_name(label) {
|
||||||
|
Some(id) => id,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Fresh FIFO for this attempt.
|
||||||
|
let _ = std::fs::remove_file(pipe_path);
|
||||||
unsafe {
|
unsafe {
|
||||||
let c_path = std::ffi::CString::new(pipe_path.to_str().unwrap()).unwrap();
|
let c_path = std::ffi::CString::new(pipe_path.to_str().unwrap()).unwrap();
|
||||||
libc::mkfifo(c_path.as_ptr(), 0o644);
|
libc::mkfifo(c_path.as_ptr(), 0o644);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tell tmux to pipe this pane's output to our FIFO
|
// Point the pane's output at our FIFO.
|
||||||
let pipe_path_str = pipe_path.to_string_lossy().to_string();
|
let pipe_cmd = format!("cat >> {}", pipe_path.to_string_lossy());
|
||||||
let result = std::process::Command::new("tmux")
|
match std::process::Command::new("tmux")
|
||||||
.args(["pipe-pane", "-t", &pane.pane_id, &format!("cat >> {}", pipe_path_str)])
|
.args(["pipe-pane", "-t", &pane_id, &pipe_cmd])
|
||||||
.output();
|
.output()
|
||||||
|
{
|
||||||
match result {
|
Ok(o) if o.status.success() => {}
|
||||||
Ok(output) if output.status.success() => {
|
Ok(o) => {
|
||||||
info!("pipe-pane set up for {} ({})", pane.label, pane.pane_id);
|
warn!("pipe-pane failed for {} ({}): {}", label, pane_id,
|
||||||
}
|
String::from_utf8_lossy(&o.stderr));
|
||||||
Ok(output) => {
|
|
||||||
error!("pipe-pane failed for {}: {}", pane.label,
|
|
||||||
String::from_utf8_lossy(&output.stderr));
|
|
||||||
state.borrow_mut().set_connected(&pane.label, false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("failed to run tmux pipe-pane for {}: {}", pane.label, e);
|
error!("running tmux pipe-pane for {}: {}", label, e);
|
||||||
state.borrow_mut().set_connected(&pane.label, false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the FIFO and read lines
|
let file = match tokio::fs::File::open(pipe_path).await {
|
||||||
let file = match tokio::fs::File::open(&pipe_path).await {
|
|
||||||
Ok(f) => f,
|
Ok(f) => f,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("failed to open pipe for {}: {}", pane.label, e);
|
warn!("opening pipe for {}: {}", label, e);
|
||||||
state.borrow_mut().set_connected(&pane.label, false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Mark as connected once pipe is open
|
info!("connected channel tmux.{} (pane {})", label, pane_id);
|
||||||
state.borrow_mut().set_connected(&pane.label, true);
|
state.borrow_mut().set_connected(label, true);
|
||||||
|
|
||||||
let reader = tokio::io::BufReader::new(file);
|
|
||||||
let mut lines = reader.lines();
|
|
||||||
let channel_key = format!("tmux.{}", pane.label);
|
|
||||||
|
|
||||||
|
let mut lines = tokio::io::BufReader::new(file).lines();
|
||||||
while let Ok(Some(line)) = lines.next_line().await {
|
while let Ok(Some(line)) = lines.next_line().await {
|
||||||
if line.trim().is_empty() {
|
if line.trim().is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
let log = s.channel_logs
|
s.channel_logs
|
||||||
.entry(channel_key.clone())
|
.entry(channel_key.to_string())
|
||||||
.or_insert_with(ChannelLog::new);
|
.or_insert_with(ChannelLog::new)
|
||||||
log.push(line);
|
.push(line);
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!("pipe-pane reader ended for {}", pane.label);
|
warn!("pipe-pane stream ended for {}", label);
|
||||||
state.borrow_mut().set_connected(&pane.label, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── ChannelServer Implementation ───────────────────────────────
|
// ── ChannelServer Implementation ───────────────────────────────
|
||||||
|
|
@ -244,10 +270,10 @@ impl channel_server::Server for ChannelServerImpl {
|
||||||
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
|
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
|
||||||
let message = pry!(pry!(params.get_message()).to_str()).to_string();
|
let message = pry!(pry!(params.get_message()).to_str()).to_string();
|
||||||
|
|
||||||
// Send to tmux pane via send-keys
|
// Send to tmux pane via send-keys — resolve the live pane id by
|
||||||
|
// label (it is not stored).
|
||||||
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
|
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
|
||||||
let pane_id = self.state.borrow().get_pane(label).map(String::from);
|
if let Some(pane_id) = find_pane_by_name(label) {
|
||||||
if let Some(pane_id) = pane_id {
|
|
||||||
let _ = std::process::Command::new("tmux")
|
let _ = std::process::Command::new("tmux")
|
||||||
.args(["send-keys", "-t", &pane_id, &message, "Enter"])
|
.args(["send-keys", "-t", &pane_id, &message, "Enter"])
|
||||||
.output();
|
.output();
|
||||||
|
|
@ -302,28 +328,22 @@ impl channel_server::Server for ChannelServerImpl {
|
||||||
let params = pry!(params.get());
|
let params = pry!(params.get());
|
||||||
let label = pry!(pry!(params.get_label()).to_str()).to_string();
|
let label = pry!(pry!(params.get_label()).to_str()).to_string();
|
||||||
|
|
||||||
// Check if already open
|
// Already registered — nothing to do.
|
||||||
if self.state.borrow().get_pane(&label).is_some() {
|
if self.state.borrow().has_pane(&label) {
|
||||||
return std::future::ready(Ok(()));
|
return std::future::ready(Ok(()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the tmux pane by name (window or pane title)
|
info!("opening channel tmux.{}", label);
|
||||||
let pane_id = match find_pane_by_name(&label) {
|
|
||||||
Some(id) => id,
|
|
||||||
None => return std::future::ready(Err(capnp::Error::failed(
|
|
||||||
format!("no tmux pane named '{}'", label)))),
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("opening channel tmux.{} (pane {})", label, pane_id);
|
// Register the label and persist. The pane id is not stored —
|
||||||
|
// the reader resolves it by label on every connect attempt, so
|
||||||
|
// this succeeds even if the pane does not exist yet; the reader
|
||||||
|
// connects once it appears.
|
||||||
|
self.state.borrow_mut().add_pane(label.clone());
|
||||||
|
|
||||||
// Register in state and persist
|
|
||||||
self.state.borrow_mut().add_pane(label.clone(), pane_id.clone());
|
|
||||||
|
|
||||||
// Start pipe-pane reader
|
|
||||||
let pane = PaneConfig { label, pane_id };
|
|
||||||
let reader_state = self.state.clone();
|
let reader_state = self.state.clone();
|
||||||
tokio::task::spawn_local(async move {
|
tokio::task::spawn_local(async move {
|
||||||
pipe_pane_reader(reader_state, pane).await;
|
pipe_pane_reader(reader_state, label).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
std::future::ready(Ok(()))
|
std::future::ready(Ok(()))
|
||||||
|
|
@ -339,15 +359,19 @@ impl channel_server::Server for ChannelServerImpl {
|
||||||
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
|
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
|
||||||
|
|
||||||
let mut s = self.state.borrow_mut();
|
let mut s = self.state.borrow_mut();
|
||||||
if let Some(pane_id) = s.remove_pane(&label) {
|
if s.remove_pane(&label) {
|
||||||
info!("closing channel tmux.{}", label);
|
info!("closing channel tmux.{}", label);
|
||||||
s.channel_logs.remove(&format!("tmux.{}", label));
|
s.channel_logs.remove(&format!("tmux.{}", label));
|
||||||
|
|
||||||
// Disconnect pipe-pane
|
// Stop piping if the pane is still around (if it is gone the
|
||||||
|
// pipe is already dead). The reader then sees the pane
|
||||||
|
// unregistered and exits.
|
||||||
|
if let Some(pane_id) = find_pane_by_name(&label) {
|
||||||
let _ = std::process::Command::new("tmux")
|
let _ = std::process::Command::new("tmux")
|
||||||
.args(["pipe-pane", "-t", &pane_id])
|
.args(["pipe-pane", "-t", &pane_id])
|
||||||
.output();
|
.output();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::future::ready(Ok(()))
|
std::future::ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
@ -397,11 +421,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
tokio::task::LocalSet::new()
|
tokio::task::LocalSet::new()
|
||||||
.run_until(async move {
|
.run_until(async move {
|
||||||
// Start a pipe-pane reader for each configured pane
|
// Start a pipe-pane reader for each configured pane; each
|
||||||
|
// resolves its live pane id by label and retries until
|
||||||
|
// connected.
|
||||||
for pane in state.borrow().config.panes.clone() {
|
for pane in state.borrow().config.panes.clone() {
|
||||||
let reader_state = state.clone();
|
let reader_state = state.clone();
|
||||||
tokio::task::spawn_local(async move {
|
tokio::task::spawn_local(async move {
|
||||||
pipe_pane_reader(reader_state, pane).await;
|
pipe_pane_reader(reader_state, pane.label).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue