experience-mine: split oversized sessions at compaction boundaries
Claude Code doesn't create new session files on context compaction —
a single UUID can accumulate 170+ conversations, producing 400MB+
JSONL files that generate 1.3M token prompts.
Split at compaction markers ("This session is being continued..."):
- extract_conversation made pub, split_on_compaction splits messages
- experience_mine takes optional segment index
- daemon watcher parses files, spawns per-segment jobs (.0, .1, .2)
- seg_cache memoizes segment counts across ticks
- per-segment dedup keys; whole-file key when all segments complete
- 150K token guard skips any remaining oversized segments
- char-boundary-safe truncation in enrich.rs and fact_mine.rs
Backwards compatible: unsegmented calls still write content-hash
dedup keys, old whole-file mined keys still recognized.
This commit is contained in:
parent
22a9fdabdb
commit
45335de220
4 changed files with 155 additions and 39 deletions
110
src/daemon.rs
110
src/daemon.rs
|
|
@ -13,7 +13,7 @@
|
||||||
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
|
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
|
||||||
|
|
||||||
use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus};
|
use jobkit::{Choir, ExecutionContext, ResourcePool, TaskError, TaskInfo, TaskStatus};
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
@ -97,13 +97,13 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn job_experience_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
|
fn job_experience_mine(ctx: &ExecutionContext, path: &str, segment: Option<usize>) -> Result<(), TaskError> {
|
||||||
let path = path.to_string();
|
let path = path.to_string();
|
||||||
run_job(ctx, &format!("experience-mine {}", path), || {
|
run_job(ctx, &format!("experience-mine {}", path), || {
|
||||||
ctx.log_line("loading store");
|
ctx.log_line("loading store");
|
||||||
let mut store = crate::store::Store::load()?;
|
let mut store = crate::store::Store::load()?;
|
||||||
ctx.log_line("mining");
|
ctx.log_line("mining");
|
||||||
let count = crate::enrich::experience_mine(&mut store, &path)?;
|
let count = crate::enrich::experience_mine(&mut store, &path, segment)?;
|
||||||
ctx.log_line(&format!("{} entries mined", count));
|
ctx.log_line(&format!("{} entries mined", count));
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
|
@ -323,6 +323,8 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
let last_daily_sw = Arc::clone(&last_daily);
|
let last_daily_sw = Arc::clone(&last_daily);
|
||||||
choir.spawn("session-watcher").init(move |ctx| {
|
choir.spawn("session-watcher").init(move |ctx| {
|
||||||
ctx.set_progress("idle");
|
ctx.set_progress("idle");
|
||||||
|
// Cache segment counts so we don't re-parse large files every tick
|
||||||
|
let mut seg_cache: HashMap<String, usize> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
if ctx.is_cancelled() {
|
if ctx.is_cancelled() {
|
||||||
return Err(TaskError::Fatal("cancelled".into()));
|
return Err(TaskError::Fatal("cancelled".into()));
|
||||||
|
|
@ -366,10 +368,13 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
let mut already_mined = 0;
|
let mut already_mined = 0;
|
||||||
let mut still_open = 0;
|
let mut still_open = 0;
|
||||||
let total_stale = stale.len();
|
let total_stale = stale.len();
|
||||||
|
// Multi-segment files where all segments are done — write whole-file key
|
||||||
|
let mut mark_transcript_done: Vec<(String, String, usize)> = Vec::new();
|
||||||
|
|
||||||
// Classify each session
|
// Classify each session — now segment-aware
|
||||||
let mut needs_extract: Vec<(PathBuf, String, String)> = Vec::new();
|
// Each entry: (filename, path_str, segment_index)
|
||||||
let mut needs_fact: Vec<(PathBuf, String, String)> = Vec::new();
|
let mut needs_extract: Vec<(String, String, Option<usize>)> = Vec::new();
|
||||||
|
let mut needs_fact: Vec<(String, String)> = Vec::new();
|
||||||
|
|
||||||
for session in stale {
|
for session in stale {
|
||||||
let filename = session.file_name()
|
let filename = session.file_name()
|
||||||
|
|
@ -377,44 +382,84 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
.unwrap_or_else(|| "unknown".into());
|
.unwrap_or_else(|| "unknown".into());
|
||||||
let path_str = session.to_string_lossy().to_string();
|
let path_str = session.to_string_lossy().to_string();
|
||||||
|
|
||||||
// Skip if already in-flight
|
// Check for old-style whole-file mined key
|
||||||
let extract_name = format!("extract:{}", filename);
|
|
||||||
let fact_name = format!("fact-mine:{}", filename);
|
|
||||||
if active.contains(&extract_name) || active.contains(&fact_name) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str);
|
let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str);
|
||||||
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
|
||||||
let fact_done = fact_keys.contains(&fact_key);
|
|
||||||
|
|
||||||
if !experience_done {
|
if !experience_done {
|
||||||
if is_file_open(&session) {
|
if is_file_open(&session) {
|
||||||
still_open += 1;
|
still_open += 1;
|
||||||
} else {
|
continue;
|
||||||
needs_extract.push((session, filename, path_str));
|
}
|
||||||
|
|
||||||
|
// Get segment count, using cache to avoid re-parsing large files
|
||||||
|
let seg_count = if let Some(&cached) = seg_cache.get(&path_str) {
|
||||||
|
cached
|
||||||
|
} else {
|
||||||
|
let messages = match crate::enrich::extract_conversation(&path_str) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
let count = crate::enrich::split_on_compaction(messages).len();
|
||||||
|
seg_cache.insert(path_str.clone(), count);
|
||||||
|
count
|
||||||
|
};
|
||||||
|
|
||||||
|
if seg_count <= 1 {
|
||||||
|
let task_name = format!("extract:{}", filename);
|
||||||
|
if !active.contains(&task_name) {
|
||||||
|
needs_extract.push((filename, path_str, None));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Multi-segment — find unmined segments
|
||||||
|
let fname_key = crate::enrich::transcript_filename_key(&path_str);
|
||||||
|
let mut unmined = 0;
|
||||||
|
for i in 0..seg_count {
|
||||||
|
let seg_key = format!("{}.{}", fname_key, i);
|
||||||
|
if mined.contains(&seg_key) { continue; }
|
||||||
|
unmined += 1;
|
||||||
|
let task_name = format!("extract:{}.{}", filename, i);
|
||||||
|
if active.contains(&task_name) { continue; }
|
||||||
|
needs_extract.push((
|
||||||
|
format!("{}.{}", filename, i),
|
||||||
|
path_str.clone(),
|
||||||
|
Some(i),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if unmined == 0 {
|
||||||
|
// All segments done — write whole-file key so we skip next tick
|
||||||
|
mark_transcript_done.push((fname_key, path_str.clone(), seg_count));
|
||||||
|
already_mined += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if !fact_done {
|
|
||||||
needs_fact.push((session, filename, path_str));
|
|
||||||
} else {
|
} else {
|
||||||
already_mined += 1;
|
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
||||||
|
let fact_done = fact_keys.contains(&fact_key);
|
||||||
|
if !fact_done {
|
||||||
|
let task_name = format!("fact-mine:{}", filename);
|
||||||
|
if !active.contains(&task_name) {
|
||||||
|
needs_fact.push((filename, path_str));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
already_mined += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn experience-mine jobs (priority)
|
// Spawn experience-mine jobs (priority)
|
||||||
for (_, filename, path_str) in &needs_extract {
|
for (task_label, path_str, segment) in &needs_extract {
|
||||||
if extract_queued >= MAX_NEW_PER_TICK {
|
if extract_queued >= MAX_NEW_PER_TICK {
|
||||||
extract_remaining += 1;
|
extract_remaining += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let task_name = format!("extract:{}", filename);
|
let task_name = format!("extract:{}", task_label);
|
||||||
log_event("extract", "queued", path_str);
|
log_event("extract", "queued", &task_name);
|
||||||
let path = path_str.clone();
|
let path = path_str.clone();
|
||||||
|
let seg = *segment;
|
||||||
choir_sw.spawn(task_name)
|
choir_sw.spawn(task_name)
|
||||||
.resource(&llm_sw)
|
.resource(&llm_sw)
|
||||||
.retries(2)
|
.retries(2)
|
||||||
.init(move |ctx| {
|
.init(move |ctx| {
|
||||||
job_experience_mine(ctx, &path)
|
job_experience_mine(ctx, &path, seg)
|
||||||
});
|
});
|
||||||
extract_queued += 1;
|
extract_queued += 1;
|
||||||
}
|
}
|
||||||
|
|
@ -423,7 +468,7 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
let mut fact_queued = 0;
|
let mut fact_queued = 0;
|
||||||
if needs_extract.len() == extract_queued {
|
if needs_extract.len() == extract_queued {
|
||||||
let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued);
|
let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued);
|
||||||
for (_, filename, path_str) in &needs_fact {
|
for (filename, path_str) in &needs_fact {
|
||||||
if fact_queued >= fact_budget {
|
if fact_queued >= fact_budget {
|
||||||
fact_remaining += 1;
|
fact_remaining += 1;
|
||||||
continue;
|
continue;
|
||||||
|
|
@ -443,6 +488,21 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
fact_remaining = needs_fact.len();
|
fact_remaining = needs_fact.len();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write whole-file keys for fully-mined multi-segment files
|
||||||
|
if !mark_transcript_done.is_empty() {
|
||||||
|
if let Ok(mut store) = crate::store::Store::load() {
|
||||||
|
for (fname_key, path_str, seg_count) in &mark_transcript_done {
|
||||||
|
let content = format!("All {} segments mined for {}", seg_count, path_str);
|
||||||
|
let mut node = crate::store::new_node(fname_key, &content);
|
||||||
|
node.category = crate::store::Category::Task;
|
||||||
|
node.provenance = crate::store::Provenance::AgentExperienceMine;
|
||||||
|
let _ = store.upsert_node(node);
|
||||||
|
seg_cache.remove(path_str);
|
||||||
|
}
|
||||||
|
let _ = store.save();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let extract_pending = extract_queued + extract_remaining;
|
let extract_pending = extract_queued + extract_remaining;
|
||||||
let fact_pending = fact_queued + fact_remaining;
|
let fact_pending = fact_queued + fact_remaining;
|
||||||
if extract_pending > 0 || fact_pending > 0 || still_open > 0 {
|
if extract_pending > 0 || fact_pending > 0 || still_open > 0 {
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ pub fn is_transcript_mined_with_keys(mined: &HashSet<String>, path: &str) -> boo
|
||||||
|
|
||||||
/// Extract user/assistant messages with line numbers from a JSONL transcript.
|
/// Extract user/assistant messages with line numbers from a JSONL transcript.
|
||||||
/// (line_number, role, text, timestamp)
|
/// (line_number, role, text, timestamp)
|
||||||
fn extract_conversation(jsonl_path: &str) -> Result<Vec<(usize, String, String, String)>, String> {
|
pub fn extract_conversation(jsonl_path: &str) -> Result<Vec<(usize, String, String, String)>, String> {
|
||||||
let content = fs::read_to_string(jsonl_path)
|
let content = fs::read_to_string(jsonl_path)
|
||||||
.map_err(|e| format!("read {}: {}", jsonl_path, e))?;
|
.map_err(|e| format!("read {}: {}", jsonl_path, e))?;
|
||||||
|
|
||||||
|
|
@ -135,6 +135,33 @@ fn extract_conversation(jsonl_path: &str) -> Result<Vec<(usize, String, String,
|
||||||
Ok(messages)
|
Ok(messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const COMPACTION_MARKER: &str = "This session is being continued from a previous conversation that ran out of context";
|
||||||
|
|
||||||
|
/// Split extracted messages into segments at compaction boundaries.
|
||||||
|
/// Each segment represents one continuous conversation before context was compacted.
|
||||||
|
pub fn split_on_compaction(messages: Vec<(usize, String, String, String)>) -> Vec<Vec<(usize, String, String, String)>> {
|
||||||
|
let mut segments: Vec<Vec<(usize, String, String, String)>> = Vec::new();
|
||||||
|
let mut current = Vec::new();
|
||||||
|
|
||||||
|
for msg in messages {
|
||||||
|
if msg.1 == "user" && msg.2.starts_with(COMPACTION_MARKER) {
|
||||||
|
if !current.is_empty() {
|
||||||
|
segments.push(current);
|
||||||
|
current = Vec::new();
|
||||||
|
}
|
||||||
|
// The continuation message itself is part of the new segment
|
||||||
|
current.push(msg);
|
||||||
|
} else {
|
||||||
|
current.push(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !current.is_empty() {
|
||||||
|
segments.push(current);
|
||||||
|
}
|
||||||
|
|
||||||
|
segments
|
||||||
|
}
|
||||||
|
|
||||||
/// Format conversation messages for the prompt (truncating long messages).
|
/// Format conversation messages for the prompt (truncating long messages).
|
||||||
fn format_conversation(messages: &[(usize, String, String, String)]) -> String {
|
fn format_conversation(messages: &[(usize, String, String, String)]) -> String {
|
||||||
messages.iter()
|
messages.iter()
|
||||||
|
|
@ -259,9 +286,11 @@ pub fn journal_enrich(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mine a conversation transcript for experiential moments not yet journaled.
|
/// Mine a conversation transcript for experiential moments not yet journaled.
|
||||||
|
/// If `segment` is Some, only process that compaction segment of the file.
|
||||||
pub fn experience_mine(
|
pub fn experience_mine(
|
||||||
store: &mut Store,
|
store: &mut Store,
|
||||||
jsonl_path: &str,
|
jsonl_path: &str,
|
||||||
|
segment: Option<usize>,
|
||||||
) -> Result<usize, String> {
|
) -> Result<usize, String> {
|
||||||
println!("Experience mining: {}", jsonl_path);
|
println!("Experience mining: {}", jsonl_path);
|
||||||
|
|
||||||
|
|
@ -287,7 +316,18 @@ pub fn experience_mine(
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let messages = extract_conversation(jsonl_path)?;
|
let all_messages = extract_conversation(jsonl_path)?;
|
||||||
|
|
||||||
|
// If segment is specified, extract just that segment; otherwise process all messages
|
||||||
|
let messages = match segment {
|
||||||
|
Some(idx) => {
|
||||||
|
let segments = split_on_compaction(all_messages);
|
||||||
|
segments.into_iter().nth(idx)
|
||||||
|
.ok_or_else(|| format!("segment {} out of range", idx))?
|
||||||
|
}
|
||||||
|
None => all_messages,
|
||||||
|
};
|
||||||
|
|
||||||
let conversation = format_conversation(&messages);
|
let conversation = format_conversation(&messages);
|
||||||
println!(" {} messages, {} chars", messages.len(), conversation.len());
|
println!(" {} messages, {} chars", messages.len(), conversation.len());
|
||||||
|
|
||||||
|
|
@ -327,7 +367,13 @@ pub fn experience_mine(
|
||||||
("{{KEYS}}", &keys_text),
|
("{{KEYS}}", &keys_text),
|
||||||
("{{CONVERSATION}}", &conversation),
|
("{{CONVERSATION}}", &conversation),
|
||||||
])?;
|
])?;
|
||||||
println!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4);
|
let est_tokens = prompt.len() / 4;
|
||||||
|
println!(" Prompt: {} chars (~{} tokens)", prompt.len(), est_tokens);
|
||||||
|
|
||||||
|
if est_tokens > 150_000 {
|
||||||
|
println!(" Skipping: prompt too large ({} tokens > 150k limit)", est_tokens);
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
println!(" Calling Sonnet...");
|
println!(" Calling Sonnet...");
|
||||||
let response = call_sonnet("experience-mine", &prompt)?;
|
let response = call_sonnet("experience-mine", &prompt)?;
|
||||||
|
|
@ -389,24 +435,34 @@ pub fn experience_mine(
|
||||||
let _ = store.upsert_node(node);
|
let _ = store.upsert_node(node);
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
let preview = if content.len() > 80 { &content[..77] } else { content };
|
let preview = if content.len() > 80 {
|
||||||
|
let end = content.floor_char_boundary(77);
|
||||||
|
&content[..end]
|
||||||
|
} else {
|
||||||
|
content
|
||||||
|
};
|
||||||
println!(" + [{}] {}...", ts, preview);
|
println!(" + [{}] {}...", ts, preview);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record this transcript as mined (even if count == 0, to prevent re-runs)
|
// Record this transcript/segment as mined (even if count == 0, to prevent re-runs)
|
||||||
// Two keys: content hash (exact dedup) and filename (fast daemon reconcile)
|
let fname_key = match segment {
|
||||||
|
Some(idx) => format!("{}.{}", transcript_filename_key(jsonl_path), idx),
|
||||||
|
None => transcript_filename_key(jsonl_path),
|
||||||
|
};
|
||||||
let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count);
|
let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count);
|
||||||
let mut dedup_node = new_node(&dedup_key, &dedup_content);
|
|
||||||
dedup_node.category = store::Category::Task;
|
|
||||||
dedup_node.provenance = store::Provenance::AgentExperienceMine;
|
|
||||||
let _ = store.upsert_node(dedup_node);
|
|
||||||
|
|
||||||
let fname_key = transcript_filename_key(jsonl_path);
|
|
||||||
let mut fname_node = new_node(&fname_key, &dedup_content);
|
let mut fname_node = new_node(&fname_key, &dedup_content);
|
||||||
fname_node.category = store::Category::Task;
|
fname_node.category = store::Category::Task;
|
||||||
fname_node.provenance = store::Provenance::AgentExperienceMine;
|
fname_node.provenance = store::Provenance::AgentExperienceMine;
|
||||||
let _ = store.upsert_node(fname_node);
|
let _ = store.upsert_node(fname_node);
|
||||||
|
|
||||||
|
// For unsegmented calls, also write the content-hash key for backwards compat
|
||||||
|
if segment.is_none() {
|
||||||
|
let mut dedup_node = new_node(&dedup_key, &dedup_content);
|
||||||
|
dedup_node.category = store::Category::Task;
|
||||||
|
dedup_node.provenance = store::Provenance::AgentExperienceMine;
|
||||||
|
let _ = store.upsert_node(dedup_node);
|
||||||
|
}
|
||||||
|
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
println!(" Saved {} new journal entries.", count);
|
println!(" Saved {} new journal entries.", count);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -234,7 +234,7 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result<Vec<Fact>, String>
|
||||||
if dry_run {
|
if dry_run {
|
||||||
for (i, (offset, chunk)) in chunks.iter().enumerate() {
|
for (i, (offset, chunk)) in chunks.iter().enumerate() {
|
||||||
eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len());
|
eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len());
|
||||||
let preview = if chunk.len() > 500 { &chunk[..500] } else { chunk };
|
let preview = if chunk.len() > 500 { &chunk[..chunk.floor_char_boundary(500)] } else { chunk };
|
||||||
eprintln!("{}", preview);
|
eprintln!("{}", preview);
|
||||||
if chunk.len() > 500 {
|
if chunk.len() > 500 {
|
||||||
eprintln!(" ... ({} more chars)", chunk.len() - 500);
|
eprintln!(" ... ({} more chars)", chunk.len() - 500);
|
||||||
|
|
|
||||||
|
|
@ -952,7 +952,7 @@ fn cmd_experience_mine(args: &[String]) -> Result<(), String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut store = store::Store::load()?;
|
let mut store = store::Store::load()?;
|
||||||
let count = enrich::experience_mine(&mut store, &jsonl_path)?;
|
let count = enrich::experience_mine(&mut store, &jsonl_path, None)?;
|
||||||
println!("Done: {} new entries mined.", count);
|
println!("Done: {} new entries mined.", count);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue