diff --git a/poc-agent/src/observe.rs b/poc-agent/src/observe.rs index 719595c..e6e651e 100644 --- a/poc-agent/src/observe.rs +++ b/poc-agent/src/observe.rs @@ -172,23 +172,76 @@ pub fn start( let (line_tx, _) = broadcast::channel::(256); let line_tx2 = line_tx.clone(); - // Receive UiMessages → write to logfile + broadcast to socket clients + // Receive UiMessages → write to logfile + broadcast to socket clients. + // TextDelta and Reasoning tokens are buffered and flushed on turn + // boundaries so the log reads as complete messages, not token fragments. tokio::spawn(async move { + let mut text_buf = String::new(); + let mut reasoning_buf = String::new(); + loop { match ui_rx.recv().await { Ok(msg) => { - if let Some(line) = format_message(&msg) { - { - use std::io::Write; - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", line); - let _ = f.flush(); + // Buffer streaming tokens + match &msg { + UiMessage::TextDelta(text, _) => { + text_buf.push_str(text); + continue; } + UiMessage::Reasoning(text) => { + reasoning_buf.push_str(text); + continue; + } + _ => {} + } + + // Flush reasoning buffer as one line + if !reasoning_buf.is_empty() { + let thinking = format!("(thinking: {})", reasoning_buf.trim()); + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", thinking); + let _ = f.flush(); + let _ = line_tx2.send(thinking); + reasoning_buf.clear(); + } + + // Flush text buffer + if !text_buf.is_empty() { + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", text_buf); + let _ = f.flush(); + let _ = line_tx2.send(std::mem::take(&mut text_buf)); + } + + // Write the non-streaming message + if let Some(line) = format_message(&msg) { + use std::io::Write; + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", line); + let _ = f.flush(); let _ = line_tx2.send(line); } } Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Closed) => { + use std::io::Write; + if !reasoning_buf.is_empty() { + let thinking = format!("(thinking: {})", reasoning_buf.trim()); + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", thinking); + let _ = f.flush(); + let _ = line_tx2.send(thinking); + } + if !text_buf.is_empty() { + let mut f = logfile.lock().await; + let _ = writeln!(f, "{}", text_buf); + let _ = f.flush(); + let _ = line_tx2.send(text_buf); + } + break; + } } } });