diff --git a/channels/tmux/src/main.rs b/channels/tmux/src/main.rs index 56a48b6..b2f9dd0 100644 --- a/channels/tmux/src/main.rs +++ b/channels/tmux/src/main.rs @@ -42,17 +42,22 @@ fn load_config() -> Config { let path = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels/tmux.json5"); - let text = std::fs::read_to_string(&path) - .unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display())); - json5::from_str(&text) - .unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())) + match std::fs::read_to_string(&path) { + Ok(text) => json5::from_str(&text) + .unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())), + Err(_) => { + info!("no tmux.json5, starting with no pre-configured panes"); + Config { panes: vec![] } + } + } } // ── State ───────────────────────────────────────────────────── struct State { channel_logs: BTreeMap, - pane_labels: Vec, + /// label → pane_id (e.g. "ktest" → "%0") + panes: BTreeMap, } type SharedState = Rc>; @@ -61,7 +66,9 @@ impl State { fn new(config: &Config) -> Self { Self { channel_logs: BTreeMap::new(), - pane_labels: config.panes.iter().map(|p| p.label.clone()).collect(), + panes: config.panes.iter() + .map(|p| (p.label.clone(), p.pane_id.clone())) + .collect(), } } } @@ -172,10 +179,10 @@ impl channel_server::Server for ChannelServerImpl { // Send to tmux pane via send-keys let label = channel.strip_prefix("tmux.").unwrap_or(&channel); - let has_pane = self.state.borrow().pane_labels.iter().any(|l| l == label); - if has_pane { + let pane_id = self.state.borrow().panes.get(label).cloned(); + if let Some(pane_id) = pane_id { let _ = std::process::Command::new("tmux") - .args(["send-keys", "-t", label, &message, "Enter"]) + .args(["send-keys", "-t", &pane_id, &message, "Enter"]) .output(); let channel_key = format!("tmux.{}", label); @@ -195,7 +202,7 @@ impl channel_server::Server for ChannelServerImpl { mut results: channel_server::ListResults, ) -> Promise<(), capnp::Error> { let s = self.state.borrow(); - let channels: Vec<_> = s.pane_labels.iter().map(|label| { + let channels: Vec<_> = s.panes.keys().map(|label| { let key = format!("tmux.{}", label); let unread = s.channel_logs.get(&key).map_or(0, |l| l.unread()); (key, true, unread) @@ -230,7 +237,7 @@ impl channel_server::Server for ChannelServerImpl { // Check if already open { let s = self.state.borrow(); - if s.pane_labels.contains(&label) { + if s.panes.contains_key(&label) { return Promise::ok(()); } } @@ -247,7 +254,7 @@ impl channel_server::Server for ChannelServerImpl { // Register in state { let mut s = self.state.borrow_mut(); - s.pane_labels.push(label.clone()); + s.panes.insert(label.clone(), pane_id.clone()); } // Start pipe-pane reader @@ -270,17 +277,14 @@ impl channel_server::Server for ChannelServerImpl { let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string(); let mut s = self.state.borrow_mut(); - if let Some(pos) = s.pane_labels.iter().position(|l| *l == label) { + if let Some(pane_id) = s.panes.remove(&label) { info!("closing channel tmux.{}", label); - s.pane_labels.remove(pos); s.channel_logs.remove(&format!("tmux.{}", label)); - // Disconnect pipe-pane — find the pane ID - if let Some(pane_id) = find_pane_by_name(&label) { - let _ = std::process::Command::new("tmux") - .args(["pipe-pane", "-t", &pane_id]) - .output(); - } + // Disconnect pipe-pane + let _ = std::process::Command::new("tmux") + .args(["pipe-pane", "-t", &pane_id]) + .output(); } Promise::ok(()) diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs index a143934..479bf91 100644 --- a/src/agent/tools/channels.rs +++ b/src/agent/tools/channels.rs @@ -67,7 +67,7 @@ fn default_min_count() -> u32 { 20 } async fn channel_recv(args: &serde_json::Value) -> Result { let a: RecvArgs = serde_json::from_value(args.clone()) .context("invalid channel_recv arguments")?; - let sock = daemon_sock(&a.channel)?; + let (sock, _) = find_daemon(&a.channel)?; let channel = a.channel; let all_new = a.all_new; let min_count = a.min_count; @@ -90,7 +90,7 @@ struct SendArgs { async fn channel_send(args: &serde_json::Value) -> Result { let a: SendArgs = serde_json::from_value(args.clone()) .context("invalid channel_send arguments")?; - let sock = daemon_sock(&a.channel)?; + let (sock, _) = find_daemon(&a.channel)?; let channel = a.channel; let message = a.message; tokio::task::spawn_blocking(move || { @@ -119,13 +119,12 @@ async fn channel_open(args: &serde_json::Value) -> Result { let label = args.get("label").and_then(|v| v.as_str()) .context("label is required")? .to_string(); - let prefix = label.split('.').next().unwrap_or("tmux"); - let sock = daemon_sock(prefix)?; + let (sock, sublabel) = find_daemon(&label)?; tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); let local = tokio::task::LocalSet::new(); - local.block_on(&rt, rpc_open(&sock, &label)) + local.block_on(&rt, rpc_open(&sock, &sublabel)) }).await? .map_err(|e| anyhow::anyhow!("{}", e)) } @@ -134,7 +133,7 @@ async fn channel_close(args: &serde_json::Value) -> Result { let channel = args.get("channel").and_then(|v| v.as_str()) .context("channel is required")? .to_string(); - let sock = daemon_sock(&channel)?; + let (sock, _) = find_daemon(&channel)?; tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all().build().unwrap(); @@ -144,7 +143,7 @@ async fn channel_close(args: &serde_json::Value) -> Result { .map_err(|e| anyhow::anyhow!("{}", e)) } -// ── Socket helpers ───────────────────────────────────────────── +// ── Daemon resolution ───────────────────────────────────────── fn channels_dir() -> std::path::PathBuf { dirs::home_dir() @@ -152,13 +151,64 @@ fn channels_dir() -> std::path::PathBuf { .join(".consciousness/channels") } -fn daemon_sock(channel: &str) -> Result { - let prefix = channel.split('.').next().unwrap_or(""); - let sock = channels_dir().join(format!("{}.sock", prefix)); - if !sock.exists() { - anyhow::bail!("no daemon for channel: {}", channel); +/// Resolve a channel path to a daemon socket. +/// +/// Walks the dot-delimited path from most-specific to least, +/// looking for a daemon socket at each level: +/// "tmux.ktest" → finds tmux.sock, returns ("tmux.sock", "ktest") +/// "irc.libera.#bcachefs" → finds irc.sock, returns ("irc.sock", "libera.#bcachefs") +/// +/// If no daemon is running, tries to start one via the supervisor. +fn find_daemon(path: &str) -> Result<(std::path::PathBuf, String)> { + let dir = channels_dir(); + + // Returns the sub-path after the matched prefix + let rest_after = |prefix: &str| -> String { + if prefix.len() < path.len() { + path[prefix.len() + 1..].to_string() + } else { + String::new() + } + }; + + // Walk from most-specific to least, looking for a socket + let mut prefix = path; + loop { + let sock = dir.join(format!("{}.sock", prefix)); + if sock.exists() { + return Ok((sock, rest_after(prefix))); + } + match prefix.rfind('.') { + Some(pos) => prefix = &prefix[..pos], + None => break, + } } - Ok(sock) + + // No running daemon found — register and start via supervisor + let top = path.split('.').next().unwrap_or(path); + let mut sup = crate::thalamus::supervisor::Supervisor::new(); + sup.load_config(); + + if !sup.has_daemon(top) { + sup.add_daemon(top, crate::thalamus::supervisor::ChannelEntry { + binary: format!("consciousness-channel-{}", top), + enabled: true, + autostart: true, + }); + } + + sup.ensure_running(); + + // Wait for socket (up to 3 seconds) + let sock = dir.join(format!("{}.sock", top)); + for _ in 0..30 { + if sock.exists() { + return Ok((sock, rest_after(top))); + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + anyhow::bail!("no daemon for channel path: {}", path) } // ── Channel RPC ──────────────────────────────────────────────── diff --git a/src/thalamus/supervisor.rs b/src/thalamus/supervisor.rs index fe15235..3a07c2e 100644 --- a/src/thalamus/supervisor.rs +++ b/src/thalamus/supervisor.rs @@ -95,6 +95,22 @@ impl Supervisor { } } + /// Check if a daemon is in the config. + pub fn has_daemon(&self, name: &str) -> bool { + self.config.contains_key(name) + } + + /// Add a daemon to the config and persist to channels.json5. + pub fn add_daemon(&mut self, name: &str, entry: ChannelEntry) { + self.config.insert(name.to_string(), entry); + let path = config_path(); + if let Ok(json) = serde_json::to_string_pretty(&self.config) { + if let Err(e) = std::fs::write(&path, &json) { + error!("failed to write {}: {}", path.display(), e); + } + } + } + /// Check if a daemon is alive by testing its socket. fn is_alive(name: &str) -> bool { let sock = channels_dir().join(format!("{}.sock", name));