diff --git a/channels/irc/src/main.rs b/channels/irc/src/main.rs index 07eb4b1..8f41259 100644 --- a/channels/irc/src/main.rs +++ b/channels/irc/src/main.rs @@ -161,14 +161,12 @@ impl AsyncWriter for PlainWriter { // ── State ────────────────────────────────────────────────────── +use poc_memory::thalamus::channel_log::ChannelLog; + struct State { config: Config, - /// Ring buffer of formatted message lines (all channels interleaved) - messages: VecDeque, - /// Number of messages consumed (monotonic) - consumed: usize, - /// Total messages ever received (monotonic) - total: usize, + /// Per-channel message logs (keyed by channel path, e.g. "irc.#bcachefs") + channel_logs: std::collections::BTreeMap, /// Currently joined channels channels: Vec, connected: bool, @@ -185,9 +183,7 @@ impl State { let channels = config.channels.clone(); Self { config, - messages: VecDeque::with_capacity(MAX_HISTORY), - consumed: 0, - total: 0, + channel_logs: std::collections::BTreeMap::new(), channels, connected: false, writer: None, @@ -196,14 +192,11 @@ impl State { } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { - if self.messages.len() >= MAX_HISTORY { - self.messages.pop_front(); - if self.consumed > 0 { - self.consumed -= 1; - } - } - self.messages.push_back(line.clone()); - self.total += 1; + // Store in per-channel log + self.channel_logs + .entry(channel.to_string()) + .or_insert_with(ChannelLog::new) + .push(line.clone()); // Notify all subscribers let preview = line.chars().take(80).collect::(); @@ -221,42 +214,6 @@ impl State { } } - fn recv_new(&mut self, min_count: usize) -> String { - let buf_len = self.messages.len(); - let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); - - let new_msgs: Vec<&str> = self.messages.iter() - .skip(unconsumed_start) - .map(|s| s.as_str()) - .collect(); - - let need_extra = min_count.saturating_sub(new_msgs.len()); - let scroll_start = unconsumed_start.saturating_sub(need_extra); - let scrollback: Vec<&str> = self.messages.iter() - .skip(scroll_start) - .take(unconsumed_start - scroll_start) - .map(|s| s.as_str()) - .collect(); - - self.consumed = self.total; - - let mut result = scrollback; - result.extend(new_msgs); - result.join("\n") - } - - fn recv_history(&self, count: usize) -> String { - self.messages.iter() - .rev() - .take(count) - .collect::>() - .into_iter() - .rev() - .map(|s| s.as_str()) - .collect::>() - .join("\n") - } - async fn send_raw(&mut self, line: &str) -> io::Result<()> { if let Some(ref mut w) = self.writer { w.write_line(line).await @@ -536,14 +493,16 @@ impl channel_server::Server for ChannelServerImpl { mut results: channel_server::RecvResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); - let _channel = pry!(params.get_channel()); + let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; - let text = if all_new { - self.state.borrow_mut().recv_new(min_count) - } else { - self.state.borrow().recv_history(min_count) + let mut s = self.state.borrow_mut(); + let text = match s.channel_logs.get_mut(&channel) { + Some(log) => { + if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) } + } + None => String::new(), }; results.get().set_text(&text); @@ -603,17 +562,18 @@ impl channel_server::Server for ChannelServerImpl { mut results: channel_server::ListResults, ) -> Promise<(), capnp::Error> { let s = self.state.borrow(); - let channels = &s.channels; - let unread = (s.total - s.consumed) as u32; let connected = s.connected; - let mut list = results.get().init_channels(channels.len() as u32); - for (i, ch) in channels.iter().enumerate() { + // All channels with logs (joined + PMs) + let names: Vec = s.channel_logs.keys().cloned().collect(); + let mut list = results.get().init_channels(names.len() as u32); + for (i, name) in names.iter().enumerate() { let mut entry = list.reborrow().get(i as u32); - let name = format!("irc.{ch}"); - entry.set_name(&name); + entry.set_name(name); entry.set_connected(connected); - entry.set_unread(unread); + entry.set_unread( + s.channel_logs.get(name).map_or(0, |l| l.unread()) + ); } Promise::ok(()) diff --git a/src/thalamus/channel_log.rs b/src/thalamus/channel_log.rs new file mode 100644 index 0000000..2de1ab7 --- /dev/null +++ b/src/thalamus/channel_log.rs @@ -0,0 +1,76 @@ +// channel_log.rs — Per-channel message history +// +// Shared by all channel daemon implementations. Tracks messages +// with consumed/unread semantics for the recv protocol. + +use std::collections::VecDeque; + +const DEFAULT_CAPACITY: usize = 200; + +/// Per-channel message history with consumed/unread tracking. +pub struct ChannelLog { + messages: VecDeque, + consumed: usize, + total: usize, +} + +impl ChannelLog { + pub fn new() -> Self { + Self { + messages: VecDeque::with_capacity(DEFAULT_CAPACITY), + consumed: 0, + total: 0, + } + } + + pub fn push(&mut self, line: String) { + if self.messages.len() >= DEFAULT_CAPACITY { + self.messages.pop_front(); + if self.consumed > 0 { + self.consumed -= 1; + } + } + self.messages.push_back(line); + self.total += 1; + } + + pub fn unread(&self) -> u32 { + (self.total - self.consumed) as u32 + } + + /// Return all unconsumed messages (marks consumed), plus scrollback + /// to reach at least min_count total. + pub fn recv_new(&mut self, min_count: usize) -> String { + let buf_len = self.messages.len(); + let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); + + let new_msgs: Vec<&str> = self.messages.iter() + .skip(unconsumed_start) + .map(|s| s.as_str()) + .collect(); + + let need_extra = min_count.saturating_sub(new_msgs.len()); + let scroll_start = unconsumed_start.saturating_sub(need_extra); + let scrollback: Vec<&str> = self.messages.iter() + .skip(scroll_start) + .take(unconsumed_start - scroll_start) + .map(|s| s.as_str()) + .collect(); + + self.consumed = self.total; + + let mut result = scrollback; + result.extend(new_msgs); + result.join("\n") + } + + /// Return last N lines without consuming. + pub fn recv_history(&self, count: usize) -> String { + self.messages.iter() + .rev().take(count) + .collect::>().into_iter().rev() + .map(|s| s.as_str()) + .collect::>() + .join("\n") + } +} diff --git a/src/thalamus/mod.rs b/src/thalamus/mod.rs index 13c7e2c..025cdc4 100644 --- a/src/thalamus/mod.rs +++ b/src/thalamus/mod.rs @@ -5,6 +5,7 @@ // code (in claude/) and the future substrate-independent consciousness // binary. +pub mod channel_log; pub mod channels; pub mod idle; pub mod supervisor;