From c22b8c3a6f0202d8a08a29de5e9d36c30321d713 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 6 Apr 2026 20:34:51 -0400 Subject: [PATCH] =?UTF-8?q?Unify=20budget=20and=20context=20state=20?= =?UTF-8?q?=E2=80=94=20single=20source=20of=20truth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kill ContextBudget and recompute_budget entirely. Budget percentages, used token counts, and compaction threshold checks now all derive from the ContextSection tree built by context_state_summary(). This eliminates the stale-budget bug where the cached budget diverged from actual context contents. Also: remove MindCommand::Turn — user input flows through shared_mind.input exclusively. Mind::start_turn() atomically moves text from pending input into the agent's context and spawns the turn. Kill /retry. Make Agent::turn() take no input parameter. Co-Authored-By: Proof of Concept --- src/agent/context.rs | 74 ++++++++++++++++---------------------------- src/agent/mod.rs | 30 +++++++----------- src/mind/mod.rs | 67 +++++++++++++++++++++++---------------- src/user/chat.rs | 29 +++-------------- 4 files changed, 83 insertions(+), 117 deletions(-) diff --git a/src/agent/context.rs b/src/agent/context.rs index 6042fea..5ce49ec 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -230,34 +230,9 @@ pub struct ContextState { /// Conversation entries — messages and memory, interleaved in order. /// Does NOT include system prompt, personality, or journal. pub entries: Vec, - /// Cached token budget — recomputed when entries change, not every frame. - pub budget: ContextBudget, } impl ContextState { - /// Compute the context budget from typed sources and cache the result. - pub fn recompute_budget(&mut self, count_str: &dyn Fn(&str) -> usize, - count_msg: &dyn Fn(&Message) -> usize, - window_tokens: usize) -> &ContextBudget { - let id = count_str(&self.system_prompt) - + self.personality.iter().map(|(_, c)| count_str(c)).sum::(); - let jnl: usize = self.journal.iter().map(|e| count_str(&e.content)).sum(); - let mut mem = 0; - let mut conv = 0; - for entry in &self.entries { - let tokens = count_msg(entry.api_message()); - if entry.is_memory() { mem += tokens } else { conv += tokens } - } - self.budget = ContextBudget { - identity_tokens: id, - memory_tokens: mem, - journal_tokens: jnl, - conversation_tokens: conv, - window_tokens, - }; - &self.budget - } - pub fn render_context_message(&self) -> String { let mut parts: Vec = self.personality.iter() .map(|(name, content)| format!("## {}\n\n{}", name, content)) @@ -281,28 +256,33 @@ impl ContextState { } } -#[derive(Debug, Clone, Default)] -pub struct ContextBudget { - pub identity_tokens: usize, - pub memory_tokens: usize, - pub journal_tokens: usize, - pub conversation_tokens: usize, - pub window_tokens: usize, +/// Total tokens used across all context sections. +pub fn sections_used(sections: &[ContextSection]) -> usize { + sections.iter().map(|s| s.tokens).sum() } -impl ContextBudget { - pub fn used(&self) -> usize { - self.identity_tokens + self.memory_tokens + self.journal_tokens + self.conversation_tokens - } - pub fn free(&self) -> usize { - self.window_tokens.saturating_sub(self.used()) - } - pub fn status_string(&self) -> String { - let total = self.window_tokens; - if total == 0 { return String::new(); } - let pct = |n: usize| if n == 0 { 0 } else { ((n * 100) / total).max(1) }; - format!("id:{}% mem:{}% jnl:{}% conv:{}% free:{}%", - pct(self.identity_tokens), pct(self.memory_tokens), - pct(self.journal_tokens), pct(self.conversation_tokens), pct(self.free())) - } +/// Budget status string derived from context sections. +pub fn sections_budget_string(sections: &[ContextSection]) -> String { + let window = context_window(); + if window == 0 { return String::new(); } + let used: usize = sections.iter().map(|s| s.tokens).sum(); + let free = window.saturating_sub(used); + let pct = |n: usize| if n == 0 { 0 } else { ((n * 100) / window).max(1) }; + let parts: Vec = sections.iter() + .map(|s| { + // Short label from section name + let label = match s.name.as_str() { + n if n.starts_with("System") => "sys", + n if n.starts_with("Personality") => "id", + n if n.starts_with("Journal") => "jnl", + n if n.starts_with("Working") => "stack", + n if n.starts_with("Memory") => "mem", + n if n.starts_with("Conversation") => "conv", + _ => return String::new(), + }; + format!("{}:{}%", label, pct(s.tokens)) + }) + .filter(|s| !s.is_empty()) + .collect(); + format!("{} free:{}%", parts.join(" "), pct(free)) } diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 2fc9363..463b07b 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -24,7 +24,7 @@ use tiktoken_rs::CoreBPE; use api::{ApiClient, ToolCall}; use api::types::{ContentPart, Message, MessageContent, Role}; -use context::{ConversationEntry, ContextState, ContextBudget}; +use context::{ConversationEntry, ContextState}; use tools::{summarize_args, working_stack}; use crate::mind::log::ConversationLog; @@ -214,7 +214,6 @@ impl Agent { journal: Vec::new(), working_stack: Vec::new(), entries: Vec::new(), - budget: ContextBudget::default(), }; let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&session_id); @@ -284,7 +283,7 @@ impl Agent { } /// Push a conversation message — stamped and logged. - fn push_message(&mut self, mut msg: Message) { + pub fn push_message(&mut self, mut msg: Message) { msg.stamp(); let entry = ConversationEntry::Message(msg); self.push_entry(entry); @@ -297,7 +296,7 @@ impl Agent { } } self.context.entries.push(entry); - self.recompute_budget(); + self.changed.notify_one(); } @@ -319,12 +318,6 @@ impl Agent { self.changed.notify_one(); } - pub fn recompute_budget(&mut self) -> &ContextBudget { - let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); - let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m); - let window = crate::agent::context::context_window(); - self.context.recompute_budget(&count_str, &count_msg, window) - } /// Send a user message and run the agent loop until the model /// produces a text response (no more tool calls). Streams text @@ -334,7 +327,6 @@ impl Agent { /// lock is never held across I/O (API streaming, tool dispatch). pub async fn turn( agent: Arc>, - user_input: &str, ) -> Result { // --- Pre-loop setup (lock 1): agent cycle, memories, user input --- let active_tools = { @@ -385,14 +377,13 @@ impl Agent { } } - // Re-acquire to apply results and push user input - { + // Re-acquire to apply background tool results + if !bg_results.is_empty() { let mut me = agent.lock().await; let mut bg_ds = DispatchState::new(); for (call, output) in bg_results { me.apply_tool_result(&call, output, &mut bg_ds); } - me.push_message(Message::user(user_input)); } tools @@ -1041,13 +1032,14 @@ impl Agent { dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", before, after, before_mem, after_mem, before_conv, after_conv); - self.recompute_budget(); - dbglog!("[compact] budget: {}", self.context.budget.status_string()); self.load_startup_journal(); self.generation += 1; self.last_prompt_tokens = 0; self.publish_context_state(); + + let sections = self.shared_context.read().map(|s| s.clone()).unwrap_or_default(); + dbglog!("[compact] budget: {}", context::sections_budget_string(§ions)); } /// Restore from the conversation log. Builds the context window @@ -1073,9 +1065,9 @@ impl Agent { all.len(), mem_count, conv_count); self.context.entries = all; self.compact(); - // Estimate prompt tokens from budget so status bar isn't 0 on startup - self.recompute_budget(); - self.last_prompt_tokens = self.context.budget.used() as u32; + // Estimate prompt tokens from sections so status bar isn't 0 on startup + let sections = self.shared_context.read().map(|s| s.clone()).unwrap_or_default(); + self.last_prompt_tokens = context::sections_used(§ions) as u32; true } diff --git a/src/mind/mod.rs b/src/mind/mod.rs index ffe1060..625bc13 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -81,8 +81,6 @@ impl Clone for MindState { /// What should happen after a state transition. pub enum MindCommand { - /// Start a turn with this input - Turn(String, StreamTarget), /// Run compaction check Compact, /// Run memory scoring @@ -113,10 +111,12 @@ impl MindState { } } - /// Consume pending input, return a Turn command if ready. - fn take_pending_input(&mut self) -> MindCommand { + /// Consume pending user input if no turn is active. + /// Returns the text to send; caller is responsible for pushing it + /// into the Agent's context and starting the turn. + fn take_pending_input(&mut self) -> Option { if self.turn_active || self.input.is_empty() { - return MindCommand::None; + return None; } let text = self.input.join("\n"); self.input.clear(); @@ -124,7 +124,7 @@ impl MindState { self.consecutive_errors = 0; self.last_user_input = Instant::now(); self.dmn = dmn::State::Engaged; - MindCommand::Turn(text, StreamTarget::Conversation) + Some(text) } /// Process turn completion, return model switch name if requested. @@ -158,17 +158,17 @@ impl MindState { } } - /// DMN tick — returns a Turn action with the DMN prompt, or None. - fn dmn_tick(&mut self) -> MindCommand { + /// DMN tick — returns a prompt and target if we should run a turn. + fn dmn_tick(&mut self) -> Option<(String, StreamTarget)> { if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) { - return MindCommand::None; + return None; } self.dmn_turns += 1; if self.dmn_turns > self.max_dmn_turns { self.dmn = dmn::State::Resting { since: Instant::now() }; self.dmn_turns = 0; - return MindCommand::None; + return None; } let dmn_ctx = dmn::DmnContext { @@ -177,7 +177,7 @@ impl MindState { last_turn_had_tools: self.last_turn_had_tools, }; let prompt = self.dmn.prompt(&dmn_ctx); - MindCommand::Turn(prompt, StreamTarget::Autonomous) + Some((prompt, StreamTarget::Autonomous)) } fn interrupt(&mut self) { @@ -261,9 +261,10 @@ impl Mind { match cmd { MindCommand::None => {} MindCommand::Compact => { - let threshold = compaction_threshold(&self.config.app); + let threshold = compaction_threshold(&self.config.app) as usize; let mut ag = self.agent.lock().await; - if ag.last_prompt_tokens() > threshold { + let sections = ag.shared_context.read().map(|s| s.clone()).unwrap_or_default(); + if crate::agent::context::sections_used(§ions) > threshold { ag.compact(); ag.notify("compacted"); } @@ -305,16 +306,6 @@ impl Mind { new_log, shared_ctx, shared_tools, ); } - MindCommand::Turn(input, target) => { - self.shared.lock().unwrap().turn_active = true; - let _ = self.turn_watch.send(true); - let agent = self.agent.clone(); - let result_tx = self.turn_tx.clone(); - self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move { - let result = Agent::turn(agent, &input).await; - let _ = result_tx.send((result, target)).await; - })); - } } } } @@ -344,6 +335,25 @@ impl Mind { }); } + /// Push user/DMN message into the agent's context and spawn a turn. + /// The text moves from pending_input to ContextState atomically — + /// by the time this returns, the message is in context and the turn + /// is running. + async fn start_turn(&self, text: &str, target: StreamTarget) { + { + let mut ag = self.agent.lock().await; + ag.push_message(crate::agent::api::types::Message::user(text)); + } + self.shared.lock().unwrap().turn_active = true; + let _ = self.turn_watch.send(true); + let agent = self.agent.clone(); + let result_tx = self.turn_tx.clone(); + self.shared.lock().unwrap().turn_handle = Some(tokio::spawn(async move { + let result = Agent::turn(agent).await; + let _ = result_tx.send((result, target)).await; + })); + } + pub async fn shutdown(&self) { if let Some(handle) = self.shared.lock().unwrap().turn_handle.take() { handle.abort(); } } @@ -404,12 +414,17 @@ impl Mind { _ = tokio::time::sleep(timeout), if !turn_active => { let tick = self.shared.lock().unwrap().dmn_tick(); - cmds.push(tick); + if let Some((prompt, target)) = tick { + self.start_turn(&prompt, target).await; + } } } - // Always check for pending input - cmds.push(self.shared.lock().unwrap().take_pending_input()); + // Check for pending user input → push to agent context and start turn + let pending = self.shared.lock().unwrap().take_pending_input(); + if let Some(text) = pending { + self.start_turn(&text, StreamTarget::Conversation).await; + } self.run_commands(cmds).await; } diff --git a/src/user/chat.rs b/src/user/chat.rs index 67e8560..af5d4bb 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -13,7 +13,6 @@ use ratatui::{ }; use super::{App, ScreenView, screen_legend}; -use crate::mind::StreamTarget; use crate::mind::MindCommand; // --- Slash command table --- @@ -35,28 +34,6 @@ fn commands() -> Vec { vec![ handler: |s, _| { if let Ok(mut ag) = s.agent.try_lock() { ag.notify("saved"); } } }, - SlashCommand { name: "/retry", help: "Re-run last turn", - handler: |s, _| { - let agent = s.agent.clone(); - let mind_tx = s.mind_tx.clone(); - tokio::spawn(async move { - let _act = crate::agent::start_activity(&agent, "retrying...").await; - let mut ag = agent.lock().await; - let entries = ag.entries_mut(); - let mut last_user_text = None; - while let Some(entry) = entries.last() { - if entry.message().role == crate::agent::api::types::Role::User { - last_user_text = Some(entries.pop().unwrap().message().content_text().to_string()); - break; - } - entries.pop(); - } - drop(ag); - if let Some(text) = last_user_text { - let _ = mind_tx.send(MindCommand::Turn(text, StreamTarget::Conversation)); - } - }); - } }, SlashCommand { name: "/model", help: "Show/switch model (/model )", handler: |s, arg| { if arg.is_empty() { @@ -555,8 +532,9 @@ impl InteractScreen { return; } - // Regular input → queue to Mind + // Regular input → queue to Mind, then wake it self.shared_mind.lock().unwrap().input.push(input.to_string()); + let _ = self.mind_tx.send(MindCommand::None); } @@ -848,7 +826,8 @@ impl ScreenView for InteractScreen { agent.expire_activities(); app.status.prompt_tokens = agent.last_prompt_tokens(); app.status.model = agent.model().to_string(); - app.status.context_budget = agent.context.budget.status_string(); + let sections = agent.shared_context.read().map(|s| s.clone()).unwrap_or_default(); + app.status.context_budget = crate::agent::context::sections_budget_string(§ions); app.activity = agent.activities.last() .map(|a| a.label.clone()) .unwrap_or_default();