observation: use transcript progress log, mark after success
Wire select_conversation_fragments to use store.is_segment_mined() instead of scanning _observed-transcripts stub nodes. Segments are now marked AFTER the agent succeeds (via mark_observation_done), not before — so failed runs don't lose segments. Fragment IDs flow through the Resolved.keys → AgentBatch.node_keys path so run_and_apply_with_log can mark them post-success. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
835b392b7a
commit
23cd80a0c3
2 changed files with 53 additions and 37 deletions
|
|
@ -226,11 +226,14 @@ fn resolve(
|
||||||
|
|
||||||
"conversations" => {
|
"conversations" => {
|
||||||
let fragments = super::knowledge::select_conversation_fragments(count);
|
let fragments = super::knowledge::select_conversation_fragments(count);
|
||||||
|
let fragment_ids: Vec<String> = fragments.iter()
|
||||||
|
.map(|(id, _)| id.clone())
|
||||||
|
.collect();
|
||||||
let text = fragments.iter()
|
let text = fragments.iter()
|
||||||
.map(|(id, text)| format!("### Session {}\n\n{}", id, text))
|
.map(|(id, text)| format!("### Session {}\n\n{}", id, text))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join("\n\n---\n\n");
|
.join("\n\n---\n\n");
|
||||||
Some(Resolved { text, keys: vec![] })
|
Some(Resolved { text, keys: fragment_ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
"siblings" | "neighborhood" => {
|
"siblings" | "neighborhood" => {
|
||||||
|
|
|
||||||
|
|
@ -634,6 +634,11 @@ pub fn run_and_apply_with_log(
|
||||||
log(&format!("skipped: {}", desc));
|
log(&format!("skipped: {}", desc));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Mark conversation segments as mined after successful processing
|
||||||
|
if agent_name == "observation" {
|
||||||
|
mark_observation_done(&result.node_keys);
|
||||||
|
}
|
||||||
|
|
||||||
Ok((actions.len(), applied))
|
Ok((actions.len(), applied))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -699,16 +704,17 @@ pub fn run_one_agent(
|
||||||
// Conversation fragment selection
|
// Conversation fragment selection
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
/// Extract human-readable dialogue from a conversation JSONL
|
|
||||||
const OBSERVED_PREFIX: &str = "_observed-transcripts";
|
|
||||||
|
|
||||||
/// Select conversation fragments (per-segment) for the observation extractor.
|
/// Select conversation fragments (per-segment) for the observation extractor.
|
||||||
/// Skips segments already processed, marks selected segments as observed.
|
/// Uses the transcript-progress.capnp log for dedup — no stub nodes.
|
||||||
|
/// Does NOT pre-mark segments; caller must call mark_observation_done() after success.
|
||||||
pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
|
pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
|
||||||
let projects = crate::config::get().projects_dir.clone();
|
let projects = crate::config::get().projects_dir.clone();
|
||||||
if !projects.exists() { return Vec::new(); }
|
if !projects.exists() { return Vec::new(); }
|
||||||
|
|
||||||
let observed = super::enrich::keys_with_prefix(&format!("{}-", OBSERVED_PREFIX));
|
let store = match crate::store::Store::load() {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(_) => return Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
let mut jsonl_files: Vec<PathBuf> = Vec::new();
|
let mut jsonl_files: Vec<PathBuf> = Vec::new();
|
||||||
if let Ok(dirs) = fs::read_dir(&projects) {
|
if let Ok(dirs) = fs::read_dir(&projects) {
|
||||||
|
|
@ -729,43 +735,50 @@ pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect unmined segments across all transcripts, keeping the text
|
// Collect unmined segments across all transcripts
|
||||||
let mut candidates: Vec<(PathBuf, usize, String, String)> = Vec::new();
|
let mut candidates: Vec<(String, String)> = Vec::new();
|
||||||
for path in &jsonl_files {
|
for path in &jsonl_files {
|
||||||
for (seg_idx, messages) in super::enrich::unmined_segments(path, OBSERVED_PREFIX, &observed) {
|
let path_str = path.to_string_lossy();
|
||||||
let text = format_segment(&messages, 8000);
|
let messages = match super::enrich::extract_conversation(&path_str) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
let session_id = path.file_stem()
|
||||||
|
.map(|s| s.to_string_lossy().to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".into());
|
||||||
|
|
||||||
|
let segments = super::enrich::split_on_compaction(messages);
|
||||||
|
for (seg_idx, segment) in segments.into_iter().enumerate() {
|
||||||
|
if store.is_segment_mined(&session_id, seg_idx as u32, "observation") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let text = format_segment(&segment, 8000);
|
||||||
if text.len() > 500 {
|
if text.len() > 500 {
|
||||||
let session_id = path.file_stem()
|
|
||||||
.map(|s| s.to_string_lossy().to_string())
|
|
||||||
.unwrap_or_else(|| "unknown".into());
|
|
||||||
let id = format!("{}.{}", session_id, seg_idx);
|
let id = format!("{}.{}", session_id, seg_idx);
|
||||||
candidates.push((path.clone(), seg_idx, id, text));
|
candidates.push((id, text));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if candidates.len() >= n { break; }
|
||||||
|
}
|
||||||
|
|
||||||
|
candidates.truncate(n);
|
||||||
|
candidates
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark observation segments as successfully mined (call AFTER the agent succeeds).
|
||||||
|
pub fn mark_observation_done(fragment_ids: &[String]) {
|
||||||
|
let mut store = match crate::store::Store::load() {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(_) => return,
|
||||||
|
};
|
||||||
|
for id in fragment_ids {
|
||||||
|
if let Some((session_id, seg_str)) = id.rsplit_once('.') {
|
||||||
|
if let Ok(seg) = seg_str.parse::<u32>() {
|
||||||
|
let _ = store.mark_segment_mined(session_id, seg, "observation");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take up to n, mark them, and return the text
|
|
||||||
let selected: Vec<_> = candidates.into_iter().take(n).collect();
|
|
||||||
|
|
||||||
if !selected.is_empty() {
|
|
||||||
if let Ok(mut store) = crate::store::Store::load() {
|
|
||||||
for (path, seg_idx, _, _) in &selected {
|
|
||||||
super::enrich::mark_segment(
|
|
||||||
&mut store,
|
|
||||||
&path.to_string_lossy(),
|
|
||||||
OBSERVED_PREFIX,
|
|
||||||
*seg_idx,
|
|
||||||
"agent:knowledge-observation",
|
|
||||||
"observed",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let _ = store.save();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
selected.into_iter()
|
|
||||||
.map(|(_, _, id, text)| (id, text))
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Format a segment's messages into readable text for the observation agent.
|
/// Format a segment's messages into readable text for the observation agent.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue