learn: stream candidates to UI, update status during alternate gen

With the timestamp filter gone (previous commit), score_finetune_candidates
started returning the actual ~100+ candidates per scoring run. The
existing code generated alternates for all of them in a tight loop
before returning anything, leaving the status line stuck on
"finetune: scoring N responses..." for ~100s of seconds while the
B200 was pegged.

Two fixes:

1. score_finetune_candidates now takes an ActivityGuard and a callback.
   Candidates are emitted one-at-a-time as they complete (after their
   alternate if that's enabled, immediately otherwise). The activity
   status updates to "finetune: generating alternate N/M" during the
   alternate-gen phase so it's clear what's happening.

2. BgEvent::FinetuneCandidates(Vec<_>) → FinetuneCandidate(one). Each
   emitted candidate is pushed onto shared.finetune_candidates; the UI
   tick picks it up and renders it on the next frame. start_finetune_scoring
   clears the previous run's list at the top so each run is fresh.

Return type changes from (Vec, f64) → (usize, f64) — the count above
threshold is all the caller still needs since the candidates stream
through the callback.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-16 12:44:25 -04:00
parent d5a3398cc9
commit 343e43afab
2 changed files with 36 additions and 19 deletions

View file

@ -320,7 +320,7 @@ impl MindState {
/// Background task completion events. /// Background task completion events.
enum BgEvent { enum BgEvent {
ScoringDone, ScoringDone,
FinetuneCandidates(Vec<learn::FinetuneCandidate>), FinetuneCandidate(learn::FinetuneCandidate),
} }
// --- Mind: cognitive state machine --- // --- Mind: cognitive state machine ---
@ -656,7 +656,12 @@ impl Mind {
/// once this runs continuously, we'll just train whatever lands at full /// once this runs continuously, we'll just train whatever lands at full
/// context without filtering. /// context without filtering.
pub fn start_finetune_scoring(&self) { pub fn start_finetune_scoring(&self) {
let threshold = self.shared.lock().unwrap().learn_threshold; let threshold = {
let mut s = self.shared.lock().unwrap();
// Clear the previous run's candidates so this run's stream in fresh.
s.finetune_candidates.clear();
s.learn_threshold
};
let agent = self.agent.clone(); let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone(); let bg_tx = self.bg_tx.clone();
@ -678,12 +683,12 @@ impl Mind {
activity.update(format!("finetune: scoring {} responses...", responses_considered)).await; activity.update(format!("finetune: scoring {} responses...", responses_considered)).await;
let bg_tx_cb = bg_tx.clone();
let stats = match learn::score_finetune_candidates( let stats = match learn::score_finetune_candidates(
&context, score_count, &client, threshold, &context, score_count, &client, threshold, &activity,
|c| { let _ = bg_tx_cb.send(BgEvent::FinetuneCandidate(c)); },
).await { ).await {
Ok((candidates, max_div)) => { Ok((above_threshold, max_div)) => {
let above_threshold = candidates.len();
let _ = bg_tx.send(BgEvent::FinetuneCandidates(candidates));
FinetuneScoringStats { FinetuneScoringStats {
responses_considered, responses_considered,
above_threshold, above_threshold,
@ -801,8 +806,8 @@ impl Mind {
BgEvent::ScoringDone => { BgEvent::ScoringDone => {
self.shared.lock().unwrap().scoring_in_flight = false; self.shared.lock().unwrap().scoring_in_flight = false;
} }
BgEvent::FinetuneCandidates(candidates) => { BgEvent::FinetuneCandidate(c) => {
self.shared.lock().unwrap().finetune_candidates = candidates; self.shared.lock().unwrap().finetune_candidates.push(c);
} }
} }
} }

View file

@ -492,22 +492,28 @@ pub struct FinetuneCandidate {
/// Score and enrich finetune candidates with full context. /// Score and enrich finetune candidates with full context.
/// ///
/// Returns (candidates, max_divergence) - candidates ready for review with /// Candidates are delivered via `on_candidate` one-at-a-time as they become
/// context/continuation token IDs, and the highest divergence seen. /// ready: scoring happens once (one /score call), then for each candidate
/// that passes the threshold we optionally generate an alternate response
/// and then emit it. The activity status is updated during the alternate
/// phase so the UI doesn't look stuck.
///
/// Returns (count_above_threshold, max_divergence).
pub async fn score_finetune_candidates( pub async fn score_finetune_candidates(
context: &ContextState, context: &ContextState,
count: usize, count: usize,
client: &ApiClient, client: &ApiClient,
min_divergence: f64, min_divergence: f64,
) -> anyhow::Result<(Vec<FinetuneCandidate>, f64)> { activity: &crate::agent::ActivityGuard,
mut on_candidate: impl FnMut(FinetuneCandidate),
) -> anyhow::Result<(usize, f64)> {
let scores = score_finetune(context, count, client).await?; let scores = score_finetune(context, count, client).await?;
let max_divergence = scores.iter().map(|(_, d)| *d).fold(0.0f64, f64::max); let max_divergence = scores.iter().map(|(_, d)| *d).fold(0.0f64, f64::max);
let entries = context.conversation(); let entries = context.conversation();
let mut candidates = Vec::new();
let trained = load_trained(); let trained = load_trained();
let mut candidates: Vec<FinetuneCandidate> = Vec::new();
for (entry_idx, divergence) in scores { for (entry_idx, divergence) in scores {
if divergence < min_divergence { if divergence < min_divergence {
@ -522,7 +528,7 @@ pub async fn score_finetune_candidates(
continue; continue;
} }
// Extract response text // Extract response text.
let response_text = match node { let response_text = match node {
AstNode::Branch { children, .. } => { AstNode::Branch { children, .. } => {
children.iter() children.iter()
@ -536,7 +542,7 @@ pub async fn score_finetune_candidates(
_ => continue, _ => continue,
}; };
// Build token IDs: context = everything before response, continuation = response // Build token IDs: context = everything before response, continuation = response.
let (context_ids, _) = build_token_ids(context, 0..entry_idx, Filter::None); let (context_ids, _) = build_token_ids(context, 0..entry_idx, Filter::None);
let continuation_ids: Vec<u32> = node.token_ids().into_iter().collect(); let continuation_ids: Vec<u32> = node.token_ids().into_iter().collect();
@ -551,17 +557,23 @@ pub async fn score_finetune_candidates(
}); });
} }
// Generate alternates if enabled let total = candidates.len();
if alternates_enabled() && !candidates.is_empty() { let gen_alternates = alternates_enabled() && total > 0;
for candidate in &mut candidates {
for (i, mut candidate) in candidates.into_iter().enumerate() {
if gen_alternates {
activity.update(
format!("finetune: generating alternate {}/{}", i + 1, total)
).await;
match generate_alternate(context, candidate.entry_idx, client).await { match generate_alternate(context, candidate.entry_idx, client).await {
Ok(text) => candidate.alternate_text = Some(text), Ok(text) => candidate.alternate_text = Some(text),
Err(e) => dbglog!("[finetune] alternate generation failed: {:#}", e), Err(e) => dbglog!("[finetune] alternate generation failed: {:#}", e),
} }
} }
on_candidate(candidate);
} }
Ok((candidates, max_divergence)) Ok((total, max_divergence))
} }
/// Generate what the model would say without memories for a given entry. /// Generate what the model would say without memories for a given entry.