From 78b22d6caef94299725c9e3718ea16507391510e Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sat, 21 Mar 2026 16:40:55 -0400 Subject: [PATCH] fix: buffer streaming tokens in observe log for readable transcripts The observe log was writing each TextDelta SSE token as a separate line, making poc-agent read show word-by-word fragments and causing the read cursor to advance past partial responses. Now TextDelta and Reasoning tokens are buffered and flushed as complete messages on turn boundaries (tool calls, user input, etc). The socket path (read -f) still streams live. Also fixed a potential deadlock: replaced blocking_lock() with .lock().await on the shared logfile mutex. Co-Authored-By: Claude Opus 4.6 (1M context) Co-Authored-By: Qwen 3.5 27B --- poc-agent/src/observe.rs | 69 +++++++++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 8 deletions(-) diff --git a/poc-agent/src/observe.rs b/poc-agent/src/observe.rs index 719595c..e6e651e 100644 --- a/poc-agent/src/observe.rs +++ b/poc-agent/src/observe.rs @@ -172,23 +172,76 @@ pub fn start( let (line_tx, _) = broadcast::channel::(256); let line_tx2 = line_tx.clone(); - // Receive UiMessages → write to logfile + broadcast to socket clients + // Receive UiMessages → write to logfile + broadcast to socket clients. + // TextDelta and Reasoning tokens are buffered and flushed on turn + // boundaries so the log reads as complete messages, not token fragments. tokio::spawn(async move { + let mut text_buf = String::new(); + let mut reasoning_buf = String::new(); + loop { match ui_rx.recv().await { Ok(msg) => { - if let Some(line) = format_message(&msg) { - { - use std::io::Write; - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", line); - let _ = f.flush(); + // Buffer streaming tokens + match &msg { + UiMessage::TextDelta(text, _) => { + text_buf.push_str(text); + continue; } + UiMessage::Reasoning(text) => { + reasoning_buf.push_str(text); + continue; + } + _ => {} + } + + // Flush reasoning buffer as one line + if !reasoning_buf.is_empty() { + let thinking = format!("(thinking: {})", reasoning_buf.trim()); + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", thinking); + let _ = f.flush(); + let _ = line_tx2.send(thinking); + reasoning_buf.clear(); + } + + // Flush text buffer + if !text_buf.is_empty() { + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", text_buf); + let _ = f.flush(); + let _ = line_tx2.send(std::mem::take(&mut text_buf)); + } + + // Write the non-streaming message + if let Some(line) = format_message(&msg) { + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", line); + let _ = f.flush(); let _ = line_tx2.send(line); } } Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Closed) => { + use std::io::Write; + if !reasoning_buf.is_empty() { + let thinking = format!("(thinking: {})", reasoning_buf.trim()); + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", thinking); + let _ = f.flush(); + let _ = line_tx2.send(thinking); + } + if !text_buf.is_empty() { + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", text_buf); + let _ = f.flush(); + let _ = line_tx2.send(text_buf); + } + break; + } } } });