diff --git a/poc-memory/src/agents/enrich.rs b/poc-memory/src/agents/enrich.rs index b8f39ca..9aa3d1a 100644 --- a/poc-memory/src/agents/enrich.rs +++ b/poc-memory/src/agents/enrich.rs @@ -40,25 +40,78 @@ pub fn is_transcript_mined(store: &impl StoreView, path: &str) -> bool { /// Dedup key for a transcript based on its filename (UUID). /// Used by the daemon reconcile loop — no file reads needed. pub fn transcript_filename_key(path: &str) -> String { + transcript_key("_mined-transcripts", path) +} + +/// Build a namespaced transcript key from a prefix and path. +pub fn transcript_key(prefix: &str, path: &str) -> String { let filename = std::path::Path::new(path) .file_stem() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| path.to_string()); - format!("_mined-transcripts#f-{}", filename) + format!("{}#f-{}", prefix, filename) +} + +/// Per-segment key: `{base_key}.{segment_index}` +pub fn segment_key(base: &str, segment: usize) -> String { + format!("{}.{}", base, segment) +} + +/// Load all keys with a given prefix from the store. +pub fn keys_with_prefix(prefix: &str) -> HashSet { + use crate::store::AnyView; + let Ok(view) = AnyView::load() else { return HashSet::new() }; + let mut keys = HashSet::new(); + view.for_each_node(|key, _, _| { + if key.starts_with(prefix) { + keys.insert(key.to_string()); + } + }); + keys +} + +/// Find unmined segments for a transcript file against a set of known keys. +/// Returns segment indices that haven't been processed yet. +pub fn unmined_segments( + path: &std::path::Path, + prefix: &str, + known: &HashSet, +) -> Vec<(usize, Vec<(usize, String, String, String)>)> { + let path_str = path.to_string_lossy(); + let base = transcript_key(prefix, &path_str); + + let messages = match extract_conversation(&path_str) { + Ok(m) => m, + Err(_) => return Vec::new(), + }; + let segments = split_on_compaction(messages); + + segments.into_iter() + .enumerate() + .filter(|(i, _)| !known.contains(&segment_key(&base, *i))) + .collect() +} + +/// Mark a segment as processed in the store. +pub fn mark_segment( + store: &mut Store, + path: &str, + prefix: &str, + segment: usize, + provenance: &str, + content: &str, +) { + let base = transcript_key(prefix, path); + let key = segment_key(&base, segment); + let mut node = new_node(&key, content); + node.provenance = provenance.to_string(); + let _ = store.upsert_node(node); } /// Get the set of all mined transcript keys (both content-hash and filename) /// from the store. Load once per daemon tick, check many. pub fn mined_transcript_keys() -> HashSet { - use crate::store::AnyView; - let Ok(view) = AnyView::load() else { return HashSet::new() }; - let mut keys = HashSet::new(); - view.for_each_node(|key, _, _| { - if key.starts_with("_mined-transcripts#") { - keys.insert(key.to_string()); - } - }); - keys + keys_with_prefix("_mined-transcripts#") } diff --git a/poc-memory/src/agents/knowledge.rs b/poc-memory/src/agents/knowledge.rs index c13d081..f5f5012 100644 --- a/poc-memory/src/agents/knowledge.rs +++ b/poc-memory/src/agents/knowledge.rs @@ -611,48 +611,16 @@ pub fn run_one_agent( // --------------------------------------------------------------------------- /// Extract human-readable dialogue from a conversation JSONL -fn extract_conversation_text(path: &Path, max_chars: usize) -> String { - let cfg = crate::config::get(); - let messages = super::transcript::parse_transcript(path).unwrap_or_default(); - let mut fragments = Vec::new(); - let mut total = 0; +const OBSERVED_PREFIX: &str = "_observed-transcripts"; - for msg in &messages { - let min_len = if msg.role == "user" { 5 } else { 10 }; - if msg.text.len() <= min_len { continue; } - - // Only include external user messages - if msg.role == "user" { - if msg.user_type.as_deref() != Some("external") { continue; } - if msg.text.starts_with("[Request interrupted") { continue; } - } - - let role = if msg.role == "user" { &cfg.user_name } else { &cfg.assistant_name }; - fragments.push(format!("**{}:** {}", role, msg.text)); - total += msg.text.len(); - if total > max_chars { break; } - } - fragments.join("\n\n") -} - -/// Count short user messages (dialogue turns) in a JSONL -fn count_dialogue_turns(path: &Path) -> usize { - let messages = super::transcript::parse_transcript(path).unwrap_or_default(); - messages.iter() - .filter(|m| m.role == "user" - && m.user_type.as_deref() == Some("external") - && m.text.len() > 5 - && m.text.len() < 500 - && !m.text.starts_with("[Request interrupted") - && !m.text.starts_with("Implement the following")) - .count() -} - -/// Select conversation fragments for the observation extractor +/// Select conversation fragments (per-segment) for the observation extractor. +/// Skips segments already processed, marks selected segments as observed. pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { let projects = crate::config::get().projects_dir.clone(); if !projects.exists() { return Vec::new(); } + let observed = super::enrich::keys_with_prefix(&format!("{}#", OBSERVED_PREFIX)); + let mut jsonl_files: Vec = Vec::new(); if let Ok(dirs) = fs::read_dir(&projects) { for dir in dirs.filter_map(|e| e.ok()) { @@ -672,24 +640,61 @@ pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { } } - let mut scored: Vec<(usize, PathBuf)> = jsonl_files.into_iter() - .map(|f| (count_dialogue_turns(&f), f)) - .filter(|(turns, _)| *turns >= 10) - .collect(); - scored.sort_by(|a, b| b.0.cmp(&a.0)); - - let mut fragments = Vec::new(); - for (_, f) in scored.iter().take(n * 2) { - let session_id = f.file_stem() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".into()); - let text = extract_conversation_text(f, 8000); - if text.len() > 500 { - fragments.push((session_id, text)); + // Collect unmined segments across all transcripts, keeping the text + let mut candidates: Vec<(PathBuf, usize, String, String)> = Vec::new(); + for path in &jsonl_files { + for (seg_idx, messages) in super::enrich::unmined_segments(path, OBSERVED_PREFIX, &observed) { + let text = format_segment(&messages, 8000); + if text.len() > 500 { + let session_id = path.file_stem() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| "unknown".into()); + let id = format!("{}.{}", session_id, seg_idx); + candidates.push((path.clone(), seg_idx, id, text)); + } } - if fragments.len() >= n { break; } } - fragments + + // Take up to n, mark them, and return the text + let selected: Vec<_> = candidates.into_iter().take(n).collect(); + + if !selected.is_empty() { + if let Ok(mut store) = crate::store::Store::load() { + for (path, seg_idx, _, _) in &selected { + super::enrich::mark_segment( + &mut store, + &path.to_string_lossy(), + OBSERVED_PREFIX, + *seg_idx, + "agent:knowledge-observation", + "observed", + ); + } + let _ = store.save(); + } + } + + selected.into_iter() + .map(|(_, _, id, text)| (id, text)) + .collect() +} + +/// Format a segment's messages into readable text for the observation agent. +fn format_segment(messages: &[(usize, String, String, String)], max_chars: usize) -> String { + let cfg = crate::config::get(); + let mut fragments = Vec::new(); + let mut total = 0; + + for (_, role, text, _) in messages { + let min_len = if role == "user" { 5 } else { 10 }; + if text.len() <= min_len { continue; } + + let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name }; + fragments.push(format!("**{}:** {}", name, text)); + total += text.len(); + if total > max_chars { break; } + } + fragments.join("\n\n") } // ---------------------------------------------------------------------------