replace tail_messages with TailMessages iterator
TailMessages is a proper iterator that yields (role, text, timestamp) newest-first. Owns the mmap internally. Caller decides when to stop. resolve_conversation collects up to 200KB, then reverses to chronological order. No compaction check needed — the byte budget naturally limits how far back we scan. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6c41b50e04
commit
ecc2cb7b20
3 changed files with 108 additions and 66 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -2848,6 +2848,7 @@ dependencies = [
|
||||||
"json5",
|
"json5",
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
"memchr",
|
||||||
"memmap2",
|
"memmap2",
|
||||||
"paste",
|
"paste",
|
||||||
"peg",
|
"peg",
|
||||||
|
|
|
||||||
|
|
@ -456,20 +456,30 @@ fn resolve_conversation() -> String {
|
||||||
let Some(path) = transcript else { return String::new() };
|
let Some(path) = transcript else { return String::new() };
|
||||||
let path_str = path.to_string_lossy();
|
let path_str = path.to_string_lossy();
|
||||||
|
|
||||||
let messages = crate::transcript::tail_messages(&path_str, 200_000);
|
let Some(iter) = crate::transcript::TailMessages::open(&path_str) else {
|
||||||
if messages.is_empty() { return String::new(); }
|
return String::new();
|
||||||
|
};
|
||||||
|
|
||||||
let cfg = crate::config::get();
|
let cfg = crate::config::get();
|
||||||
let mut text = String::new();
|
let mut fragments: Vec<String> = Vec::new();
|
||||||
for (role, content, ts) in &messages {
|
let mut total_bytes = 0;
|
||||||
|
const MAX_BYTES: usize = 200_000;
|
||||||
|
|
||||||
|
for (role, content, ts) in iter {
|
||||||
|
if total_bytes >= MAX_BYTES { break; }
|
||||||
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
|
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
|
||||||
if !ts.is_empty() {
|
let formatted = if !ts.is_empty() {
|
||||||
text.push_str(&format!("**{}** {}: {}\n\n", name, &ts[..ts.len().min(19)], content));
|
format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], content)
|
||||||
} else {
|
} else {
|
||||||
text.push_str(&format!("**{}:** {}\n\n", name, content));
|
format!("**{}:** {}", name, content)
|
||||||
|
};
|
||||||
|
total_bytes += content.len();
|
||||||
|
fragments.push(formatted);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
text
|
// Reverse back to chronological order
|
||||||
|
fragments.reverse();
|
||||||
|
fragments.join("\n\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get recently surfaced memory keys for the current session.
|
/// Get recently surfaced memory keys for the current session.
|
||||||
|
|
|
||||||
|
|
@ -142,28 +142,67 @@ fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool {
|
||||||
haystack.windows(needle.len()).any(|w| w == needle)
|
haystack.windows(needle.len()).any(|w| w == needle)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reverse-scan a transcript file, collecting user/assistant messages
|
/// Reverse iterator over user/assistant messages in a transcript file.
|
||||||
/// until `max_tokens` tokens (~4 chars each) are accumulated. Stops at
|
/// Yields (role, text, timestamp) tuples newest-first. The caller decides
|
||||||
/// the last compaction boundary. Returns messages in chronological order.
|
/// when to stop (byte budget, count, etc).
|
||||||
pub fn tail_messages(path: &str, max_bytes: usize) -> Vec<(String, String, String)> {
|
pub struct TailMessages {
|
||||||
let (mmap, _file) = match mmap_transcript(path) {
|
_file: fs::File,
|
||||||
Some(v) => v,
|
mmap: Mmap,
|
||||||
None => return Vec::new(),
|
pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TailMessages {
|
||||||
|
pub fn open(path: &str) -> Option<Self> {
|
||||||
|
let (mmap, file) = mmap_transcript(path)?;
|
||||||
|
let pos = mmap.len();
|
||||||
|
Some(Self { _file: file, mmap, pos })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for TailMessages {
|
||||||
|
type Item = (String, String, String);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
loop {
|
||||||
|
// Find closing }
|
||||||
|
let close = loop {
|
||||||
|
let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?;
|
||||||
|
self.pos = p;
|
||||||
|
if self.mmap[p] == b'}' { break p; }
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut messages: Vec<(String, String, String)> = Vec::new();
|
// Track brace depth to find matching {
|
||||||
let mut total_bytes = 0;
|
let mut depth: usize = 1;
|
||||||
|
let mut in_string = false;
|
||||||
|
let open = loop {
|
||||||
|
let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?;
|
||||||
|
self.pos = p;
|
||||||
|
let ch = self.mmap[p];
|
||||||
|
|
||||||
for obj_bytes in JsonlBackwardIter::new(&mmap) {
|
if in_string {
|
||||||
if total_bytes >= max_bytes { break; }
|
if ch == b'"' {
|
||||||
|
let mut bs = 0;
|
||||||
// Quick byte check: skip objects that aren't user/assistant
|
while p > bs + 1 && self.mmap[p - 1 - bs] == b'\\' {
|
||||||
// (avoids parsing large tool_result / system objects)
|
bs += 1;
|
||||||
if !contains_bytes(obj_bytes, b"\"user\"")
|
}
|
||||||
&& !contains_bytes(obj_bytes, b"\"assistant\"") {
|
if bs % 2 == 0 { in_string = false; }
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match ch {
|
||||||
|
b'"' => { in_string = true; }
|
||||||
|
b'}' => { depth += 1; }
|
||||||
|
b'{' => {
|
||||||
|
depth -= 1;
|
||||||
|
if depth == 0 { break p; }
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let obj_bytes = &self.mmap[open..=close];
|
||||||
|
|
||||||
let obj: Value = match serde_json::from_slice(obj_bytes) {
|
let obj: Value = match serde_json::from_slice(obj_bytes) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
|
|
@ -186,22 +225,14 @@ pub fn tail_messages(path: &str, max_bytes: usize) -> Vec<(String, String, Strin
|
||||||
};
|
};
|
||||||
if text.is_empty() { continue; }
|
if text.is_empty() { continue; }
|
||||||
|
|
||||||
// Stop at compaction boundary
|
|
||||||
if msg_type == "user" && text.starts_with("This session is being continued") {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let timestamp = obj.get("timestamp")
|
let timestamp = obj.get("timestamp")
|
||||||
.and_then(|v| v.as_str())
|
.and_then(|v| v.as_str())
|
||||||
.unwrap_or("")
|
.unwrap_or("")
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
total_bytes += text.len();
|
return Some((msg_type.to_string(), text, timestamp));
|
||||||
messages.push((msg_type.to_string(), text, timestamp));
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
messages.reverse();
|
|
||||||
messages
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the timestamp of the compaction message at a given byte offset.
|
/// Get the timestamp of the compaction message at a given byte offset.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue