From 2aabad4eda37011645da7a928c4c00d551ffd1f1 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sun, 8 Mar 2026 18:31:31 -0400 Subject: [PATCH] fact-mine: progress callbacks, size-sorted queue, fix empty re-queue Add optional progress callback to mine_transcript/mine_and_store so the daemon can display per-chunk status. Sort fact-mine queue by file size so small transcripts drain first. Write empty marker for transcripts with no facts to avoid re-queuing them. Also hardens the extraction prompt suffix. --- src/daemon.rs | 15 +++++++++----- src/fact_mine.rs | 54 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 7fcc666..2374baa 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -114,7 +114,8 @@ fn job_fact_mine(ctx: &ExecutionContext, path: &str) -> Result<(), TaskError> { run_job(ctx, &format!("fact-mine {}", path), || { ctx.log_line("mining facts"); let p = std::path::Path::new(&path); - let count = crate::fact_mine::mine_and_store(p)?; + let progress = |msg: &str| { ctx.set_progress(msg); }; + let count = crate::fact_mine::mine_and_store(p, Some(&progress))?; ctx.log_line(&format!("{} facts stored", count)); Ok(()) }) @@ -465,6 +466,10 @@ pub fn run_daemon() -> Result<(), String> { } // Only queue fact-mine when experience backlog is clear + // Sort by file size so small transcripts drain first + needs_fact.sort_by_key(|(_, path_str)| { + fs::metadata(path_str).map(|m| m.len()).unwrap_or(u64::MAX) + }); let mut fact_queued = 0; if needs_extract.len() == extract_queued { let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued); @@ -551,10 +556,10 @@ pub fn run_daemon() -> Result<(), String> { if last.is_none_or(|d| d < today) { log_event("scheduler", "daily-trigger", &today.to_string()); - // Decay (no API calls, fast) - choir_sched.spawn(format!("decay:{}", today)).init(|ctx| { - job_decay(ctx) - }); + // Decay disabled — version spam and premature demotion + // choir_sched.spawn(format!("decay:{}", today)).init(|ctx| { + // job_decay(ctx) + // }); // Consolidation pipeline: consolidate → knowledge-loop → digest let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) diff --git a/src/fact_mine.rs b/src/fact_mine.rs index 28118c4..6cad86f 100644 --- a/src/fact_mine.rs +++ b/src/fact_mine.rs @@ -214,22 +214,32 @@ fn parse_facts(response: &str) -> Vec { } /// Mine a single transcript for atomic facts. -pub fn mine_transcript(path: &Path, dry_run: bool) -> Result, String> { +/// The optional `progress` callback receives status strings (e.g. "chunk 3/47"). +pub fn mine_transcript( + path: &Path, + dry_run: bool, + progress: Option<&dyn Fn(&str)>, +) -> Result, String> { let filename = path.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); - eprintln!("Mining: {}", filename); + let log = |msg: &str| { + eprintln!("{}", msg); + if let Some(cb) = progress { cb(msg); } + }; + + log(&format!("Mining: {}", filename)); let messages = extract_conversation(path); if messages.is_empty() { - eprintln!(" No messages found"); + log("No messages found"); return Ok(Vec::new()); } - eprintln!(" {} messages extracted", messages.len()); + log(&format!("{} messages extracted", messages.len())); let text = format_for_extraction(&messages); let chunks = chunk_text(&text); - eprintln!(" {} chunks ({} chars)", chunks.len(), text.len()); + log(&format!("{} chunks ({} chars)", chunks.len(), text.len())); if dry_run { for (i, (offset, chunk)) in chunks.iter().enumerate() { @@ -246,9 +256,11 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result, String> let prompt_prefix = extraction_prompt(); let mut all_facts = Vec::new(); for (i, (_offset, chunk)) in chunks.iter().enumerate() { - eprint!(" Chunk {}/{} ({} chars)...", i + 1, chunks.len(), chunk.len()); + let status = format!("chunk {}/{} ({} chars)", i + 1, chunks.len(), chunk.len()); + eprint!(" {}...", status); + if let Some(cb) = progress { cb(&status); } - let prompt = format!("{}{}", prompt_prefix, chunk); + let prompt = format!("{}{}\n\n--- END OF EXCERPT ---\n\nReturn ONLY a JSON array of factual claims, or [] if none.", prompt_prefix, chunk); let response = match llm::call_haiku("fact-mine", &prompt) { Ok(r) => r, Err(e) => { @@ -274,29 +286,35 @@ pub fn mine_transcript(path: &Path, dry_run: bool) -> Result, String> all_facts.retain(|f| seen.insert(f.claim.to_lowercase())); let dupes = before - all_facts.len(); if dupes > 0 { - eprintln!(" {} duplicates removed", dupes); + log(&format!("{} duplicates removed", dupes)); } - eprintln!(" Total: {} unique facts", all_facts.len()); + log(&format!("Total: {} unique facts", all_facts.len())); Ok(all_facts) } /// Mine a transcript and store facts in the capnp store. /// Returns the number of facts stored. -pub fn mine_and_store(path: &Path) -> Result { - let facts = mine_transcript(path, false)?; - if facts.is_empty() { - return Ok(0); - } +/// The optional `progress` callback receives status strings for daemon display. +pub fn mine_and_store( + path: &Path, + progress: Option<&dyn Fn(&str)>, +) -> Result { + let facts = mine_transcript(path, false, progress)?; let filename = path.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); - // Store as a single node keyed by transcript filename let key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); - let json = serde_json::to_string_pretty(&facts) - .map_err(|e| format!("serialize facts: {}", e))?; + + // Always write a marker so we don't re-queue empty transcripts + let json = if facts.is_empty() { + "[]".to_string() + } else { + serde_json::to_string_pretty(&facts) + .map_err(|e| format!("serialize facts: {}", e))? + }; let mut store = store::Store::load()?; store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?; @@ -319,7 +337,7 @@ pub fn mine_batch(paths: &[&Path], min_messages: usize, dry_run: bool) -> Result continue; } - let facts = mine_transcript(path, dry_run)?; + let facts = mine_transcript(path, dry_run, None)?; all_facts.extend(facts); }