From d5e6f55da9a59c3306fb6c645bdf5e96e5558f41 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 6 Apr 2026 21:48:12 -0400 Subject: [PATCH] Fix context budgeting and compaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Budget now counts exact message tokens matching what assemble_api_messages sends, not raw string content. Eliminates undercounting from formatting overhead (journal headers, personality separators, working stack). - Load journal before trimming so trim accounts for journal cost. - Compact before every turn, not just after turn completion. Prevents agent_cycle surfaced memories from pushing context over budget. - Move agent_cycle orchestration from Agent::turn to Mind::start_turn — surfaced memories and reflections now precede the user message. - Move AgentCycleState from Agent to Mind — it's orchestration, not per-agent state. memory_scoring_in_flight and memory_scores stay on Agent where they belong. - Tag DMN entries as ConversationEntry::Dmn — compaction evicts them first since they're ephemeral. Compaction also prefers evicting memories over conversation when memories exceed 50% of entry tokens. - Kill /retry slash command. Co-Authored-By: Proof of Concept --- src/agent/context.rs | 104 +++++++++++++++---- src/agent/mod.rs | 167 ++++++++----------------------- src/mind/mod.rs | 76 ++++++++++++-- src/subconscious/subconscious.rs | 16 +-- src/user/mod.rs | 1 - 5 files changed, 194 insertions(+), 170 deletions(-) diff --git a/src/agent/context.rs b/src/agent/context.rs index 5ce49ec..eb5692b 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -59,8 +59,6 @@ pub fn trim_entries( entries: &[ConversationEntry], tokenizer: &CoreBPE, ) -> Vec { - let count = |s: &str| tokenizer.encode_with_special_tokens(s).len(); - // --- Phase 1: dedup memory entries by key (keep last) --- let mut seen_keys: std::collections::HashMap<&str, usize> = std::collections::HashMap::new(); let mut drop_indices: std::collections::HashSet = std::collections::HashSet::new(); @@ -79,31 +77,77 @@ pub fn trim_entries( .collect(); // --- Phase 2: trim to fit context budget --- + // Everything in the context window is a message. Count them all, + // trim entries until the total fits. let max_tokens = context_budget_tokens(); - let identity_cost = count(&context.system_prompt) - + context.personality.iter().map(|(_, c)| count(c)).sum::(); - let journal_cost: usize = context.journal.iter().map(|e| count(&e.content)).sum(); - let available = max_tokens - .saturating_sub(identity_cost) - .saturating_sub(journal_cost); + let count_msg = |m: &Message| msg_token_count(tokenizer, m); + + let fixed_cost = count_msg(&Message::system(&context.system_prompt)) + + count_msg(&Message::user(context.render_context_message())) + + count_msg(&Message::user(render_journal(&context.journal))); let msg_costs: Vec = deduped.iter() - .map(|e| msg_token_count(tokenizer, e.message())).collect(); - let total: usize = msg_costs.iter().sum(); + .map(|e| count_msg(e.api_message())).collect(); + let entry_total: usize = msg_costs.iter().sum(); + let total: usize = fixed_cost + entry_total; - let mut skip = 0; + let mem_tokens: usize = deduped.iter().zip(&msg_costs) + .filter(|(e, _)| e.is_memory()) + .map(|(_, &c)| c).sum(); + let conv_tokens: usize = entry_total - mem_tokens; + + dbglog!("[trim] max_tokens={} fixed={} mem={} conv={} total={} entries={}", + max_tokens, fixed_cost, mem_tokens, conv_tokens, total, deduped.len()); + + // Phase 2a: evict all DMN entries first — they're ephemeral + let mut drop = vec![false; deduped.len()]; let mut trimmed = total; - while trimmed > available && skip < deduped.len() { - trimmed -= msg_costs[skip]; - skip += 1; + let mut cur_mem = mem_tokens; + + for i in 0..deduped.len() { + if deduped[i].is_dmn() { + drop[i] = true; + trimmed -= msg_costs[i]; + } } - // Walk forward to user message boundary - while skip < deduped.len() && deduped[skip].message().role != Role::User { - skip += 1; + // Phase 2b: if memories > 50% of entries, evict oldest memories + if cur_mem > conv_tokens && trimmed > max_tokens { + for i in 0..deduped.len() { + if drop[i] { continue; } + if !deduped[i].is_memory() { continue; } + if cur_mem <= conv_tokens { break; } + if trimmed <= max_tokens { break; } + drop[i] = true; + trimmed -= msg_costs[i]; + cur_mem -= msg_costs[i]; + } } - deduped[skip..].to_vec() + // Phase 2b: drop oldest entries until under budget + for i in 0..deduped.len() { + if trimmed <= max_tokens { break; } + if drop[i] { continue; } + drop[i] = true; + trimmed -= msg_costs[i]; + } + + // Walk forward to include complete conversation boundaries + let mut result: Vec = Vec::new(); + let mut skipping = true; + for (i, entry) in deduped.into_iter().enumerate() { + if skipping { + if drop[i] { continue; } + // Snap to user message boundary + if entry.message().role != Role::User { continue; } + skipping = false; + } + result.push(entry); + } + + dbglog!("[trim] result={} trimmed_total={}", result.len(), trimmed); + + result } /// Count the token footprint of a message using BPE tokenization. @@ -153,6 +197,8 @@ pub fn is_stream_error(err: &anyhow::Error) -> bool { pub enum ConversationEntry { Message(Message), Memory { key: String, message: Message }, + /// DMN heartbeat/autonomous prompt — evicted aggressively during compaction. + Dmn(Message), } // Custom serde: serialize Memory with a "memory_key" field added to the message, @@ -161,7 +207,7 @@ impl Serialize for ConversationEntry { fn serialize(&self, s: S) -> Result { use serde::ser::SerializeMap; match self { - Self::Message(m) => m.serialize(s), + Self::Message(m) | Self::Dmn(m) => m.serialize(s), Self::Memory { key, message } => { let json = serde_json::to_value(message).map_err(serde::ser::Error::custom)?; let mut map = s.serialize_map(None)?; @@ -195,7 +241,7 @@ impl ConversationEntry { /// Get the API message for sending to the model. pub fn api_message(&self) -> &Message { match self { - Self::Message(m) => m, + Self::Message(m) | Self::Dmn(m) => m, Self::Memory { message, .. } => message, } } @@ -204,10 +250,14 @@ impl ConversationEntry { matches!(self, Self::Memory { .. }) } + pub fn is_dmn(&self) -> bool { + matches!(self, Self::Dmn(_)) + } + /// Get a reference to the inner message. pub fn message(&self) -> &Message { match self { - Self::Message(m) => m, + Self::Message(m) | Self::Dmn(m) => m, Self::Memory { message, .. } => message, } } @@ -215,7 +265,7 @@ impl ConversationEntry { /// Get a mutable reference to the inner message. pub fn message_mut(&mut self) -> &mut Message { match self { - Self::Message(m) => m, + Self::Message(m) | Self::Dmn(m) => m, Self::Memory { message, .. } => message, } } @@ -232,6 +282,16 @@ pub struct ContextState { pub entries: Vec, } +pub fn render_journal(entries: &[JournalEntry]) -> String { + if entries.is_empty() { return String::new(); } + let mut text = String::from("[Earlier — from your journal]\n\n"); + for entry in entries { + use std::fmt::Write; + writeln!(text, "## {}\n{}\n", entry.timestamp.format("%Y-%m-%dT%H:%M"), entry.content).ok(); + } + text +} + impl ContextState { pub fn render_context_message(&self) -> String { let mut parts: Vec = self.personality.iter() diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 463b07b..2975514 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -160,7 +160,7 @@ pub struct Agent { pub pending_model_switch: Option, pub pending_dmn_pause: bool, /// Persistent conversation log — append-only record of all messages. - conversation_log: Option, + pub conversation_log: Option, /// BPE tokenizer for token counting (cl100k_base — close enough /// for Claude and Qwen budget allocation, ~85-90% count accuracy). tokenizer: CoreBPE, @@ -175,24 +175,16 @@ pub struct Agent { pub session_id: String, /// Incremented on compaction — UI uses this to detect resets. pub generation: u64, - /// Agent orchestration state (surface-observe, journal, reflect). - /// TODO: move to Session — it's session-level, not agent-level. - pub agent_cycles: crate::subconscious::subconscious::AgentCycleState, + /// Whether incremental memory scoring is currently running. + pub memory_scoring_in_flight: bool, + /// Latest per-memory scores from incremental scoring. + pub memory_scores: Vec<(String, f64)>, /// Shared active tools — Agent writes, TUI reads. pub active_tools: tools::SharedActiveTools, /// Fires when agent state changes — UI wakes on this instead of polling. pub changed: Arc, } -fn render_journal(entries: &[context::JournalEntry]) -> String { - if entries.is_empty() { return String::new(); } - let mut text = String::from("[Earlier — from your journal]\n\n"); - for entry in entries { - use std::fmt::Write; - writeln!(text, "## {}\n{}\n", entry.timestamp.format("%Y-%m-%dT%H:%M"), entry.content).ok(); - } - text -} impl Agent { pub fn new( @@ -216,7 +208,6 @@ impl Agent { entries: Vec::new(), }; let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); - let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&session_id); let mut agent = Self { client, tools: tools::tools(), @@ -238,7 +229,8 @@ impl Agent { prompt_file, session_id, generation: 0, - agent_cycles, + memory_scoring_in_flight: false, + memory_scores: Vec::new(), active_tools, changed: Arc::new(tokio::sync::Notify::new()), }; @@ -258,7 +250,7 @@ impl Agent { if !ctx.is_empty() { msgs.push(Message::user(ctx)); } - let jnl = render_journal(&self.context.journal); + let jnl = context::render_journal(&self.context.journal); if !jnl.is_empty() { msgs.push(Message::user(jnl)); } @@ -267,21 +259,6 @@ impl Agent { } /// Run agent orchestration cycle, returning structured output. - fn run_agent_cycle(&mut self) -> crate::subconscious::subconscious::AgentCycleOutput { - let transcript_path = self.conversation_log.as_ref() - .map(|l| l.path().to_string_lossy().to_string()) - .unwrap_or_default(); - - let session = crate::session::HookSession::from_fields( - self.session_id.clone(), - transcript_path, - "UserPromptSubmit".into(), - ); - - self.agent_cycles.trigger(&session); - std::mem::take(&mut self.agent_cycles.last_output) - } - /// Push a conversation message — stamped and logged. pub fn push_message(&mut self, mut msg: Message) { msg.stamp(); @@ -289,7 +266,7 @@ impl Agent { self.push_entry(entry); } - fn push_entry(&mut self, entry: ConversationEntry) { + pub fn push_entry(&mut self, entry: ConversationEntry) { if let Some(ref log) = self.conversation_log { if let Err(e) = log.append(&entry) { eprintln!("warning: failed to log entry: {:#}", e); @@ -328,31 +305,11 @@ impl Agent { pub async fn turn( agent: Arc>, ) -> Result { - // --- Pre-loop setup (lock 1): agent cycle, memories, user input --- + // --- Pre-loop setup (lock 1): collect finished tools --- let active_tools = { let mut finished = Vec::new(); let tools = { - let mut me = agent.lock().await; - - let cycle = me.run_agent_cycle(); - for key in &cycle.surfaced_keys { - if let Some(rendered) = crate::cli::node::render_node( - &crate::store::Store::load().unwrap_or_default(), key, - ) { - let mut msg = Message::user(format!( - "\n--- {} (surfaced) ---\n{}\n", - key, rendered, - )); - msg.stamp(); - me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); - } - } - if let Some(ref reflection) = cycle.reflection { - me.push_message(Message::user(format!( - "\n--- subconscious reflection ---\n{}\n", - reflection.trim(), - ))); - } + let me = agent.lock().await; // Collect completed background tool handles — remove from active list // but don't await yet (MutexGuard isn't Send). @@ -647,83 +604,38 @@ impl Agent { /// Build context state summary for the debug screen. pub fn context_state_summary(&self, memory_scores: Option<&learn::MemoryScore>) -> Vec { - let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); + let count_msg = |m: &Message| context::msg_token_count(&self.tokenizer, m); let mut sections = Vec::new(); - // System prompt + // System prompt — counted as the actual message sent + let system_msg = Message::system(&self.context.system_prompt); sections.push(ContextSection { name: "System prompt".into(), - tokens: count(&self.context.system_prompt), + tokens: count_msg(&system_msg), content: self.context.system_prompt.clone(), children: Vec::new(), }); - // Personality — parent with file children - let personality_children: Vec = self.context.personality.iter() - .map(|(name, content)| ContextSection { - name: name.clone(), - tokens: count(content), - content: content.clone(), - children: Vec::new(), - }) - .collect(); - let personality_tokens: usize = personality_children.iter().map(|c| c.tokens).sum(); + // Context message (personality + working stack) — counted as the + // single user message that assemble_api_messages sends + let context_rendered = self.context.render_context_message(); + let context_msg = Message::user(&context_rendered); sections.push(ContextSection { - name: format!("Personality ({} files)", personality_children.len()), - tokens: personality_tokens, - content: String::new(), - children: personality_children, + name: format!("Identity ({} files + stack)", self.context.personality.len()), + tokens: count_msg(&context_msg), + content: context_rendered, + children: Vec::new(), }); - // Journal - { - let journal_children: Vec = self.context.journal.iter() - .map(|entry| { - let preview: String = entry.content.lines() - .find(|l| !l.trim().is_empty()) - .unwrap_or("").chars().take(60).collect(); - ContextSection { - name: format!("{}: {}", entry.timestamp.format("%Y-%m-%dT%H:%M"), preview), - tokens: count(&entry.content), - content: entry.content.clone(), - children: Vec::new(), - } - }) - .collect(); - let journal_tokens: usize = journal_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Journal ({} entries)", journal_children.len()), - tokens: journal_tokens, - content: String::new(), - children: journal_children, - }); - } - - // Working stack — instructions + items as children - let instructions = std::fs::read_to_string(working_stack::instructions_path()) - .unwrap_or_default(); - let mut stack_children = vec![ContextSection { - name: "Instructions".into(), - tokens: count(&instructions), - content: instructions, - children: Vec::new(), - }]; - for (i, item) in self.context.working_stack.iter().enumerate() { - let marker = if i == self.context.working_stack.len() - 1 { "→" } else { " " }; - stack_children.push(ContextSection { - name: format!("{} [{}] {}", marker, i, item), - tokens: count(item), - content: String::new(), - children: Vec::new(), - }); - } - let stack_tokens: usize = stack_children.iter().map(|c| c.tokens).sum(); + // Journal — counted as the single rendered message sent + let journal_rendered = context::render_journal(&self.context.journal); + let journal_msg = Message::user(&journal_rendered); sections.push(ContextSection { - name: format!("Working stack ({} items)", self.context.working_stack.len()), - tokens: stack_tokens, - content: String::new(), - children: stack_children, + name: format!("Journal ({} entries)", self.context.journal.len()), + tokens: if journal_rendered.is_empty() { 0 } else { count_msg(&journal_msg) }, + content: journal_rendered, + children: Vec::new(), }); // Memory nodes — extracted from Memory entries in the conversation @@ -737,7 +649,6 @@ impl Agent { ConversationEntry::Memory { key, .. } => key.as_str(), _ => unreachable!(), }; - let text = entry.message().content_text(); // Show node weight from graph (updated by incremental scorer) let graph_weight = crate::hippocampus::store::Store::load().ok() .and_then(|s| s.nodes.get(key).map(|n| n.weight)); @@ -754,7 +665,7 @@ impl Agent { }; ContextSection { name: label, - tokens: count(text), + tokens: count_msg(entry.message()), content: String::new(), children: Vec::new(), } @@ -769,9 +680,9 @@ impl Agent { }); } - // Conversation — each message as a child - let conv_messages = &self.context.entries; - let conv_children: Vec = conv_messages.iter().enumerate() + // Conversation — non-memory entries only (memories counted above) + let conv_children: Vec = self.context.entries.iter().enumerate() + .filter(|(_, e)| !e.is_memory()) .map(|(i, entry)| { let m = entry.message(); let text = m.content.as_ref() @@ -797,7 +708,7 @@ impl Agent { } } }; - let tokens = count(&text); + let tokens = count_msg(entry.api_message()); let cfg = crate::config::get(); let role_name = if entry.is_memory() { "mem".to_string() } else { match m.role { @@ -1017,7 +928,10 @@ impl Agent { let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); let before_conv = before - before_mem; - // Dedup memory, trim to budget, reload journal + // Load journal BEFORE trimming so trim accounts for journal cost + self.load_startup_journal(); + + // Dedup memory, trim to budget let entries = self.context.entries.clone(); self.context.entries = crate::agent::context::trim_entries( &self.context, @@ -1031,9 +945,6 @@ impl Agent { dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", before, after, before_mem, after_mem, before_conv, after_conv); - - - self.load_startup_journal(); self.generation += 1; self.last_prompt_tokens = 0; self.publish_context_state(); diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 625bc13..4c9871f 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -199,6 +199,7 @@ pub struct Mind { pub agent: Arc>, pub shared: Arc, pub config: SessionConfig, + agent_cycles: tokio::sync::Mutex, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, bg_tx: mpsc::UnboundedSender, @@ -219,7 +220,7 @@ impl Mind { config.session_dir.join("conversation.jsonl"), ).ok(); - let agent = Arc::new(tokio::sync::Mutex::new(Agent::new( + let ag = Agent::new( client, config.system_prompt.clone(), config.context_parts.clone(), @@ -228,7 +229,9 @@ impl Mind { conversation_log, shared_context, shared_active_tools, - ))); + ); + let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&ag.session_id); + let agent = Arc::new(tokio::sync::Mutex::new(ag)); let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); let (turn_watch, _) = tokio::sync::watch::channel(false); @@ -238,7 +241,8 @@ impl Mind { sup.load_config(); sup.ensure_running(); - Self { agent, shared, config, turn_tx, turn_watch, bg_tx, + Self { agent, shared, config, agent_cycles: tokio::sync::Mutex::new(agent_cycles), + turn_tx, turn_watch, bg_tx, bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup } } @@ -319,8 +323,8 @@ impl Mind { tokio::spawn(async move { let (context, client) = { let mut ag = agent.lock().await; - if ag.agent_cycles.memory_scoring_in_flight { return; } - ag.agent_cycles.memory_scoring_in_flight = true; + if ag.memory_scoring_in_flight { return; } + ag.memory_scoring_in_flight = true; (ag.context.clone(), ag.client_clone()) }; let result = learn::score_memories_incremental( @@ -328,8 +332,8 @@ impl Mind { ).await; { let mut ag = agent.lock().await; - ag.agent_cycles.memory_scoring_in_flight = false; - if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); } + ag.memory_scoring_in_flight = false; + if let Ok(ref scores) = result { ag.memory_scores = scores.clone(); } } let _ = bg_tx.send(BgEvent::ScoringDone); }); @@ -342,7 +346,63 @@ impl Mind { async fn start_turn(&self, text: &str, target: StreamTarget) { { let mut ag = self.agent.lock().await; - ag.push_message(crate::agent::api::types::Message::user(text)); + + // Run agent cycle — surface memories and reflection before the user message + let transcript_path = ag.conversation_log.as_ref() + .map(|l| l.path().to_string_lossy().to_string()) + .unwrap_or_default(); + let session = crate::session::HookSession::from_fields( + ag.session_id.clone(), + transcript_path, + "UserPromptSubmit".into(), + ); + let mut cycles = self.agent_cycles.lock().await; + cycles.trigger(&session); + let cycle = std::mem::take(&mut cycles.last_output); + drop(cycles); + for key in &cycle.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node( + &crate::store::Store::load().unwrap_or_default(), &key, + ) { + let mut msg = crate::agent::api::types::Message::user(format!( + "\n--- {} (surfaced) ---\n{}\n", + key, rendered, + )); + msg.stamp(); + ag.push_entry(crate::agent::context::ConversationEntry::Memory { + key: key.clone(), message: msg, + }); + } + } + if let Some(ref reflection) = cycle.reflection { + ag.push_message(crate::agent::api::types::Message::user(format!( + "\n--- subconscious reflection ---\n{}\n", + reflection.trim(), + ))); + } + + match target { + StreamTarget::Conversation => { + ag.push_message(crate::agent::api::types::Message::user(text)); + } + StreamTarget::Autonomous => { + let mut msg = crate::agent::api::types::Message::user(text); + msg.stamp(); + ag.push_entry(crate::agent::context::ConversationEntry::Dmn(msg)); + } + } + + // Compact if over budget before sending + let threshold = compaction_threshold(&self.config.app) as usize; + ag.publish_context_state(); + let used = { + let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default(); + crate::agent::context::sections_used(§ions) + }; + if used > threshold { + ag.compact(); + ag.notify("compacted"); + } } self.shared.lock().unwrap().turn_active = true; let _ = self.turn_watch.send(true); diff --git a/src/subconscious/subconscious.rs b/src/subconscious/subconscious.rs index 9bb6927..5088806 100644 --- a/src/subconscious/subconscious.rs +++ b/src/subconscious/subconscious.rs @@ -104,10 +104,6 @@ pub struct AgentCycleState { log_file: Option, pub agents: Vec, pub last_output: AgentCycleOutput, - /// Whether incremental memory scoring is currently running. - pub memory_scoring_in_flight: bool, - /// Latest per-memory scores from incremental scoring. - pub memory_scores: Vec<(String, f64)>, } const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; @@ -134,8 +130,6 @@ impl AgentCycleState { reflection: None, sleep_secs: None, }, - memory_scoring_in_flight: false, - memory_scores: Vec::new(), } } @@ -180,17 +174,17 @@ impl AgentCycleState { } } - pub fn snapshots(&self) -> Vec { + pub fn snapshots(&self, scoring_in_flight: bool, scored_count: usize) -> Vec { let mut snaps: Vec = self.agents.iter().map(|a| a.snapshot()).collect(); snaps.push(AgentSnapshot { name: "memory-scoring".to_string(), pid: None, - phase: if self.memory_scoring_in_flight { + phase: if scoring_in_flight { Some("scoring...".into()) - } else if self.memory_scores.is_empty() { + } else if scored_count == 0 { None } else { - Some(format!("{} scored", self.memory_scores.len())) + Some(format!("{} scored", scored_count)) }, log_path: None, }); @@ -210,7 +204,7 @@ impl AgentCycleState { /// Save current state for the Claude Code hook path. pub fn save(&self, session_id: &str) { - let state = SavedAgentState { agents: self.snapshots() }; + let state = SavedAgentState { agents: self.snapshots(false, 0) }; state.save(session_id); } diff --git a/src/user/mod.rs b/src/user/mod.rs index 05bd960..5e3d70c 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -316,7 +316,6 @@ pub async fn run( mind_tx: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { let agent = &mind.agent; - let shared_mind = &mind.shared; // UI-owned state let mut idle_state = crate::thalamus::idle::State::new(); idle_state.load();