transcript: extract JSONL backward scanner and compaction detection into library
Move JsonlBackwardIter and find_last_compaction() from parse-claude-conversation into a shared transcript module. Both memory-search and parse-claude-conversation now use the same robust compaction detection: mmap-based backward scan, JSON parsing to verify user-type message, content prefix check. Replaces memory-search's old detect_compaction() which did a forward scan with raw string matching on "continued from a previous conversation" — that could false-positive on the string appearing in assistant output or tool results. Add parse-claude-conversation as a new binary for debugging what's in the context window post-compaction. Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
This commit is contained in:
parent
0e17ab00b0
commit
c2f245740c
4 changed files with 548 additions and 9 deletions
|
|
@ -31,6 +31,10 @@ struct Args {
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
seen: bool,
|
seen: bool,
|
||||||
|
|
||||||
|
/// Show full seen set (list all keys)
|
||||||
|
#[arg(long)]
|
||||||
|
seen_full: bool,
|
||||||
|
|
||||||
/// Max results to return
|
/// Max results to return
|
||||||
#[arg(long, default_value = "5")]
|
#[arg(long, default_value = "5")]
|
||||||
max_results: usize,
|
max_results: usize,
|
||||||
|
|
@ -50,7 +54,7 @@ fn main() {
|
||||||
|
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
if args.seen {
|
if args.seen || args.seen_full {
|
||||||
show_seen();
|
show_seen();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -87,8 +91,11 @@ fn main() {
|
||||||
let state_dir = PathBuf::from("/tmp/claude-memory-search");
|
let state_dir = PathBuf::from("/tmp/claude-memory-search");
|
||||||
fs::create_dir_all(&state_dir).ok();
|
fs::create_dir_all(&state_dir).ok();
|
||||||
|
|
||||||
// Detect post-compaction reload
|
// Detect post-compaction reload via mmap backward scan
|
||||||
let is_compaction = prompt.contains("continued from a previous conversation");
|
let transcript_path = json["transcript_path"].as_str().unwrap_or("");
|
||||||
|
let is_compaction = poc_memory::transcript::detect_new_compaction(
|
||||||
|
&state_dir, session_id, transcript_path,
|
||||||
|
);
|
||||||
|
|
||||||
// First prompt or post-compaction: load full context
|
// First prompt or post-compaction: load full context
|
||||||
let cookie_path = state_dir.join(format!("cookie-{}", session_id));
|
let cookie_path = state_dir.join(format!("cookie-{}", session_id));
|
||||||
|
|
@ -155,7 +162,6 @@ fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Search for node keys in last ~150k tokens of transcript
|
// Search for node keys in last ~150k tokens of transcript
|
||||||
let transcript_path = json["transcript_path"].as_str().unwrap_or("");
|
|
||||||
if debug { println!("[memory-search] transcript: {}", transcript_path); }
|
if debug { println!("[memory-search] transcript: {}", transcript_path); }
|
||||||
let terms = extract_weighted_terms(transcript_path, 150_000, &store);
|
let terms = extract_weighted_terms(transcript_path, 150_000, &store);
|
||||||
|
|
||||||
|
|
@ -363,7 +369,7 @@ fn extract_key_from_line(line: &str) -> Option<String> {
|
||||||
let rest = &line[after_bracket + 2..];
|
let rest = &line[after_bracket + 2..];
|
||||||
let key_end = rest.find(" (c").unwrap_or(rest.len());
|
let key_end = rest.find(" (c").unwrap_or(rest.len());
|
||||||
let key = rest[..key_end].trim();
|
let key = rest[..key_end].trim();
|
||||||
if key.is_empty() || !key.contains('.') {
|
if key.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(key.to_string())
|
Some(key.to_string())
|
||||||
|
|
@ -374,13 +380,19 @@ fn generate_cookie() -> String {
|
||||||
uuid::Uuid::new_v4().as_simple().to_string()[..12].to_string()
|
uuid::Uuid::new_v4().as_simple().to_string()[..12].to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse a seen-file line: "TIMESTAMP\tKEY" or legacy "KEY"
|
||||||
|
fn parse_seen_line(line: &str) -> &str {
|
||||||
|
line.split_once('\t').map(|(_, key)| key).unwrap_or(line)
|
||||||
|
}
|
||||||
|
|
||||||
fn load_seen(dir: &Path, session_id: &str) -> HashSet<String> {
|
fn load_seen(dir: &Path, session_id: &str) -> HashSet<String> {
|
||||||
let path = dir.join(format!("seen-{}", session_id));
|
let path = dir.join(format!("seen-{}", session_id));
|
||||||
if path.exists() {
|
if path.exists() {
|
||||||
fs::read_to_string(path)
|
fs::read_to_string(path)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.lines()
|
.lines()
|
||||||
.map(|s| s.to_string())
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(|s| parse_seen_line(s).to_string())
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
HashSet::new()
|
HashSet::new()
|
||||||
|
|
@ -390,7 +402,8 @@ fn load_seen(dir: &Path, session_id: &str) -> HashSet<String> {
|
||||||
fn mark_seen(dir: &Path, session_id: &str, key: &str) {
|
fn mark_seen(dir: &Path, session_id: &str, key: &str) {
|
||||||
let path = dir.join(format!("seen-{}", session_id));
|
let path = dir.join(format!("seen-{}", session_id));
|
||||||
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(path) {
|
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(path) {
|
||||||
writeln!(f, "{}", key).ok();
|
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
|
||||||
|
writeln!(f, "{}\t{}", ts, key).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -454,8 +467,29 @@ fn show_seen() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let seen = load_seen(&state_dir, session_id);
|
// Read seen file in insertion order (append-only file)
|
||||||
println!("\nSeen set ({} total, {} pre-seeded):", seen.len(), seen.len() - returned.len());
|
let seen_path = state_dir.join(format!("seen-{}", session_id));
|
||||||
|
let seen_lines: Vec<String> = fs::read_to_string(&seen_path)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.lines()
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.collect();
|
||||||
|
let returned_set: HashSet<_> = returned.iter().cloned().collect();
|
||||||
|
println!("\nSeen set ({} total, {} pre-seeded):", seen_lines.len(), seen_lines.len() - returned.len());
|
||||||
|
|
||||||
|
if Args::parse().seen_full {
|
||||||
|
for line in &seen_lines {
|
||||||
|
let key = parse_seen_line(line);
|
||||||
|
let marker = if returned_set.contains(key) { "→ " } else { " " };
|
||||||
|
// Show timestamp if present, otherwise just key
|
||||||
|
if let Some((ts, k)) = line.split_once('\t') {
|
||||||
|
println!(" {} {}{}", ts, marker, k);
|
||||||
|
} else {
|
||||||
|
println!(" (no ts) {}{}", marker, line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cleanup_stale_files(dir: &Path, max_age: Duration) {
|
fn cleanup_stale_files(dir: &Path, max_age: Duration) {
|
||||||
|
|
|
||||||
328
poc-memory/src/bin/parse-claude-conversation.rs
Normal file
328
poc-memory/src/bin/parse-claude-conversation.rs
Normal file
|
|
@ -0,0 +1,328 @@
|
||||||
|
// parse-claude-conversation: debug tool for inspecting what's in the context window
|
||||||
|
//
|
||||||
|
// Two-layer design:
|
||||||
|
// 1. extract_context_items() — walks JSONL from last compaction, yields
|
||||||
|
// structured records representing what's in the context window
|
||||||
|
// 2. format_as_context() — renders those records as they appear to Claude
|
||||||
|
//
|
||||||
|
// The transcript is mmap'd and scanned backwards from EOF using brace-depth
|
||||||
|
// tracking to find complete JSON objects, avoiding a full forward scan of
|
||||||
|
// what can be a 500MB+ file.
|
||||||
|
//
|
||||||
|
// Usage:
|
||||||
|
// parse-claude-conversation [TRANSCRIPT_PATH]
|
||||||
|
// parse-claude-conversation --last # use the last stashed session
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use memmap2::Mmap;
|
||||||
|
use poc_memory::transcript::{JsonlBackwardIter, find_last_compaction};
|
||||||
|
use serde_json::Value;
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
#[command(name = "parse-claude-conversation")]
|
||||||
|
struct Args {
|
||||||
|
/// Transcript JSONL path (or --last to use stashed session)
|
||||||
|
path: Option<String>,
|
||||||
|
|
||||||
|
/// Use the last stashed session from memory-search
|
||||||
|
#[arg(long)]
|
||||||
|
last: bool,
|
||||||
|
|
||||||
|
/// Dump raw JSONL objects. Optional integer: number of extra objects
|
||||||
|
/// to include before the compaction boundary.
|
||||||
|
#[arg(long, num_args = 0..=1, default_missing_value = "0")]
|
||||||
|
raw: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Context extraction ---
|
||||||
|
|
||||||
|
/// A single item in the context window, as Claude sees it.
|
||||||
|
enum ContextItem {
|
||||||
|
UserText(String),
|
||||||
|
SystemReminder(String),
|
||||||
|
AssistantText(String),
|
||||||
|
AssistantThinking,
|
||||||
|
ToolUse { name: String, input: String },
|
||||||
|
ToolResult(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract context items from the transcript, starting from the last compaction.
|
||||||
|
fn extract_context_items(data: &[u8]) -> Vec<ContextItem> {
|
||||||
|
let start = find_last_compaction(data).unwrap_or(0);
|
||||||
|
let region = &data[start..];
|
||||||
|
|
||||||
|
let mut items = Vec::new();
|
||||||
|
|
||||||
|
// Forward scan through JSONL lines from compaction onward
|
||||||
|
for line in region.split(|&b| b == b'\n') {
|
||||||
|
if line.is_empty() { continue; }
|
||||||
|
|
||||||
|
let obj: Value = match serde_json::from_slice(line) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
|
||||||
|
match msg_type {
|
||||||
|
"user" => {
|
||||||
|
if let Some(content) = obj.get("message").and_then(|m| m.get("content")) {
|
||||||
|
extract_user_content(content, &mut items);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"assistant" => {
|
||||||
|
if let Some(content) = obj.get("message").and_then(|m| m.get("content")) {
|
||||||
|
extract_assistant_content(content, &mut items);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
items
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse user message content into context items.
|
||||||
|
fn extract_user_content(content: &Value, items: &mut Vec<ContextItem>) {
|
||||||
|
match content {
|
||||||
|
Value::String(s) => {
|
||||||
|
split_system_reminders(s, items, false);
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
for block in arr {
|
||||||
|
let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
match btype {
|
||||||
|
"text" => {
|
||||||
|
if let Some(t) = block.get("text").and_then(|v| v.as_str()) {
|
||||||
|
split_system_reminders(t, items, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"tool_result" => {
|
||||||
|
let result_text = extract_tool_result_text(block);
|
||||||
|
if !result_text.is_empty() {
|
||||||
|
split_system_reminders(&result_text, items, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract text from a tool_result block (content can be string or array).
|
||||||
|
fn extract_tool_result_text(block: &Value) -> String {
|
||||||
|
match block.get("content") {
|
||||||
|
Some(Value::String(s)) => s.clone(),
|
||||||
|
Some(Value::Array(arr)) => {
|
||||||
|
arr.iter()
|
||||||
|
.filter_map(|b| b.get("text").and_then(|v| v.as_str()))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n")
|
||||||
|
}
|
||||||
|
_ => String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Split text on <system-reminder> tags. Non-reminder text emits UserText
|
||||||
|
/// or ToolResult depending on `is_tool_result`.
|
||||||
|
fn split_system_reminders(text: &str, items: &mut Vec<ContextItem>, is_tool_result: bool) {
|
||||||
|
let mut remaining = text;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(start) = remaining.find("<system-reminder>") {
|
||||||
|
let before = remaining[..start].trim();
|
||||||
|
if !before.is_empty() {
|
||||||
|
if is_tool_result {
|
||||||
|
items.push(ContextItem::ToolResult(before.to_string()));
|
||||||
|
} else {
|
||||||
|
items.push(ContextItem::UserText(before.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let after_open = &remaining[start + "<system-reminder>".len()..];
|
||||||
|
if let Some(end) = after_open.find("</system-reminder>") {
|
||||||
|
let reminder = after_open[..end].trim();
|
||||||
|
if !reminder.is_empty() {
|
||||||
|
items.push(ContextItem::SystemReminder(reminder.to_string()));
|
||||||
|
}
|
||||||
|
remaining = &after_open[end + "</system-reminder>".len()..];
|
||||||
|
} else {
|
||||||
|
let reminder = after_open.trim();
|
||||||
|
if !reminder.is_empty() {
|
||||||
|
items.push(ContextItem::SystemReminder(reminder.to_string()));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let trimmed = remaining.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
if is_tool_result {
|
||||||
|
items.push(ContextItem::ToolResult(trimmed.to_string()));
|
||||||
|
} else {
|
||||||
|
items.push(ContextItem::UserText(trimmed.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse assistant message content into context items.
|
||||||
|
fn extract_assistant_content(content: &Value, items: &mut Vec<ContextItem>) {
|
||||||
|
match content {
|
||||||
|
Value::String(s) => {
|
||||||
|
let trimmed = s.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
items.push(ContextItem::AssistantText(trimmed.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
for block in arr {
|
||||||
|
let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
match btype {
|
||||||
|
"text" => {
|
||||||
|
if let Some(t) = block.get("text").and_then(|v| v.as_str()) {
|
||||||
|
let trimmed = t.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
items.push(ContextItem::AssistantText(trimmed.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"tool_use" => {
|
||||||
|
let name = block.get("name")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("?")
|
||||||
|
.to_string();
|
||||||
|
let input = block.get("input")
|
||||||
|
.map(|v| v.to_string())
|
||||||
|
.unwrap_or_default();
|
||||||
|
items.push(ContextItem::ToolUse { name, input });
|
||||||
|
}
|
||||||
|
"thinking" => {
|
||||||
|
items.push(ContextItem::AssistantThinking);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Formatting layer ---
|
||||||
|
|
||||||
|
fn truncate(s: &str, max: usize) -> String {
|
||||||
|
if s.len() <= max {
|
||||||
|
s.to_string()
|
||||||
|
} else {
|
||||||
|
format!("{}...({} total)", &s[..max], s.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_as_context(items: &[ContextItem]) {
|
||||||
|
for item in items {
|
||||||
|
match item {
|
||||||
|
ContextItem::UserText(text) => {
|
||||||
|
println!("USER: {}", truncate(text, 300));
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
ContextItem::SystemReminder(text) => {
|
||||||
|
println!("<system-reminder>");
|
||||||
|
println!("{}", truncate(text, 500));
|
||||||
|
println!("</system-reminder>");
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
ContextItem::AssistantText(text) => {
|
||||||
|
println!("ASSISTANT: {}", truncate(text, 300));
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
ContextItem::AssistantThinking => {
|
||||||
|
println!("[thinking]");
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
ContextItem::ToolUse { name, input } => {
|
||||||
|
println!("TOOL_USE: {} {}", name, truncate(input, 200));
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
ContextItem::ToolResult(text) => {
|
||||||
|
println!("TOOL_RESULT: {}", truncate(text, 300));
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let path = if args.last {
|
||||||
|
let stash = fs::read_to_string("/tmp/claude-memory-search/last-input.json")
|
||||||
|
.expect("No stashed input");
|
||||||
|
let json: Value = serde_json::from_str(&stash).expect("Bad JSON");
|
||||||
|
json["transcript_path"]
|
||||||
|
.as_str()
|
||||||
|
.expect("No transcript_path")
|
||||||
|
.to_string()
|
||||||
|
} else if let Some(p) = args.path {
|
||||||
|
p
|
||||||
|
} else {
|
||||||
|
eprintln!("error: provide a transcript path or --last");
|
||||||
|
std::process::exit(1);
|
||||||
|
};
|
||||||
|
|
||||||
|
let file = fs::File::open(&path).expect("Can't open transcript");
|
||||||
|
let mmap = unsafe { Mmap::map(&file).expect("Failed to mmap") };
|
||||||
|
|
||||||
|
eprintln!(
|
||||||
|
"Transcript: {} ({:.1} MB)",
|
||||||
|
&path,
|
||||||
|
mmap.len() as f64 / 1_000_000.0
|
||||||
|
);
|
||||||
|
|
||||||
|
let compaction_offset = find_last_compaction(&mmap).unwrap_or(0);
|
||||||
|
eprintln!("Compaction at byte offset: {}", compaction_offset);
|
||||||
|
|
||||||
|
if let Some(extra) = args.raw {
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
// Collect `extra` JSON objects before the compaction boundary
|
||||||
|
let mut before = Vec::new();
|
||||||
|
if extra > 0 && compaction_offset > 0 {
|
||||||
|
for obj_bytes in JsonlBackwardIter::new(&mmap[..compaction_offset]) {
|
||||||
|
if let Ok(obj) = serde_json::from_slice::<Value>(obj_bytes) {
|
||||||
|
let t = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
if t == "file-history-snapshot" { continue; }
|
||||||
|
}
|
||||||
|
before.push(obj_bytes.to_vec());
|
||||||
|
if before.len() >= extra {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
before.reverse();
|
||||||
|
}
|
||||||
|
|
||||||
|
for obj in &before {
|
||||||
|
std::io::stdout().write_all(obj).ok();
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then dump everything from compaction onward
|
||||||
|
let region = &mmap[compaction_offset..];
|
||||||
|
for line in region.split(|&b| b == b'\n') {
|
||||||
|
if line.is_empty() { continue; }
|
||||||
|
if let Ok(obj) = serde_json::from_slice::<Value>(line) {
|
||||||
|
let t = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
if t == "file-history-snapshot" { continue; }
|
||||||
|
std::io::stdout().write_all(line).ok();
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let items = extract_context_items(&mmap);
|
||||||
|
eprintln!("Context items: {}", items.len());
|
||||||
|
format_as_context(&items);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -14,6 +14,7 @@ pub mod spectral;
|
||||||
pub mod lookups;
|
pub mod lookups;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod migrate;
|
pub mod migrate;
|
||||||
|
pub mod transcript;
|
||||||
pub mod neuro;
|
pub mod neuro;
|
||||||
|
|
||||||
// Agent layer (LLM-powered operations)
|
// Agent layer (LLM-powered operations)
|
||||||
|
|
|
||||||
176
poc-memory/src/transcript.rs
Normal file
176
poc-memory/src/transcript.rs
Normal file
|
|
@ -0,0 +1,176 @@
|
||||||
|
// Transcript JSONL parsing utilities.
|
||||||
|
//
|
||||||
|
// Provides mmap-based backward scanning of Claude Code transcript files
|
||||||
|
// and compaction detection. Used by memory-search (hook mode) and
|
||||||
|
// parse-claude-conversation (debug tool).
|
||||||
|
|
||||||
|
use memmap2::Mmap;
|
||||||
|
use serde_json::Value;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
/// Scan backwards through mmap'd bytes, yielding byte slices of complete
|
||||||
|
/// top-level JSON objects (outermost { to matching }).
|
||||||
|
///
|
||||||
|
/// Tracks brace depth, skipping braces inside JSON strings. Returns
|
||||||
|
/// objects in reverse order (newest first).
|
||||||
|
pub struct JsonlBackwardIter<'a> {
|
||||||
|
data: &'a [u8],
|
||||||
|
pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> JsonlBackwardIter<'a> {
|
||||||
|
pub fn new(data: &'a [u8]) -> Self {
|
||||||
|
Self { data, pos: data.len() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Iterator for JsonlBackwardIter<'a> {
|
||||||
|
type Item = &'a [u8];
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
if self.pos == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the closing } of the next object (scanning backward)
|
||||||
|
let close = loop {
|
||||||
|
if self.pos == 0 { return None; }
|
||||||
|
self.pos -= 1;
|
||||||
|
if self.data[self.pos] == b'}' {
|
||||||
|
break self.pos;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Track brace depth to find matching {
|
||||||
|
let mut depth: usize = 1;
|
||||||
|
let mut in_string = false;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if self.pos == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
self.pos -= 1;
|
||||||
|
let ch = self.data[self.pos];
|
||||||
|
|
||||||
|
if in_string {
|
||||||
|
if ch == b'"' {
|
||||||
|
let mut bs = 0;
|
||||||
|
while self.pos > bs && self.data[self.pos - 1 - bs] == b'\\' {
|
||||||
|
bs += 1;
|
||||||
|
}
|
||||||
|
if bs % 2 == 0 {
|
||||||
|
in_string = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match ch {
|
||||||
|
b'"' => { in_string = true; }
|
||||||
|
b'}' => { depth += 1; }
|
||||||
|
b'{' => {
|
||||||
|
depth -= 1;
|
||||||
|
if depth == 0 {
|
||||||
|
return Some(&self.data[self.pos..=close]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the byte offset of the last compaction summary in mmap'd transcript data.
|
||||||
|
///
|
||||||
|
/// Scans backward for a user-type message whose content starts with
|
||||||
|
/// "This session is being continued". Returns the byte offset of the
|
||||||
|
/// JSON object's opening brace.
|
||||||
|
pub fn find_last_compaction(data: &[u8]) -> Option<usize> {
|
||||||
|
let marker = b"This session is being continued";
|
||||||
|
|
||||||
|
for obj_bytes in JsonlBackwardIter::new(data) {
|
||||||
|
// Quick byte check before parsing
|
||||||
|
if !contains_bytes(obj_bytes, marker) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let obj: Value = match serde_json::from_slice(obj_bytes) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
if obj.get("type").and_then(|v| v.as_str()) != Some("user") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(content) = obj.get("message")
|
||||||
|
.and_then(|m| m.get("content"))
|
||||||
|
.and_then(|c| c.as_str())
|
||||||
|
{
|
||||||
|
if content.starts_with("This session is being continued") {
|
||||||
|
let offset = obj_bytes.as_ptr() as usize - data.as_ptr() as usize;
|
||||||
|
return Some(offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the byte offset of the last compaction in a transcript file.
|
||||||
|
/// Returns None if the file can't be opened or has no compaction.
|
||||||
|
pub fn find_last_compaction_in_file(path: &str) -> Option<u64> {
|
||||||
|
if path.is_empty() { return None; }
|
||||||
|
|
||||||
|
let file = fs::File::open(path).ok()?;
|
||||||
|
let meta = file.metadata().ok()?;
|
||||||
|
if meta.len() == 0 { return None; }
|
||||||
|
|
||||||
|
let mmap = unsafe { Mmap::map(&file).ok()? };
|
||||||
|
find_last_compaction(&mmap).map(|off| off as u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mmap a transcript file. Returns (Mmap, File) to keep both alive.
|
||||||
|
pub fn mmap_transcript(path: &str) -> Option<(Mmap, fs::File)> {
|
||||||
|
let file = fs::File::open(path).ok()?;
|
||||||
|
let meta = file.metadata().ok()?;
|
||||||
|
if meta.len() == 0 { return None; }
|
||||||
|
let mmap = unsafe { Mmap::map(&file).ok()? };
|
||||||
|
Some((mmap, file))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool {
|
||||||
|
haystack.windows(needle.len()).any(|w| w == needle)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Detect whether a compaction has occurred since the last check.
|
||||||
|
///
|
||||||
|
/// Compares the current compaction offset against a saved value in
|
||||||
|
/// `state_dir/compaction-{session_id}`. Returns true if a new
|
||||||
|
/// compaction was found. Updates the saved offset.
|
||||||
|
pub fn detect_new_compaction(
|
||||||
|
state_dir: &Path,
|
||||||
|
session_id: &str,
|
||||||
|
transcript_path: &str,
|
||||||
|
) -> bool {
|
||||||
|
let offset = find_last_compaction_in_file(transcript_path);
|
||||||
|
|
||||||
|
let save_path = state_dir.join(format!("compaction-{}", session_id));
|
||||||
|
let saved: Option<u64> = fs::read_to_string(&save_path)
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.trim().parse().ok());
|
||||||
|
|
||||||
|
let is_new = match (offset, saved) {
|
||||||
|
(Some(cur), Some(prev)) => cur != prev,
|
||||||
|
(Some(_), None) => true,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Save current offset
|
||||||
|
if let Some(off) = offset {
|
||||||
|
fs::write(&save_path, off.to_string()).ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
is_new
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue