diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 436dda3..703c65c 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -172,7 +172,6 @@ pub struct AgentState { pub pending_dmn_pause: bool, pub provenance: String, pub generation: u64, - pub memory_scoring_in_flight: bool, pub active_tools: tools::ActiveTools, /// vLLM scheduling priority (lower = higher priority). /// 0 = interactive, 1 = surface agent, 2 = other subconscious, 10 = unconscious. @@ -237,7 +236,6 @@ impl Agent { pending_dmn_pause: false, provenance: "manual".to_string(), generation: 0, - memory_scoring_in_flight: false, active_tools, priority: Some(0), no_compact: false, @@ -275,7 +273,6 @@ impl Agent { pending_dmn_pause: false, provenance: st.provenance.clone(), generation: 0, - memory_scoring_in_flight: false, active_tools: tools::ActiveTools::new(), priority: None, no_compact: true, diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 474e2c2..4ca97ea 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -9,6 +9,44 @@ pub mod unconscious; pub mod identity; pub mod log; +/// A background operation wired off Mind. Each flow (memory scoring, +/// finetune scoring, compare) is a struct holding its dependencies and +/// a TaskHandle; `trigger()` picks the flow's own "start a fresh run" +/// semantics (abort-restart vs no-op-if-running). +pub trait MindTriggered { + fn trigger(&self); +} + +/// Owns a JoinHandle for a background task with two trigger semantics. +/// Uses a sync Mutex for interior mutability so callers can `trigger()` +/// off `&self` (Mind is shared via Arc). +#[derive(Default)] +pub struct TaskHandle(std::sync::Mutex>>); + +impl TaskHandle { + pub fn new() -> Self { Self::default() } + + /// Abort any running task and start a fresh one. + pub fn trigger(&self, fut: F) + where F: std::future::Future + Send + 'static + { + let mut h = self.0.lock().unwrap(); + if let Some(old) = h.take() { old.abort(); } + *h = Some(tokio::spawn(fut)); + } + + /// No-op if a task is still running; otherwise start a fresh one. + pub fn trigger_if_idle(&self, fut: F) + where F: std::future::Future + Send + 'static + { + let mut h = self.0.lock().unwrap(); + if let Some(old) = &*h { + if !old.is_finished() { return; } + } + *h = Some(tokio::spawn(fut)); + } +} + // consciousness.rs — Mind state machine and event loop // // The core runtime for the consciousness binary. Mind manages turns, @@ -48,7 +86,7 @@ fn match_scores( }).collect() } -fn find_memory_by_key(ctx: &ContextState, key: &str) -> Option<(Section, usize)> { +pub(crate) fn find_memory_by_key(ctx: &ContextState, key: &str) -> Option<(Section, usize)> { [(Section::Identity, ctx.identity()), (Section::Conversation, ctx.conversation())] .into_iter() .find_map(|(section, nodes)| { @@ -87,7 +125,7 @@ fn load_memory_scores(ctx: &mut ContextState, path: &std::path::Path) { } /// Collect scored memory keys from identity and conversation entries. -fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap { +pub(crate) fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap { ctx.identity().iter() .chain(ctx.conversation().iter()) .filter_map(|node| { @@ -102,7 +140,7 @@ fn collect_memory_scores(ctx: &ContextState) -> std::collections::BTreeMap, path: &std::path::Path) { +pub(crate) fn save_memory_scores(scores: &std::collections::BTreeMap, path: &std::path::Path) { match serde_json::to_string_pretty(scores) { Ok(json) => match std::fs::write(path, &json) { Ok(()) => dbglog!("[scoring] saved {} scores to {} ({} bytes)", @@ -154,22 +192,7 @@ pub struct MindState { /// Fine-tuning candidates identified by scoring. pub finetune_candidates: Vec, /// Last scoring run stats for UI display. - pub finetune_last_run: Option, -} - -/// Stats from the last finetune scoring run. -#[derive(Clone, Debug)] -pub struct FinetuneScoringStats { - /// Count of assistant responses we considered (recent half of context). - pub responses_considered: usize, - /// How many exceeded the divergence threshold. - pub above_threshold: usize, - /// Threshold used for this run. - pub threshold: f64, - /// Highest divergence observed. - pub max_divergence: f64, - /// Error message if the run failed. - pub error: Option, + pub finetune_last_run: Option, } impl Clone for MindState { @@ -318,11 +341,6 @@ impl MindState { } } -/// Background task completion events. -enum BgEvent { - ScoringDone, - FinetuneCandidate(learn::FinetuneCandidate), -} // --- Mind: cognitive state machine --- @@ -339,8 +357,8 @@ pub struct Mind { /// Signals conscious activity to the unconscious loop. /// true = active, false = idle opportunity. conscious_active: tokio::sync::watch::Sender, - bg_tx: mpsc::UnboundedSender, - bg_rx: std::sync::Mutex>>, + memory_scoring: learn::MemoryScoring, + finetune_scoring: learn::FinetuneScoring, _supervisor: crate::thalamus::supervisor::Supervisor, } @@ -380,7 +398,6 @@ impl Mind { ))); let (turn_watch, _) = tokio::sync::watch::channel(false); let (conscious_active, _) = tokio::sync::watch::channel(false); - let (bg_tx, bg_rx) = mpsc::unbounded_channel(); let mut sup = crate::thalamus::supervisor::Supervisor::new(); sup.load_config(); @@ -465,10 +482,17 @@ impl Mind { }); } + let scores_path = config.session_dir.join("memory-scores.json"); + let memory_scoring = learn::MemoryScoring::new( + agent.clone(), shared.clone(), scores_path); + let finetune_scoring = learn::FinetuneScoring::new(agent.clone(), shared.clone()); + Self { agent, shared, config, subconscious, unconscious, - turn_tx, turn_watch, conscious_active, bg_tx, - bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } + turn_tx, turn_watch, conscious_active, + memory_scoring, + finetune_scoring, + _supervisor: sup } } /// Initialize — restore log, start daemons and background agents. @@ -513,14 +537,7 @@ impl Mind { // Kick off an incremental scoring pass on startup so memories due // for re-scoring get evaluated without requiring a user message. - { - let mut s = self.shared.lock().unwrap(); - if !s.scoring_in_flight { - s.scoring_in_flight = true; - drop(s); - self.start_memory_scoring(); - } - } + self.memory_scoring.trigger(); } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { @@ -540,24 +557,10 @@ impl Mind { } } MindCommand::Score => { - let mut s = self.shared.lock().unwrap(); - if !s.scoring_in_flight { - s.scoring_in_flight = true; - drop(s); - self.start_memory_scoring(); - } else { - dbglog!("[scoring] skipped: scoring_in_flight=true"); - } + self.memory_scoring.trigger(); } MindCommand::ScoreFull => { - let mut s = self.shared.lock().unwrap(); - if !s.scoring_in_flight { - s.scoring_in_flight = true; - drop(s); - self.start_full_scoring(); - } else { - dbglog!("[scoring-full] skipped: scoring_in_flight=true"); - } + self.memory_scoring.trigger_full(); } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); @@ -588,7 +591,7 @@ impl Mind { self.agent.compact().await; } MindCommand::ScoreFinetune => { - self.start_finetune_scoring(); + self.finetune_scoring.trigger(); } MindCommand::SetLearnThreshold(value) => { if let Err(e) = crate::config_writer::set_learn_threshold(value) { @@ -605,167 +608,6 @@ impl Mind { } } - pub fn start_memory_scoring(&self) { - let agent = self.agent.clone(); - let bg_tx = self.bg_tx.clone(); - let scores_path = self.config.session_dir.join("memory-scores.json"); - let cfg = crate::config::get(); - let max_age = cfg.scoring_interval_secs; - let response_window = cfg.scoring_response_window; - tokio::spawn(async move { - let (context, client) = { - let mut st = agent.state.lock().await; - if st.memory_scoring_in_flight { - dbglog!("[scoring] skipped: memory_scoring_in_flight=true"); - return; - } - st.memory_scoring_in_flight = true; - drop(st); - let ctx = agent.context.lock().await.clone(); - (ctx, agent.client.clone()) - }; - let _result = learn::score_memories_incremental( - &context, max_age as i64, response_window, &client, &agent, - |key: String, score: f64| { - let agent = agent.clone(); - let path = scores_path.clone(); - async move { - let scores_snapshot = { - let mut ctx = agent.context.lock().await; - // Find memory by key in identity or conversation - let found = find_memory_by_key(&ctx, &key); - match found { - Some((section, i)) => { - ctx.set_score(section, i, Some(score)); - let nodes: &[crate::agent::context::AstNode] = match section { - Section::Identity => ctx.identity(), - Section::Conversation => ctx.conversation(), - _ => &[], - }; - let read_back = match nodes.get(i) { - Some(crate::agent::context::AstNode::Leaf(l)) => match l.body() { - crate::agent::context::NodeBody::Memory { score, .. } => format!("{:?}", score), - _ => "not-memory".to_string(), - }, - _ => "out-of-bounds".to_string(), - }; - dbglog!("[scoring] persisted {} → {:.3} ({:?}[{}]) read_back={}", - key, score, section, i, read_back); - } - None => { - dbglog!( - "[scoring] DROP {}: find_memory_by_key None (id={}, cv={})", - key, ctx.identity().len(), ctx.conversation().len() - ); - } - } - let snapshot = collect_memory_scores(&ctx); - let in_snapshot = snapshot.contains_key(&key); - dbglog!("[scoring] snapshot size={} contains({})={}", - snapshot.len(), key, in_snapshot); - drop(ctx); - agent.state.lock().await.changed.notify_one(); - snapshot - }; - dbglog!("[scoring] about to save {} entries", scores_snapshot.len()); - save_memory_scores(&scores_snapshot, &path); - } - }, - ).await; - { - agent.state.lock().await.memory_scoring_in_flight = false; - } - let _ = bg_tx.send(BgEvent::ScoringDone); - }); - } - - /// Run full N×M scoring matrix — scores every memory against every response. - pub fn start_full_scoring(&self) { - let agent = self.agent.clone(); - let bg_tx = self.bg_tx.clone(); - tokio::spawn(async move { - { - let mut st = agent.state.lock().await; - if st.memory_scoring_in_flight { - dbglog!("[scoring-full] skipped: memory_scoring_in_flight=true"); - return; - } - st.memory_scoring_in_flight = true; - } - let client = agent.client.clone(); - match learn::score_memories(&client, &agent).await { - Ok(()) => { let _ = bg_tx.send(BgEvent::ScoringDone); } - Err(e) => { dbglog!("[scoring-full] FAILED: {:#}", e); } - } - agent.state.lock().await.memory_scoring_in_flight = false; - }); - } - - /// Score responses for fine-tuning candidates. - /// - /// Scores the most recent half of the context — responses near the end - /// of the context window were generated with the most context available, - /// which is what we want to train on. The threshold is a temporary knob; - /// once this runs continuously, we'll just train whatever lands at full - /// context without filtering. - pub fn start_finetune_scoring(&self) { - // Snapshot the config values we need before spawning — the scoring - // task shouldn't hold the config read lock across async work. - let (threshold, gen_alternates) = { - let app = crate::config::app(); - (app.learn.threshold, app.learn.generate_alternates) - }; - // Clear the previous run's candidates so this run's stream is fresh. - self.shared.lock().unwrap().finetune_candidates.clear(); - - let agent = self.agent.clone(); - let bg_tx = self.bg_tx.clone(); - let shared = self.shared.clone(); - tokio::spawn(async move { - let activity = crate::agent::start_activity(&agent, "finetune: scoring...").await; - - let (context, client) = { - let ctx = agent.context.lock().await; - (ctx.clone(), agent.client.clone()) - }; - - let entries = context.conversation(); - let score_count = entries.len() / 2; - let range_start = entries.len() - score_count; - let responses_considered: usize = entries[range_start..].iter() - .filter(|n| matches!(n, crate::agent::context::AstNode::Branch { role: crate::agent::context::Role::Assistant, .. })) - .count(); - - activity.update(format!("finetune: scoring {} responses...", responses_considered)).await; - - let bg_tx_cb = bg_tx.clone(); - let stats = match learn::score_finetune_candidates( - &context, score_count, &client, threshold, - gen_alternates, &activity, - |c| { let _ = bg_tx_cb.send(BgEvent::FinetuneCandidate(c)); }, - ).await { - Ok((above_threshold, max_div)) => { - FinetuneScoringStats { - responses_considered, - above_threshold, - threshold, - max_divergence: max_div, - error: None, - } - } - Err(e) => FinetuneScoringStats { - responses_considered, - above_threshold: 0, - threshold, - max_divergence: 0.0, - error: Some(format!("{}", e)), - }, - }; - - shared.lock().unwrap().finetune_last_run = Some(stats); - // activity drops here, marking completion and notifying observers - }); - } async fn start_turn(&self, text: &str, target: StreamTarget) { { @@ -828,13 +670,11 @@ impl Mind { } }); - let mut bg_rx = self.bg_rx.lock().unwrap().take() - .expect("Mind::run() called twice"); let mut sub_handle: Option> = None; // Start finetune scoring at startup (scores existing conversation) if !self.config.no_agents { - self.start_finetune_scoring(); + self.finetune_scoring.trigger(); } loop { @@ -857,17 +697,6 @@ impl Mind { } } - Some(bg) = bg_rx.recv() => { - match bg { - BgEvent::ScoringDone => { - self.shared.lock().unwrap().scoring_in_flight = false; - } - BgEvent::FinetuneCandidate(c) => { - self.shared.lock().unwrap().finetune_candidates.push(c); - } - } - } - Some((result, target)) = turn_rx.recv() => { let _ = self.conscious_active.send(false); let model_switch = { diff --git a/src/subconscious/learn.rs b/src/subconscious/learn.rs index b7656bf..3021fc3 100644 --- a/src/subconscious/learn.rs +++ b/src/subconscious/learn.rs @@ -14,11 +14,14 @@ // with high divergence depend on memories the model // hasn't internalized. 2 API calls. +use std::sync::Arc; + use crate::agent::api::ApiClient; use crate::agent::context::{ Ast, AstNode, ContextState, Role, WireImage, is_assistant, is_memory_node, memory_key, render_branch_text, render_prior_context, }; +use crate::mind::{MindState, MindTriggered, TaskHandle}; use crate::subconscious::generate::gen_continuation; const SCORE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); @@ -376,6 +379,108 @@ where Ok(scored) } +/// Memory scoring — two modes sharing an in-flight handle (only one +/// runs at a time): `trigger()` for incremental, `trigger_full()` for +/// the N×M debug matrix. +pub struct MemoryScoring { + agent: Arc, + shared: Arc>, + scores_path: std::path::PathBuf, + task: TaskHandle, +} + +impl MemoryScoring { + pub fn new( + agent: Arc, + shared: Arc>, + scores_path: std::path::PathBuf, + ) -> Self { + Self { agent, shared, scores_path, task: TaskHandle::new() } + } + + pub fn trigger_full(&self) { + self.task.trigger_if_idle(run_full(self.agent.clone(), self.shared.clone())); + } +} + +impl MindTriggered for MemoryScoring { + fn trigger(&self) { + self.task.trigger_if_idle(run_incremental( + self.agent.clone(), self.shared.clone(), self.scores_path.clone(), + )); + } +} + +async fn run_incremental( + agent: Arc, + shared: Arc>, + scores_path: std::path::PathBuf, +) { + shared.lock().unwrap().scoring_in_flight = true; + agent.state.lock().await.changed.notify_one(); + + let cfg = crate::config::get(); + let max_age = cfg.scoring_interval_secs; + let response_window = cfg.scoring_response_window; + + let (context, client) = { + let ctx = agent.context.lock().await.clone(); + (ctx, agent.client.clone()) + }; + + let _result = score_memories_incremental( + &context, max_age as i64, response_window, &client, &agent, + |key: String, score: f64| { + let agent = agent.clone(); + let path = scores_path.clone(); + async move { + let scores_snapshot = { + let mut ctx = agent.context.lock().await; + let found = crate::mind::find_memory_by_key(&ctx, &key); + match found { + Some((section, i)) => { + ctx.set_score(section, i, Some(score)); + dbglog!("[scoring] persisted {} → {:.3} ({:?}[{}])", + key, score, section, i); + } + None => { + dbglog!( + "[scoring] DROP {}: find_memory_by_key None (id={}, cv={})", + key, ctx.identity().len(), ctx.conversation().len() + ); + } + } + let snapshot = crate::mind::collect_memory_scores(&ctx); + drop(ctx); + agent.state.lock().await.changed.notify_one(); + snapshot + }; + crate::mind::save_memory_scores(&scores_snapshot, &path); + } + }, + ).await; + + shared.lock().unwrap().scoring_in_flight = false; + agent.state.lock().await.changed.notify_one(); +} + +async fn run_full( + agent: Arc, + shared: Arc>, +) { + shared.lock().unwrap().scoring_in_flight = true; + agent.state.lock().await.changed.notify_one(); + + let client = agent.client.clone(); + match score_memories(&client, &agent).await { + Ok(()) => {}, + Err(e) => { dbglog!("[scoring-full] FAILED: {:#}", e); } + } + + shared.lock().unwrap().scoring_in_flight = false; + agent.state.lock().await.changed.notify_one(); +} + // ── Fine-tuning scoring ───────────────────────────────────────── /// Score which recent responses are candidates for fine-tuning. @@ -520,6 +625,100 @@ pub async fn score_finetune_candidates( Ok((total, max_divergence)) } +/// Stats from a finetune scoring run. Stored on MindState for UI display. +#[derive(Clone, Debug)] +pub struct FinetuneScoringStats { + pub responses_considered: usize, + pub above_threshold: usize, + pub threshold: f64, + pub max_divergence: f64, + pub error: Option, +} + +/// Finetune scoring — `trigger()` aborts any in-flight run and starts +/// a fresh one, clearing the previous candidates. +pub struct FinetuneScoring { + agent: Arc, + shared: Arc>, + task: TaskHandle, +} + +impl FinetuneScoring { + pub fn new( + agent: Arc, + shared: Arc>, + ) -> Self { + Self { agent, shared, task: TaskHandle::new() } + } +} + +impl MindTriggered for FinetuneScoring { + fn trigger(&self) { + self.task.trigger(run_finetune(self.agent.clone(), self.shared.clone())); + } +} + +async fn run_finetune( + agent: Arc, + shared: Arc>, +) { + let (threshold, gen_alternates) = { + let app = crate::config::app(); + (app.learn.threshold, app.learn.generate_alternates) + }; + + // Fresh run — clear previous candidates. + shared.lock().unwrap().finetune_candidates.clear(); + agent.state.lock().await.changed.notify_one(); + + let activity = crate::agent::start_activity(&agent, "finetune: scoring...").await; + + let (context, client) = { + let ctx = agent.context.lock().await; + (ctx.clone(), agent.client.clone()) + }; + + let entries = context.conversation(); + let score_count = entries.len() / 2; + let range_start = entries.len() - score_count; + let responses_considered: usize = entries[range_start..].iter() + .filter(|n| matches!(n, AstNode::Branch { role: Role::Assistant, .. })) + .count(); + + activity.update(format!("finetune: scoring {} responses...", responses_considered)).await; + + let stats = { + let shared = shared.clone(); + let agent = agent.clone(); + match score_finetune_candidates( + &context, score_count, &client, threshold, + gen_alternates, &activity, + move |c| { + shared.lock().unwrap().finetune_candidates.push(c); + if let Ok(st) = agent.state.try_lock() { st.changed.notify_one(); } + }, + ).await { + Ok((above_threshold, max_div)) => FinetuneScoringStats { + responses_considered, + above_threshold, + threshold, + max_divergence: max_div, + error: None, + }, + Err(e) => FinetuneScoringStats { + responses_considered, + above_threshold: 0, + threshold, + max_divergence: 0.0, + error: Some(format!("{}", e)), + }, + } + }; + + shared.lock().unwrap().finetune_last_run = Some(stats); + agent.state.lock().await.changed.notify_one(); +} + // ── Finetune config and persistence ───────────────────────────── use std::path::PathBuf; diff --git a/src/user/mod.rs b/src/user/mod.rs index 93da72c..e077167 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -504,6 +504,7 @@ async fn run( keep }); } + app.mind_state = Some(ms.clone()); } app.walked_count = mind.subconscious_walked().await.len();