From 45335de2207a790b613b41752f11738ee8fbd274 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 7 Mar 2026 12:01:38 -0500 Subject: [PATCH] experience-mine: split oversized sessions at compaction boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude Code doesn't create new session files on context compaction — a single UUID can accumulate 170+ conversations, producing 400MB+ JSONL files that generate 1.3M token prompts. Split at compaction markers ("This session is being continued..."): - extract_conversation made pub, split_on_compaction splits messages - experience_mine takes optional segment index - daemon watcher parses files, spawns per-segment jobs (.0, .1, .2) - seg_cache memoizes segment counts across ticks - per-segment dedup keys; whole-file key when all segments complete - 150K token guard skips any remaining oversized segments - char-boundary-safe truncation in enrich.rs and fact_mine.rs Backwards compatible: unsegmented calls still write content-hash dedup keys, old whole-file mined keys still recognized. --- src/daemon.rs | 110 ++++++++++++++++++++++++++++++++++++----------- src/enrich.rs | 80 ++++++++++++++++++++++++++++------ src/fact_mine.rs | 2 +- src/main.rs | 2 +- 4 files changed, 155 insertions(+), 39 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 765bd7f..7fcc666 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -13,7 +13,7 @@ // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; @@ -97,13 +97,13 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St } } -fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { +fn job_experience_mine(ctx: &ExecutionContext, path: &str, segment: Option) -> Result<(), TaskError> { let path = path.to_string(); run_job(ctx, &format!("experience-mine {}", path), || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("mining"); - let count = crate::enrich::experience_mine(&mut store, &path)?; + let count = crate::enrich::experience_mine(&mut store, &path, segment)?; ctx.log_line(&format!("{} entries mined", count)); Ok(()) }) @@ -323,6 +323,8 @@ pub fn run_daemon() -> Result<(), String> { let last_daily_sw = Arc::clone(&last_daily); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); + // Cache segment counts so we don't re-parse large files every tick + let mut seg_cache: HashMap = HashMap::new(); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); @@ -366,10 +368,13 @@ pub fn run_daemon() -> Result<(), String> { let mut already_mined = 0; let mut still_open = 0; let total_stale = stale.len(); + // Multi-segment files where all segments are done — write whole-file key + let mut mark_transcript_done: Vec<(String, String, usize)> = Vec::new(); - // Classify each session - let mut needs_extract: Vec<(PathBuf, String, String)> = Vec::new(); - let mut needs_fact: Vec<(PathBuf, String, String)> = Vec::new(); + // Classify each session — now segment-aware + // Each entry: (filename, path_str, segment_index) + let mut needs_extract: Vec<(String, String, Option)> = Vec::new(); + let mut needs_fact: Vec<(String, String)> = Vec::new(); for session in stale { let filename = session.file_name() @@ -377,44 +382,84 @@ pub fn run_daemon() -> Result<(), String> { .unwrap_or_else(|| "unknown".into()); let path_str = session.to_string_lossy().to_string(); - // Skip if already in-flight - let extract_name = format!("extract:{}", filename); - let fact_name = format!("fact-mine:{}", filename); - if active.contains(&extract_name) || active.contains(&fact_name) { - continue; - } - + // Check for old-style whole-file mined key let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str); - let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); - let fact_done = fact_keys.contains(&fact_key); if !experience_done { if is_file_open(&session) { still_open += 1; - } else { - needs_extract.push((session, filename, path_str)); + continue; + } + + // Get segment count, using cache to avoid re-parsing large files + let seg_count = if let Some(&cached) = seg_cache.get(&path_str) { + cached + } else { + let messages = match crate::enrich::extract_conversation(&path_str) { + Ok(m) => m, + Err(_) => continue, + }; + let count = crate::enrich::split_on_compaction(messages).len(); + seg_cache.insert(path_str.clone(), count); + count + }; + + if seg_count <= 1 { + let task_name = format!("extract:{}", filename); + if !active.contains(&task_name) { + needs_extract.push((filename, path_str, None)); + } + } else { + // Multi-segment — find unmined segments + let fname_key = crate::enrich::transcript_filename_key(&path_str); + let mut unmined = 0; + for i in 0..seg_count { + let seg_key = format!("{}.{}", fname_key, i); + if mined.contains(&seg_key) { continue; } + unmined += 1; + let task_name = format!("extract:{}.{}", filename, i); + if active.contains(&task_name) { continue; } + needs_extract.push(( + format!("{}.{}", filename, i), + path_str.clone(), + Some(i), + )); + } + if unmined == 0 { + // All segments done — write whole-file key so we skip next tick + mark_transcript_done.push((fname_key, path_str.clone(), seg_count)); + already_mined += 1; + } } - } else if !fact_done { - needs_fact.push((session, filename, path_str)); } else { - already_mined += 1; + let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); + let fact_done = fact_keys.contains(&fact_key); + if !fact_done { + let task_name = format!("fact-mine:{}", filename); + if !active.contains(&task_name) { + needs_fact.push((filename, path_str)); + } + } else { + already_mined += 1; + } } } // Spawn experience-mine jobs (priority) - for (_, filename, path_str) in &needs_extract { + for (task_label, path_str, segment) in &needs_extract { if extract_queued >= MAX_NEW_PER_TICK { extract_remaining += 1; continue; } - let task_name = format!("extract:{}", filename); - log_event("extract", "queued", path_str); + let task_name = format!("extract:{}", task_label); + log_event("extract", "queued", &task_name); let path = path_str.clone(); + let seg = *segment; choir_sw.spawn(task_name) .resource(&llm_sw) .retries(2) .init(move |ctx| { - job_experience_mine(ctx, &path) + job_experience_mine(ctx, &path, seg) }); extract_queued += 1; } @@ -423,7 +468,7 @@ pub fn run_daemon() -> Result<(), String> { let mut fact_queued = 0; if needs_extract.len() == extract_queued { let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued); - for (_, filename, path_str) in &needs_fact { + for (filename, path_str) in &needs_fact { if fact_queued >= fact_budget { fact_remaining += 1; continue; @@ -443,6 +488,21 @@ pub fn run_daemon() -> Result<(), String> { fact_remaining = needs_fact.len(); } + // Write whole-file keys for fully-mined multi-segment files + if !mark_transcript_done.is_empty() { + if let Ok(mut store) = crate::store::Store::load() { + for (fname_key, path_str, seg_count) in &mark_transcript_done { + let content = format!("All {} segments mined for {}", seg_count, path_str); + let mut node = crate::store::new_node(fname_key, &content); + node.category = crate::store::Category::Task; + node.provenance = crate::store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(node); + seg_cache.remove(path_str); + } + let _ = store.save(); + } + } + let extract_pending = extract_queued + extract_remaining; let fact_pending = fact_queued + fact_remaining; if extract_pending > 0 || fact_pending > 0 || still_open > 0 { diff --git a/src/enrich.rs b/src/enrich.rs index 91def2d..abe13df 100644 --- a/src/enrich.rs +++ b/src/enrich.rs @@ -85,7 +85,7 @@ pub fn is_transcript_mined_with_keys(mined: &HashSet, path: &str) -> boo /// Extract user/assistant messages with line numbers from a JSONL transcript. /// (line_number, role, text, timestamp) -fn extract_conversation(jsonl_path: &str) -> Result, String> { +pub fn extract_conversation(jsonl_path: &str) -> Result, String> { let content = fs::read_to_string(jsonl_path) .map_err(|e| format!("read {}: {}", jsonl_path, e))?; @@ -135,6 +135,33 @@ fn extract_conversation(jsonl_path: &str) -> Result) -> Vec> { + let mut segments: Vec> = Vec::new(); + let mut current = Vec::new(); + + for msg in messages { + if msg.1 == "user" && msg.2.starts_with(COMPACTION_MARKER) { + if !current.is_empty() { + segments.push(current); + current = Vec::new(); + } + // The continuation message itself is part of the new segment + current.push(msg); + } else { + current.push(msg); + } + } + if !current.is_empty() { + segments.push(current); + } + + segments +} + /// Format conversation messages for the prompt (truncating long messages). fn format_conversation(messages: &[(usize, String, String, String)]) -> String { messages.iter() @@ -259,9 +286,11 @@ pub fn journal_enrich( } /// Mine a conversation transcript for experiential moments not yet journaled. +/// If `segment` is Some, only process that compaction segment of the file. pub fn experience_mine( store: &mut Store, jsonl_path: &str, + segment: Option, ) -> Result { println!("Experience mining: {}", jsonl_path); @@ -287,7 +316,18 @@ pub fn experience_mine( return Ok(0); } - let messages = extract_conversation(jsonl_path)?; + let all_messages = extract_conversation(jsonl_path)?; + + // If segment is specified, extract just that segment; otherwise process all messages + let messages = match segment { + Some(idx) => { + let segments = split_on_compaction(all_messages); + segments.into_iter().nth(idx) + .ok_or_else(|| format!("segment {} out of range", idx))? + } + None => all_messages, + }; + let conversation = format_conversation(&messages); println!(" {} messages, {} chars", messages.len(), conversation.len()); @@ -327,7 +367,13 @@ pub fn experience_mine( ("{{KEYS}}", &keys_text), ("{{CONVERSATION}}", &conversation), ])?; - println!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4); + let est_tokens = prompt.len() / 4; + println!(" Prompt: {} chars (~{} tokens)", prompt.len(), est_tokens); + + if est_tokens > 150_000 { + println!(" Skipping: prompt too large ({} tokens > 150k limit)", est_tokens); + return Ok(0); + } println!(" Calling Sonnet..."); let response = call_sonnet("experience-mine", &prompt)?; @@ -389,24 +435,34 @@ pub fn experience_mine( let _ = store.upsert_node(node); count += 1; - let preview = if content.len() > 80 { &content[..77] } else { content }; + let preview = if content.len() > 80 { + let end = content.floor_char_boundary(77); + &content[..end] + } else { + content + }; println!(" + [{}] {}...", ts, preview); } - // Record this transcript as mined (even if count == 0, to prevent re-runs) - // Two keys: content hash (exact dedup) and filename (fast daemon reconcile) + // Record this transcript/segment as mined (even if count == 0, to prevent re-runs) + let fname_key = match segment { + Some(idx) => format!("{}.{}", transcript_filename_key(jsonl_path), idx), + None => transcript_filename_key(jsonl_path), + }; let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count); - let mut dedup_node = new_node(&dedup_key, &dedup_content); - dedup_node.category = store::Category::Task; - dedup_node.provenance = store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(dedup_node); - - let fname_key = transcript_filename_key(jsonl_path); let mut fname_node = new_node(&fname_key, &dedup_content); fname_node.category = store::Category::Task; fname_node.provenance = store::Provenance::AgentExperienceMine; let _ = store.upsert_node(fname_node); + // For unsegmented calls, also write the content-hash key for backwards compat + if segment.is_none() { + let mut dedup_node = new_node(&dedup_key, &dedup_content); + dedup_node.category = store::Category::Task; + dedup_node.provenance = store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(dedup_node); + } + if count > 0 { println!(" Saved {} new journal entries.", count); } diff --git a/src/fact_mine.rs b/src/fact_mine.rs index 8cb4e5b..28118c4 100644 --- a/src/fact_mine.rs +++ b/src/fact_mine.rs @@ -234,7 +234,7 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result, String> if dry_run { for (i, (offset, chunk)) in chunks.iter().enumerate() { eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len()); - let preview = if chunk.len() > 500 { &chunk[..500] } else { chunk }; + let preview = if chunk.len() > 500 { &chunk[..chunk.floor_char_boundary(500)] } else { chunk }; eprintln!("{}", preview); if chunk.len() > 500 { eprintln!(" ... ({} more chars)", chunk.len() - 500); diff --git a/src/main.rs b/src/main.rs index 7ffb44a..ddf4a92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -952,7 +952,7 @@ fn cmd_experience_mine(args: &[String]) -> Result<(), String> { } let mut store = store::Store::load()?; - let count = enrich::experience_mine(&mut store, &jsonl_path)?; + let count = enrich::experience_mine(&mut store, &jsonl_path, None)?; println!("Done: {} new entries mined.", count); Ok(()) }