From c19f26f4fa1d3441ba6b66fd5dad54bf0301c21c Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 3 Apr 2026 20:04:08 -0400 Subject: [PATCH] telegram daemon: per-channel logs with shared ChannelLog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same treatment as IRC daemon — replace single ring buffer with BTreeMap. list() returns all channels with per-channel unread counts. Sent messages tracked too. Co-Developed-By: Kent Overstreet --- channels/telegram/src/main.rs | 106 +++++++++------------------------- 1 file changed, 27 insertions(+), 79 deletions(-) diff --git a/channels/telegram/src/main.rs b/channels/telegram/src/main.rs index 384a296..34fb3f8 100644 --- a/channels/telegram/src/main.rs +++ b/channels/telegram/src/main.rs @@ -45,17 +45,12 @@ fn load_config() -> Config { // ── State ─────────────────────────────────────────────────────── -const MAX_HISTORY: usize = 1000; +use poc_memory::thalamus::channel_log::ChannelLog; struct State { config: Config, - /// Ring buffer of formatted message lines - messages: VecDeque, - /// Index of first unconsumed message per client - /// (simplified: single consumer for now) - consumed: usize, - /// Total messages ever received (monotonic counter) - total: usize, + /// Per-channel message logs (keyed by channel path, e.g. "telegram.kent") + channel_logs: std::collections::BTreeMap, /// Telegram API offset last_offset: i64, connected: bool, @@ -71,9 +66,7 @@ impl State { let last_offset = load_offset(); Self { config, - messages: VecDeque::with_capacity(MAX_HISTORY), - consumed: 0, - total: 0, + channel_logs: std::collections::BTreeMap::new(), last_offset, connected: false, client: reqwest::Client::new(), @@ -82,15 +75,10 @@ impl State { } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { - if self.messages.len() >= MAX_HISTORY { - self.messages.pop_front(); - // Adjust consumed so it doesn't point past the buffer - if self.consumed > 0 { - self.consumed -= 1; - } - } - self.messages.push_back(line.clone()); - self.total += 1; + self.channel_logs + .entry(channel.to_string()) + .or_insert_with(ChannelLog::new) + .push(line.clone()); // Notify all subscribers let preview = line.chars().take(80).collect::(); @@ -113,52 +101,6 @@ impl State { format!("https://api.telegram.org/bot{}/{}", self.config.token, method) } - /// Get new unconsumed messages, mark as consumed. - /// Returns at least min_count lines (from history if needed). - fn recv_new(&mut self, min_count: usize) -> String { - let buf_len = self.messages.len(); - let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); - let _unconsumed = &self.messages.as_slices(); - - // Collect unconsumed - let new_msgs: Vec<&str> = self.messages.iter() - .skip(unconsumed_start) - .map(|s| s.as_str()) - .collect(); - - let need_extra = if new_msgs.len() < min_count { - min_count - new_msgs.len() - } else { - 0 - }; - - // Scrollback for extra lines - let scroll_start = unconsumed_start.saturating_sub(need_extra); - let scrollback: Vec<&str> = self.messages.iter() - .skip(scroll_start) - .take(unconsumed_start - scroll_start) - .map(|s| s.as_str()) - .collect(); - - self.consumed = self.total; - - let mut result = scrollback; - result.extend(new_msgs); - result.join("\n") - } - - /// Get last N lines without consuming. - fn recv_history(&self, count: usize) -> String { - self.messages.iter() - .rev() - .take(count) - .collect::>() - .into_iter() - .rev() - .map(|s| s.as_str()) - .collect::>() - .join("\n") - } } // ── Persistence ───────────────────────────────────────────────── @@ -281,14 +223,16 @@ impl channel_server::Server for ChannelServerImpl { mut results: channel_server::RecvResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); - let _channel = pry!(params.get_channel()); + let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; - let text = if all_new { - self.state.borrow_mut().recv_new(min_count) - } else { - self.state.borrow().recv_history(min_count) + let mut s = self.state.borrow_mut(); + let text = match s.channel_logs.get_mut(&channel) { + Some(log) => { + if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) } + } + None => String::new(), }; results.get().set_text(&text); @@ -301,7 +245,7 @@ impl channel_server::Server for ChannelServerImpl { _results: channel_server::SendResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); - let _channel = pry!(params.get_channel()); + let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string(); let state = self.state.clone(); @@ -340,14 +284,18 @@ impl channel_server::Server for ChannelServerImpl { mut results: channel_server::ListResults, ) -> Promise<(), capnp::Error> { let s = self.state.borrow(); - let unread = (s.total - s.consumed) as u32; + let connected = s.connected; - // Report a single "telegram" channel - let mut list = results.get().init_channels(1); - let mut ch = list.reborrow().get(0); - ch.set_name("telegram"); - ch.set_connected(s.connected); - ch.set_unread(unread); + let names: Vec = s.channel_logs.keys().cloned().collect(); + let mut list = results.get().init_channels(names.len() as u32); + for (i, name) in names.iter().enumerate() { + let mut entry = list.reborrow().get(i as u32); + entry.set_name(name); + entry.set_connected(connected); + entry.set_unread( + s.channel_logs.get(name).map_or(0, |l| l.unread()) + ); + } Promise::ok(()) }