channels: find_daemon path walking, consistent pane_id, auto-start
find_daemon() replaces daemon_sock() — walks the dot-delimited channel path from most-specific to least looking for a daemon socket, and auto-starts via the supervisor if none is found. All channel tools (recv, send, open, close) use the same resolution path. Fix tmux daemon to use pane_id consistently for both pipe-pane and send-keys (send-keys -t <label> doesn't work, needs the %N pane id). Store label→pane_id mapping in State instead of bare label vec. Gracefully handle missing tmux.json5 — start with empty pane list since panes are added dynamically via the open RPC. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
c9b19dc3d7
commit
2a84fb325d
3 changed files with 103 additions and 33 deletions
|
|
@ -42,17 +42,22 @@ fn load_config() -> Config {
|
|||
let path = dirs::home_dir()
|
||||
.unwrap_or_default()
|
||||
.join(".consciousness/channels/tmux.json5");
|
||||
let text = std::fs::read_to_string(&path)
|
||||
.unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display()));
|
||||
json5::from_str(&text)
|
||||
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display()))
|
||||
match std::fs::read_to_string(&path) {
|
||||
Ok(text) => json5::from_str(&text)
|
||||
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())),
|
||||
Err(_) => {
|
||||
info!("no tmux.json5, starting with no pre-configured panes");
|
||||
Config { panes: vec![] }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── State ─────────────────────────────────────────────────────
|
||||
|
||||
struct State {
|
||||
channel_logs: BTreeMap<String, ChannelLog>,
|
||||
pane_labels: Vec<String>,
|
||||
/// label → pane_id (e.g. "ktest" → "%0")
|
||||
panes: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
type SharedState = Rc<RefCell<State>>;
|
||||
|
|
@ -61,7 +66,9 @@ impl State {
|
|||
fn new(config: &Config) -> Self {
|
||||
Self {
|
||||
channel_logs: BTreeMap::new(),
|
||||
pane_labels: config.panes.iter().map(|p| p.label.clone()).collect(),
|
||||
panes: config.panes.iter()
|
||||
.map(|p| (p.label.clone(), p.pane_id.clone()))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -172,10 +179,10 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
|
||||
// Send to tmux pane via send-keys
|
||||
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
|
||||
let has_pane = self.state.borrow().pane_labels.iter().any(|l| l == label);
|
||||
if has_pane {
|
||||
let pane_id = self.state.borrow().panes.get(label).cloned();
|
||||
if let Some(pane_id) = pane_id {
|
||||
let _ = std::process::Command::new("tmux")
|
||||
.args(["send-keys", "-t", label, &message, "Enter"])
|
||||
.args(["send-keys", "-t", &pane_id, &message, "Enter"])
|
||||
.output();
|
||||
|
||||
let channel_key = format!("tmux.{}", label);
|
||||
|
|
@ -195,7 +202,7 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
mut results: channel_server::ListResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let s = self.state.borrow();
|
||||
let channels: Vec<_> = s.pane_labels.iter().map(|label| {
|
||||
let channels: Vec<_> = s.panes.keys().map(|label| {
|
||||
let key = format!("tmux.{}", label);
|
||||
let unread = s.channel_logs.get(&key).map_or(0, |l| l.unread());
|
||||
(key, true, unread)
|
||||
|
|
@ -230,7 +237,7 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
// Check if already open
|
||||
{
|
||||
let s = self.state.borrow();
|
||||
if s.pane_labels.contains(&label) {
|
||||
if s.panes.contains_key(&label) {
|
||||
return Promise::ok(());
|
||||
}
|
||||
}
|
||||
|
|
@ -247,7 +254,7 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
// Register in state
|
||||
{
|
||||
let mut s = self.state.borrow_mut();
|
||||
s.pane_labels.push(label.clone());
|
||||
s.panes.insert(label.clone(), pane_id.clone());
|
||||
}
|
||||
|
||||
// Start pipe-pane reader
|
||||
|
|
@ -270,18 +277,15 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
|
||||
|
||||
let mut s = self.state.borrow_mut();
|
||||
if let Some(pos) = s.pane_labels.iter().position(|l| *l == label) {
|
||||
if let Some(pane_id) = s.panes.remove(&label) {
|
||||
info!("closing channel tmux.{}", label);
|
||||
s.pane_labels.remove(pos);
|
||||
s.channel_logs.remove(&format!("tmux.{}", label));
|
||||
|
||||
// Disconnect pipe-pane — find the pane ID
|
||||
if let Some(pane_id) = find_pane_by_name(&label) {
|
||||
// Disconnect pipe-pane
|
||||
let _ = std::process::Command::new("tmux")
|
||||
.args(["pipe-pane", "-t", &pane_id])
|
||||
.output();
|
||||
}
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ fn default_min_count() -> u32 { 20 }
|
|||
async fn channel_recv(args: &serde_json::Value) -> Result<String> {
|
||||
let a: RecvArgs = serde_json::from_value(args.clone())
|
||||
.context("invalid channel_recv arguments")?;
|
||||
let sock = daemon_sock(&a.channel)?;
|
||||
let (sock, _) = find_daemon(&a.channel)?;
|
||||
let channel = a.channel;
|
||||
let all_new = a.all_new;
|
||||
let min_count = a.min_count;
|
||||
|
|
@ -90,7 +90,7 @@ struct SendArgs {
|
|||
async fn channel_send(args: &serde_json::Value) -> Result<String> {
|
||||
let a: SendArgs = serde_json::from_value(args.clone())
|
||||
.context("invalid channel_send arguments")?;
|
||||
let sock = daemon_sock(&a.channel)?;
|
||||
let (sock, _) = find_daemon(&a.channel)?;
|
||||
let channel = a.channel;
|
||||
let message = a.message;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
|
|
@ -119,13 +119,12 @@ async fn channel_open(args: &serde_json::Value) -> Result<String> {
|
|||
let label = args.get("label").and_then(|v| v.as_str())
|
||||
.context("label is required")?
|
||||
.to_string();
|
||||
let prefix = label.split('.').next().unwrap_or("tmux");
|
||||
let sock = daemon_sock(prefix)?;
|
||||
let (sock, sublabel) = find_daemon(&label)?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all().build().unwrap();
|
||||
let local = tokio::task::LocalSet::new();
|
||||
local.block_on(&rt, rpc_open(&sock, &label))
|
||||
local.block_on(&rt, rpc_open(&sock, &sublabel))
|
||||
}).await?
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
}
|
||||
|
|
@ -134,7 +133,7 @@ async fn channel_close(args: &serde_json::Value) -> Result<String> {
|
|||
let channel = args.get("channel").and_then(|v| v.as_str())
|
||||
.context("channel is required")?
|
||||
.to_string();
|
||||
let sock = daemon_sock(&channel)?;
|
||||
let (sock, _) = find_daemon(&channel)?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all().build().unwrap();
|
||||
|
|
@ -144,7 +143,7 @@ async fn channel_close(args: &serde_json::Value) -> Result<String> {
|
|||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
}
|
||||
|
||||
// ── Socket helpers ─────────────────────────────────────────────
|
||||
// ── Daemon resolution ─────────────────────────────────────────
|
||||
|
||||
fn channels_dir() -> std::path::PathBuf {
|
||||
dirs::home_dir()
|
||||
|
|
@ -152,13 +151,64 @@ fn channels_dir() -> std::path::PathBuf {
|
|||
.join(".consciousness/channels")
|
||||
}
|
||||
|
||||
fn daemon_sock(channel: &str) -> Result<std::path::PathBuf> {
|
||||
let prefix = channel.split('.').next().unwrap_or("");
|
||||
let sock = channels_dir().join(format!("{}.sock", prefix));
|
||||
if !sock.exists() {
|
||||
anyhow::bail!("no daemon for channel: {}", channel);
|
||||
/// Resolve a channel path to a daemon socket.
|
||||
///
|
||||
/// Walks the dot-delimited path from most-specific to least,
|
||||
/// looking for a daemon socket at each level:
|
||||
/// "tmux.ktest" → finds tmux.sock, returns ("tmux.sock", "ktest")
|
||||
/// "irc.libera.#bcachefs" → finds irc.sock, returns ("irc.sock", "libera.#bcachefs")
|
||||
///
|
||||
/// If no daemon is running, tries to start one via the supervisor.
|
||||
fn find_daemon(path: &str) -> Result<(std::path::PathBuf, String)> {
|
||||
let dir = channels_dir();
|
||||
|
||||
// Returns the sub-path after the matched prefix
|
||||
let rest_after = |prefix: &str| -> String {
|
||||
if prefix.len() < path.len() {
|
||||
path[prefix.len() + 1..].to_string()
|
||||
} else {
|
||||
String::new()
|
||||
}
|
||||
Ok(sock)
|
||||
};
|
||||
|
||||
// Walk from most-specific to least, looking for a socket
|
||||
let mut prefix = path;
|
||||
loop {
|
||||
let sock = dir.join(format!("{}.sock", prefix));
|
||||
if sock.exists() {
|
||||
return Ok((sock, rest_after(prefix)));
|
||||
}
|
||||
match prefix.rfind('.') {
|
||||
Some(pos) => prefix = &prefix[..pos],
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// No running daemon found — register and start via supervisor
|
||||
let top = path.split('.').next().unwrap_or(path);
|
||||
let mut sup = crate::thalamus::supervisor::Supervisor::new();
|
||||
sup.load_config();
|
||||
|
||||
if !sup.has_daemon(top) {
|
||||
sup.add_daemon(top, crate::thalamus::supervisor::ChannelEntry {
|
||||
binary: format!("consciousness-channel-{}", top),
|
||||
enabled: true,
|
||||
autostart: true,
|
||||
});
|
||||
}
|
||||
|
||||
sup.ensure_running();
|
||||
|
||||
// Wait for socket (up to 3 seconds)
|
||||
let sock = dir.join(format!("{}.sock", top));
|
||||
for _ in 0..30 {
|
||||
if sock.exists() {
|
||||
return Ok((sock, rest_after(top)));
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
anyhow::bail!("no daemon for channel path: {}", path)
|
||||
}
|
||||
|
||||
// ── Channel RPC ────────────────────────────────────────────────
|
||||
|
|
|
|||
|
|
@ -95,6 +95,22 @@ impl Supervisor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Check if a daemon is in the config.
|
||||
pub fn has_daemon(&self, name: &str) -> bool {
|
||||
self.config.contains_key(name)
|
||||
}
|
||||
|
||||
/// Add a daemon to the config and persist to channels.json5.
|
||||
pub fn add_daemon(&mut self, name: &str, entry: ChannelEntry) {
|
||||
self.config.insert(name.to_string(), entry);
|
||||
let path = config_path();
|
||||
if let Ok(json) = serde_json::to_string_pretty(&self.config) {
|
||||
if let Err(e) = std::fs::write(&path, &json) {
|
||||
error!("failed to write {}: {}", path.display(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a daemon is alive by testing its socket.
|
||||
fn is_alive(name: &str) -> bool {
|
||||
let sock = channels_dir().join(format!("{}.sock", name));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue