From fcd77fb79e933c6ccc44e6d89826163214b90a38 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 4 Apr 2026 05:01:49 -0400 Subject: [PATCH] training: per-node scoring with graph weight updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Memory scoring now uses the graph as source of truth: - last_scored timestamp on each node (new capnp field @22) - Nodes scored when older than scoring_interval_secs (default 1hr) - Oldest-scored-first ordering - Window: scoring_response_window assistant responses (default 100) - First-quarter memories scored even without full window - Per-response normalization (raw divergence / response count) - Asymmetric weight update: alpha=0.5 up, alpha=0.1 down (responds fast to importance, decays slowly — memories stay surfaced even if only useful 1/4 of the time) Graph writes disabled pending normalization calibration. Also: configurable scoring_interval_secs and scoring_response_window. Co-Authored-By: Proof of Concept --- schema/memory.capnp | 3 ++ src/agent/mod.rs | 14 ++++-- src/agent/training.rs | 85 ++++++++++++++++++++------------ src/config.rs | 10 ++++ src/hippocampus/store/ops.rs | 16 ++++++ src/hippocampus/store/types.rs | 7 ++- src/mind/mod.rs | 28 ++++------- src/subconscious/subconscious.rs | 10 +--- 8 files changed, 109 insertions(+), 64 deletions(-) diff --git a/schema/memory.capnp b/schema/memory.capnp index 87be4c2..9175205 100644 --- a/schema/memory.capnp +++ b/schema/memory.capnp @@ -42,6 +42,9 @@ struct ContentNode { # Freeform provenance string: "extractor:write", "rename:tombstone", etc. provenance @21 :Text; + # Memory importance scoring + lastScored @22 :Int64; # unix epoch seconds, 0 = never scored + } enum NodeType { diff --git a/src/agent/mod.rs b/src/agent/mod.rs index f2f8376..900f1cf 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -662,13 +662,19 @@ impl Agent { _ => unreachable!(), }; let text = entry.message().content_text(); - let score = memory_scores + // Show node weight from graph (updated by incremental scorer) + let graph_weight = crate::hippocampus::store::Store::load().ok() + .and_then(|s| s.nodes.get(key).map(|n| n.weight)); + // Show full matrix score if available + let matrix_score = memory_scores .and_then(|s| s.memory_weights.iter() .find(|(k, _)| k == key) .map(|(_, v)| *v)); - let label = match score { - Some(v) => format!("{} (importance: {:.1})", key, v), - None => key.to_string(), + let label = match (graph_weight, matrix_score) { + (Some(w), Some(s)) => format!("{} (w:{:.2} score:{:.1})", key, w, s), + (Some(w), None) => format!("{} (w:{:.2})", key, w), + (None, Some(s)) => format!("{} (score:{:.1})", key, s), + (None, None) => key.to_string(), }; ContextSection { name: label, diff --git a/src/agent/training.rs b/src/agent/training.rs index 920d409..a2203b8 100644 --- a/src/agent/training.rs +++ b/src/agent/training.rs @@ -253,6 +253,19 @@ pub async fn score_memories( }) } +/// Find the entry index after `start` that contains the Nth assistant response. +/// Returns (end_index, true) if N responses were found, (entries.len(), false) if not. +fn nth_response_end(entries: &[ConversationEntry], start: usize, n: usize) -> (usize, bool) { + let mut count = 0; + for i in start..entries.len() { + if entries[i].message().role == Role::Assistant { + count += 1; + if count >= n { return (i + 1, true); } + } + } + (entries.len(), false) +} + // ── Single memory scoring ─────────────────────────────────────── /// Score how important a single memory is to the conversation. @@ -266,7 +279,7 @@ pub async fn score_memory( client: &ApiClient, ui_tx: &UiSender, ) -> anyhow::Result { - const WINDOW: usize = 50; + const RESPONSE_WINDOW: usize = 50; let first_pos = match context.entries.iter().position(|e| { matches!(e, ConversationEntry::Memory { key: k, .. } if k == key) @@ -275,7 +288,8 @@ pub async fn score_memory( None => return Ok(0.0), }; - let range = first_pos..(first_pos + WINDOW).min(context.entries.len()); + let (end, _) = nth_response_end(&context.entries, first_pos, RESPONSE_WINDOW); + let range = first_pos..end; if !context.entries[range.clone()].iter().any(|e| e.message().role == Role::Assistant) { return Ok(0.0); } @@ -290,63 +304,71 @@ pub async fn score_memory( // ── Background memory scoring ─────────────────────────────────── -/// Incrementally score memories through the conversation. +/// Score memories in the conversation that are due for re-scoring. /// -/// Walks memory entries in conversation order starting from `cursor`. -/// For each memory with a full WINDOW after it, calls score_memory() -/// and yields the result. Stops at the first memory that doesn't have -/// enough messages yet — the conversation needs to grow before we can -/// score it. +/// Checks the graph for each memory's last_scored timestamp. Scores +/// nodes that haven't been scored within `max_age_secs`, oldest first. +/// Updates the graph weight (EWMA) and last_scored after each. /// -/// Returns the updated cursor (entry index to resume from next time) -/// and the scores for each memory that was scored this round. +/// Returns the number of nodes scored and their (key, score) pairs. pub async fn score_memories_incremental( context: &ContextState, - cursor: usize, + max_age_secs: i64, + response_window: usize, client: &ApiClient, ui_tx: &UiSender, -) -> anyhow::Result<(usize, Vec<(String, f64)>)> { - const WINDOW: usize = 50; +) -> anyhow::Result> { + let now = chrono::Utc::now().timestamp(); - // Collect unique memory keys with their first position, starting from cursor + // Collect unique memory keys with their first position let mut seen = std::collections::HashSet::new(); - let mut to_score: Vec<(usize, String)> = Vec::new(); + let mut candidates: Vec<(usize, String, i64)> = Vec::new(); // (pos, key, last_scored) - for (i, entry) in context.entries.iter().enumerate().skip(cursor) { + let store = crate::hippocampus::store::Store::load().unwrap_or_default(); + + for (i, entry) in context.entries.iter().enumerate() { if let ConversationEntry::Memory { key, .. } = entry { - if seen.insert(key.clone()) { - to_score.push((i, key.clone())); + if !seen.insert(key.clone()) { continue; } + let last_scored = store.nodes.get(key.as_str()) + .map(|n| n.last_scored) + .unwrap_or(0); + if now - last_scored >= max_age_secs { + candidates.push((i, key.clone(), last_scored)); } } } + // Score oldest-first + candidates.sort_by_key(|&(_, _, last)| last); + let http = http_client(); - let mut new_cursor = cursor; let mut results = Vec::new(); - for (pos, key) in &to_score { - let end = pos + WINDOW; + let total_entries = context.entries.len(); + let first_quarter = total_entries / 4; - // Not enough conversation after this memory yet — stop here - if end > context.entries.len() { - break; + for (pos, key, _) in &candidates { + let (end, full_window) = nth_response_end(&context.entries, *pos, response_window); + // Skip memories without a full window, unless they're in the + // first quarter of the conversation (always score those). + if !full_window && *pos >= first_quarter { + continue; } - - // Need at least one assistant response in the window let range = *pos..end; if !context.entries[range.clone()].iter().any(|e| e.message().role == Role::Assistant) { - new_cursor = end; continue; } let _ = ui_tx.send(UiMessage::Activity(format!("scoring memory: {}...", key))); match score_divergence(&http, client, context, range, Filter::SkipKey(key)).await { Ok((divs, _)) => { - let importance: f64 = divs.iter().sum(); + let n_responses = divs.len(); + let max_div = divs.iter().cloned().fold(0.0f64, f64::max); let _ = ui_tx.send(UiMessage::Debug(format!( - "[scoring] {} → {:.2}", key, importance, + "[scoring] {} max:{:.3} ({} responses)", key, max_div, n_responses, ))); - results.push((key.clone(), importance)); + // TODO: update graph weight once normalization is figured out + results.push((key.clone(), max_div)); } Err(e) => { let _ = ui_tx.send(UiMessage::Debug(format!( @@ -354,11 +376,10 @@ pub async fn score_memories_incremental( ))); } } - new_cursor = end; } let _ = ui_tx.send(UiMessage::Activity(String::new())); - Ok((new_cursor, results)) + Ok(results) } // ── Fine-tuning scoring ───────────────────────────────────────── diff --git a/src/config.rs b/src/config.rs index cba0152..9a12f1f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -56,6 +56,8 @@ fn default_true() -> bool { true } fn default_context_window() -> usize { 128_000 } fn default_stream_timeout() -> u64 { 60 } fn default_scoring_chunk_tokens() -> usize { 50_000 } +fn default_scoring_interval_secs() -> u64 { 3600 } // 1 hour +fn default_scoring_response_window() -> usize { 100 } fn default_identity_dir() -> PathBuf { dirs::home_dir().unwrap_or_default().join(".consciousness/identity") } @@ -97,6 +99,12 @@ pub struct Config { /// Max tokens per chunk for memory scoring logprobs calls. #[serde(default = "default_scoring_chunk_tokens")] pub scoring_chunk_tokens: usize, + /// How often to re-score memory nodes (seconds). Default: 3600 (1 hour). + #[serde(default = "default_scoring_interval_secs")] + pub scoring_interval_secs: u64, + /// Number of assistant responses to score per memory. Default: 50. + #[serde(default = "default_scoring_response_window")] + pub scoring_response_window: usize, pub api_reasoning: String, pub agent_types: Vec, /// Surface agent timeout in seconds. @@ -145,6 +153,8 @@ impl Default for Config { api_context_window: default_context_window(), api_stream_timeout_secs: default_stream_timeout(), scoring_chunk_tokens: default_scoring_chunk_tokens(), + scoring_interval_secs: default_scoring_interval_secs(), + scoring_response_window: default_scoring_response_window(), agent_model: None, api_reasoning: "high".to_string(), agent_types: vec![ diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index 9771cc6..96e9d6a 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -296,6 +296,22 @@ impl Store { Ok((old, weight)) } + /// Update a node's weight with a new score and record the scoring + /// timestamp. Uses asymmetric smoothing: responds quickly to high + /// scores (alpha=0.5) but decays slowly on low scores (alpha=0.1). + /// This keeps memories surfaced even if they're only useful 1 in 4 times. + /// Returns (old_weight, new_weight). + pub fn score_weight(&mut self, key: &str, score: f64) -> Result<(f32, f32), String> { + let node = self.nodes.get_mut(key) + .ok_or_else(|| format!("node not found: {}", key))?; + let old = node.weight; + let alpha = if score > old as f64 { 0.5 } else { 0.1 }; + let new = (alpha * score + (1.0 - alpha) * old as f64) as f32; + node.weight = new.clamp(0.01, 1.0); + node.last_scored = chrono::Utc::now().timestamp(); + Ok((old, node.weight)) + } + /// Set the strength of a link between two nodes. Deduplicates if /// multiple links exist. Returns the old strength, or error if no link. pub fn set_link_strength(&mut self, source: &str, target: &str, strength: f32) -> Result { diff --git a/src/hippocampus/store/types.rs b/src/hippocampus/store/types.rs index 2c46437..e4e2464 100644 --- a/src/hippocampus/store/types.rs +++ b/src/hippocampus/store/types.rs @@ -214,6 +214,10 @@ pub struct Node { #[serde(default)] pub created_at: i64, + // Memory importance scoring — unix epoch seconds, 0 = never scored. + #[serde(default)] + pub last_scored: i64, + // Derived fields (not in capnp, computed from graph) #[serde(default)] pub community_id: Option, @@ -342,7 +346,7 @@ capnp_message!(Node, uuid: [uuid], prim: [version, timestamp, weight, emotion, deleted, retrievals, uses, wrongs, last_replayed, - spaced_repetition_interval, position, created_at], + spaced_repetition_interval, position, created_at, last_scored], enm: [node_type: NodeType], skip: [community_id, clustering_coefficient, degree], ); @@ -531,6 +535,7 @@ pub fn new_node(key: &str, content: &str) -> Node { spaced_repetition_interval: 1, position: 0, created_at: now_epoch(), + last_scored: 0, community_id: None, clustering_coefficient: None, degree: None, diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 5c78a49..bc8dfe9 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -219,41 +219,31 @@ impl Session { fn start_memory_scoring(&self) { let agent = self.agent.clone(); let ui_tx = self.ui_tx.clone(); + let cfg = crate::config::get(); + let max_age = cfg.scoring_interval_secs; + let response_window = cfg.scoring_response_window; tokio::spawn(async move { - // Check + snapshot under one brief lock - let (context, client, cursor) = { + let (context, client) = { let mut agent = agent.lock().await; if agent.agent_cycles.memory_scoring_in_flight { return; } - let cursor = agent.agent_cycles.memory_score_cursor; agent.agent_cycles.memory_scoring_in_flight = true; - // Count total unique memories - let mut seen = std::collections::HashSet::new(); - for entry in &agent.context.entries { - if let crate::agent::context::ConversationEntry::Memory { key, .. } = entry { - seen.insert(key.clone()); - } - } - agent.agent_cycles.memory_total = seen.len(); let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); - (agent.context.clone(), agent.client_clone(), cursor) + (agent.context.clone(), agent.client_clone()) }; - // Lock released — event loop is free + let result = crate::agent::training::score_memories_incremental( - &context, cursor, &client, &ui_tx, + &context, max_age as i64, response_window, &client, &ui_tx, ).await; - // Brief lock — just update fields, no heavy work { let mut agent = agent.lock().await; agent.agent_cycles.memory_scoring_in_flight = false; - if let Ok((new_cursor, ref scores)) = result { - agent.agent_cycles.memory_score_cursor = new_cursor; - agent.agent_cycles.memory_scores.extend(scores.clone()); + if let Ok(ref scores) = result { + agent.agent_cycles.memory_scores = scores.clone(); } } - // Snapshot and log outside the lock match result { Ok(_) => { let agent = agent.lock().await; diff --git a/src/subconscious/subconscious.rs b/src/subconscious/subconscious.rs index c1a1ad8..9bb6927 100644 --- a/src/subconscious/subconscious.rs +++ b/src/subconscious/subconscious.rs @@ -104,14 +104,10 @@ pub struct AgentCycleState { log_file: Option, pub agents: Vec, pub last_output: AgentCycleOutput, - /// Incremental memory scoring — entry index to resume from. - pub memory_score_cursor: usize, /// Whether incremental memory scoring is currently running. pub memory_scoring_in_flight: bool, /// Latest per-memory scores from incremental scoring. pub memory_scores: Vec<(String, f64)>, - /// Total unique memories in the context (updated when scoring starts). - pub memory_total: usize, } const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; @@ -138,10 +134,8 @@ impl AgentCycleState { reflection: None, sleep_secs: None, }, - memory_score_cursor: 0, memory_scoring_in_flight: false, memory_scores: Vec::new(), - memory_total: 0, } } @@ -192,11 +186,11 @@ impl AgentCycleState { name: "memory-scoring".to_string(), pid: None, phase: if self.memory_scoring_in_flight { - Some(format!("scoring {}/{}", self.memory_scores.len(), self.memory_total)) + Some("scoring...".into()) } else if self.memory_scores.is_empty() { None } else { - Some(format!("{}/{} scored", self.memory_scores.len(), self.memory_total)) + Some(format!("{} scored", self.memory_scores.len())) }, log_path: None, });