From 54df2713084df1e724ff3e458c15725875923a1b Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 15 Jun 2026 11:24:18 -0500 Subject: [PATCH] Split conversation transcript parsing --- src/conversation/claude.rs | 113 +++++++++++ src/conversation/codex.rs | 105 +++++++++++ src/conversation/jsonl.rs | 110 +++++++++++ src/conversation/mod.rs | 271 +++++++++++++++++++++++++++ src/hippocampus/mod.rs | 1 - src/hippocampus/transcript.rs | 340 ---------------------------------- src/lib.rs | 6 +- src/mind/log.rs | 4 +- src/subconscious/defs.rs | 12 +- 9 files changed, 614 insertions(+), 348 deletions(-) create mode 100644 src/conversation/claude.rs create mode 100644 src/conversation/codex.rs create mode 100644 src/conversation/jsonl.rs create mode 100644 src/conversation/mod.rs delete mode 100644 src/hippocampus/transcript.rs diff --git a/src/conversation/claude.rs b/src/conversation/claude.rs new file mode 100644 index 0000000..5eb264e --- /dev/null +++ b/src/conversation/claude.rs @@ -0,0 +1,113 @@ +use serde_json::Value; + +use super::{ConversationSource, TranscriptMessage, TranscriptRole}; + +pub struct ClaudeSource; + +impl ConversationSource for ClaudeSource { + fn parse_message(&self, obj: &Value, offset: u64) -> Option { + parse_message(obj, offset) + } + + fn is_compaction(&self, obj: &Value) -> bool { + is_compaction(obj) + } + + fn may_contain_compaction(&self, obj_bytes: &[u8]) -> bool { + contains_bytes(obj_bytes, b"This session is being continued") + } +} + +fn text_content(value: &Value) -> Option { + let text = match value { + Value::String(s) => s.clone(), + Value::Array(arr) => { + arr.iter() + .filter(|b| b.get("type").and_then(|v| v.as_str()) == Some("text")) + .filter_map(|b| b.get("text").and_then(|v| v.as_str())) + .collect::>() + .join(" ") + } + _ => return None, + }; + (!text.is_empty()).then_some(text) +} + +pub(crate) fn parse_message(obj: &Value, offset: u64) -> Option { + let role = match obj.get("type").and_then(|v| v.as_str()) { + Some("user") => TranscriptRole::User, + Some("assistant") => TranscriptRole::Assistant, + _ => return None, + }; + + let msg = obj.get("message").unwrap_or(obj); + let text = msg.get("content").and_then(text_content)?; + let timestamp = obj.get("timestamp") + .and_then(|v| v.as_str()) + .map(str::to_string); + + Some(TranscriptMessage { role, text, timestamp, offset }) +} + +pub(crate) fn is_compaction(obj: &Value) -> bool { + obj.get("type").and_then(|v| v.as_str()) == Some("user") + && obj.get("message") + .and_then(|m| m.get("content")) + .and_then(|c| c.as_str()) + .is_some_and(|content| content.starts_with("This session is being continued")) +} + +fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { + haystack.windows(needle.len()).any(|w| w == needle) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn parses_string_and_array_content() { + let user = json!({ + "timestamp": "2026-06-15T15:00:00.000Z", + "type": "user", + "message": { "content": "hello" } + }); + let assistant = json!({ + "timestamp": "2026-06-15T15:00:01.000Z", + "type": "assistant", + "message": { + "content": [ + { "type": "text", "text": "hi" }, + { "type": "tool_use", "name": "ignored" }, + { "type": "text", "text": "there" } + ] + } + }); + + assert_eq!( + parse_message(&user, 7).unwrap(), + TranscriptMessage { + role: TranscriptRole::User, + text: "hello".to_string(), + timestamp: Some("2026-06-15T15:00:00.000Z".to_string()), + offset: 7, + } + ); + + assert_eq!(parse_message(&assistant, 9).unwrap().text, "hi there"); + } + + #[test] + fn detects_compaction_marker() { + let obj = json!({ + "timestamp": "2026-06-15T15:00:01.000Z", + "type": "user", + "message": { + "content": "This session is being continued from a previous conversation." + } + }); + + assert!(is_compaction(&obj)); + } +} diff --git a/src/conversation/codex.rs b/src/conversation/codex.rs new file mode 100644 index 0000000..3b053d8 --- /dev/null +++ b/src/conversation/codex.rs @@ -0,0 +1,105 @@ +use serde_json::Value; + +use super::{ConversationSource, TranscriptMessage, TranscriptRole}; + +pub struct CodexSource; + +impl ConversationSource for CodexSource { + fn parse_message(&self, obj: &Value, offset: u64) -> Option { + parse_message(obj, offset) + } + + fn is_compaction(&self, obj: &Value) -> bool { + is_compaction(obj) + } + + fn may_contain_compaction(&self, obj_bytes: &[u8]) -> bool { + contains_bytes(obj_bytes, b"context_compacted") + } +} + +pub(crate) fn parse_message(obj: &Value, offset: u64) -> Option { + if obj.get("type").and_then(|v| v.as_str()) != Some("event_msg") { + return None; + } + + let payload = obj.get("payload")?; + let (role, text) = match payload.get("type").and_then(|v| v.as_str()) { + Some("user_message") => ( + TranscriptRole::User, + payload.get("message").and_then(|v| v.as_str())?.to_string(), + ), + Some("agent_message") => ( + TranscriptRole::Assistant, + payload.get("message").and_then(|v| v.as_str())?.to_string(), + ), + _ => return None, + }; + + if text.is_empty() { + return None; + } + + let timestamp = obj.get("timestamp") + .and_then(|v| v.as_str()) + .map(str::to_string); + + Some(TranscriptMessage { role, text, timestamp, offset }) +} + +pub(crate) fn is_compaction(obj: &Value) -> bool { + obj.get("type").and_then(|v| v.as_str()) == Some("event_msg") + && obj.get("payload") + .and_then(|p| p.get("type")) + .and_then(|v| v.as_str()) == Some("context_compacted") +} + +fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { + haystack.windows(needle.len()).any(|w| w == needle) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn parses_event_messages_and_skips_noise() { + let user = json!({ + "timestamp": "2026-06-15T15:00:00.000Z", + "type": "event_msg", + "payload": { "type": "user_message", "message": "start here" } + }); + let assistant = json!({ + "timestamp": "2026-06-15T15:00:01.000Z", + "type": "event_msg", + "payload": { "type": "agent_message", "message": "working" } + }); + let tool = json!({ + "timestamp": "2026-06-15T15:00:02.000Z", + "type": "event_msg", + "payload": { "type": "task_started" } + }); + let raw = json!({ + "timestamp": "2026-06-15T15:00:03.000Z", + "type": "response_item", + "payload": { "type": "message", "role": "user" } + }); + + assert_eq!(parse_message(&user, 1).unwrap().role, TranscriptRole::User); + assert_eq!(parse_message(&assistant, 2).unwrap().text, "working"); + assert!(parse_message(&tool, 3).is_none()); + assert!(parse_message(&raw, 4).is_none()); + } + + #[test] + fn detects_compaction_event() { + let obj = json!({ + "timestamp": "2026-06-15T15:00:01.000Z", + "type": "event_msg", + "payload": { "type": "context_compacted" } + }); + + assert!(is_compaction(&obj)); + } +} diff --git a/src/conversation/jsonl.rs b/src/conversation/jsonl.rs new file mode 100644 index 0000000..2675dbc --- /dev/null +++ b/src/conversation/jsonl.rs @@ -0,0 +1,110 @@ +use memchr::memrchr3; + +/// Scan backwards through mmap'd bytes, yielding byte slices of complete +/// top-level JSON objects (outermost { to matching }). +/// +/// Uses memrchr3 (SIMD) to jump between structurally significant bytes +/// ({, }, ") instead of scanning byte-by-byte. Tracks brace depth, +/// skipping braces inside JSON strings. Returns objects in reverse order +/// (newest first). +pub struct JsonlBackwardIter<'a> { + data: &'a [u8], + pos: usize, +} + +impl<'a> JsonlBackwardIter<'a> { + pub fn new(data: &'a [u8]) -> Self { + Self { data, pos: data.len() } + } +} + +impl<'a> Iterator for JsonlBackwardIter<'a> { + type Item = (usize, &'a [u8]); + + fn next(&mut self) -> Option { + next_json_object(self.data, &mut self.pos) + } +} + +fn is_unescaped_quote(data: &[u8], p: usize) -> bool { + let mut bs = 0; + while p > bs && data[p - 1 - bs] == b'\\' { + bs += 1; + } + bs % 2 == 0 +} + +fn next_json_object<'a>(data: &'a [u8], pos: &mut usize) -> Option<(usize, &'a [u8])> { + // Find the closing } of the next object, skipping } inside strings. + let close = { + let mut in_string = false; + loop { + let p = memrchr3(b'{', b'}', b'"', &data[..*pos])?; + *pos = p; + let ch = data[p]; + + if in_string { + if ch == b'"' && is_unescaped_quote(data, p) { + in_string = false; + } + continue; + } + + match ch { + b'}' => break p, + b'"' => in_string = true, + _ => {} + } + } + }; + + // Track brace depth to find matching {. + let mut depth: usize = 1; + let mut in_string = false; + + loop { + let p = memrchr3(b'{', b'}', b'"', &data[..*pos])?; + *pos = p; + let ch = data[p]; + + if in_string { + if ch == b'"' && is_unescaped_quote(data, p) { + in_string = false; + } + continue; + } + + match ch { + b'"' => { in_string = true; } + b'}' => { depth += 1; } + b'{' => { + depth -= 1; + if depth == 0 { + return Some((*pos, &data[*pos..=close])); + } + } + _ => {} + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handles_nested_json_and_quoted_braces() { + let data = br#"{"n":1,"s":"literal } brace"} +{"n":2,"nested":{"s":"escaped quote: \" and { brace"}} +trailing garbage +"#; + + let objs: Vec<_> = JsonlBackwardIter::new(data) + .map(|(_, bytes)| std::str::from_utf8(bytes).unwrap().to_string()) + .collect(); + + assert_eq!(objs.len(), 2); + assert!(objs[0].contains(r#""n":2"#)); + assert!(objs[1].contains(r#""n":1"#)); + } +} diff --git a/src/conversation/mod.rs b/src/conversation/mod.rs new file mode 100644 index 0000000..a6ac1f9 --- /dev/null +++ b/src/conversation/mod.rs @@ -0,0 +1,271 @@ +// Conversation transcript abstraction. +// +// Core code consumes normalized user/assistant messages through this module. +// Product-specific log formats live in the small compatibility sources below. + +use memmap2::Mmap; +use serde_json::Value; +use std::fs; +use std::path::Path; + +pub mod claude; +pub mod codex; +pub mod jsonl; + +pub use jsonl::JsonlBackwardIter; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TranscriptRole { + User, + Assistant, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TranscriptMessage { + pub role: TranscriptRole, + pub text: String, + pub timestamp: Option, + pub offset: u64, +} + +pub trait ConversationSource { + fn parse_message(&self, obj: &Value, offset: u64) -> Option; + fn is_compaction(&self, obj: &Value) -> bool; + + fn may_contain_compaction(&self, _obj_bytes: &[u8]) -> bool { + true + } +} + +pub struct AnyConversationSource; + +impl ConversationSource for AnyConversationSource { + fn parse_message(&self, obj: &Value, offset: u64) -> Option { + claude::ClaudeSource.parse_message(obj, offset) + .or_else(|| codex::CodexSource.parse_message(obj, offset)) + } + + fn is_compaction(&self, obj: &Value) -> bool { + claude::ClaudeSource.is_compaction(obj) || codex::CodexSource.is_compaction(obj) + } + + fn may_contain_compaction(&self, obj_bytes: &[u8]) -> bool { + claude::ClaudeSource.may_contain_compaction(obj_bytes) + || codex::CodexSource.may_contain_compaction(obj_bytes) + } +} + +/// Find the byte offset of the last compaction marker in mmap'd transcript data. +/// Returns the byte offset of the JSON object's opening brace. +pub(crate) fn find_last_compaction(data: &[u8]) -> Option { + find_last_compaction_with(data, &AnyConversationSource) +} + +pub(crate) fn find_last_compaction_with( + data: &[u8], + source: &impl ConversationSource, +) -> Option { + for (offset, obj_bytes) in JsonlBackwardIter::new(data) { + // Quick byte check before parsing large transcript entries. + if !source.may_contain_compaction(obj_bytes) { + continue; + } + + let obj: Value = match serde_json::from_slice(obj_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + + if source.is_compaction(&obj) { + return Some(offset); + } + } + + None +} + +/// Find the byte offset of the last compaction in a transcript file. +/// Returns None if the file can't be opened or has no compaction. +pub(crate) fn find_last_compaction_in_file(path: &str) -> Option { + if path.is_empty() { return None; } + + let file = fs::File::open(path).ok()?; + let meta = file.metadata().ok()?; + if meta.len() == 0 { return None; } + + let mmap = unsafe { Mmap::map(&file).ok()? }; + find_last_compaction(&mmap).map(|off| off as u64) +} + +/// Mmap a transcript file. Returns (Mmap, File) to keep both alive. +pub(crate) fn mmap_transcript(path: &str) -> Option<(Mmap, fs::File)> { + let file = fs::File::open(path).ok()?; + let meta = file.metadata().ok()?; + if meta.len() == 0 { return None; } + let mmap = unsafe { Mmap::map(&file).ok()? }; + Some((mmap, file)) +} + +/// Reverse iterator over user/assistant messages in a transcript file. +/// Yields normalized transcript messages newest-first. The caller decides +/// when to stop (byte budget, count, etc). +pub struct TailMessages { + _file: fs::File, + mmap: Mmap, + pos: usize, +} + +impl TailMessages { + pub fn open(path: &str) -> Option { + let (mmap, file) = mmap_transcript(path)?; + let pos = mmap.len(); + Some(Self { _file: file, mmap, pos }) + } +} + +impl Iterator for TailMessages { + type Item = TranscriptMessage; + + fn next(&mut self) -> Option { + loop { + let (offset, obj_bytes) = jsonl::JsonlBackwardIter::new(&self.mmap[..self.pos]).next()?; + self.pos = offset; + + let obj: Value = match serde_json::from_slice(obj_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + + if let Some(message) = AnyConversationSource.parse_message(&obj, offset as u64) { + return Some(message); + } + } + } +} + +/// Get the timestamp of the compaction message at a given byte offset. +/// Returns a human-readable datetime string, or None if unavailable. +pub fn compaction_timestamp(path: &str, offset: u64) -> Option { + let (mmap, _file) = mmap_transcript(path)?; + let start = offset as usize; + if start >= mmap.len() { return None; } + + // Find the end of this JSONL line + let end = mmap[start..].iter().position(|&b| b == b'\n') + .map(|p| start + p) + .unwrap_or(mmap.len()); + + let obj: Value = serde_json::from_slice(&mmap[start..end]).ok()?; + + if let Some(ts) = obj.get("timestamp").and_then(|v| v.as_str()) { + return Some(ts.to_string()); + } + + for field in &["createdAt", "created_at", "time"] { + if let Some(ts) = obj.get(*field).and_then(|v| v.as_str()) { + return Some(ts.to_string()); + } + } + + None +} + +/// Detect whether a compaction has occurred since the last check. +/// +/// Compares the current compaction offset against a saved value in +/// `state_dir/compaction-{session_id}`. Returns true if a new +/// compaction was found. Updates the saved offset. +pub fn detect_new_compaction( + state_dir: &Path, + session_id: &str, + transcript_path: &str, +) -> bool { + let offset = find_last_compaction_in_file(transcript_path); + + let save_path = state_dir.join(format!("compaction-{}", session_id)); + let saved: Option = fs::read_to_string(&save_path) + .ok() + .and_then(|s| s.trim().parse().ok()); + + let is_new = match (offset, saved) { + (Some(cur), Some(prev)) => cur != prev, + (Some(_), None) => true, + _ => false, + }; + + // Save current offset + if let Some(off) = offset { + fs::write(&save_path, off.to_string()).ok(); + } + + is_new +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + + fn write_temp_jsonl(content: &str) -> tempfile::NamedTempFile { + let mut file = tempfile::NamedTempFile::new().unwrap(); + file.write_all(content.as_bytes()).unwrap(); + file.flush().unwrap(); + file + } + + #[test] + fn tail_messages_yields_normalized_messages_newest_first() { + let file = write_temp_jsonl( + r#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"user","message":{"content":"claude user"}} +{"timestamp":"2026-06-15T15:00:01.000Z","type":"assistant","message":{"content":[{"type":"text","text":"claude assistant"}]}} +{"timestamp":"2026-06-15T15:00:02.000Z","type":"event_msg","payload":{"type":"user_message","message":"codex user"}} +{"timestamp":"2026-06-15T15:00:03.000Z","type":"event_msg","payload":{"type":"task_started"}} +{"timestamp":"2026-06-15T15:00:04.000Z","type":"event_msg","payload":{"type":"agent_message","message":"codex assistant"}} +"#, + ); + + let messages: Vec<_> = TailMessages::open(&file.path().to_string_lossy()) + .unwrap() + .collect(); + + assert_eq!(messages.len(), 4); + assert_eq!(messages[0].text, "codex assistant"); + assert_eq!(messages[1].text, "codex user"); + assert_eq!(messages[2].text, "claude assistant"); + assert_eq!(messages[3].text, "claude user"); + assert!(messages[0].offset > messages[1].offset); + } + + #[test] + fn detects_claude_and_codex_compactions() { + let claude = br#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"user","message":{"content":"normal"}} +{"timestamp":"2026-06-15T15:00:01.000Z","type":"user","message":{"content":"This session is being continued from a previous conversation."}} +"#; + let codex = br#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"event_msg","payload":{"type":"user_message","message":"normal"}} +{"timestamp":"2026-06-15T15:00:01.000Z","type":"event_msg","payload":{"type":"context_compacted"}} +"#; + + assert!(find_last_compaction(claude).is_some()); + assert!(find_last_compaction(codex).is_some()); + } + + #[test] + fn detect_new_compaction_tracks_offset_changes() { + let transcript = write_temp_jsonl( + r#"{"timestamp":"2026-06-15T15:00:00.000Z","type":"event_msg","payload":{"type":"context_compacted"}} +"#, + ); + let state = tempfile::tempdir().unwrap(); + + assert!(detect_new_compaction( + state.path(), + "session", + &transcript.path().to_string_lossy(), + )); + assert!(!detect_new_compaction( + state.path(), + "session", + &transcript.path().to_string_lossy(), + )); + } +} diff --git a/src/hippocampus/mod.rs b/src/hippocampus/mod.rs index d79640d..0193004 100644 --- a/src/hippocampus/mod.rs +++ b/src/hippocampus/mod.rs @@ -17,7 +17,6 @@ pub mod query; pub mod spectral; pub mod neuro; pub mod counters; -pub mod transcript; use std::cell::RefCell; use std::path::PathBuf; diff --git a/src/hippocampus/transcript.rs b/src/hippocampus/transcript.rs deleted file mode 100644 index 4fd1ef2..0000000 --- a/src/hippocampus/transcript.rs +++ /dev/null @@ -1,340 +0,0 @@ -// Transcript JSONL parsing utilities. -// -// Provides mmap-based backward scanning of Claude Code transcript files -// and compaction detection. Used by memory-search (hook mode) and -// parse-claude-conversation (debug tool). - -use memchr::memrchr3; -use memmap2::Mmap; -use serde_json::Value; -use std::fs; -use std::path::Path; - -/// Scan backwards through mmap'd bytes, yielding byte slices of complete -/// top-level JSON objects (outermost { to matching }). -/// -/// Uses memrchr3 (SIMD) to jump between structurally significant bytes -/// ({, }, ") instead of scanning byte-by-byte. Tracks brace depth, -/// skipping braces inside JSON strings. Returns objects in reverse order -/// (newest first). -pub struct JsonlBackwardIter<'a> { - data: &'a [u8], - pos: usize, -} - -impl<'a> JsonlBackwardIter<'a> { - pub fn new(data: &'a [u8]) -> Self { - Self { data, pos: data.len() } - } -} - -impl<'a> Iterator for JsonlBackwardIter<'a> { - type Item = &'a [u8]; - - fn next(&mut self) -> Option { - // Find the closing } of the next object, skipping } inside strings - let close = { - let mut in_string = false; - loop { - let p = memrchr3(b'{', b'}', b'"', &self.data[..self.pos])?; - self.pos = p; - let ch = self.data[p]; - - if in_string { - if ch == b'"' { - let mut bs = 0; - while p > bs + 1 && self.data[p - 1 - bs] == b'\\' { - bs += 1; - } - if bs % 2 == 0 { in_string = false; } - } - continue; - } - - match ch { - b'}' => break p, - b'"' => in_string = true, - _ => {} - } - } - }; - - // Track brace depth to find matching { - let mut depth: usize = 1; - let mut in_string = false; - - loop { - let p = memrchr3(b'{', b'}', b'"', &self.data[..self.pos])?; - self.pos = p; - let ch = self.data[p]; - - if in_string { - if ch == b'"' { - // Check for escaped quote (count preceding backslashes) - let mut bs = 0; - while p > bs + 1 && self.data[p - 1 - bs] == b'\\' { - bs += 1; - } - if bs % 2 == 0 { - in_string = false; - } - } - // { and } inside strings don't affect depth - continue; - } - - match ch { - b'"' => { in_string = true; } - b'}' => { depth += 1; } - b'{' => { - depth -= 1; - if depth == 0 { - return Some(&self.data[self.pos..=close]); - } - } - _ => {} - } - } - } -} - -/// Find the byte offset of the last compaction summary in mmap'd transcript data. -/// -/// Scans backward for a user-type message whose content starts with -/// "This session is being continued". Returns the byte offset of the -/// JSON object's opening brace. -pub(crate) fn find_last_compaction(data: &[u8]) -> Option { - let marker = b"This session is being continued"; - - for obj_bytes in JsonlBackwardIter::new(data) { - // Quick byte check before parsing - if !contains_bytes(obj_bytes, marker) { - continue; - } - - let obj: Value = match serde_json::from_slice(obj_bytes) { - Ok(v) => v, - Err(_) => continue, - }; - - if obj.get("type").and_then(|v| v.as_str()) != Some("user") { - continue; - } - - if let Some(content) = obj.get("message") - .and_then(|m| m.get("content")) - .and_then(|c| c.as_str()) - && content.starts_with("This session is being continued") { - let offset = obj_bytes.as_ptr() as usize - data.as_ptr() as usize; - return Some(offset); - } - } - - None -} - -/// Find the byte offset of the last compaction in a transcript file. -/// Returns None if the file can't be opened or has no compaction. -pub(crate) fn find_last_compaction_in_file(path: &str) -> Option { - if path.is_empty() { return None; } - - let file = fs::File::open(path).ok()?; - let meta = file.metadata().ok()?; - if meta.len() == 0 { return None; } - - let mmap = unsafe { Mmap::map(&file).ok()? }; - find_last_compaction(&mmap).map(|off| off as u64) -} - -/// Mmap a transcript file. Returns (Mmap, File) to keep both alive. -pub(crate) fn mmap_transcript(path: &str) -> Option<(Mmap, fs::File)> { - let file = fs::File::open(path).ok()?; - let meta = file.metadata().ok()?; - if meta.len() == 0 { return None; } - let mmap = unsafe { Mmap::map(&file).ok()? }; - Some((mmap, file)) -} - -fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { - haystack.windows(needle.len()).any(|w| w == needle) -} - -/// Reverse iterator over user/assistant messages in a transcript file. -/// Yields (role, text, timestamp) tuples newest-first. The caller decides -/// when to stop (byte budget, count, etc). -pub struct TailMessages { - _file: fs::File, - mmap: Mmap, - pos: usize, -} - -impl TailMessages { - pub fn open(path: &str) -> Option { - let (mmap, file) = mmap_transcript(path)?; - let pos = mmap.len(); - Some(Self { _file: file, mmap, pos }) - } -} - -impl Iterator for TailMessages { - type Item = (String, String, String); - - fn next(&mut self) -> Option { - loop { - // Find closing }, skipping } inside strings - let close = { - let mut in_string = false; - loop { - let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?; - self.pos = p; - let ch = self.mmap[p]; - - if in_string { - if ch == b'"' { - let mut bs = 0; - while p > bs + 1 && self.mmap[p - 1 - bs] == b'\\' { - bs += 1; - } - if bs % 2 == 0 { in_string = false; } - } - continue; - } - - match ch { - b'}' => break p, - b'"' => in_string = true, - _ => {} - } - } - }; - - // Track brace depth to find matching { - let mut depth: usize = 1; - let mut in_string = false; - let open = loop { - let p = memrchr3(b'{', b'}', b'"', &self.mmap[..self.pos])?; - self.pos = p; - let ch = self.mmap[p]; - - if in_string { - if ch == b'"' { - let mut bs = 0; - while p > bs + 1 && self.mmap[p - 1 - bs] == b'\\' { - bs += 1; - } - if bs % 2 == 0 { in_string = false; } - } - continue; - } - - match ch { - b'"' => { in_string = true; } - b'}' => { depth += 1; } - b'{' => { - depth -= 1; - if depth == 0 { break p; } - } - _ => {} - } - }; - - let obj_bytes = &self.mmap[open..=close]; - - // The "type" field is near the start of top-level objects. - // Only check the first 200 bytes to avoid scanning megabyte objects. - let prefix = &obj_bytes[..obj_bytes.len().min(200)]; - let is_user = memchr::memmem::find(prefix, b"\"type\":\"user\"").is_some(); - let is_assistant = !is_user - && memchr::memmem::find(prefix, b"\"type\":\"assistant\"").is_some(); - if !is_user && !is_assistant { continue; } - - let obj: Value = match serde_json::from_slice(obj_bytes) { - Ok(v) => v, - Err(_) => continue, - }; - - let msg_type = if is_user { "user" } else { "assistant" }; - - let msg = obj.get("message").unwrap_or(&obj); - let text = match msg.get("content") { - Some(Value::String(s)) => s.clone(), - Some(Value::Array(arr)) => { - arr.iter() - .filter(|b| b.get("type").and_then(|v| v.as_str()) == Some("text")) - .filter_map(|b| b.get("text").and_then(|v| v.as_str())) - .collect::>() - .join(" ") - } - _ => continue, - }; - if text.is_empty() { continue; } - - let timestamp = obj.get("timestamp") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - - return Some((msg_type.to_string(), text, timestamp)); - } - } -} - -/// Get the timestamp of the compaction message at a given byte offset. -/// Returns a human-readable datetime string, or None if unavailable. -pub fn compaction_timestamp(path: &str, offset: u64) -> Option { - let (mmap, _file) = mmap_transcript(path)?; - let start = offset as usize; - if start >= mmap.len() { return None; } - - // Find the end of this JSONL line - let end = mmap[start..].iter().position(|&b| b == b'\n') - .map(|p| start + p) - .unwrap_or(mmap.len()); - - let obj: Value = serde_json::from_slice(&mmap[start..end]).ok()?; - - // Claude Code transcript entries have a "timestamp" field (ISO 8601) - if let Some(ts) = obj.get("timestamp").and_then(|v| v.as_str()) { - return Some(ts.to_string()); - } - - // Fallback: try "createdAt" or similar fields - for field in &["createdAt", "created_at", "time"] { - if let Some(ts) = obj.get(*field).and_then(|v| v.as_str()) { - return Some(ts.to_string()); - } - } - - None -} - -/// Detect whether a compaction has occurred since the last check. -/// -/// Compares the current compaction offset against a saved value in -/// `state_dir/compaction-{session_id}`. Returns true if a new -/// compaction was found. Updates the saved offset. -pub fn detect_new_compaction( - state_dir: &Path, - session_id: &str, - transcript_path: &str, -) -> bool { - let offset = find_last_compaction_in_file(transcript_path); - - let save_path = state_dir.join(format!("compaction-{}", session_id)); - let saved: Option = fs::read_to_string(&save_path) - .ok() - .and_then(|s| s.trim().parse().ok()); - - let is_new = match (offset, saved) { - (Some(cur), Some(prev)) => cur != prev, - (Some(_), None) => true, - _ => false, - }; - - // Save current offset - if let Some(off) = offset { - fs::write(&save_path, off.to_string()).ok(); - } - - is_new -} diff --git a/src/lib.rs b/src/lib.rs index ae67616..2cebf42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,9 @@ pub mod hippocampus; // Autonomous agents pub mod subconscious; +// Conversation transcript abstraction and compatibility sources +pub mod conversation; + // Unified configuration pub mod config; pub mod config_writer; @@ -88,7 +91,8 @@ pub mod channel_capnp { pub use hippocampus::{ store, graph, lookups, query, spectral, neuro, counters, - transcript, memory, + memory, }; +pub use conversation as transcript; use hippocampus::query::engine as search; use hippocampus::query::parser as query_parser; diff --git a/src/mind/log.rs b/src/mind/log.rs index 7ac0d79..51f294e 100644 --- a/src/mind/log.rs +++ b/src/mind/log.rs @@ -3,7 +3,7 @@ use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; use crate::agent::context::AstNode; -use crate::hippocampus::transcript::JsonlBackwardIter; +use crate::conversation::JsonlBackwardIter; use memmap2::Mmap; pub struct ConversationLog { @@ -78,6 +78,6 @@ pub struct TailNodes { impl TailNodes { pub fn iter(&self) -> impl Iterator + '_ { JsonlBackwardIter::new(&self.mmap) - .filter_map(|bytes| serde_json::from_slice::(bytes).ok()) + .filter_map(|(_, bytes)| serde_json::from_slice::(bytes).ok()) } } diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index a862c8d..af98fe1 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -390,7 +390,7 @@ fn resolve_conversation(budget: Option) -> String { if !transcript.exists() { return String::new(); } - let Some(iter) = crate::transcript::TailMessages::open(&transcript.path) else { + let Some(iter) = crate::conversation::TailMessages::open(&transcript.path) else { return String::new(); }; @@ -401,10 +401,14 @@ fn resolve_conversation(budget: Option) -> String { let mut total_bytes = 0; let mut oldest_ts = String::new(); - for (role, content, ts) in iter { + for message in iter { if total_bytes >= max_bytes { break; } - let name = if role == "user" { &app.user_name } else { &app.assistant_name }; - let formatted = if !ts.is_empty() { + let content = message.text; + let name = match message.role { + crate::conversation::TranscriptRole::User => &app.user_name, + crate::conversation::TranscriptRole::Assistant => &app.assistant_name, + }; + let formatted = if let Some(ts) = message.timestamp { oldest_ts = ts[..ts.floor_char_boundary(ts.len().min(19))].to_string(); format!("**{}** {}: {}", name, &oldest_ts, content) } else {