observation extractor: per-segment dedup using shared transcript helpers

The observation agent was re-extracting the same conversations every
consolidation run because select_conversation_fragments had no tracking
of what had already been processed.

Extract shared helpers from the fact miner's dedup pattern:
  - transcript_key(prefix, path): namespaced key from prefix + filename
  - segment_key(base, idx): per-segment key
  - keys_with_prefix(prefix): bulk lookup from store
  - unmined_segments(path, prefix, known): find unprocessed segments
  - mark_segment(...): mark a segment as processed

Rewrite select_conversation_fragments to use these with
_observed-transcripts prefix. Each compaction segment within a
transcript is now tracked independently — new segments from ongoing
sessions get picked up, already-processed segments are skipped.
This commit is contained in:
ProofOfConcept 2026-03-12 18:03:26 -04:00 committed by Kent Overstreet
parent 9d1d690f17
commit 10499a98ea
2 changed files with 121 additions and 63 deletions

View file

@ -40,25 +40,78 @@ pub fn is_transcript_mined(store: &impl StoreView, path: &str) -> bool {
/// Dedup key for a transcript based on its filename (UUID). /// Dedup key for a transcript based on its filename (UUID).
/// Used by the daemon reconcile loop — no file reads needed. /// Used by the daemon reconcile loop — no file reads needed.
pub fn transcript_filename_key(path: &str) -> String { pub fn transcript_filename_key(path: &str) -> String {
transcript_key("_mined-transcripts", path)
}
/// Build a namespaced transcript key from a prefix and path.
pub fn transcript_key(prefix: &str, path: &str) -> String {
let filename = std::path::Path::new(path) let filename = std::path::Path::new(path)
.file_stem() .file_stem()
.map(|s| s.to_string_lossy().to_string()) .map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| path.to_string()); .unwrap_or_else(|| path.to_string());
format!("_mined-transcripts#f-{}", filename) format!("{}#f-{}", prefix, filename)
}
/// Per-segment key: `{base_key}.{segment_index}`
pub fn segment_key(base: &str, segment: usize) -> String {
format!("{}.{}", base, segment)
}
/// Load all keys with a given prefix from the store.
pub fn keys_with_prefix(prefix: &str) -> HashSet<String> {
use crate::store::AnyView;
let Ok(view) = AnyView::load() else { return HashSet::new() };
let mut keys = HashSet::new();
view.for_each_node(|key, _, _| {
if key.starts_with(prefix) {
keys.insert(key.to_string());
}
});
keys
}
/// Find unmined segments for a transcript file against a set of known keys.
/// Returns segment indices that haven't been processed yet.
pub fn unmined_segments(
path: &std::path::Path,
prefix: &str,
known: &HashSet<String>,
) -> Vec<(usize, Vec<(usize, String, String, String)>)> {
let path_str = path.to_string_lossy();
let base = transcript_key(prefix, &path_str);
let messages = match extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => return Vec::new(),
};
let segments = split_on_compaction(messages);
segments.into_iter()
.enumerate()
.filter(|(i, _)| !known.contains(&segment_key(&base, *i)))
.collect()
}
/// Mark a segment as processed in the store.
pub fn mark_segment(
store: &mut Store,
path: &str,
prefix: &str,
segment: usize,
provenance: &str,
content: &str,
) {
let base = transcript_key(prefix, path);
let key = segment_key(&base, segment);
let mut node = new_node(&key, content);
node.provenance = provenance.to_string();
let _ = store.upsert_node(node);
} }
/// Get the set of all mined transcript keys (both content-hash and filename) /// Get the set of all mined transcript keys (both content-hash and filename)
/// from the store. Load once per daemon tick, check many. /// from the store. Load once per daemon tick, check many.
pub fn mined_transcript_keys() -> HashSet<String> { pub fn mined_transcript_keys() -> HashSet<String> {
use crate::store::AnyView; keys_with_prefix("_mined-transcripts#")
let Ok(view) = AnyView::load() else { return HashSet::new() };
let mut keys = HashSet::new();
view.for_each_node(|key, _, _| {
if key.starts_with("_mined-transcripts#") {
keys.insert(key.to_string());
}
});
keys
} }

View file

