diff --git a/Cargo.lock b/Cargo.lock index 0d12bd2..f7a2fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2848,6 +2848,7 @@ dependencies = [ "json5", "libc", "log", + "memchr", "memmap2", "paste", "peg", diff --git a/poc-memory/src/agents/defs.rs b/poc-memory/src/agents/defs.rs index 6745fc7..875c6c5 100644 --- a/poc-memory/src/agents/defs.rs +++ b/poc-memory/src/agents/defs.rs @@ -456,20 +456,30 @@ fn resolve_conversation() -> String { let Some(path) = transcript else { return String::new() }; let path_str = path.to_string_lossy(); - let messages = crate::transcript::tail_messages(&path_str, 200_000); - if messages.is_empty() { return String::new(); } + let Some(iter) = crate::transcript::TailMessages::open(&path_str) else { + return String::new(); + }; let cfg = crate::config::get(); - let mut text = String::new(); - for (role, content, ts) in &messages { + let mut fragments: Vec = Vec::new(); + let mut total_bytes = 0; + const MAX_BYTES: usize = 200_000; + + for (role, content, ts) in iter { + if total_bytes >= MAX_BYTES { break; } let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name }; - if !ts.is_empty() { - text.push_str(&format!("**{}** {}: {}\n\n", name, &ts[..ts.len().min(19)], content)); + let formatted = if !ts.is_empty() { + format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], content) } else { - text.push_str(&format!("**{}:** {}\n\n", name, content)); - } + format!("**{}:** {}", name, content) + }; + total_bytes += content.len(); + fragments.push(formatted); } - text + + // Reverse back to chronological order + fragments.reverse(); + fragments.join("\n\n") } /// Get recently surfaced memory keys for the current session. diff --git a/poc-memory/src/transcript.rs b/poc-memory/src/transcript.rs index aee1015..01a1fc6 100644 --- a/poc-memory/src/transcript.rs +++ b/poc-memory/src/transcript.rs @@ -142,66 +142,97 @@ fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { haystack.windows(needle.len()).any(|w| w == needle) } -/// Reverse-scan a transcript file, collecting user/assistant messages -/// until `max_tokens` tokens (~4 chars each) are accumulated. Stops at -/// the last compaction boundary. Returns messages in chronological order. -pub fn tail_messages(path: &str, max_bytes: usize) -> Vec<(String, String, String)> { - let (mmap, _file) = match mmap_transcript(path) { - Some(v) => v, - None => return Vec::new(), - }; +/// Reverse iterator over user/assistant messages in a transcript file. +/// Yields (role, text, timestamp) tuples newest-first. The caller decides +/// when to stop (byte budget, count, etc). +pub struct TailMessages { + _file: fs::File, + mmap: Mmap, + pos: usize, +} - let mut messages: Vec<(String, String, String)> = Vec::new(); - let mut total_bytes = 0; - - for obj_bytes in JsonlBackwardIter::new(&mmap) { - if total_bytes >= max_bytes { break; } - - // Quick byte check: skip objects that aren't user/assistant - // (avoids parsing large tool_result / system objects) - if !contains_bytes(obj_bytes, b"\"user\"") - && !contains_bytes(obj_bytes, b"\"assistant\"") { - continue; - } - - let obj: Value = match serde_json::from_slice(obj_bytes) { - Ok(v) => v, - Err(_) => continue, - }; - - let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or(""); - if msg_type != "user" && msg_type != "assistant" { continue; } - - let msg = obj.get("message").unwrap_or(&obj); - let text = match msg.get("content") { - Some(Value::String(s)) => s.clone(), - Some(Value::Array(arr)) => { - arr.iter() - .filter(|b| b.get("type").and_then(|v| v.as_str()) == Some("text")) - .filter_map(|b| b.get("text").and_then(|v| v.as_str())) - .collect::>() - .join(" ") - } - _ => continue, - }; - if text.is_empty() { continue; } - - // Stop at compaction boundary - if msg_type == "user" && text.starts_with("This session is being continued") { - break; - } - - let timestamp = obj.get("timestamp") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - - total_bytes += text.len(); - messages.push((msg_type.to_string(), text, timestamp)); +impl TailMessages { + pub fn open(path: &str) -> Option { + let (mmap, file) = mmap_transcript(path)?; + let pos = mmap.len(); + Some(Self { _file: file, mmap, pos }) } +} - messages.reverse(); - messages +impl Iterator for TailMessages { + type Item = (String, String, String); + + fn next(&mut self) -> Option { + loop { + // Find closing } + let close = loop { + let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?; + self.pos = p; + if self.mmap[p] == b'}' { break p; } + }; + + // Track brace depth to find matching { + let mut depth: usize = 1; + let mut in_string = false; + let open = loop { + let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?; + self.pos = p; + let ch = self.mmap[p]; + + if in_string { + if ch == b'"' { + let mut bs = 0; + while p > bs + 1 && self.mmap[p - 1 - bs] == b'\\' { + bs += 1; + } + if bs % 2 == 0 { in_string = false; } + } + continue; + } + + match ch { + b'"' => { in_string = true; } + b'}' => { depth += 1; } + b'{' => { + depth -= 1; + if depth == 0 { break p; } + } + _ => {} + } + }; + + let obj_bytes = &self.mmap[open..=close]; + + let obj: Value = match serde_json::from_slice(obj_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + + let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if msg_type != "user" && msg_type != "assistant" { continue; } + + let msg = obj.get("message").unwrap_or(&obj); + let text = match msg.get("content") { + Some(Value::String(s)) => s.clone(), + Some(Value::Array(arr)) => { + arr.iter() + .filter(|b| b.get("type").and_then(|v| v.as_str()) == Some("text")) + .filter_map(|b| b.get("text").and_then(|v| v.as_str())) + .collect::>() + .join(" ") + } + _ => continue, + }; + if text.is_empty() { continue; } + + let timestamp = obj.get("timestamp") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + return Some((msg_type.to_string(), text, timestamp)); + } + } } /// Get the timestamp of the compaction message at a given byte offset.