channels: improve tmux state tracking and config persistence

tmux channel:
- Track connected state per-pane (shows true channel availability)
- Persist pane config on add/remove (survives restarts)
- Remove cleanup_pipes on exit (unnecessary with persisted config)
- Reorder PaneConfig fields for consistency

telegram channel:
- Use json5 crate for config parsing (matches tmux)

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-12 20:11:34 -04:00
parent 4556e16fd7
commit 33156d9ab3
4 changed files with 90 additions and 58 deletions

View file

@ -8,6 +8,7 @@ capnp = "0.25"
capnp-rpc = "0.25"
dirs = "6"
futures = "0.3"
json5 = "1.3"
consciousness = { path = "../.." }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -40,7 +40,7 @@ fn load_config() -> Config {
let config_path = dir.join("telegram.json5");
let text = std::fs::read_to_string(&config_path)
.unwrap_or_else(|_| panic!("failed to read {}", config_path.display()));
let mut config: Config = serde_json::from_str(&text)
let mut config: Config = json5::from_str(&text)
.unwrap_or_else(|e| panic!("failed to parse {}: {}", config_path.display(), e));
// Read token from secrets file

View file

@ -8,11 +8,11 @@ capnp = "0.25"
capnp-rpc = "0.25"
dirs = "6"
libc = "0.2"
scopeguard = "1"
futures = "0.3"
json5 = "1.3"
consciousness = { path = "../.." }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }
log = "0.4"

View file

