From 8eb6308760ca83e2b1e4e1bb2f654c263ce4a039 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Mon, 9 Mar 2026 02:27:51 -0400 Subject: [PATCH] experience-mine: per-segment dedup keys, retry backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The whole-file dedup key (_mined-transcripts#f-{UUID}) prevented mining new compaction segments when session files grew. Replace with per-segment keys (_mined-transcripts#f-{UUID}.{N}) so each segment is tracked independently. Changes: - daemon session-watcher: segment-aware dedup, migrate 272 existing whole-file keys to per-segment on restart - seg_cache with size-based invalidation (re-parse when file grows) - exponential retry backoff (5min → 30min cap) for failed sessions - experience_mine(): write per-segment key only, backfill on content-hash early return - fact-mining gated on all per-segment keys existing Also adds documentation: - docs/claude-code-transcript-format.md: JSONL transcript format - docs/plan-experience-mine-dedup-fix.md: design document --- docs/claude-code-transcript-format.md | 97 ++++++++++++ docs/plan-experience-mine-dedup-fix.md | 112 ++++++++++++++ poc-memory/src/agents/daemon.rs | 201 ++++++++++++++++--------- poc-memory/src/agents/enrich.rs | 52 +++---- 4 files changed, 367 insertions(+), 95 deletions(-) create mode 100644 docs/claude-code-transcript-format.md create mode 100644 docs/plan-experience-mine-dedup-fix.md diff --git a/docs/claude-code-transcript-format.md b/docs/claude-code-transcript-format.md new file mode 100644 index 0000000..47dbdeb --- /dev/null +++ b/docs/claude-code-transcript-format.md @@ -0,0 +1,97 @@ +# Claude Code Transcript Format + +Claude Code stores session transcripts as JSONL files (one JSON object per +line) in `~/.claude/projects//.jsonl`. + +## Common fields + +Every line has: +- `type` — message type (see below) +- `uuid` — unique ID for this message +- `parentUuid` — links to preceding message (forms a chain) +- `sessionId` — session UUID (matches the filename stem) +- `timestamp` — ISO 8601 +- `cwd`, `version`, `gitBranch` — session context + +## Message types + +### `user` + +User input or tool results. `message.content` is either: +- A string (plain user text) +- An array of content blocks, each with `type`: + - `"tool_result"` — result of a tool call, with `tool_use_id`, `content` + (string or array of text/image blocks), `is_error` + +User messages that start a compaction segment begin with: +``` +This session is being continued from a previous conversation that ran out of context. +``` +These are injected by Claude Code when context is compacted. + +Additional fields on user messages: +- `userType` — `"external"` for human input, may differ for system-injected +- `todos` — task list state +- `permissionMode` — permission level for the session + +### `assistant` + +Model responses. `message` contains the full API response: +- `model` — model ID (e.g. `"claude-opus-4-6"`) +- `role` — `"assistant"` +- `content` — array of content blocks: + - `{"type": "text", "text": "..."}` — text output + - `{"type": "tool_use", "id": "...", "name": "Bash", "input": {...}}` — tool call +- `stop_reason` — why generation stopped +- `usage` — token counts (input, output, cache hits) + +Additional fields: +- `requestId` — API request ID + +### `system` + +System events. Has `subtype` field: +- `"stop_hook_summary"` — hook execution results at end of turn + - `hookCount`, `hookInfos` (command + duration), `hookErrors` + - `preventedContinuation`, `stopReason` + +### `progress` + +Hook execution progress. `data` contains: +- `type` — e.g. `"hook_progress"` +- `hookEvent` — trigger event (e.g. `"PostToolUse"`) +- `hookName` — specific hook (e.g. `"PostToolUse:Bash"`) +- `command` — hook command path + +### `queue-operation` + +User input queued while assistant is working: +- `operation` — `"enqueue"` +- `content` — the queued text + +### `file-history-snapshot` + +File state snapshots for undo/redo: +- `snapshot.trackedFileBackups` — map of file paths to backup state + +## Compaction segments + +Long-running sessions hit context limits and get compacted. Each compaction +injects a user message starting with the marker text (see above), containing +a summary of the preceding conversation. This splits the transcript into +segments: + +- Segment 0: original conversation start through first compaction +- Segment 1: first compaction summary through second compaction +- Segment N: Nth compaction through next (or end of file) + +Segments are append-only — new compactions add higher-indexed segments. +Existing segment indices are stable and never shift. + +## File lifecycle + +- Created when a session starts +- Grows as messages are exchanged +- Grows further when compaction happens (summary injected, conversation continues) +- Never truncated or rewritten +- Becomes stale when the session ends (no process has the file open) diff --git a/docs/plan-experience-mine-dedup-fix.md b/docs/plan-experience-mine-dedup-fix.md new file mode 100644 index 0000000..f64ec8e --- /dev/null +++ b/docs/plan-experience-mine-dedup-fix.md @@ -0,0 +1,112 @@ +# Fix: experience-mine dedup and retry handling + +## Problem + +1. **Whole-file dedup key prevents mining new segments.** When a session + is mined, `experience_mine()` writes `_mined-transcripts#f-{UUID}` (a + whole-file key). If the session later grows (compaction adds segments), + the daemon sees the whole-file key and skips it forever. New segments + never get mined. + +2. **No retry backoff.** When `claude` CLI fails (exit status 1), the + session-watcher re-queues the same session every 60s tick. This + produces a wall of failures in the log and wastes resources. + +## Design + +### Dedup keys: per-segment only + +Going forward, dedup keys are per-segment: `_mined-transcripts#f-{UUID}.{N}` +where N is the segment index. No more whole-file keys. + +Segment indices are stable — compaction appends new segments, never +reorders existing ones. See `docs/claude-code-transcript-format.md`. + +### Migration of existing whole-file keys + +~276 sessions have whole-file keys (`_mined-transcripts#f-{UUID}` with +no segment suffix) and no per-segment keys. These were mined correctly +at the time. + +When the session-watcher encounters a whole-file key: +- Count current segments in the file +- Write per-segment keys for all current segments (they were covered + by the old whole-file key) +- If the file has grown since (new segments beyond the migrated set), + those won't have per-segment keys and will be mined normally + +This is a one-time migration per file. After migration, the whole-file +key is harmless dead weight — nothing creates new ones. + +### Retry backoff + +The session-watcher tracks failed sessions in a local +`HashMap` mapping path to +(next_retry_after, current_backoff). + +- Initial backoff: 5 minutes +- Each failure: double the backoff +- Cap: 30 minutes +- Resets on daemon restart (map is thread-local, not persisted) + +## Changes + +### `poc-memory/src/agents/enrich.rs` + +`experience_mine()`: stop writing the bare filename key for unsegmented +calls. Only write the content-hash key (for the legacy dedup check at +the top of the function) and per-segment keys. + +**Already done** — edited earlier in this session. + +### `poc-memory/src/agents/daemon.rs` + +Session-watcher changes: + +1. **Remove whole-file fast path.** Delete the `is_transcript_mined_with_keys` + check that short-circuits before segment counting. + +2. **Always go through segment-aware path.** Every stale session gets + segment counting (cached) and per-segment key checks. + +3. **Migrate whole-file keys.** When we find a whole-file key exists but + no per-segment keys: write per-segment keys for all current segments + into the store. One-time cost per file, batched into a single + store load/save per tick. + +4. **seg_cache with size invalidation.** Change from `HashMap` + to `HashMap` — `(file_size, seg_count)`. When + stat shows a different size, evict and re-parse. + +5. **Remove `mark_transcript_done`.** Stop writing whole-file keys for + fully-mined multi-segment files. + +6. **Add retry backoff.** `HashMap` for + tracking failed sessions. Skip sessions whose backoff hasn't expired. + On failure (task finishes with error), update the backoff. Exponential + from 5min, cap at 30min. + +7. **Fact-mining check.** Currently fact-mining is gated behind + `experience_done` (the whole-file key). After removing the whole-file + fast path, fact-mining should be gated on "all segments mined" — + i.e., all per-segment keys exist for the current segment count. + +### Manual cleanup after deploy + +Delete the dedup keys for sessions that failed repeatedly (like +`8cebfc0a-bd33-49f1-85a4-1489bdf7050c`) so they get re-processed: + +``` +poc-memory delete-node '_mined-transcripts#f-8cebfc0a-bd33-49f1-85a4-1489bdf7050c' +# also any content-hash key for the same file +``` + +## Verification + +After deploying: +- `tail -f ~/.claude/memory/daemon.log | grep session-watcher` should + show ticks with migration activity, then settle to idle +- Failed sessions should show increasing backoff intervals, not + per-second retries +- After fixing the `claude` CLI issue, backed-off sessions should + retry and succeed on the next daemon restart diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 8092b51..b49fb67 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -308,16 +308,52 @@ pub fn run_daemon() -> Result<(), String> { let last_daily_sw = Arc::clone(&last_daily); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); - // Cache segment counts so we don't re-parse large files every tick - let mut seg_cache: HashMap = HashMap::new(); + // Cache: path → (file_size, segment_count). Invalidated when size changes. + let mut seg_cache: HashMap = HashMap::new(); + // Retry backoff: filename → (next_retry_after, current_backoff). + // Exponential from 5min, cap 30min. Resets on daemon restart. + let mut retry_backoff: HashMap = HashMap::new(); + + const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min + const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min + loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } ctx.set_progress("scanning"); + + // Check for failed tasks and update backoff. + // Task names are "extract:{filename}.{segment}" — extract the + // filename (UUID.jsonl) by stripping the trailing .N segment suffix. + let task_statuses = choir_sw.task_statuses(); + for t in &task_statuses { + if let Some(label) = t.name.strip_prefix("extract:") { + // label is "UUID.jsonl.N" — strip last ".N" to get filename + let filename = match label.rfind('.') { + Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => { + &label[..pos] + } + _ => label, + }; + match t.status { + TaskStatus::Failed => { + let entry = retry_backoff.entry(filename.to_string()) + .or_insert((std::time::Instant::now(), BACKOFF_INITIAL)); + entry.1 = (entry.1 * 2).min(BACKOFF_MAX); + entry.0 = std::time::Instant::now() + entry.1; + } + TaskStatus::Completed => { + retry_backoff.remove(filename); + } + _ => {} + } + } + } + // What's currently running/pending? (avoid spawning duplicates) - let active: HashSet = choir_sw.task_statuses().iter() + let active: HashSet = task_statuses.iter() .filter(|t| !t.status.is_finished()) .map(|t| t.name.clone()) .collect(); @@ -327,9 +363,6 @@ pub fn run_daemon() -> Result<(), String> { // Load mined transcript keys once for this tick let mined = super::enrich::mined_transcript_keys(); - // Limit new tasks per tick — the resource pool gates execution, - // but we don't need thousands of task objects in the registry. - // The watcher ticks every 60s so backlog drains steadily. const MAX_NEW_PER_TICK: usize = 10; // Load fact-mined keys too @@ -352,74 +385,91 @@ pub fn run_daemon() -> Result<(), String> { let mut fact_remaining = 0; let mut already_mined = 0; let mut still_open = 0; + let mut backed_off = 0; let total_stale = stale.len(); - // Multi-segment files where all segments are done — write whole-file key - let mut mark_transcript_done: Vec<(String, String, usize)> = Vec::new(); - // Classify each session — now segment-aware - // Each entry: (filename, path_str, segment_index) + // Sessions with old whole-file keys that need per-segment migration + let mut migrate_keys: Vec<(String, String, usize)> = Vec::new(); + let mut needs_extract: Vec<(String, String, Option)> = Vec::new(); let mut needs_fact: Vec<(String, String)> = Vec::new(); + let now = std::time::Instant::now(); + for session in stale { let filename = session.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); let path_str = session.to_string_lossy().to_string(); - // Check for old-style whole-file mined key - let experience_done = super::enrich::is_transcript_mined_with_keys(&mined, &path_str); - - if !experience_done { - if is_file_open(&session) { - still_open += 1; + // Check retry backoff before doing any work + if let Some((next_retry, _)) = retry_backoff.get(&filename) { + if now < *next_retry { + backed_off += 1; continue; } + } - // Get segment count, using cache to avoid re-parsing large files - let seg_count = if let Some(&cached) = seg_cache.get(&path_str) { - cached + if is_file_open(&session) { + still_open += 1; + continue; + } + + // Get file size for cache invalidation + let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0); + + // Get segment count, using cache with size-based invalidation + let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) { + if cached_size == file_size { + cached_count } else { + // File changed — re-parse let messages = match super::enrich::extract_conversation(&path_str) { Ok(m) => m, Err(_) => continue, }; let count = super::enrich::split_on_compaction(messages).len(); - seg_cache.insert(path_str.clone(), count); + seg_cache.insert(path_str.clone(), (file_size, count)); count - }; - - if seg_count <= 1 { - let task_name = format!("extract:{}", filename); - if !active.contains(&task_name) { - needs_extract.push((filename, path_str, None)); - } - } else { - // Multi-segment — find unmined segments - let fname_key = super::enrich::transcript_filename_key(&path_str); - let mut unmined = 0; - for i in 0..seg_count { - let seg_key = format!("{}.{}", fname_key, i); - if mined.contains(&seg_key) { continue; } - unmined += 1; - let task_name = format!("extract:{}.{}", filename, i); - if active.contains(&task_name) { continue; } - needs_extract.push(( - format!("{}.{}", filename, i), - path_str.clone(), - Some(i), - )); - } - if unmined == 0 { - // All segments done — write whole-file key so we skip next tick - mark_transcript_done.push((fname_key, path_str.clone(), seg_count)); - already_mined += 1; - } } } else { + let messages = match super::enrich::extract_conversation(&path_str) { + Ok(m) => m, + Err(_) => continue, + }; + let count = super::enrich::split_on_compaction(messages).len(); + seg_cache.insert(path_str.clone(), (file_size, count)); + count + }; + + let fname_key = super::enrich::transcript_filename_key(&path_str); + let has_whole_file_key = mined.contains(&fname_key); + + // Check per-segment keys, find unmined segments + let mut unmined_segs: Vec = Vec::new(); + let mut has_any_seg_key = false; + for i in 0..seg_count { + let seg_key = format!("{}.{}", fname_key, i); + if mined.contains(&seg_key) { + has_any_seg_key = true; + } else { + unmined_segs.push(i); + } + } + + // Migrate old whole-file key: if it exists but no per-segment keys, + // write per-segment keys for all current segments (they were mined + // under the old scheme) + if has_whole_file_key && !has_any_seg_key { + migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count)); + // After migration, all current segments are covered + unmined_segs.clear(); + } + + if unmined_segs.is_empty() { + // All segments mined — check fact-mining let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); - let fact_done = fact_keys.contains(&fact_key); - if !fact_done { + if !fact_keys.contains(&fact_key) { let task_name = format!("fact-mine:{}", filename); if !active.contains(&task_name) { needs_fact.push((filename, path_str)); @@ -427,6 +477,35 @@ pub fn run_daemon() -> Result<(), String> { } else { already_mined += 1; } + } else { + // Queue unmined segments + for i in unmined_segs { + let task_name = format!("extract:{}.{}", filename, i); + if active.contains(&task_name) { continue; } + needs_extract.push(( + format!("{}.{}", filename, i), + path_str.clone(), + Some(i), + )); + } + } + } + + // Migrate old whole-file keys to per-segment keys + if !migrate_keys.is_empty() { + if let Ok(mut store) = crate::store::Store::load() { + for (fname_key, path_str, seg_count) in &migrate_keys { + for i in 0..*seg_count { + let seg_key = format!("{}.{}", fname_key, i); + let content = format!("Migrated from whole-file key for {}", path_str); + let mut node = crate::store::new_node(&seg_key, &content); + node.provenance = crate::store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(node); + } + } + let _ = store.save(); + log_event("session-watcher", "migrated", + &format!("{} whole-file keys → per-segment", migrate_keys.len())); } } @@ -450,7 +529,6 @@ pub fn run_daemon() -> Result<(), String> { } // Only queue fact-mine when experience backlog is clear - // Sort by file size so small transcripts drain first needs_fact.sort_by_key(|(_, path_str)| { fs::metadata(path_str).map(|m| m.len()).unwrap_or(u64::MAX) }); @@ -477,30 +555,17 @@ pub fn run_daemon() -> Result<(), String> { fact_remaining = needs_fact.len(); } - // Write whole-file keys for fully-mined multi-segment files - if !mark_transcript_done.is_empty() { - if let Ok(mut store) = crate::store::Store::load() { - for (fname_key, path_str, seg_count) in &mark_transcript_done { - let content = format!("All {} segments mined for {}", seg_count, path_str); - let mut node = crate::store::new_node(fname_key, &content); - node.provenance = crate::store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(node); - seg_cache.remove(path_str); - } - let _ = store.save(); - } - } - let extract_pending = extract_queued + extract_remaining; let fact_pending = fact_queued + fact_remaining; - if extract_pending > 0 || fact_pending > 0 || still_open > 0 { + if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 { log_event("session-watcher", "tick", - &format!("{} stale, {} mined, {} extract, {} fact, {} open", - total_stale, already_mined, extract_pending, fact_pending, still_open)); + &format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff", + total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off)); let mut parts = Vec::new(); if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); } if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); } if still_open > 0 { parts.push(format!("{} open", still_open)); } + if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); } ctx.set_progress(parts.join(", ")); } else { ctx.set_progress("idle"); diff --git a/poc-memory/src/agents/enrich.rs b/poc-memory/src/agents/enrich.rs index f6a2911..b178dfa 100644 --- a/poc-memory/src/agents/enrich.rs +++ b/poc-memory/src/agents/enrich.rs @@ -61,13 +61,6 @@ pub fn mined_transcript_keys() -> HashSet { keys } -/// Check if a transcript has been mined, given a pre-loaded set of mined keys. -/// Checks filename-based key only (no file read). Sessions mined before the -/// filename key was added will pass through and short-circuit in experience_mine -/// via the content hash check — a one-time cost on first restart after this change. -pub fn is_transcript_mined_with_keys(mined: &HashSet, path: &str) -> bool { - mined.contains(&transcript_filename_key(path)) -} /// Extract user/assistant messages with line numbers from a JSONL transcript. /// (line_number, role, text, timestamp) @@ -243,13 +236,15 @@ pub fn experience_mine( let dedup_key = format!("_mined-transcripts#h-{:016x}", hash); if store.nodes.contains_key(&dedup_key) { - // Backfill filename key if missing (transcripts mined before this key existed) - let fname_key = transcript_filename_key(jsonl_path); - if !store.nodes.contains_key(&fname_key) { - let mut node = new_node(&fname_key, &format!("Backfilled from {}", dedup_key)); - node.provenance = store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(node); - store.save()?; + // Backfill per-segment key if called with a specific segment + if let Some(idx) = segment { + let seg_key = format!("{}.{}", transcript_filename_key(jsonl_path), idx); + if !store.nodes.contains_key(&seg_key) { + let mut node = new_node(&seg_key, &format!("Backfilled from {}", dedup_key)); + node.provenance = store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(node); + store.save()?; + } } println!(" Already mined this transcript ({}), skipping.", &dedup_key[24..]); return Ok(0); @@ -370,20 +365,23 @@ pub fn experience_mine( } // Record this transcript/segment as mined (even if count == 0, to prevent re-runs) - let fname_key = match segment { - Some(idx) => format!("{}.{}", transcript_filename_key(jsonl_path), idx), - None => transcript_filename_key(jsonl_path), - }; let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count); - let mut fname_node = new_node(&fname_key, &dedup_content); - fname_node.provenance = store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(fname_node); - - // For unsegmented calls, also write the content-hash key for backwards compat - if segment.is_none() { - let mut dedup_node = new_node(&dedup_key, &dedup_content); - dedup_node.provenance = store::Provenance::AgentExperienceMine; - let _ = store.upsert_node(dedup_node); + match segment { + Some(idx) => { + // Per-segment key: the daemon writes the whole-file key when all segments are done + let seg_key = format!("{}.{}", transcript_filename_key(jsonl_path), idx); + let mut node = new_node(&seg_key, &dedup_content); + node.provenance = store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(node); + } + None => { + // Unsegmented: only write content-hash key (not the filename key, since the + // file may grow with new compaction segments later — the daemon handles + // writing the whole-file filename key after verifying all segments are done) + let mut node = new_node(&dedup_key, &dedup_content); + node.provenance = store::Provenance::AgentExperienceMine; + let _ = store.upsert_node(node); + } } if count > 0 {