fact-mine: progress callbacks, size-sorted queue, fix empty re-queue
Add optional progress callback to mine_transcript/mine_and_store so the daemon can display per-chunk status. Sort fact-mine queue by file size so small transcripts drain first. Write empty marker for transcripts with no facts to avoid re-queuing them. Also hardens the extraction prompt suffix.
This commit is contained in:
parent
63910e987c
commit
2aabad4eda
2 changed files with 46 additions and 23 deletions
|
|
@ -114,7 +114,8 @@ fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> {
|
||||||
run_job(ctx, &format!("fact-mine {}", path), || {
|
run_job(ctx, &format!("fact-mine {}", path), || {
|
||||||
ctx.log_line("mining facts");
|
ctx.log_line("mining facts");
|
||||||
let p = std::path::Path::new(&path);
|
let p = std::path::Path::new(&path);
|
||||||
let count = crate::fact_mine::mine_and_store(p)?;
|
let progress = |msg: &str| { ctx.set_progress(msg); };
|
||||||
|
let count = crate::fact_mine::mine_and_store(p, Some(&progress))?;
|
||||||
ctx.log_line(&format!("{} facts stored", count));
|
ctx.log_line(&format!("{} facts stored", count));
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
|
@ -465,6 +466,10 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only queue fact-mine when experience backlog is clear
|
// Only queue fact-mine when experience backlog is clear
|
||||||
|
// Sort by file size so small transcripts drain first
|
||||||
|
needs_fact.sort_by_key(|(_, path_str)| {
|
||||||
|
fs::metadata(path_str).map(|m| m.len()).unwrap_or(u64::MAX)
|
||||||
|
});
|
||||||
let mut fact_queued = 0;
|
let mut fact_queued = 0;
|
||||||
if needs_extract.len() == extract_queued {
|
if needs_extract.len() == extract_queued {
|
||||||
let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued);
|
let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued);
|
||||||
|
|
@ -551,10 +556,10 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
if last.is_none_or(|d| d < today) {
|
if last.is_none_or(|d| d < today) {
|
||||||
log_event("scheduler", "daily-trigger", &today.to_string());
|
log_event("scheduler", "daily-trigger", &today.to_string());
|
||||||
|
|
||||||
// Decay (no API calls, fast)
|
// Decay disabled — version spam and premature demotion
|
||||||
choir_sched.spawn(format!("decay:{}", today)).init(|ctx| {
|
// choir_sched.spawn(format!("decay:{}", today)).init(|ctx| {
|
||||||
job_decay(ctx)
|
// job_decay(ctx)
|
||||||
});
|
// });
|
||||||
|
|
||||||
// Consolidation pipeline: consolidate → knowledge-loop → digest
|
// Consolidation pipeline: consolidate → knowledge-loop → digest
|
||||||
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
|
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
|
||||||
|
|
|
||||||
|
|
@ -214,22 +214,32 @@ fn parse_facts(response: &str) -> Vec<Fact> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mine a single transcript for atomic facts.
|
/// Mine a single transcript for atomic facts.
|
||||||
pub fn mine_transcript(path: &Path, dry_run: bool) -> Result<Vec<Fact>, String> {
|
/// The optional `progress` callback receives status strings (e.g. "chunk 3/47").
|
||||||
|
pub fn mine_transcript(
|
||||||
|
path: &Path,
|
||||||
|
dry_run: bool,
|
||||||
|
progress: Option<&dyn Fn(&str)>,
|
||||||
|
) -> Result<Vec<Fact>, String> {
|
||||||
let filename = path.file_name()
|
let filename = path.file_name()
|
||||||
.map(|n| n.to_string_lossy().to_string())
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
.unwrap_or_else(|| "unknown".into());
|
.unwrap_or_else(|| "unknown".into());
|
||||||
eprintln!("Mining: {}", filename);
|
let log = |msg: &str| {
|
||||||
|
eprintln!("{}", msg);
|
||||||
|
if let Some(cb) = progress { cb(msg); }
|
||||||
|
};
|
||||||
|
|
||||||
|
log(&format!("Mining: {}", filename));
|
||||||
|
|
||||||
let messages = extract_conversation(path);
|
let messages = extract_conversation(path);
|
||||||
if messages.is_empty() {
|
if messages.is_empty() {
|
||||||
eprintln!(" No messages found");
|
log("No messages found");
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
eprintln!(" {} messages extracted", messages.len());
|
log(&format!("{} messages extracted", messages.len()));
|
||||||
|
|
||||||
let text = format_for_extraction(&messages);
|
let text = format_for_extraction(&messages);
|
||||||
let chunks = chunk_text(&text);
|
let chunks = chunk_text(&text);
|
||||||
eprintln!(" {} chunks ({} chars)", chunks.len(), text.len());
|
log(&format!("{} chunks ({} chars)", chunks.len(), text.len()));
|
||||||
|
|
||||||
if dry_run {
|
if dry_run {
|
||||||
for (i, (offset, chunk)) in chunks.iter().enumerate() {
|
for (i, (offset, chunk)) in chunks.iter().enumerate() {
|
||||||
|
|
@ -246,9 +256,11 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result<Vec<Fact>, String>
|
||||||
let prompt_prefix = extraction_prompt();
|
let prompt_prefix = extraction_prompt();
|
||||||
let mut all_facts = Vec::new();
|
let mut all_facts = Vec::new();
|
||||||
for (i, (_offset, chunk)) in chunks.iter().enumerate() {
|
for (i, (_offset, chunk)) in chunks.iter().enumerate() {
|
||||||
eprint!(" Chunk {}/{} ({} chars)...", i + 1, chunks.len(), chunk.len());
|
let status = format!("chunk {}/{} ({} chars)", i + 1, chunks.len(), chunk.len());
|
||||||
|
eprint!(" {}...", status);
|
||||||
|
if let Some(cb) = progress { cb(&status); }
|
||||||
|
|
||||||
let prompt = format!("{}{}", prompt_prefix, chunk);
|
let prompt = format!("{}{}\n\n--- END OF EXCERPT ---\n\nReturn ONLY a JSON array of factual claims, or [] if none.", prompt_prefix, chunk);
|
||||||
let response = match llm::call_haiku("fact-mine", &prompt) {
|
let response = match llm::call_haiku("fact-mine", &prompt) {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -274,29 +286,35 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result<Vec<Fact>, String>
|
||||||
all_facts.retain(|f| seen.insert(f.claim.to_lowercase()));
|
all_facts.retain(|f| seen.insert(f.claim.to_lowercase()));
|
||||||
let dupes = before - all_facts.len();
|
let dupes = before - all_facts.len();
|
||||||
if dupes > 0 {
|
if dupes > 0 {
|
||||||
eprintln!(" {} duplicates removed", dupes);
|
log(&format!("{} duplicates removed", dupes));
|
||||||
}
|
}
|
||||||
|
|
||||||
eprintln!(" Total: {} unique facts", all_facts.len());
|
log(&format!("Total: {} unique facts", all_facts.len()));
|
||||||
Ok(all_facts)
|
Ok(all_facts)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mine a transcript and store facts in the capnp store.
|
/// Mine a transcript and store facts in the capnp store.
|
||||||
/// Returns the number of facts stored.
|
/// Returns the number of facts stored.
|
||||||
pub fn mine_and_store(path: &Path) -> Result<usize, String> {
|
/// The optional `progress` callback receives status strings for daemon display.
|
||||||
let facts = mine_transcript(path, false)?;
|
pub fn mine_and_store(
|
||||||
if facts.is_empty() {
|
path: &Path,
|
||||||
return Ok(0);
|
progress: Option<&dyn Fn(&str)>,
|
||||||
}
|
) -> Result<usize, String> {
|
||||||
|
let facts = mine_transcript(path, false, progress)?;
|
||||||
|
|
||||||
let filename = path.file_name()
|
let filename = path.file_name()
|
||||||
.map(|n| n.to_string_lossy().to_string())
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
.unwrap_or_else(|| "unknown".into());
|
.unwrap_or_else(|| "unknown".into());
|
||||||
|
|
||||||
// Store as a single node keyed by transcript filename
|
|
||||||
let key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
let key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
||||||
let json = serde_json::to_string_pretty(&facts)
|
|
||||||
.map_err(|e| format!("serialize facts: {}", e))?;
|
// Always write a marker so we don't re-queue empty transcripts
|
||||||
|
let json = if facts.is_empty() {
|
||||||
|
"[]".to_string()
|
||||||
|
} else {
|
||||||
|
serde_json::to_string_pretty(&facts)
|
||||||
|
.map_err(|e| format!("serialize facts: {}", e))?
|
||||||
|
};
|
||||||
|
|
||||||
let mut store = store::Store::load()?;
|
let mut store = store::Store::load()?;
|
||||||
store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?;
|
store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?;
|
||||||
|
|
@ -319,7 +337,7 @@ pub fn mine_batch(paths: &[&Path], min_messages: usize, dry_run: bool) -> Result
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let facts = mine_transcript(path, dry_run)?;
|
let facts = mine_transcript(path, dry_run, None)?;
|
||||||
all_facts.extend(facts);
|
all_facts.extend(facts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue