mind: MindTriggered trait for background scoring flows

Mind's impl had accumulated ~50 lines of setup glue per scoring flow
(memory, memory-full, finetune): snapshot config, clone handles,
resolve context, spawn task, route results back through BgEvent,
write stats. The shape was identical; only the middle changed.

Introduce the MindTriggered trait:

    pub trait MindTriggered {
        fn trigger(&self);
    }

Each flow becomes a struct next to its scoring code that owns its
dependencies and a JoinHandle (behind a sync Mutex for interior
mutability):

    subconscious::learn::MemoryScoring    (Score, ScoreFull)
    subconscious::learn::FinetuneScoring  (ScoreFinetune)

Mind holds one of each and dispatches in one line:

    MindCommand::Score         => self.memory_scoring.trigger(),
    MindCommand::ScoreFull     => self.memory_scoring.trigger_full(),
    MindCommand::ScoreFinetune => self.finetune_scoring.trigger(),

Each struct picks its own trigger semantics — memory scoring is
no-op-if-running (!handle.is_finished()); finetune is abort-restart.

Falls out:

 - BgEvent / bg_tx / bg_rx disappear entirely. Tasks write directly
   to their slice of MindState and call agent.state.changed.notify_one()
   to wake the UI. The bg_rx arm in Mind's select loop is gone.

 - agent.state.memory_scoring_in_flight was duplicating
   shared.scoring_in_flight via BgEvent routing; now the JoinHandle
   alone tells us, and shared.scoring_in_flight is written directly
   by the task for the UI.

 - start_memory_scoring / start_full_scoring / start_finetune_scoring
   methods on Mind are deleted; Mind no longer knows the setup shape
   of any scoring flow.

 - FinetuneScoringStats moves from mind/ to subconscious/learn.rs
   next to the function that produces it.

No behavior change — same flows, same trigger points, same semantics.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-17 15:57:23 -04:00
parent c5745e38e2
commit 575325e855
4 changed files with 258 additions and 232 deletions

View file

@ -172,7 +172,6 @@ pub struct AgentState {
pub pending_dmn_pause: bool,
pub provenance: String,
pub generation: u64,
pub memory_scoring_in_flight: bool,
pub active_tools: tools::ActiveTools,
/// vLLM scheduling priority (lower = higher priority).
/// 0 = interactive, 1 = surface agent, 2 = other subconscious, 10 = unconscious.
@ -237,7 +236,6 @@ impl Agent {
pending_dmn_pause: false,
provenance: "manual".to_string(),
generation: 0,
memory_scoring_in_flight: false,
active_tools,
priority: Some(0),
no_compact: false,
@ -275,7 +273,6 @@ impl Agent {
pending_dmn_pause: false,
provenance: st.provenance.clone(),
generation: 0,
memory_scoring_in_flight: false,
active_tools: tools::ActiveTools::new(),
priority: None,
no_compact: true,

View file

@ -9,6 +9,44 @@ pub mod unconscious;
pub mod identity;
pub mod log;
/// A background operation wired off Mind. Each flow (memory scoring,
/// finetune scoring, compare) is a struct holding its dependencies and
/// a TaskHandle; `trigger()` picks the flow's own "start a fresh run"
/// semantics (abort-restart vs no-op-if-running).
pub trait MindTriggered {
fn trigger(&self);
}
/// Owns a JoinHandle for a background task with two trigger semantics.
/// Uses a sync Mutex for interior mutability so callers can `trigger()`
/// off `&self` (Mind is shared via Arc).
#[derive(Default)]
pub struct TaskHandle(std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>);
impl TaskHandle {
pub fn new() -> Self { Self::default() }
/// Abort any running task and start a fresh one.
pub fn trigger<F>(&self, fut: F)
where F: std::future::Future<Output = ()> + Send + 'static
{
let mut h = self.0.lock().unwrap();
if let Some(old) = h.take() { old.abort(); }
*h = Some(tokio::spawn(fut));
}
/// No-op if a task is still running; otherwise start a fresh one.
pub fn trigger_if_idle<F>(&self, fut: F)
where F: std::future::Future<Output = ()> + Send + 'static
{
let mut h = self.0.lock().unwrap();
if let Some(old) = &*h {
if !old.is_finished() { return; }
}
*h = Some(tokio::spawn(fut));
}
}
// consciousness.rs — Mind state machine and event loop
//
// The core runtime for the consciousness binary. Mind manages turns,
@ -48,7 +86,7 @@ fn match_scores(
}).collect()
}
fn find_memory_by_key(ctx: &ContextState, key: &str) -> Option<(Section, usize)> {
pub(crate) fn find_memory_by_key(ctx: &ContextState, key: &str) -> Option<(Section, usize)> {
[(Section::Identity, ctx.identity()), (Section::Conversation, ctx.conversation())]
.into_iter()
.find_map(|(section, nodes)| {
@ -87,7 +125,7 @@ fn load_memory_scores(ctx: &mut ContextState, path: &std::path::Path) {
}
/// Collect scored memory keys from identity and conversation entries.
fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap<String, f64> {
pub(crate) fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap<String, f64> {
ctx.identity().iter()
.chain(ctx.conversation().iter())
.filter_map(|node| {
@ -102,7 +140,7 @@ fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap<Strin
}
/// Save memory scores to disk.
fn save_memory_scores(scores: &std::collections::BTreeMap<String, f64>, path: &std::path::Path) {
pub(crate) fn save_memory_scores(scores: &std::collections::BTreeMap<String, f64>, path: &std::path::Path) {
match serde_json::to_string_pretty(scores) {
Ok(json) => match std::fs::write(path, &json) {
Ok(()) => dbglog!("[scoring] saved {} scores to {} ({} bytes)",
@ -154,22 +192,7 @@ pub struct MindState {
/// Fine-tuning candidates identified by scoring.
pub finetune_candidates: Vec<learn::FinetuneCandidate>,
/// Last scoring run stats for UI display.
pub finetune_last_run: Option<FinetuneScoringStats>,
}
/// Stats from the last finetune scoring run.
#[derive(Clone, Debug)]
pub struct FinetuneScoringStats {
/// Count of assistant responses we considered (recent half of context).
pub responses_considered: usize,
/// How many exceeded the divergence threshold.
pub above_threshold: usize,
/// Threshold used for this run.
pub threshold: f64,
/// Highest divergence observed.
pub max_divergence: f64,
/// Error message if the run failed.
pub error: Option<String>,
pub finetune_last_run: Option<learn::FinetuneScoringStats>,
}
impl Clone for MindState {
@ -318,11 +341,6 @@ impl MindState {
}
}
/// Background task completion events.
enum BgEvent {
ScoringDone,
FinetuneCandidate(learn::FinetuneCandidate),
}
// --- Mind: cognitive state machine ---
@ -339,8 +357,8 @@ pub struct Mind {
/// Signals conscious activity to the unconscious loop.
/// true = active, false = idle opportunity.
conscious_active: tokio::sync::watch::Sender<bool>,
bg_tx: mpsc::UnboundedSender<BgEvent>,
bg_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<BgEvent>>>,
memory_scoring: learn::MemoryScoring,
finetune_scoring: learn::FinetuneScoring,
_supervisor: crate::thalamus::supervisor::Supervisor,
}
@ -380,7 +398,6 @@ impl Mind {
)));
let (turn_watch, _) = tokio::sync::watch::channel(false);
let (conscious_active, _) = tokio::sync::watch::channel(false);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let mut sup = crate::thalamus::supervisor::Supervisor::new();
sup.load_config();
@ -465,10 +482,17 @@ impl Mind {
});
}
let scores_path = config.session_dir.join("memory-scores.json");
let memory_scoring = learn::MemoryScoring::new(
agent.clone(), shared.clone(), scores_path);
let finetune_scoring = learn::FinetuneScoring::new(agent.clone(), shared.clone());
Self { agent, shared, config,
subconscious, unconscious,
turn_tx, turn_watch, conscious_active, bg_tx,
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
turn_tx, turn_watch, conscious_active,
memory_scoring,
finetune_scoring,
_supervisor: sup }
}
/// Initialize — restore log, start daemons and background agents.
@ -513,14 +537,7 @@ impl Mind {
// Kick off an incremental scoring pass on startup so memories due
// for re-scoring get evaluated without requiring a user message.
{
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
}
}
self.memory_scoring.trigger();
}
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
@ -540,24 +557,10 @@ impl Mind {
}
}
MindCommand::Score => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_memory_scoring();
} else {
dbglog!("[scoring] skipped: scoring_in_flight=true");
}
self.memory_scoring.trigger();
}
MindCommand::ScoreFull => {
let mut s = self.shared.lock().unwrap();
if !s.scoring_in_flight {
s.scoring_in_flight = true;
drop(s);
self.start_full_scoring();
} else {
dbglog!("[scoring-full] skipped: scoring_in_flight=true");
}
self.memory_scoring.trigger_full();
}
MindCommand::Interrupt => {
self.shared.lock().unwrap().interrupt();
@ -588,7 +591,7 @@ impl Mind {
self.agent.compact().await;
}
MindCommand::ScoreFinetune => {
self.start_finetune_scoring();
self.finetune_scoring.trigger();
}
MindCommand::SetLearnThreshold(value) => {
if let Err(e) = crate::config_writer::set_learn_threshold(value) {
@ -605,167 +608,6 @@ impl Mind {
}
}
pub fn start_memory_scoring(&self) {
let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone();
let scores_path = self.config.session_dir.join("memory-scores.json");
let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs;
let response_window = cfg.scoring_response_window;
tokio::spawn(async move {
let (context, client) = {
let mut st = agent.state.lock().await;
if st.memory_scoring_in_flight {
dbglog!("[scoring] skipped: memory_scoring_in_flight=true");
return;
}
st.memory_scoring_in_flight = true;
drop(st);
let ctx = agent.context.lock().await.clone();
(ctx, agent.client.clone())
};
let _result = learn::score_memories_incremental(
&context, max_age as i64, response_window, &client, &agent,
|key: String, score: f64| {
let agent = agent.clone();
let path = scores_path.clone();
async move {
let scores_snapshot = {
let mut ctx = agent.context.lock().await;
// Find memory by key in identity or conversation
let found = find_memory_by_key(&ctx, &key);
match found {
Some((section, i)) => {
ctx.set_score(section, i, Some(score));
let nodes: &[crate::agent::context::AstNode] = match section {
Section::Identity => ctx.identity(),
Section::Conversation => ctx.conversation(),
_ => &[],
};
let read_back = match nodes.get(i) {
Some(crate::agent::context::AstNode::Leaf(l)) => match l.body() {
crate::agent::context::NodeBody::Memory { score, .. } => format!("{:?}", score),
_ => "not-memory".to_string(),
},
_ => "out-of-bounds".to_string(),
};
dbglog!("[scoring] persisted {} → {:.3} ({:?}[{}]) read_back={}",
key, score, section, i, read_back);
}
None => {
dbglog!(
"[scoring] DROP {}: find_memory_by_key None (id={}, cv={})",
key, ctx.identity().len(), ctx.conversation().len()
);
}
}
let snapshot = collect_memory_scores(&ctx);
let in_snapshot = snapshot.contains_key(&key);
dbglog!("[scoring] snapshot size={} contains({})={}",
snapshot.len(), key, in_snapshot);
drop(ctx);
agent.state.lock().await.changed.notify_one();
snapshot
};
dbglog!("[scoring] about to save {} entries", scores_snapshot.len());
save_memory_scores(&scores_snapshot, &path);
}
},
).await;
{
agent.state.lock().await.memory_scoring_in_flight = false;
}
let _ = bg_tx.send(BgEvent::ScoringDone);
});
}
/// Run full N×M scoring matrix — scores every memory against every response.
pub fn start_full_scoring(&self) {
let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone();
tokio::spawn(async move {
{
let mut st = agent.state.lock().await;
if st.memory_scoring_in_flight {
dbglog!("[scoring-full] skipped: memory_scoring_in_flight=true");
return;
}
st.memory_scoring_in_flight = true;
}
let client = agent.client.clone();
match learn::score_memories(&client, &agent).await {
Ok(()) => { let _ = bg_tx.send(BgEvent::ScoringDone); }
Err(e) => { dbglog!("[scoring-full] FAILED: {:#}", e); }
}
agent.state.lock().await.memory_scoring_in_flight = false;
});
}
/// Score responses for fine-tuning candidates.
///
/// Scores the most recent half of the context — responses near the end
/// of the context window were generated with the most context available,
/// which is what we want to train on. The threshold is a temporary knob;
/// once this runs continuously, we'll just train whatever lands at full
/// context without filtering.
pub fn start_finetune_scoring(&self) {
// Snapshot the config values we need before spawning — the scoring
// task shouldn't hold the config read lock across async work.
let (threshold, gen_alternates) = {
let app = crate::config::app();
(app.learn.threshold, app.learn.generate_alternates)
};
// Clear the previous run's candidates so this run's stream is fresh.
self.shared.lock().unwrap().finetune_candidates.clear();
let agent = self.agent.clone();
let bg_tx = self.bg_tx.clone();
let shared = self.shared.clone();
tokio::spawn(async move {
let activity = crate::agent::start_activity(&agent, "finetune: scoring...").await;
let (context, client) = {
let ctx = agent.context.lock().await;
(ctx.clone(), agent.client.clone())
};
let entries = context.conversation();
let score_count = entries.len() / 2;
let range_start = entries.len() - score_count;
let responses_considered: usize = entries[range_start..].iter()
.filter(|n| matches!(n, crate::agent::context::AstNode::Branch { role: crate::agent::context::Role::Assistant, .. }))
.count();
activity.update(format!("finetune: scoring {} responses...", responses_considered)).await;
let bg_tx_cb = bg_tx.clone();
let stats = match learn::score_finetune_candidates(
&context, score_count, &client, threshold,
gen_alternates, &activity,
|c| { let _ = bg_tx_cb.send(BgEvent::FinetuneCandidate(c)); },
).await {
Ok((above_threshold, max_div)) => {
FinetuneScoringStats {
responses_considered,
above_threshold,
threshold,
max_divergence: max_div,
error: None,
}
}
Err(e) => FinetuneScoringStats {
responses_considered,
above_threshold: 0,
threshold,
max_divergence: 0.0,
error: Some(format!("{}", e)),
},
};
shared.lock().unwrap().finetune_last_run = Some(stats);
// activity drops here, marking completion and notifying observers
});
}
async fn start_turn(&self, text: &str, target: StreamTarget) {
{
@ -828,13 +670,11 @@ impl Mind {
}
});
let mut bg_rx = self.bg_rx.lock().unwrap().take()
.expect("Mind::run() called twice");
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
// Start finetune scoring at startup (scores existing conversation)
if !self.config.no_agents {
self.start_finetune_scoring();
self.finetune_scoring.trigger();
}
loop {
@ -857,17 +697,6 @@ impl Mind {
}
}
Some(bg) = bg_rx.recv() => {
match bg {
BgEvent::ScoringDone => {
self.shared.lock().unwrap().scoring_in_flight = false;
}
BgEvent::FinetuneCandidate(c) => {
self.shared.lock().unwrap().finetune_candidates.push(c);
}
}
}
Some((result, target)) = turn_rx.recv() => {
let _ = self.conscious_active.send(false);
let model_switch = {

View file

@ -14,11 +14,14 @@
// with high divergence depend on memories the model
// hasn't internalized. 2 API calls.
use std::sync::Arc;
use crate::agent::api::ApiClient;
use crate::agent::context::{
Ast, AstNode, ContextState, Role, WireImage,
is_assistant, is_memory_node, memory_key, render_branch_text, render_prior_context,
};
use crate::mind::{MindState, MindTriggered, TaskHandle};
use crate::subconscious::generate::gen_continuation;
const SCORE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
@ -376,6 +379,108 @@ where
Ok(scored)
}
/// Memory scoring — two modes sharing an in-flight handle (only one
/// runs at a time): `trigger()` for incremental, `trigger_full()` for
/// the N×M debug matrix.
pub struct MemoryScoring {
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
scores_path: std::path::PathBuf,
task: TaskHandle,
}
impl MemoryScoring {
pub fn new(
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
scores_path: std::path::PathBuf,
) -> Self {
Self { agent, shared, scores_path, task: TaskHandle::new() }
}
pub fn trigger_full(&self) {
self.task.trigger_if_idle(run_full(self.agent.clone(), self.shared.clone()));
}
}
impl MindTriggered for MemoryScoring {
fn trigger(&self) {
self.task.trigger_if_idle(run_incremental(
self.agent.clone(), self.shared.clone(), self.scores_path.clone(),
));
}
}
async fn run_incremental(
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
scores_path: std::path::PathBuf,
) {
shared.lock().unwrap().scoring_in_flight = true;
agent.state.lock().await.changed.notify_one();
let cfg = crate::config::get();
let max_age = cfg.scoring_interval_secs;
let response_window = cfg.scoring_response_window;
let (context, client) = {
let ctx = agent.context.lock().await.clone();
(ctx, agent.client.clone())
};
let _result = score_memories_incremental(
&context, max_age as i64, response_window, &client, &agent,
|key: String, score: f64| {
let agent = agent.clone();
let path = scores_path.clone();
async move {
let scores_snapshot = {
let mut ctx = agent.context.lock().await;
let found = crate::mind::find_memory_by_key(&ctx, &key);
match found {
Some((section, i)) => {
ctx.set_score(section, i, Some(score));
dbglog!("[scoring] persisted {} → {:.3} ({:?}[{}])",
key, score, section, i);
}
None => {
dbglog!(
"[scoring] DROP {}: find_memory_by_key None (id={}, cv={})",
key, ctx.identity().len(), ctx.conversation().len()
);
}
}
let snapshot = crate::mind::collect_memory_scores(&ctx);
drop(ctx);
agent.state.lock().await.changed.notify_one();
snapshot
};
crate::mind::save_memory_scores(&scores_snapshot, &path);
}
},
).await;
shared.lock().unwrap().scoring_in_flight = false;
agent.state.lock().await.changed.notify_one();
}
async fn run_full(
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
) {
shared.lock().unwrap().scoring_in_flight = true;
agent.state.lock().await.changed.notify_one();
let client = agent.client.clone();
match score_memories(&client, &agent).await {
Ok(()) => {},
Err(e) => { dbglog!("[scoring-full] FAILED: {:#}", e); }
}
shared.lock().unwrap().scoring_in_flight = false;
agent.state.lock().await.changed.notify_one();
}
// ── Fine-tuning scoring ─────────────────────────────────────────
/// Score which recent responses are candidates for fine-tuning.
@ -520,6 +625,100 @@ pub async fn score_finetune_candidates(
Ok((total, max_divergence))
}
/// Stats from a finetune scoring run. Stored on MindState for UI display.
#[derive(Clone, Debug)]
pub struct FinetuneScoringStats {
pub responses_considered: usize,
pub above_threshold: usize,
pub threshold: f64,
pub max_divergence: f64,
pub error: Option<String>,
}
/// Finetune scoring — `trigger()` aborts any in-flight run and starts
/// a fresh one, clearing the previous candidates.
pub struct FinetuneScoring {
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
task: TaskHandle,
}
impl FinetuneScoring {
pub fn new(
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
) -> Self {
Self { agent, shared, task: TaskHandle::new() }
}
}
impl MindTriggered for FinetuneScoring {
fn trigger(&self) {
self.task.trigger(run_finetune(self.agent.clone(), self.shared.clone()));
}
}
async fn run_finetune(
agent: Arc<crate::agent::Agent>,
shared: Arc<std::sync::Mutex<MindState>>,
) {
let (threshold, gen_alternates) = {
let app = crate::config::app();
(app.learn.threshold, app.learn.generate_alternates)
};
// Fresh run — clear previous candidates.
shared.lock().unwrap().finetune_candidates.clear();
agent.state.lock().await.changed.notify_one();
let activity = crate::agent::start_activity(&agent, "finetune: scoring...").await;
let (context, client) = {
let ctx = agent.context.lock().await;
(ctx.clone(), agent.client.clone())
};
let entries = context.conversation();
let score_count = entries.len() / 2;
let range_start = entries.len() - score_count;
let responses_considered: usize = entries[range_start..].iter()
.filter(|n| matches!(n, AstNode::Branch { role: Role::Assistant, .. }))
.count();
activity.update(format!("finetune: scoring {} responses...", responses_considered)).await;
let stats = {
let shared = shared.clone();
let agent = agent.clone();
match score_finetune_candidates(
&context, score_count, &client, threshold,
gen_alternates, &activity,
move |c| {
shared.lock().unwrap().finetune_candidates.push(c);
if let Ok(st) = agent.state.try_lock() { st.changed.notify_one(); }
},
).await {
Ok((above_threshold, max_div)) => FinetuneScoringStats {
responses_considered,
above_threshold,
threshold,
max_divergence: max_div,
error: None,
},
Err(e) => FinetuneScoringStats {
responses_considered,
above_threshold: 0,
threshold,
max_divergence: 0.0,
error: Some(format!("{}", e)),
},
}
};
shared.lock().unwrap().finetune_last_run = Some(stats);
agent.state.lock().await.changed.notify_one();
}
// ── Finetune config and persistence ─────────────────────────────
use std::path::PathBuf;

View file

@ -504,6 +504,7 @@ async fn run(
keep
});
}
app.mind_state = Some(ms.clone());
}
app.walked_count = mind.subconscious_walked().await.len();