From 23cd80a0c3c04ed40baa63bed01fb76607878b6b Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 16 Mar 2026 17:44:20 -0400 Subject: [PATCH] observation: use transcript progress log, mark after success MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire select_conversation_fragments to use store.is_segment_mined() instead of scanning _observed-transcripts stub nodes. Segments are now marked AFTER the agent succeeds (via mark_observation_done), not before — so failed runs don't lose segments. Fragment IDs flow through the Resolved.keys → AgentBatch.node_keys path so run_and_apply_with_log can mark them post-success. Co-Authored-By: Claude Opus 4.6 (1M context) --- poc-memory/src/agents/defs.rs | 5 +- poc-memory/src/agents/knowledge.rs | 85 +++++++++++++++++------------- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/poc-memory/src/agents/defs.rs b/poc-memory/src/agents/defs.rs index 6dc1316..6bcab3d 100644 --- a/poc-memory/src/agents/defs.rs +++ b/poc-memory/src/agents/defs.rs @@ -226,11 +226,14 @@ fn resolve( "conversations" => { let fragments = super::knowledge::select_conversation_fragments(count); + let fragment_ids: Vec = fragments.iter() + .map(|(id, _)| id.clone()) + .collect(); let text = fragments.iter() .map(|(id, text)| format!("### Session {}\n\n{}", id, text)) .collect::>() .join("\n\n---\n\n"); - Some(Resolved { text, keys: vec![] }) + Some(Resolved { text, keys: fragment_ids }) } "siblings" | "neighborhood" => { diff --git a/poc-memory/src/agents/knowledge.rs b/poc-memory/src/agents/knowledge.rs index baeb8b4..3ac389d 100644 --- a/poc-memory/src/agents/knowledge.rs +++ b/poc-memory/src/agents/knowledge.rs @@ -634,6 +634,11 @@ pub fn run_and_apply_with_log( log(&format!("skipped: {}", desc)); } } + // Mark conversation segments as mined after successful processing + if agent_name == "observation" { + mark_observation_done(&result.node_keys); + } + Ok((actions.len(), applied)) } @@ -699,16 +704,17 @@ pub fn run_one_agent( // Conversation fragment selection // --------------------------------------------------------------------------- -/// Extract human-readable dialogue from a conversation JSONL -const OBSERVED_PREFIX: &str = "_observed-transcripts"; - /// Select conversation fragments (per-segment) for the observation extractor. -/// Skips segments already processed, marks selected segments as observed. +/// Uses the transcript-progress.capnp log for dedup — no stub nodes. +/// Does NOT pre-mark segments; caller must call mark_observation_done() after success. pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { let projects = crate::config::get().projects_dir.clone(); if !projects.exists() { return Vec::new(); } - let observed = super::enrich::keys_with_prefix(&format!("{}-", OBSERVED_PREFIX)); + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return Vec::new(), + }; let mut jsonl_files: Vec = Vec::new(); if let Ok(dirs) = fs::read_dir(&projects) { @@ -729,43 +735,50 @@ pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { } } - // Collect unmined segments across all transcripts, keeping the text - let mut candidates: Vec<(PathBuf, usize, String, String)> = Vec::new(); + // Collect unmined segments across all transcripts + let mut candidates: Vec<(String, String)> = Vec::new(); for path in &jsonl_files { - for (seg_idx, messages) in super::enrich::unmined_segments(path, OBSERVED_PREFIX, &observed) { - let text = format_segment(&messages, 8000); + let path_str = path.to_string_lossy(); + let messages = match super::enrich::extract_conversation(&path_str) { + Ok(m) => m, + Err(_) => continue, + }; + let session_id = path.file_stem() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| "unknown".into()); + + let segments = super::enrich::split_on_compaction(messages); + for (seg_idx, segment) in segments.into_iter().enumerate() { + if store.is_segment_mined(&session_id, seg_idx as u32, "observation") { + continue; + } + let text = format_segment(&segment, 8000); if text.len() > 500 { - let session_id = path.file_stem() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".into()); let id = format!("{}.{}", session_id, seg_idx); - candidates.push((path.clone(), seg_idx, id, text)); + candidates.push((id, text)); + } + } + + if candidates.len() >= n { break; } + } + + candidates.truncate(n); + candidates +} + +/// Mark observation segments as successfully mined (call AFTER the agent succeeds). +pub fn mark_observation_done(fragment_ids: &[String]) { + let mut store = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + for id in fragment_ids { + if let Some((session_id, seg_str)) = id.rsplit_once('.') { + if let Ok(seg) = seg_str.parse::() { + let _ = store.mark_segment_mined(session_id, seg, "observation"); } } } - - // Take up to n, mark them, and return the text - let selected: Vec<_> = candidates.into_iter().take(n).collect(); - - if !selected.is_empty() { - if let Ok(mut store) = crate::store::Store::load() { - for (path, seg_idx, _, _) in &selected { - super::enrich::mark_segment( - &mut store, - &path.to_string_lossy(), - OBSERVED_PREFIX, - *seg_idx, - "agent:knowledge-observation", - "observed", - ); - } - let _ = store.save(); - } - } - - selected.into_iter() - .map(|(_, _, id, text)| (id, text)) - .collect() } /// Format a segment's messages into readable text for the observation agent.