From 8971e6841b8075133ef06b4f7ee1b9a73dcea910 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 6 Apr 2026 22:20:22 -0400 Subject: [PATCH] Fix streaming entry duplication and context state freshness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace pop+push of streaming entries with finalize_streaming() which finds the unstamped assistant entry and updates it in place. The streaming entry IS the assistant message — just stamp it when done. Also: set dirty flag on agent_changed/turn_watch so the TUI actually redraws when the agent state changes. Publish context state on F2 switch so the debug screen shows current data. Age out images during compact() so old screenshots don't bloat the request payload on startup. Co-Authored-By: Proof of Concept --- src/agent/mod.rs | 70 +++++++++++++++++++++++++++++++----------------- src/user/mod.rs | 17 +++++++++--- 2 files changed, 58 insertions(+), 29 deletions(-) diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 2975514..1ba2ad4 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -277,24 +277,50 @@ impl Agent { self.changed.notify_one(); } - /// Append streaming text to the last entry (creating a partial - /// assistant entry if needed). Called by collect_stream per token batch. - pub fn append_streaming(&mut self, text: &str) { - if let Some(entry) = self.context.entries.last_mut() { - let msg = entry.message_mut(); - if msg.role == Role::Assistant { - msg.append_content(text); - self.changed.notify_one(); - return; + fn streaming_entry(&mut self) -> Option<&mut Message> { + for entry in self.context.entries.iter_mut().rev() { + let m = entry.message_mut(); + if m.role == Role::Assistant { + return if m.timestamp.is_none() { Some(m) } else { None } } } - // No assistant entry yet — push a new partial one - self.context.entries.push(ConversationEntry::Message( - Message::assistant(text), - )); + + None + } + + /// Append streaming text to the last entry (creating a partial + /// assistant entry if needed). Called by collect_stream per token batch. + fn append_streaming(&mut self, text: &str) { + if let Some(m) = self.streaming_entry() { + m.append_content(text); + } else { + // No streaming entry — create without timestamp so finalize can find it + self.context.entries.push(ConversationEntry::Message(Message { + role: Role::Assistant, + content: Some(MessageContent::Text(text.to_string())), + tool_calls: None, + tool_call_id: None, + name: None, + timestamp: None, + })); + } + self.changed.notify_one(); } + /// Finalize the streaming entry with the complete response message. + /// Finds the unstamped assistant entry and updates it in place. + fn finalize_streaming(&mut self, msg: Message) { + if let Some(m) = self.streaming_entry() { + *m = msg; + m.stamp(); + } else { + // No streaming entry found — push as new + self.push_message(msg); + } + + self.changed.notify_one(); + } /// Send a user message and run the agent loop until the model /// produces a text response (no more tool calls). Streams text @@ -414,15 +440,6 @@ impl Agent { me.append_streaming(&display_buf); } - // Pop the streaming entry — the proper entry gets pushed below - // via build_response_message which handles tool calls, leaked - // tool calls, etc. sync_from_agent handles the swap. - if let Some(entry) = me.context.entries.last() { - if entry.message().role == Role::Assistant && entry.message().timestamp.is_none() { - me.context.entries.pop(); - } - } - let msg = api::build_response_message(content, tool_calls); if let Some(usage) = &usage { @@ -465,7 +482,7 @@ impl Agent { }; if !pending.is_empty() { - agent.lock().await.push_message(msg.clone()); + agent.lock().await.finalize_streaming(msg.clone()); // Drop lock before awaiting tool handles let mut results = Vec::new(); @@ -486,7 +503,7 @@ impl Agent { // Tool calls (structured API path) if let Some(ref tool_calls) = msg.tool_calls { if !tool_calls.is_empty() { - agent.lock().await.push_message(msg.clone()); + agent.lock().await.finalize_streaming(msg.clone()); let calls: Vec = tool_calls.clone(); // Drop lock before tool dispatch for call in &calls { @@ -501,7 +518,7 @@ impl Agent { // Genuinely text-only response let text = msg.content_text().to_string(); let mut me = agent.lock().await; - me.push_message(msg); + me.finalize_streaming(msg); // Drain pending control flags if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } @@ -928,6 +945,9 @@ impl Agent { let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); let before_conv = before - before_mem; + // Age out images before trimming — they're huge in the request payload + self.age_out_images(); + // Load journal BEFORE trimming so trim accounts for journal cost self.load_startup_journal(); diff --git a/src/user/mod.rs b/src/user/mod.rs index 5e3d70c..888a9d3 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -377,6 +377,7 @@ pub async fn run( // Replay conversation after Mind init completes (non-blocking check) let mut startup_done = false; + let mut dirty = true; // render on first loop loop { tokio::select! { @@ -389,9 +390,13 @@ pub async fn run( } } - _ = agent_changed.notified() => {} + _ = agent_changed.notified() => { + dirty = true; + } - _ = turn_watch.changed() => {} + _ = turn_watch.changed() => { + dirty = true; + } Some(channels) = channel_rx.recv() => { app.set_channel_status(channels); @@ -419,8 +424,6 @@ pub async fn run( if !pending.is_empty() { idle_state.user_activity(); } - let mut dirty = false; - while !pending.is_empty() || dirty { let global_pos = pending.iter().position(|e| is_global_event(e)) .unwrap_or(pending.len()); @@ -448,6 +451,12 @@ pub async fn run( let idx = n as usize; if idx >= 1 && idx <= screens.len() { active_screen = idx; + // Refresh context state when switching to the conscious screen + if idx == 2 { + if let Ok(mut ag) = agent.try_lock() { + ag.publish_context_state(); + } + } } } else if key.modifiers.contains(KeyModifiers::CONTROL) { match key.code {