telegram daemon: per-channel logs with shared ChannelLog

Same treatment as IRC daemon — replace single ring buffer with
BTreeMap<String, ChannelLog>. list() returns all channels with
per-channel unread counts. Sent messages tracked too.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-04-03 20:04:08 -04:00
parent e604659e3a
commit c19f26f4fa

View file

@ -45,17 +45,12 @@ fn load_config() -> Config {
// ── State ─────────────────────────────────────────────────────── // ── State ───────────────────────────────────────────────────────
const MAX_HISTORY: usize = 1000; use poc_memory::thalamus::channel_log::ChannelLog;
struct State { struct State {
config: Config, config: Config,
/// Ring buffer of formatted message lines /// Per-channel message logs (keyed by channel path, e.g. "telegram.kent")
messages: VecDeque<String>, channel_logs: std::collections::BTreeMap<String, ChannelLog>,
/// Index of first unconsumed message per client
/// (simplified: single consumer for now)
consumed: usize,
/// Total messages ever received (monotonic counter)
total: usize,
/// Telegram API offset /// Telegram API offset
last_offset: i64, last_offset: i64,
connected: bool, connected: bool,
@ -71,9 +66,7 @@ impl State {
let last_offset = load_offset(); let last_offset = load_offset();
Self { Self {
config, config,
messages: VecDeque::with_capacity(MAX_HISTORY), channel_logs: std::collections::BTreeMap::new(),
consumed: 0,
total: 0,
last_offset, last_offset,
connected: false, connected: false,
client: reqwest::Client::new(), client: reqwest::Client::new(),
@ -82,15 +75,10 @@ impl State {
} }
fn push_message(&mut self, line: String, urgency: u8, channel: &str) { fn push_message(&mut self, line: String, urgency: u8, channel: &str) {
if self.messages.len() >= MAX_HISTORY { self.channel_logs
self.messages.pop_front(); .entry(channel.to_string())
// Adjust consumed so it doesn't point past the buffer .or_insert_with(ChannelLog::new)
if self.consumed > 0 { .push(line.clone());
self.consumed -= 1;
}
}
self.messages.push_back(line.clone());
self.total += 1;
// Notify all subscribers // Notify all subscribers
let preview = line.chars().take(80).collect::<String>(); let preview = line.chars().take(80).collect::<String>();
@ -113,52 +101,6 @@ impl State {
format!("https://api.telegram.org/bot{}/{}", self.config.token, method) format!("https://api.telegram.org/bot{}/{}", self.config.token, method)
} }
/// Get new unconsumed messages, mark as consumed.
/// Returns at least min_count lines (from history if needed).
fn recv_new(&mut self, min_count: usize) -> String {
let buf_len = self.messages.len();
let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed);
let _unconsumed = &self.messages.as_slices();
// Collect unconsumed
let new_msgs: Vec<&str> = self.messages.iter()
.skip(unconsumed_start)
.map(|s| s.as_str())
.collect();
let need_extra = if new_msgs.len() < min_count {
min_count - new_msgs.len()
} else {
0
};
// Scrollback for extra lines
let scroll_start = unconsumed_start.saturating_sub(need_extra);
let scrollback: Vec<&str> = self.messages.iter()
.skip(scroll_start)
.take(unconsumed_start - scroll_start)
.map(|s| s.as_str())
.collect();
self.consumed = self.total;
let mut result = scrollback;
result.extend(new_msgs);
result.join("\n")
}
/// Get last N lines without consuming.
fn recv_history(&self, count: usize) -> String {
self.messages.iter()
.rev()
.take(count)
.collect::<Vec<_>>()
.into_iter()
.rev()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join("\n")
}
} }
// ── Persistence ───────────────────────────────────────────────── // ── Persistence ─────────────────────────────────────────────────
@ -281,14 +223,16 @@ impl channel_server::Server for ChannelServerImpl {
mut results: channel_server::RecvResults, mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
let params = pry!(params.get()); let params = pry!(params.get());
let _channel = pry!(params.get_channel()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new(); let all_new = params.get_all_new();
let min_count = params.get_min_count() as usize; let min_count = params.get_min_count() as usize;
let text = if all_new { let mut s = self.state.borrow_mut();
self.state.borrow_mut().recv_new(min_count) let text = match s.channel_logs.get_mut(&channel) {
} else { Some(log) => {
self.state.borrow().recv_history(min_count) if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
}
None => String::new(),
}; };
results.get().set_text(&text); results.get().set_text(&text);
@ -301,7 +245,7 @@ impl channel_server::Server for ChannelServerImpl {
_results: channel_server::SendResults, _results: channel_server::SendResults,
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
let params = pry!(params.get()); let params = pry!(params.get());
let _channel = pry!(params.get_channel()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string();
let state = self.state.clone(); let state = self.state.clone();
@ -340,14 +284,18 @@ impl channel_server::Server for ChannelServerImpl {
mut results: channel_server::ListResults, mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
let s = self.state.borrow(); let s = self.state.borrow();
let unread = (s.total - s.consumed) as u32; let connected = s.connected;
// Report a single "telegram" channel let names: Vec<String> = s.channel_logs.keys().cloned().collect();
let mut list = results.get().init_channels(1); let mut list = results.get().init_channels(names.len() as u32);
let mut ch = list.reborrow().get(0); for (i, name) in names.iter().enumerate() {
ch.set_name("telegram"); let mut entry = list.reborrow().get(i as u32);
ch.set_connected(s.connected); entry.set_name(name);
ch.set_unread(unread); entry.set_connected(connected);
entry.set_unread(
s.channel_logs.get(name).map_or(0, |l| l.unread())
);
}
Promise::ok(()) Promise::ok(())
} }