experience-mine: per-segment dedup keys, retry backoff
The whole-file dedup key (_mined-transcripts#f-{UUID}) prevented mining
new compaction segments when session files grew. Replace with per-segment
keys (_mined-transcripts#f-{UUID}.{N}) so each segment is tracked
independently.
Changes:
- daemon session-watcher: segment-aware dedup, migrate 272 existing
whole-file keys to per-segment on restart
- seg_cache with size-based invalidation (re-parse when file grows)
- exponential retry backoff (5min → 30min cap) for failed sessions
- experience_mine(): write per-segment key only, backfill on
content-hash early return
- fact-mining gated on all per-segment keys existing
Also adds documentation:
- docs/claude-code-transcript-format.md: JSONL transcript format
- docs/plan-experience-mine-dedup-fix.md: design document
This commit is contained in:
parent
1326a683a5
commit
8eb6308760
4 changed files with 367 additions and 95 deletions
|
|
@ -308,16 +308,52 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
let last_daily_sw = Arc::clone(&last_daily);
|
||||
choir.spawn("session-watcher").init(move |ctx| {
|
||||
ctx.set_progress("idle");
|
||||
// Cache segment counts so we don't re-parse large files every tick
|
||||
let mut seg_cache: HashMap<String, usize> = HashMap::new();
|
||||
// Cache: path → (file_size, segment_count). Invalidated when size changes.
|
||||
let mut seg_cache: HashMap<String, (u64, usize)> = HashMap::new();
|
||||
// Retry backoff: filename → (next_retry_after, current_backoff).
|
||||
// Exponential from 5min, cap 30min. Resets on daemon restart.
|
||||
let mut retry_backoff: HashMap<String, (std::time::Instant, Duration)> = HashMap::new();
|
||||
|
||||
const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min
|
||||
const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min
|
||||
|
||||
loop {
|
||||
if ctx.is_cancelled() {
|
||||
return Err(TaskError::Fatal("cancelled".into()));
|
||||
}
|
||||
|
||||
ctx.set_progress("scanning");
|
||||
|
||||
// Check for failed tasks and update backoff.
|
||||
// Task names are "extract:{filename}.{segment}" — extract the
|
||||
// filename (UUID.jsonl) by stripping the trailing .N segment suffix.
|
||||
let task_statuses = choir_sw.task_statuses();
|
||||
for t in &task_statuses {
|
||||
if let Some(label) = t.name.strip_prefix("extract:") {
|
||||
// label is "UUID.jsonl.N" — strip last ".N" to get filename
|
||||
let filename = match label.rfind('.') {
|
||||
Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => {
|
||||
&label[..pos]
|
||||
}
|
||||
_ => label,
|
||||
};
|
||||
match t.status {
|
||||
TaskStatus::Failed => {
|
||||
let entry = retry_backoff.entry(filename.to_string())
|
||||
.or_insert((std::time::Instant::now(), BACKOFF_INITIAL));
|
||||
entry.1 = (entry.1 * 2).min(BACKOFF_MAX);
|
||||
entry.0 = std::time::Instant::now() + entry.1;
|
||||
}
|
||||
TaskStatus::Completed => {
|
||||
retry_backoff.remove(filename);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// What's currently running/pending? (avoid spawning duplicates)
|
||||
let active: HashSet<String> = choir_sw.task_statuses().iter()
|
||||
let active: HashSet<String> = task_statuses.iter()
|
||||
.filter(|t| !t.status.is_finished())
|
||||
.map(|t| t.name.clone())
|
||||
.collect();
|
||||
|
|
@ -327,9 +363,6 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
// Load mined transcript keys once for this tick
|
||||
let mined = super::enrich::mined_transcript_keys();
|
||||
|
||||
// Limit new tasks per tick — the resource pool gates execution,
|
||||
// but we don't need thousands of task objects in the registry.
|
||||
// The watcher ticks every 60s so backlog drains steadily.
|
||||
const MAX_NEW_PER_TICK: usize = 10;
|
||||
|
||||
// Load fact-mined keys too
|
||||
|
|
@ -352,74 +385,91 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
let mut fact_remaining = 0;
|
||||
let mut already_mined = 0;
|
||||
let mut still_open = 0;
|
||||
let mut backed_off = 0;
|
||||
let total_stale = stale.len();
|
||||
// Multi-segment files where all segments are done — write whole-file key
|
||||
let mut mark_transcript_done: Vec<(String, String, usize)> = Vec::new();
|
||||
|
||||
// Classify each session — now segment-aware
|
||||
// Each entry: (filename, path_str, segment_index)
|
||||
// Sessions with old whole-file keys that need per-segment migration
|
||||
let mut migrate_keys: Vec<(String, String, usize)> = Vec::new();
|
||||
|
||||
let mut needs_extract: Vec<(String, String, Option<usize>)> = Vec::new();
|
||||
let mut needs_fact: Vec<(String, String)> = Vec::new();
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
for session in stale {
|
||||
let filename = session.file_name()
|
||||
.map(|n| n.to_string_lossy().to_string())
|
||||
.unwrap_or_else(|| "unknown".into());
|
||||
let path_str = session.to_string_lossy().to_string();
|
||||
|
||||
// Check for old-style whole-file mined key
|
||||
let experience_done = super::enrich::is_transcript_mined_with_keys(&mined, &path_str);
|
||||
|
||||
if !experience_done {
|
||||
if is_file_open(&session) {
|
||||
still_open += 1;
|
||||
// Check retry backoff before doing any work
|
||||
if let Some((next_retry, _)) = retry_backoff.get(&filename) {
|
||||
if now < *next_retry {
|
||||
backed_off += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Get segment count, using cache to avoid re-parsing large files
|
||||
let seg_count = if let Some(&cached) = seg_cache.get(&path_str) {
|
||||
cached
|
||||
if is_file_open(&session) {
|
||||
still_open += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get file size for cache invalidation
|
||||
let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
// Get segment count, using cache with size-based invalidation
|
||||
let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) {
|
||||
if cached_size == file_size {
|
||||
cached_count
|
||||
} else {
|
||||
// File changed — re-parse
|
||||
let messages = match super::enrich::extract_conversation(&path_str) {
|
||||
Ok(m) => m,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let count = super::enrich::split_on_compaction(messages).len();
|
||||
seg_cache.insert(path_str.clone(), count);
|
||||
seg_cache.insert(path_str.clone(), (file_size, count));
|
||||
count
|
||||
};
|
||||
|
||||
if seg_count <= 1 {
|
||||
let task_name = format!("extract:{}", filename);
|
||||
if !active.contains(&task_name) {
|
||||
needs_extract.push((filename, path_str, None));
|
||||
}
|
||||
} else {
|
||||
// Multi-segment — find unmined segments
|
||||
let fname_key = super::enrich::transcript_filename_key(&path_str);
|
||||
let mut unmined = 0;
|
||||
for i in 0..seg_count {
|
||||
let seg_key = format!("{}.{}", fname_key, i);
|
||||
if mined.contains(&seg_key) { continue; }
|
||||
unmined += 1;
|
||||
let task_name = format!("extract:{}.{}", filename, i);
|
||||
if active.contains(&task_name) { continue; }
|
||||
needs_extract.push((
|
||||
format!("{}.{}", filename, i),
|
||||
path_str.clone(),
|
||||
Some(i),
|
||||
));
|
||||
}
|
||||
if unmined == 0 {
|
||||
// All segments done — write whole-file key so we skip next tick
|
||||
mark_transcript_done.push((fname_key, path_str.clone(), seg_count));
|
||||
already_mined += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let messages = match super::enrich::extract_conversation(&path_str) {
|
||||
Ok(m) => m,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let count = super::enrich::split_on_compaction(messages).len();
|
||||
seg_cache.insert(path_str.clone(), (file_size, count));
|
||||
count
|
||||
};
|
||||
|
||||
let fname_key = super::enrich::transcript_filename_key(&path_str);
|
||||
let has_whole_file_key = mined.contains(&fname_key);
|
||||
|
||||
// Check per-segment keys, find unmined segments
|
||||
let mut unmined_segs: Vec<usize> = Vec::new();
|
||||
let mut has_any_seg_key = false;
|
||||
for i in 0..seg_count {
|
||||
let seg_key = format!("{}.{}", fname_key, i);
|
||||
if mined.contains(&seg_key) {
|
||||
has_any_seg_key = true;
|
||||
} else {
|
||||
unmined_segs.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate old whole-file key: if it exists but no per-segment keys,
|
||||
// write per-segment keys for all current segments (they were mined
|
||||
// under the old scheme)
|
||||
if has_whole_file_key && !has_any_seg_key {
|
||||
migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count));
|
||||
// After migration, all current segments are covered
|
||||
unmined_segs.clear();
|
||||
}
|
||||
|
||||
if unmined_segs.is_empty() {
|
||||
// All segments mined — check fact-mining
|
||||
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
||||
let fact_done = fact_keys.contains(&fact_key);
|
||||
if !fact_done {
|
||||
if !fact_keys.contains(&fact_key) {
|
||||
let task_name = format!("fact-mine:{}", filename);
|
||||
if !active.contains(&task_name) {
|
||||
needs_fact.push((filename, path_str));
|
||||
|
|
@ -427,6 +477,35 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
} else {
|
||||
already_mined += 1;
|
||||
}
|
||||
} else {
|
||||
// Queue unmined segments
|
||||
for i in unmined_segs {
|
||||
let task_name = format!("extract:{}.{}", filename, i);
|
||||
if active.contains(&task_name) { continue; }
|
||||
needs_extract.push((
|
||||
format!("{}.{}", filename, i),
|
||||
path_str.clone(),
|
||||
Some(i),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate old whole-file keys to per-segment keys
|
||||
if !migrate_keys.is_empty() {
|
||||
if let Ok(mut store) = crate::store::Store::load() {
|
||||
for (fname_key, path_str, seg_count) in &migrate_keys {
|
||||
for i in 0..*seg_count {
|
||||
let seg_key = format!("{}.{}", fname_key, i);
|
||||
let content = format!("Migrated from whole-file key for {}", path_str);
|
||||
let mut node = crate::store::new_node(&seg_key, &content);
|
||||
node.provenance = crate::store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
}
|
||||
}
|
||||
let _ = store.save();
|
||||
log_event("session-watcher", "migrated",
|
||||
&format!("{} whole-file keys → per-segment", migrate_keys.len()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -450,7 +529,6 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
}
|
||||
|
||||
// Only queue fact-mine when experience backlog is clear
|
||||
// Sort by file size so small transcripts drain first
|
||||
needs_fact.sort_by_key(|(_, path_str)| {
|
||||
fs::metadata(path_str).map(|m| m.len()).unwrap_or(u64::MAX)
|
||||
});
|
||||
|
|
@ -477,30 +555,17 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
fact_remaining = needs_fact.len();
|
||||
}
|
||||
|
||||
// Write whole-file keys for fully-mined multi-segment files
|
||||
if !mark_transcript_done.is_empty() {
|
||||
if let Ok(mut store) = crate::store::Store::load() {
|
||||
for (fname_key, path_str, seg_count) in &mark_transcript_done {
|
||||
let content = format!("All {} segments mined for {}", seg_count, path_str);
|
||||
let mut node = crate::store::new_node(fname_key, &content);
|
||||
node.provenance = crate::store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
seg_cache.remove(path_str);
|
||||
}
|
||||
let _ = store.save();
|
||||
}
|
||||
}
|
||||
|
||||
let extract_pending = extract_queued + extract_remaining;
|
||||
let fact_pending = fact_queued + fact_remaining;
|
||||
if extract_pending > 0 || fact_pending > 0 || still_open > 0 {
|
||||
if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 {
|
||||
log_event("session-watcher", "tick",
|
||||
&format!("{} stale, {} mined, {} extract, {} fact, {} open",
|
||||
total_stale, already_mined, extract_pending, fact_pending, still_open));
|
||||
&format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff",
|
||||
total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off));
|
||||
let mut parts = Vec::new();
|
||||
if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); }
|
||||
if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); }
|
||||
if still_open > 0 { parts.push(format!("{} open", still_open)); }
|
||||
if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); }
|
||||
ctx.set_progress(parts.join(", "));
|
||||
} else {
|
||||
ctx.set_progress("idle");
|
||||
|
|
|
|||
|
|
@ -61,13 +61,6 @@ pub fn mined_transcript_keys() -> HashSet<String> {
|
|||
keys
|
||||
}
|
||||
|
||||
/// Check if a transcript has been mined, given a pre-loaded set of mined keys.
|
||||
/// Checks filename-based key only (no file read). Sessions mined before the
|
||||
/// filename key was added will pass through and short-circuit in experience_mine
|
||||
/// via the content hash check — a one-time cost on first restart after this change.
|
||||
pub fn is_transcript_mined_with_keys(mined: &HashSet<String>, path: &str) -> bool {
|
||||
mined.contains(&transcript_filename_key(path))
|
||||
}
|
||||
|
||||
/// Extract user/assistant messages with line numbers from a JSONL transcript.
|
||||
/// (line_number, role, text, timestamp)
|
||||
|
|
@ -243,13 +236,15 @@ pub fn experience_mine(
|
|||
let dedup_key = format!("_mined-transcripts#h-{:016x}", hash);
|
||||
|
||||
if store.nodes.contains_key(&dedup_key) {
|
||||
// Backfill filename key if missing (transcripts mined before this key existed)
|
||||
let fname_key = transcript_filename_key(jsonl_path);
|
||||
if !store.nodes.contains_key(&fname_key) {
|
||||
let mut node = new_node(&fname_key, &format!("Backfilled from {}", dedup_key));
|
||||
node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
store.save()?;
|
||||
// Backfill per-segment key if called with a specific segment
|
||||
if let Some(idx) = segment {
|
||||
let seg_key = format!("{}.{}", transcript_filename_key(jsonl_path), idx);
|
||||
if !store.nodes.contains_key(&seg_key) {
|
||||
let mut node = new_node(&seg_key, &format!("Backfilled from {}", dedup_key));
|
||||
node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
store.save()?;
|
||||
}
|
||||
}
|
||||
println!(" Already mined this transcript ({}), skipping.", &dedup_key[24..]);
|
||||
return Ok(0);
|
||||
|
|
@ -370,20 +365,23 @@ pub fn experience_mine(
|
|||
}
|
||||
|
||||
// Record this transcript/segment as mined (even if count == 0, to prevent re-runs)
|
||||
let fname_key = match segment {
|
||||
Some(idx) => format!("{}.{}", transcript_filename_key(jsonl_path), idx),
|
||||
None => transcript_filename_key(jsonl_path),
|
||||
};
|
||||
let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count);
|
||||
let mut fname_node = new_node(&fname_key, &dedup_content);
|
||||
fname_node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(fname_node);
|
||||
|
||||
// For unsegmented calls, also write the content-hash key for backwards compat
|
||||
if segment.is_none() {
|
||||
let mut dedup_node = new_node(&dedup_key, &dedup_content);
|
||||
dedup_node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(dedup_node);
|
||||
match segment {
|
||||
Some(idx) => {
|
||||
// Per-segment key: the daemon writes the whole-file key when all segments are done
|
||||
let seg_key = format!("{}.{}", transcript_filename_key(jsonl_path), idx);
|
||||
let mut node = new_node(&seg_key, &dedup_content);
|
||||
node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
}
|
||||
None => {
|
||||
// Unsegmented: only write content-hash key (not the filename key, since the
|
||||
// file may grow with new compaction segments later — the daemon handles
|
||||
// writing the whole-file filename key after verifying all segments are done)
|
||||
let mut node = new_node(&dedup_key, &dedup_content);
|
||||
node.provenance = store::Provenance::AgentExperienceMine;
|
||||
let _ = store.upsert_node(node);
|
||||
}
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue