From e39096b7872816015975632484c58b7409d4d5d7 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 22 Mar 2026 03:02:11 -0400 Subject: [PATCH] add tail_messages() for fast reverse transcript scanning Reverse-scans the mmap'd transcript using JsonlBackwardIter, collecting user/assistant messages up to a token budget, stopping at the compaction boundary. Returns messages in chronological order. resolve_conversation() now uses this instead of parsing the entire file through extract_conversation + split_on_compaction. Co-Authored-By: Claude Opus 4.6 (1M context) --- poc-memory/src/agents/defs.rs | 24 +++---------- poc-memory/src/transcript.rs | 67 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 20 deletions(-) diff --git a/poc-memory/src/agents/defs.rs b/poc-memory/src/agents/defs.rs index ab0dc82..d0f961d 100644 --- a/poc-memory/src/agents/defs.rs +++ b/poc-memory/src/agents/defs.rs @@ -455,19 +455,13 @@ fn resolve_conversation() -> String { let Some(path) = transcript else { return String::new() }; let path_str = path.to_string_lossy(); - let messages = match super::enrich::extract_conversation(&path_str) { - Ok(m) => m, - Err(_) => return String::new(), - }; - // Take the last segment (post-compaction) - let segments = super::enrich::split_on_compaction(messages); - let Some(segment) = segments.last() else { return String::new() }; + let messages = crate::transcript::tail_messages(&path_str, 25_000); + if messages.is_empty() { return String::new(); } - // Format and take the tail let cfg = crate::config::get(); let mut text = String::new(); - for (_, role, content, ts) in segment { + for (role, content, ts) in &messages { let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name }; if !ts.is_empty() { text.push_str(&format!("**{}** {}: {}\n\n", name, &ts[..ts.len().min(19)], content)); @@ -475,17 +469,7 @@ fn resolve_conversation() -> String { text.push_str(&format!("**{}:** {}\n\n", name, content)); } } - - // Tail: keep last ~100K chars - const MAX_CHARS: usize = 100_000; - if text.len() > MAX_CHARS { - // Find a clean line break near the cut point - let start = text.len() - MAX_CHARS; - let start = text[start..].find('\n').map(|i| start + i + 1).unwrap_or(start); - text[start..].to_string() - } else { - text - } + text } /// Get recently surfaced memory keys for the current session. diff --git a/poc-memory/src/transcript.rs b/poc-memory/src/transcript.rs index 202140c..3fe3679 100644 --- a/poc-memory/src/transcript.rs +++ b/poc-memory/src/transcript.rs @@ -142,6 +142,73 @@ fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { haystack.windows(needle.len()).any(|w| w == needle) } +/// Reverse-scan a transcript file, collecting user/assistant messages +/// until `max_tokens` tokens (~4 chars each) are accumulated. Stops at +/// the last compaction boundary. Returns messages in chronological order. +pub fn tail_messages(path: &str, max_tokens: usize) -> Vec<(String, String, String)> { + let (mmap, _file) = match mmap_transcript(path) { + Some(v) => v, + None => return Vec::new(), + }; + + let compaction_marker = b"This session is being continued"; + let mut messages: Vec<(String, String, String)> = Vec::new(); + let mut token_count = 0; + + for obj_bytes in JsonlBackwardIter::new(&mmap) { + if token_count >= max_tokens { break; } + + // Stop at compaction boundary + if contains_bytes(obj_bytes, compaction_marker) { + let obj: Value = match serde_json::from_slice(obj_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + if obj.get("type").and_then(|v| v.as_str()) == Some("user") { + if let Some(c) = obj.get("message") + .and_then(|m| m.get("content")) + .and_then(|c| c.as_str()) + && c.starts_with("This session is being continued") { + break; + } + } + } + + let obj: Value = match serde_json::from_slice(obj_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + + let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if msg_type != "user" && msg_type != "assistant" { continue; } + + let msg = obj.get("message").unwrap_or(&obj); + let text = match msg.get("content") { + Some(Value::String(s)) => s.clone(), + Some(Value::Array(arr)) => { + arr.iter() + .filter(|b| b.get("type").and_then(|v| v.as_str()) == Some("text")) + .filter_map(|b| b.get("text").and_then(|v| v.as_str())) + .collect::>() + .join(" ") + } + _ => continue, + }; + if text.is_empty() { continue; } + + let timestamp = obj.get("timestamp") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + token_count += text.len() / 4; + messages.push((msg_type.to_string(), text, timestamp)); + } + + messages.reverse(); + messages +} + /// Get the timestamp of the compaction message at a given byte offset. /// Returns a human-readable datetime string, or None if unavailable. pub fn compaction_timestamp(path: &str, offset: u64) -> Option {