@ -611,48 +611,16 @@ pub fn run_one_agent(
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Extract human-readable dialogue from a conversation JSONL /// Extract human-readable dialogue from a conversation JSONL
fn extract_conversation_text(path: &Path, max_chars: usize) -> String { const OBSERVED_PREFIX: &str = "_observed-transcripts";
let cfg = crate::config::get();
let messages = super::transcript::parse_transcript(path).unwrap_or_default();
let mut fragments = Vec::new();
let mut total = 0;
for msg in &messages { /// Select conversation fragments (per-segment) for the observation extractor.
let min_len = if msg.role == "user" { 5 } else { 10 }; /// Skips segments already processed, marks selected segments as observed.
if msg.text.len() <= min_len { continue; }
// Only include external user messages
if msg.role == "user" {
if msg.user_type.as_deref() != Some("external") { continue; }
if msg.text.starts_with("[Request interrupted") { continue; }
}
let role = if msg.role == "user" { &cfg.user_name } else { &cfg.assistant_name };
fragments.push(format!("**{}:** {}", role, msg.text));
total += msg.text.len();
if total > max_chars { break; }
}
fragments.join("\n\n")
}
/// Count short user messages (dialogue turns) in a JSONL
fn count_dialogue_turns(path: &Path) -> usize {
let messages = super::transcript::parse_transcript(path).unwrap_or_default();
messages.iter()
.filter(|m| m.role == "user"
&& m.user_type.as_deref() == Some("external")
&& m.text.len() > 5
&& m.text.len() < 500
&& !m.text.starts_with("[Request interrupted")
&& !m.text.starts_with("Implement the following"))
.count()
}
/// Select conversation fragments for the observation extractor
pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
let projects = crate::config::get().projects_dir.clone(); let projects = crate::config::get().projects_dir.clone();
if !projects.exists() { return Vec::new(); } if !projects.exists() { return Vec::new(); }
let observed = super::enrich::keys_with_prefix(&format!("{}#", OBSERVED_PREFIX));
let mut jsonl_files: Vec<PathBuf> = Vec::new(); let mut jsonl_files: Vec<PathBuf> = Vec::new();
if let Ok(dirs) = fs::read_dir(&projects) { if let Ok(dirs) = fs::read_dir(&projects) {
for dir in dirs.filter_map(|e| e.ok()) { for dir in dirs.filter_map(|e| e.ok()) {
@ -672,24 +640,61 @@ pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
} }
} }
let mut scored: Vec<(usize, PathBuf)> = jsonl_files.into_iter() // Collect unmined segments across all transcripts, keeping the text
.map(|f| (count_dialogue_turns(&f), f)) let mut candidates: Vec<(PathBuf, usize, String, String)> = Vec::new();
.filter(|(turns, _)| *turns >= 10) for path in &jsonl_files {
.collect(); for (seg_idx, messages) in super::enrich::unmined_segments(path, OBSERVED_PREFIX, &observed) {
scored.sort_by(|a, b| b.0.cmp(&a.0)); let text = format_segment(&messages, 8000);
if text.len() > 500 {
let mut fragments = Vec::new(); let session_id = path.file_stem()
for (_, f) in scored.iter().take(n * 2) { .map(|s| s.to_string_lossy().to_string())
let session_id = f.file_stem() .unwrap_or_else(|| "unknown".into());
.map(|s| s.to_string_lossy().to_string()) let id = format!("{}.{}", session_id, seg_idx);
.unwrap_or_else(|| "unknown".into()); candidates.push((path.clone(), seg_idx, id, text));
let text = extract_conversation_text(f, 8000); }
if text.len() > 500 {
fragments.push((session_id, text));
} }
if fragments.len() >= n { break; }
} }
fragments
// Take up to n, mark them, and return the text
let selected: Vec<_> = candidates.into_iter().take(n).collect();
if !selected.is_empty() {
if let Ok(mut store) = crate::store::Store::load() {
for (path, seg_idx, _, _) in &selected {
super::enrich::mark_segment(
&mut store,
&path.to_string_lossy(),
OBSERVED_PREFIX,
*seg_idx,
"agent:knowledge-observation",
"observed",
);
}
let _ = store.save();
}
}
selected.into_iter()
.map(|(_, _, id, text)| (id, text))
.collect()
}
/// Format a segment's messages into readable text for the observation agent.
fn format_segment(messages: &[(usize, String, String, String)], max_chars: usize) -> String {
let cfg = crate::config::get();
let mut fragments = Vec::new();
let mut total = 0;
for (_, role, text, _) in messages {
let min_len = if role == "user" { 5 } else { 10 };
if text.len() <= min_len { continue; }
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
fragments.push(format!("**{}:** {}", name, text));
total += text.len();
if total > max_chars { break; }
}
fragments.join("\n\n")
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------