@ -24,26 +24,30 @@ use consciousness::thalamus::channel_log::ChannelLog;
// ── Config ─────────────────────────────────────────────────────
#[derive(Clone, serde::Deserialize)]
#[derive(Clone, serde::Serialize, serde::Deserialize)]
struct PaneConfig {
/// Tmux pane ID, e.g. "0:1.0"
pane_id: String,
/// Human-readable label, becomes the channel name "tmux.<label>"
label: String,
/// Tmux pane ID, e.g. "%5"
pane_id: String,
}
#[derive(Clone, serde::Deserialize)]
#[derive(Clone, serde::Serialize, serde::Deserialize)]
struct Config {
#[serde(default)]
panes: Vec<PaneConfig>,
}
fn load_config() -> Config {
let path = dirs::home_dir()
fn config_path() -> std::path::PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels/tmux.json5");
match std::fs::read_to_string(&path) {
.join(".consciousness/channels/tmux.json5")
}
fn load_config() -> Config {
match std::fs::read_to_string(config_path()) {
Ok(text) => json5::from_str(&text)
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())),
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", config_path().display())),
Err(_) => {
info!("no tmux.json5, starting with no pre-configured panes");
Config { panes: vec![] }
@ -51,23 +55,71 @@ fn load_config() -> Config {
}
}
fn save_config(config: &Config) {
match serde_json::to_string_pretty(config) {
Ok(json) => {
if let Err(e) = std::fs::write(config_path(), json) {
error!("failed to write config: {}", e);
}
}
Err(e) => error!("failed to serialize config: {}", e),
}
}
// ── State ─────────────────────────────────────────────────────
struct State {
config: Config,
channel_logs: BTreeMap<String, ChannelLog>,
/// label → pane_id (e.g. "ktest" → "%0")
panes: BTreeMap<String, String>,
/// Tracks which panes are actually connected (pipe-pane active)
connected: BTreeMap<String, bool>,
}
type SharedState = Rc<RefCell<State>>;
impl State {
fn new(config: &Config) -> Self {
fn new(config: Config) -> Self {
Self {
config,
channel_logs: BTreeMap::new(),
panes: config.panes.iter()
.map(|p| (p.label.clone(), p.pane_id.clone()))
.collect(),
connected: BTreeMap::new(),
}
}
/// Get pane_id for a label
fn get_pane(&self, label: &str) -> Option<&str> {
self.config.panes.iter()
.find(|p| p.label == label)
.map(|p| p.pane_id.as_str())
}
/// Check if a pane is connected
fn is_connected(&self, label: &str) -> bool {
self.connected.get(label).copied().unwrap_or(false)
}
/// Set connection state for a pane
fn set_connected(&mut self, label: &str, connected: bool) {
self.connected.insert(label.to_string(), connected);
}
/// Add a pane and persist
fn add_pane(&mut self, label: String, pane_id: String) {
if !self.config.panes.iter().any(|p| p.label == label) {
self.config.panes.push(PaneConfig { label, pane_id });
save_config(&self.config);
}
}
/// Remove a pane and persist
fn remove_pane(&mut self, label: &str) -> Option<String> {
if let Some(idx) = self.config.panes.iter().position(|p| p.label == label) {
let pane = self.config.panes.remove(idx);
self.connected.remove(label);
save_config(&self.config);
Some(pane.pane_id)
} else {
None
}
}
}
@ -103,10 +155,12 @@ async fn pipe_pane_reader(state: SharedState, pane: PaneConfig) {
Ok(output) => {
error!("pipe-pane failed for {}: {}", pane.label,
String::from_utf8_lossy(&output.stderr));
state.borrow_mut().set_connected(&pane.label, false);
return;
}
Err(e) => {
error!("failed to run tmux pipe-pane for {}: {}", pane.label, e);
state.borrow_mut().set_connected(&pane.label, false);
return;
}
}
@ -116,10 +170,14 @@ async fn pipe_pane_reader(state: SharedState, pane: PaneConfig) {
Ok(f) => f,
Err(e) => {
error!("failed to open pipe for {}: {}", pane.label, e);
state.borrow_mut().set_connected(&pane.label, false);
return;
}
};
// Mark as connected once pipe is open
state.borrow_mut().set_connected(&pane.label, true);
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let channel_key = format!("tmux.{}", pane.label);
@ -136,6 +194,7 @@ async fn pipe_pane_reader(state: SharedState, pane: PaneConfig) {
}
warn!("pipe-pane reader ended for {}", pane.label);
state.borrow_mut().set_connected(&pane.label, false);
}
// ── ChannelServer Implementation ───────────────────────────────
@ -187,7 +246,7 @@ impl channel_server::Server for ChannelServerImpl {
// Send to tmux pane via send-keys
let label = channel.strip_prefix("tmux.").unwrap_or(&channel);
let pane_id = self.state.borrow().panes.get(label).cloned();
let pane_id = self.state.borrow().get_pane(label).map(String::from);
if let Some(pane_id) = pane_id {
let _ = std::process::Command::new("tmux")
.args(["send-keys", "-t", &pane_id, &message, "Enter"])
@ -210,10 +269,11 @@ impl channel_server::Server for ChannelServerImpl {
mut results: channel_server::ListResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow();
let channels: Vec<_> = s.panes.keys().map(|label| {
let key = format!("tmux.{}", label);
let channels: Vec<_> = s.config.panes.iter().map(|p| {
let key = format!("tmux.{}", p.label);
let connected = s.is_connected(&p.label);
let unread = s.channel_logs.get(&key).map_or(0, |l| l.unread());
(key, true, unread)
(key, connected, unread)
}).collect();
let mut list = results.get().init_channels(channels.len() as u32);
@ -243,11 +303,8 @@ impl channel_server::Server for ChannelServerImpl {
let label = pry!(pry!(params.get_label()).to_str()).to_string();
// Check if already open
{
let s = self.state.borrow();
if s.panes.contains_key(&label) {
return std::future::ready(Ok(()));
}
if self.state.borrow().get_pane(&label).is_some() {
return std::future::ready(Ok(()));
}
// Find the tmux pane by name (window or pane title)
@ -259,14 +316,11 @@ impl channel_server::Server for ChannelServerImpl {
info!("opening channel tmux.{} (pane {})", label, pane_id);
// Register in state
{
let mut s = self.state.borrow_mut();
s.panes.insert(label.clone(), pane_id.clone());
}
// Register in state and persist
self.state.borrow_mut().add_pane(label.clone(), pane_id.clone());
// Start pipe-pane reader
let pane = PaneConfig { pane_id, label };
let pane = PaneConfig { label, pane_id };
let reader_state = self.state.clone();
tokio::task::spawn_local(async move {
pipe_pane_reader(reader_state, pane).await;
@ -285,7 +339,7 @@ impl channel_server::Server for ChannelServerImpl {
let label = channel.strip_prefix("tmux.").unwrap_or(&channel).to_string();
let mut s = self.state.borrow_mut();
if let Some(pane_id) = s.panes.remove(&label) {
if let Some(pane_id) = s.remove_pane(&label) {
info!("closing channel tmux.{}", label);
s.channel_logs.remove(&format!("tmux.{}", label));
@ -323,24 +377,6 @@ fn find_pane_by_name(name: &str) -> Option<String> {
}
None
}
// ── Cleanup ───────────────────────────────────────────────────
/// Remove pipe-pane connections on exit.
fn cleanup_pipes(config: &Config) {
for pane in &config.panes {
// Disconnect pipe-pane
let _ = std::process::Command::new("tmux")
.args(["pipe-pane", "-t", &pane.pane_id])
.output();
}
// Clean up FIFO files
let pipe_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels/tmux-pipes");
let _ = std::fs::remove_dir_all(&pipe_dir);
}
// ── Main ───────────────────────────────────────────────────────
#[tokio::main]
@ -348,7 +384,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let config = load_config();
let state = Rc::new(RefCell::new(State::new(&config)));
let state = Rc::new(RefCell::new(State::new(config)));
let sock_dir = dirs::home_dir()
.unwrap_or_default()
@ -359,16 +395,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("tmux channel daemon starting on {}", sock_path.display());
// Set up cleanup on exit
let cleanup_config = config.clone();
let _cleanup = scopeguard::guard(cleanup_config, |c| cleanup_pipes(&c));
tokio::task::LocalSet::new()
.run_until(async move {
// Start a pipe-pane reader for each configured pane
for pane in &config.panes {
for pane in state.borrow().config.panes.clone() {
let reader_state = state.clone();
let pane = pane.clone();
tokio::task::spawn_local(async move {
pipe_pane_reader(reader_state, pane).await;
});