Split conversation transcript parsing

This commit is contained in:
Kent Overstreet 2026-06-15 11:24:18 -05:00
commit 54df271308
9 changed files with 614 additions and 348 deletions

271
src/conversation/mod.rs Normal file
View file

@ -0,0 +1,271 @@
// Conversation transcript abstraction.
//
// Core code consumes normalized user/assistant messages through this module.
// Product-specific log formats live in the small compatibility sources below.
use memmap2::Mmap;
use serde_json::Value;
use std::fs;
use std::path::Path;
pub mod claude;
pub mod codex;
pub mod jsonl;
pub use jsonl::JsonlBackwardIter;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TranscriptRole {
User,
Assistant,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TranscriptMessage {
pub role: TranscriptRole,
pub text: String,
pub timestamp: Option<String>,
pub offset: u64,
}
pub trait ConversationSource {
fn parse_message(&self, obj: &Value, offset: u64) -> Option<TranscriptMessage>;
fn is_compaction(&self, obj: &Value) -> bool;
fn may_contain_compaction(&self, _obj_bytes: &[u8]) -> bool {
true
}
}
pub struct AnyConversationSource;
impl ConversationSource for AnyConversationSource {
fn parse_message(&self, obj: &Value, offset: u64) -> Option<TranscriptMessage> {
claude::ClaudeSource.parse_message(obj, offset)
.or_else(|| codex::CodexSource.parse_message(obj, offset))
}
fn is_compaction(&self, obj: &Value) -> bool {
claude::ClaudeSource.is_compaction(obj) || codex::CodexSource.is_compaction(obj)
}
fn may_contain_compaction(&self, obj_bytes: &[u8]) -> bool {
claude::ClaudeSource.may_contain_compaction(obj_bytes)
|| codex::CodexSource.may_contain_compaction(obj_bytes)
}
}
/// Find the byte offset of the last compaction marker in mmap'd transcript data.
/// Returns the byte offset of the JSON object's opening brace.
pub(crate) fn find_last_compaction(data: &[u8]) -> Option<usize> {
find_last_compaction_with(data, &AnyConversationSource)
}
pub(crate) fn find_last_compaction_with(
data: &[u8],
source: &impl ConversationSource,
) -> Option<usize> {
for (offset, obj_bytes) in JsonlBackwardIter::new(data) {
// Quick byte check before parsing large transcript entries.
if !source.may_contain_compaction(obj_bytes) {
continue;
}
let obj: Value = match serde_json::from_slice(obj_bytes) {
Ok(v) => v,
Err(_) => continue,
};
if source.is_compaction(&obj) {
return Some(offset);
}
}
None
}
/// Find the byte offset of the last compaction in a transcript file.
/// Returns None if the file can't be opened or has no compaction.
pub(crate) fn find_last_compaction_in_file(path: &str) -> Option<u64> {
if path.is_empty() { return None; }
let file = fs::File::open(path).ok()?;
let meta = file.metadata().ok()?;
if meta.len() == 0 { return None; }
let mmap = unsafe { Mmap::map(&file).ok()? };
find_last_compaction(&mmap).map(|off| off as u64)
}
/// Mmap a transcript file. Returns (Mmap, File) to keep both alive.
pub(crate) fn mmap_transcript(path: &str) -> Option<(Mmap, fs::File)> {
let file = fs::File::open(path).ok()?;
let meta = file.metadata().ok()?;
if meta.len() == 0 { return None; }
let mmap = unsafe { Mmap::map(&file).ok()? };
Some((mmap, file))
}
/// Reverse iterator over user/assistant messages in a transcript file.
/// Yields normalized transcript messages newest-first. The caller decides
/// when to stop (byte budget, count, etc).
pub struct TailMessages {
_file: fs::File,
mmap: Mmap,
pos: usize,
}
impl TailMessages {
pub fn open(path: &str) -> Option<Self> {
let (mmap, file) = mmap_transcript(path)?;
let pos = mmap.len();
Some(Self { _file: file, mmap, pos })
}
}
impl Iterator for TailMessages {
type Item = TranscriptMessage;
fn next(&mut self) -> Option<Self::Item> {
loop {
let (offset, obj_bytes) = jsonl::JsonlBackwardIter::new(&self.mmap[..self.pos]).next()?;
self.pos = offset;
let obj: Value = match serde_json::from_slice(obj_bytes) {
Ok(v) => v,
Err(_) => continue,
};
if let Some(message) = AnyConversationSource.parse_message(&obj, offset as u64) {
return Some(message);
}
}
}
}
/// Get the timestamp of the compaction message at a given byte offset.
/// Returns a human-readable datetime string, or None if unavailable.
pub fn compaction_timestamp(path: &str, offset: u64) -> Option<String> {
let (mmap, _file) = mmap_transcript(path)?;
let start = offset as usize;
if start >= mmap.len() { return None; }
// Find the end of this JSONL line
let end = mmap[start..].iter().position(|&b| b == b'\n')
.map(|p| start + p)
.unwrap_or(mmap.len());
let obj: Value = serde_json::from_slice(&mmap[start..end]).ok()?;
if let Some(ts) = obj.get("timestamp").and_then(|v| v.as_str()) {
return Some(ts.to_string());
}
for field in &["createdAt", "created_at", "time"] {
if let Some(ts) = obj.get(*field).and_then(|v| v.as_str()) {
return Some(ts.to_string());
}
}
None
}
/// Detect whether a compaction has occurred since the last check.
///
/// Compares the current compaction offset against a saved value in
/// `state_dir/compaction-{session_id}`. Returns true if a new
/// compaction was found. Updates the saved offset.
pub fn detect_new_compaction(
state_dir: &Path,
session_id: &str,
transcript_path: &str,
) -> bool {
let offset = find_last_compaction_in_file(transcript_path);
let save_path = state_dir.join(format!("compaction-{}", session_id));
let saved: Option<u64> = fs::read_to_string(&save_path)
.ok()
.and_then(|s| s.trim().parse().ok());
let is_new = match (offset, saved) {
(Some(cur), Some(prev)) => cur != prev,
(Some(_), None) => true,
_ => false,
};
// Save current offset
if let Some(off) = offset {
fs::write(&save_path, off.to_string()).ok();
}
is_new
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn write_temp_jsonl(content: &str) -> tempfile::NamedTempFile {
let mut file = tempfile::NamedTempFile::new().unwrap();
file.write_all(content.as_bytes()).unwrap();
file.flush().unwrap();
file
}
#[test]
fn tail_messages_yields_normalized_messages_newest_first() {
let file = write_temp_jsonl(
r#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"user","message":{"content":"claude user"}}
{"timestamp":"2026-06-15T15:00:01.000Z","type":"assistant","message":{"content":[{"type":"text","text":"claude assistant"}]}}
{"timestamp":"2026-06-15T15:00:02.000Z","type":"event_msg","payload":{"type":"user_message","message":"codex user"}}
{"timestamp":"2026-06-15T15:00:03.000Z","type":"event_msg","payload":{"type":"task_started"}}
{"timestamp":"2026-06-15T15:00:04.000Z","type":"event_msg","payload":{"type":"agent_message","message":"codex assistant"}}
"#,
);
let messages: Vec<_> = TailMessages::open(&file.path().to_string_lossy())
.unwrap()
.collect();
assert_eq!(messages.len(), 4);
assert_eq!(messages[0].text, "codex assistant");
assert_eq!(messages[1].text, "codex user");
assert_eq!(messages[2].text, "claude assistant");
assert_eq!(messages[3].text, "claude user");
assert!(messages[0].offset > messages[1].offset);
}
#[test]
fn detects_claude_and_codex_compactions() {
let claude = br#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"user","message":{"content":"normal"}}
{"timestamp":"2026-06-15T15:00:01.000Z","type":"user","message":{"content":"This session is being continued from a previous conversation."}}
"#;
let codex = br#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"event_msg","payload":{"type":"user_message","message":"normal"}}
{"timestamp":"2026-06-15T15:00:01.000Z","type":"event_msg","payload":{"type":"context_compacted"}}
"#;
assert!(find_last_compaction(claude).is_some());
assert!(find_last_compaction(codex).is_some());
}
#[test]
fn detect_new_compaction_tracks_offset_changes() {
let transcript = write_temp_jsonl(
r#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"event_msg","payload":{"type":"context_compacted"}}
"#,
);
let state = tempfile::tempdir().unwrap();
assert!(detect_new_compaction(
state.path(),
"session",
&transcript.path().to_string_lossy(),
));
assert!(!detect_new_compaction(
state.path(),
"session",
&transcript.path().to_string_lossy(),
));
}
}