Fix streaming entry duplication and context state freshness

Replace pop+push of streaming entries with finalize_streaming() which
finds the unstamped assistant entry and updates it in place. The
streaming entry IS the assistant message — just stamp it when done.

Also: set dirty flag on agent_changed/turn_watch so the TUI actually
redraws when the agent state changes. Publish context state on F2
switch so the debug screen shows current data.

Age out images during compact() so old screenshots don't bloat the
request payload on startup.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-06 22:20:22 -04:00
parent d5e6f55da9
commit 8971e6841b
2 changed files with 58 additions and 29 deletions

View file

@ -277,24 +277,50 @@ impl Agent {
self.changed.notify_one(); self.changed.notify_one();
} }
fn streaming_entry(&mut self) -> Option<&mut Message> {
for entry in self.context.entries.iter_mut().rev() {
let m = entry.message_mut();
if m.role == Role::Assistant {
return if m.timestamp.is_none() { Some(m) } else { None }
}
}
None
}
/// Append streaming text to the last entry (creating a partial /// Append streaming text to the last entry (creating a partial
/// assistant entry if needed). Called by collect_stream per token batch. /// assistant entry if needed). Called by collect_stream per token batch.
pub fn append_streaming(&mut self, text: &str) { fn append_streaming(&mut self, text: &str) {
if let Some(entry) = self.context.entries.last_mut() { if let Some(m) = self.streaming_entry() {
let msg = entry.message_mut(); m.append_content(text);
if msg.role == Role::Assistant { } else {
msg.append_content(text); // No streaming entry — create without timestamp so finalize can find it
self.changed.notify_one(); self.context.entries.push(ConversationEntry::Message(Message {
return; role: Role::Assistant,
content: Some(MessageContent::Text(text.to_string())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}));
} }
}
// No assistant entry yet — push a new partial one
self.context.entries.push(ConversationEntry::Message(
Message::assistant(text),
));
self.changed.notify_one(); self.changed.notify_one();
} }
/// Finalize the streaming entry with the complete response message.
/// Finds the unstamped assistant entry and updates it in place.
fn finalize_streaming(&mut self, msg: Message) {
if let Some(m) = self.streaming_entry() {
*m = msg;
m.stamp();
} else {
// No streaming entry found — push as new
self.push_message(msg);
}
self.changed.notify_one();
}
/// Send a user message and run the agent loop until the model /// Send a user message and run the agent loop until the model
/// produces a text response (no more tool calls). Streams text /// produces a text response (no more tool calls). Streams text
@ -414,15 +440,6 @@ impl Agent {
me.append_streaming(&display_buf); me.append_streaming(&display_buf);
} }
// Pop the streaming entry — the proper entry gets pushed below
// via build_response_message which handles tool calls, leaked
// tool calls, etc. sync_from_agent handles the swap.
if let Some(entry) = me.context.entries.last() {
if entry.message().role == Role::Assistant && entry.message().timestamp.is_none() {
me.context.entries.pop();
}
}
let msg = api::build_response_message(content, tool_calls); let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage { if let Some(usage) = &usage {
@ -465,7 +482,7 @@ impl Agent {
}; };
if !pending.is_empty() { if !pending.is_empty() {
agent.lock().await.push_message(msg.clone()); agent.lock().await.finalize_streaming(msg.clone());
// Drop lock before awaiting tool handles // Drop lock before awaiting tool handles
let mut results = Vec::new(); let mut results = Vec::new();
@ -486,7 +503,7 @@ impl Agent {
// Tool calls (structured API path) // Tool calls (structured API path)
if let Some(ref tool_calls) = msg.tool_calls { if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() { if !tool_calls.is_empty() {
agent.lock().await.push_message(msg.clone()); agent.lock().await.finalize_streaming(msg.clone());
let calls: Vec<ToolCall> = tool_calls.clone(); let calls: Vec<ToolCall> = tool_calls.clone();
// Drop lock before tool dispatch // Drop lock before tool dispatch
for call in &calls { for call in &calls {
@ -501,7 +518,7 @@ impl Agent {
// Genuinely text-only response // Genuinely text-only response
let text = msg.content_text().to_string(); let text = msg.content_text().to_string();
let mut me = agent.lock().await; let mut me = agent.lock().await;
me.push_message(msg); me.finalize_streaming(msg);
// Drain pending control flags // Drain pending control flags
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
@ -928,6 +945,9 @@ impl Agent {
let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count();
let before_conv = before - before_mem; let before_conv = before - before_mem;
// Age out images before trimming — they're huge in the request payload
self.age_out_images();
// Load journal BEFORE trimming so trim accounts for journal cost // Load journal BEFORE trimming so trim accounts for journal cost
self.load_startup_journal(); self.load_startup_journal();

View file

@ -377,6 +377,7 @@ pub async fn run(
// Replay conversation after Mind init completes (non-blocking check) // Replay conversation after Mind init completes (non-blocking check)
let mut startup_done = false; let mut startup_done = false;
let mut dirty = true; // render on first loop
loop { loop {
tokio::select! { tokio::select! {
@ -389,9 +390,13 @@ pub async fn run(
} }
} }
_ = agent_changed.notified() => {} _ = agent_changed.notified() => {
dirty = true;
}
_ = turn_watch.changed() => {} _ = turn_watch.changed() => {
dirty = true;
}
Some(channels) = channel_rx.recv() => { Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels); app.set_channel_status(channels);
@ -419,8 +424,6 @@ pub async fn run(
if !pending.is_empty() { idle_state.user_activity(); } if !pending.is_empty() { idle_state.user_activity(); }
let mut dirty = false;
while !pending.is_empty() || dirty { while !pending.is_empty() || dirty {
let global_pos = pending.iter().position(|e| is_global_event(e)) let global_pos = pending.iter().position(|e| is_global_event(e))
.unwrap_or(pending.len()); .unwrap_or(pending.len());
@ -448,6 +451,12 @@ pub async fn run(
let idx = n as usize; let idx = n as usize;
if idx >= 1 && idx <= screens.len() { if idx >= 1 && idx <= screens.len() {
active_screen = idx; active_screen = idx;
// Refresh context state when switching to the conscious screen
if idx == 2 {
if let Ok(mut ag) = agent.try_lock() {
ag.publish_context_state();
}
}
} }
} else if key.modifiers.contains(KeyModifiers::CONTROL) { } else if key.modifiers.contains(KeyModifiers::CONTROL) {
match key.code { match key.code {