diff --git a/src/agent/api/anthropic.rs b/src/agent/api/anthropic.rs index 2433943..dc42820 100644 --- a/src/agent/api/anthropic.rs +++ b/src/agent/api/anthropic.rs @@ -15,8 +15,11 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use std::time::Duration; +use tokio::sync::mpsc; + use crate::agent::types::*; use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender}; +use super::StreamEvent; // --- Anthropic wire types --- @@ -653,3 +656,58 @@ pub async fn stream( Ok((super::build_response_message(content, tool_calls), usage)) } + +/// Wrapper that calls the existing stream() and synthesizes StreamEvents. +/// TODO: refactor to emit events during streaming like the OpenAI backend. +pub async fn stream_events( + client: &Client, + api_key: &str, + model: &str, + messages: &[Message], + tools: Option<&[crate::agent::types::ToolDef]>, + tx: &mpsc::UnboundedSender, + ui_tx: &UiSender, + reasoning_effort: &str, +) -> Result<()> { + let (msg, usage) = stream( + client, api_key, model, messages, tools, + ui_tx, StreamTarget::Conversation, reasoning_effort, + ).await?; + + // Synthesize events from the completed message. + if let Some(text) = msg.content.as_ref().and_then(|c| match c { + MessageContent::Text(t) => Some(t.as_str()), + _ => None, + }) { + if !text.is_empty() { + let _ = tx.send(StreamEvent::Content(text.to_string())); + } + } + if let Some(ref tcs) = msg.tool_calls { + for (i, tc) in tcs.iter().enumerate() { + let _ = tx.send(StreamEvent::ToolCallDelta { + index: i, + id: Some(tc.id.clone()), + call_type: Some(tc.call_type.clone()), + name: Some(tc.function.name.clone()), + arguments: Some(tc.function.arguments.clone()), + }); + } + } + if let Some(u) = usage { + let _ = tx.send(StreamEvent::Usage(u.clone())); + let _ = tx.send(StreamEvent::Finished { + reason: "stop".into(), + prompt_tokens: u.prompt_tokens, + completion_tokens: u.completion_tokens, + }); + } else { + let _ = tx.send(StreamEvent::Finished { + reason: "stop".into(), + prompt_tokens: 0, + completion_tokens: 0, + }); + } + + Ok(()) +} diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 302ac4a..2cb133a 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -18,8 +18,41 @@ use anyhow::Result; use reqwest::Client; use std::time::{Duration, Instant}; +use tokio::sync::mpsc; + use crate::agent::types::*; -use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender}; +use crate::agent::ui_channel::{UiMessage, UiSender}; + +// ───────────────────────────────────────────────────────────── +// Stream events — yielded by backends, consumed by the runner +// ───────────────────────────────────────────────────────────── + +/// Events produced by the streaming API backends. +/// The runner reads these and decides what to display where. +pub enum StreamEvent { + /// Content token from the model's response. + Content(String), + /// Reasoning/thinking token (internal monologue). + Reasoning(String), + /// Incremental tool call delta (structured, from APIs that support it). + ToolCallDelta { + index: usize, + id: Option, + call_type: Option, + name: Option, + arguments: Option, + }, + /// Token usage stats. + Usage(Usage), + /// Stream finished. + Finished { + reason: String, + prompt_tokens: u32, + completion_tokens: u32, + }, + /// Error from the stream. + Error(String), +} enum Backend { OpenAi { @@ -58,20 +91,71 @@ impl ApiClient { } } + /// Start a streaming chat completion. Returns a receiver of StreamEvents. + /// The caller (runner) reads events and handles routing to the UI. + /// + /// The old `chat_completion_stream` method is kept for the subconscious + /// agents which don't need fine-grained stream control. + pub fn start_stream( + &self, + messages: &[Message], + tools: Option<&[ToolDef]>, + ui_tx: &UiSender, + reasoning_effort: &str, + temperature: Option, + ) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + let client = self.client.clone(); + let api_key = self.api_key.clone(); + let model = self.model.clone(); + let messages = messages.to_vec(); + let tools = tools.map(|t| t.to_vec()); + let ui_tx = ui_tx.clone(); + let reasoning_effort = reasoning_effort.to_string(); + let backend = match &self.backend { + Backend::OpenAi { base_url } => Backend::OpenAi { base_url: base_url.clone() }, + Backend::Anthropic => Backend::Anthropic, + }; + + tokio::spawn(async move { + let result = match &backend { + Backend::OpenAi { base_url } => { + openai::stream_events( + &client, base_url, &api_key, &model, + &messages, tools.as_deref(), &tx, &ui_tx, + &reasoning_effort, temperature, + ).await + } + Backend::Anthropic => { + // Anthropic backend still uses the old path for now — + // wrap it by calling the old stream() and synthesizing events. + anthropic::stream_events( + &client, &api_key, &model, + &messages, tools.as_deref(), &tx, &ui_tx, + &reasoning_effort, + ).await + } + }; + if let Err(e) = result { + let _ = tx.send(StreamEvent::Error(e.to_string())); + } + }); + + rx + } + /// Streaming chat completion. Returns the assembled response message /// plus optional usage stats. Text tokens stream through the UI channel. /// - /// Empty response handling is done at the agent level (agent.rs) - /// where the conversation can be modified between retries. + /// Used by subconscious agents that don't need per-token routing. pub async fn chat_completion_stream( &self, messages: &[Message], tools: Option<&[ToolDef]>, ui_tx: &UiSender, - target: StreamTarget, reasoning_effort: &str, ) -> Result<(Message, Option)> { - self.chat_completion_stream_temp(messages, tools, ui_tx, target, reasoning_effort, None).await + self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None).await } pub async fn chat_completion_stream_temp( @@ -79,24 +163,48 @@ impl ApiClient { messages: &[Message], tools: Option<&[ToolDef]>, ui_tx: &UiSender, - target: StreamTarget, reasoning_effort: &str, temperature: Option, ) -> Result<(Message, Option)> { - match &self.backend { - Backend::OpenAi { base_url } => { - openai::stream( - &self.client, base_url, &self.api_key, &self.model, - messages, tools, ui_tx, target, reasoning_effort, temperature, - ).await - } - Backend::Anthropic => { - anthropic::stream( - &self.client, &self.api_key, &self.model, - messages, tools, ui_tx, target, reasoning_effort, - ).await + // Use the event stream and accumulate into a message. + let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature); + let mut content = String::new(); + let mut tool_calls: Vec = Vec::new(); + let mut usage = None; + let mut finish_reason = None; + + while let Some(event) = rx.recv().await { + match event { + StreamEvent::Content(text) => content.push_str(&text), + StreamEvent::Reasoning(_) => {} + StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { + while tool_calls.len() <= index { + tool_calls.push(ToolCall { + id: String::new(), + call_type: "function".to_string(), + function: FunctionCall { name: String::new(), arguments: String::new() }, + }); + } + if let Some(id) = id { tool_calls[index].id = id; } + if let Some(ct) = call_type { tool_calls[index].call_type = ct; } + if let Some(n) = name { tool_calls[index].function.name = n; } + if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } + } + StreamEvent::Usage(u) => usage = Some(u), + StreamEvent::Finished { reason, .. } => { + finish_reason = Some(reason); + break; + } + StreamEvent::Error(e) => anyhow::bail!("{}", e), } } + + if finish_reason.as_deref() == Some("error") { + let detail = if content.is_empty() { "no details".into() } else { content }; + anyhow::bail!("model stream error: {}", detail); + } + + Ok((build_response_message(content, tool_calls), usage)) } /// Return a label for the active backend, used in startup info. @@ -325,7 +433,7 @@ impl SseReader { /// from models that emit tool calls as text), parse them out and /// promote them to structured tool_calls. This way all consumers /// see tool calls uniformly regardless of backend. -pub(crate) fn build_response_message( +pub fn build_response_message( content: String, tool_calls: Vec, ) -> Message { diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index bb25a50..814bec6 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -6,23 +6,27 @@ use anyhow::Result; use reqwest::Client; -use std::time::Duration; +use tokio::sync::mpsc; use crate::agent::types::*; -use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender}; +use crate::agent::ui_channel::{UiMessage, UiSender}; +use super::StreamEvent; -pub async fn stream( +/// Stream SSE events from an OpenAI-compatible endpoint, sending +/// parsed StreamEvents through the channel. The caller (runner) +/// handles routing to the UI. +pub async fn stream_events( client: &Client, base_url: &str, api_key: &str, model: &str, messages: &[Message], tools: Option<&[ToolDef]>, + tx: &mpsc::UnboundedSender, ui_tx: &UiSender, - target: StreamTarget, reasoning_effort: &str, temperature: Option, -) -> Result<(Message, Option)> { +) -> Result<()> { let request = ChatRequest { model: model.to_string(), messages: messages.to_vec(), @@ -59,23 +63,19 @@ pub async fn stream( let mut reader = super::SseReader::new(ui_tx); - let mut content = String::new(); - let mut tool_calls: Vec = Vec::new(); - let mut usage = None; - let mut finish_reason = None; + let mut content_len: usize = 0; let mut reasoning_chars: usize = 0; + let mut tool_call_count: usize = 0; let mut empty_deltas: u64 = 0; - let mut first_content_at: Option = None; - - let _reasoning_enabled = reasoning_effort != "none"; + let mut first_content_at = None; + let mut finish_reason = None; + let mut usage = None; while let Some(event) = reader.next_event(&mut response).await? { - // OpenRouter sometimes embeds error objects in the stream if let Some(err_msg) = event["error"]["message"].as_str() { let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or(""); let _ = ui_tx.send(UiMessage::Debug(format!( - "API error in stream: {}", - err_msg + "API error in stream: {}", err_msg ))); anyhow::bail!("API error in stream: {} {}", err_msg, raw); } @@ -83,7 +83,6 @@ pub async fn stream( let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) { Ok(c) => c, Err(e) => { - // Log unparseable events — they may contain error info let preview = event.to_string(); let _ = ui_tx.send(UiMessage::Debug(format!( "unparseable SSE event ({}): {}", @@ -93,7 +92,8 @@ pub async fn stream( } }; - if chunk.usage.is_some() { + if let Some(ref u) = chunk.usage { + let _ = tx.send(StreamEvent::Usage(u.clone())); usage = chunk.usage; } @@ -107,18 +107,14 @@ pub async fn stream( // Reasoning tokens — multiple field names across providers let mut has_reasoning = false; - if let Some(ref r) = choice.delta.reasoning_content { + for r in [ + choice.delta.reasoning_content.as_ref(), + choice.delta.reasoning.as_ref(), + ].into_iter().flatten() { reasoning_chars += r.len(); has_reasoning = true; if !r.is_empty() { - let _ = ui_tx.send(UiMessage::Reasoning(r.clone())); - } - } - if let Some(ref r) = choice.delta.reasoning { - reasoning_chars += r.len(); - has_reasoning = true; - if !r.is_empty() { - let _ = ui_tx.send(UiMessage::Reasoning(r.clone())); + let _ = tx.send(StreamEvent::Reasoning(r.clone())); } } if let Some(ref r) = choice.delta.reasoning_details { @@ -126,46 +122,28 @@ pub async fn stream( reasoning_chars += s.len(); has_reasoning = true; if !s.is_empty() && s != "null" { - let _ = ui_tx.send(UiMessage::Reasoning(s)); + let _ = tx.send(StreamEvent::Reasoning(s)); } } if let Some(ref text_delta) = choice.delta.content { if first_content_at.is_none() && !text_delta.is_empty() { first_content_at = Some(reader.stream_start.elapsed()); - let _ = ui_tx.send(UiMessage::Activity("streaming...".into())); } - content.push_str(text_delta); - let _ = ui_tx.send(UiMessage::TextDelta(text_delta.clone(), target)); + content_len += text_delta.len(); + let _ = tx.send(StreamEvent::Content(text_delta.clone())); } if let Some(ref tc_deltas) = choice.delta.tool_calls { for tc_delta in tc_deltas { - let idx = tc_delta.index; - while tool_calls.len() <= idx { - tool_calls.push(ToolCall { - id: String::new(), - call_type: "function".to_string(), - function: FunctionCall { - name: String::new(), - arguments: String::new(), - }, - }); - } - if let Some(ref id) = tc_delta.id { - tool_calls[idx].id = id.clone(); - } - if let Some(ref ct) = tc_delta.call_type { - tool_calls[idx].call_type = ct.clone(); - } - if let Some(ref func) = tc_delta.function { - if let Some(ref name) = func.name { - tool_calls[idx].function.name = name.clone(); - } - if let Some(ref args) = func.arguments { - tool_calls[idx].function.arguments.push_str(args); - } - } + tool_call_count = tool_call_count.max(tc_delta.index + 1); + let _ = tx.send(StreamEvent::ToolCallDelta { + index: tc_delta.index, + id: tc_delta.id.clone(), + call_type: tc_delta.call_type.clone(), + name: tc_delta.function.as_ref().and_then(|f| f.name.clone()), + arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()), + }); } } @@ -179,8 +157,8 @@ pub async fn stream( super::log_diagnostics( ui_tx, - content.len(), - tool_calls.len(), + content_len, + tool_call_count, reasoning_chars, reasoning_effort, &finish_reason, @@ -191,25 +169,18 @@ pub async fn stream( total_elapsed, first_content_at, &usage, - &tool_calls, + &[], // tool_calls not accumulated here anymore ); - // Model/provider error delivered inside the stream (HTTP 200 but - // finish_reason="error"). Surface whatever content came back as - // the error message so the caller can retry or display it. - // Don't append the trailing newline — this isn't real content. - if finish_reason.as_deref() == Some("error") { - let detail = if content.is_empty() { - "no details".to_string() - } else { - content - }; - anyhow::bail!("model stream error: {}", detail); - } + let reason = finish_reason.unwrap_or_default(); + let (pt, ct) = usage.as_ref() + .map(|u| (u.prompt_tokens, u.completion_tokens)) + .unwrap_or((0, 0)); + let _ = tx.send(StreamEvent::Finished { + reason, + prompt_tokens: pt, + completion_tokens: ct, + }); - if !content.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); - } - - Ok((super::build_response_message(content, tool_calls), usage)) + Ok(()) } diff --git a/src/agent/runner.rs b/src/agent/runner.rs index 0c022b1..b8a1e15 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -19,6 +19,7 @@ use tiktoken_rs::CoreBPE; use crate::agent::api::ApiClient; use crate::agent::journal; use crate::agent::log::ConversationLog; +use crate::agent::api::StreamEvent; use crate::agent::tools; use crate::agent::tools::ProcessTracker; use crate::agent::types::*; @@ -251,21 +252,94 @@ impl Agent { loop { let _ = ui_tx.send(UiMessage::Activity("thinking...".into())); - let api_result = self - .client - .chat_completion_stream( - &self.messages, - Some(&self.tool_defs), - ui_tx, - target, - &self.reasoning_effort, - ) - .await; - // Context overflow → compact and retry (max 2 attempts) - // Stream error → retry with backoff (max 2 attempts) - let (msg, usage) = match api_result { - Err(e) if crate::agent::context::is_context_overflow(&e) && overflow_retries < 2 => { + // Stream events from the API — we route each event to the + // appropriate UI pane rather than letting the API layer do it. + let mut rx = self.client.start_stream( + &self.messages, + Some(&self.tool_defs), + ui_tx, + &self.reasoning_effort, + None, + ); + + let mut content = String::new(); + let mut tool_calls: Vec = Vec::new(); + let mut usage = None; + let mut finish_reason = None; + let mut in_tool_call = false; + let mut stream_error = None; + let mut first_content = true; + // Buffer for content not yet sent to UI — holds a tail + // that might be a partial tag. + let mut display_buf = String::new(); + + while let Some(event) = rx.recv().await { + match event { + StreamEvent::Content(text) => { + if first_content { + let _ = ui_tx.send(UiMessage::Activity("streaming...".into())); + first_content = false; + } + content.push_str(&text); + + if in_tool_call { + // Already inside a tool call — suppress display. + } else { + display_buf.push_str(&text); + + if let Some(pos) = display_buf.find("") { + // Flush content before the tag, suppress the rest. + let before = &display_buf[..pos]; + if !before.is_empty() { + let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); + } + display_buf.clear(); + in_tool_call = true; + } else { + // Flush display_buf except a tail that could be + // a partial "" (10 chars). + let safe = display_buf.len().saturating_sub(10); + if safe > 0 { + let flush = display_buf[..safe].to_string(); + display_buf = display_buf[safe..].to_string(); + let _ = ui_tx.send(UiMessage::TextDelta(flush, target)); + } + } + } + } + StreamEvent::Reasoning(text) => { + let _ = ui_tx.send(UiMessage::Reasoning(text)); + } + StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { + while tool_calls.len() <= index { + tool_calls.push(ToolCall { + id: String::new(), + call_type: "function".to_string(), + function: FunctionCall { name: String::new(), arguments: String::new() }, + }); + } + if let Some(id) = id { tool_calls[index].id = id; } + if let Some(ct) = call_type { tool_calls[index].call_type = ct; } + if let Some(n) = name { tool_calls[index].function.name = n; } + if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } + } + StreamEvent::Usage(u) => usage = Some(u), + StreamEvent::Finished { reason, .. } => { + finish_reason = Some(reason); + break; + } + StreamEvent::Error(e) => { + stream_error = Some(e); + break; + } + } + } + + // Handle stream errors with retry logic + if let Some(e) = stream_error { + let err = anyhow::anyhow!("{}", e); + if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { overflow_retries += 1; let _ = ui_tx.send(UiMessage::Info(format!( "[context overflow — compacting and retrying ({}/2)]", @@ -274,7 +348,7 @@ impl Agent { self.emergency_compact(); continue; } - Err(e) if crate::agent::context::is_stream_error(&e) && empty_retries < 2 => { + if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { empty_retries += 1; let _ = ui_tx.send(UiMessage::Info(format!( "[stream error: {} — retrying ({}/2)]", @@ -283,8 +357,23 @@ impl Agent { tokio::time::sleep(std::time::Duration::from_secs(2)).await; continue; } - other => other?, - }; + return Err(err); + } + + if finish_reason.as_deref() == Some("error") { + let detail = if content.is_empty() { "no details".into() } else { content }; + return Err(anyhow::anyhow!("model stream error: {}", detail)); + } + + // Flush remaining display buffer (normal responses without tool calls). + if !in_tool_call && !display_buf.is_empty() { + let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); + } + if !content.is_empty() && !in_tool_call { + let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); + } + + let msg = crate::agent::api::build_response_message(content, tool_calls); // Strip ephemeral tool calls (journal) that the API has // now processed. They're persisted to disk; no need to keep diff --git a/src/agent/types.rs b/src/agent/types.rs index e28ec9d..be1c77e 100644 --- a/src/agent/types.rs +++ b/src/agent/types.rs @@ -157,7 +157,7 @@ pub struct Choice { pub finish_reason: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[allow(dead_code)] pub struct Usage { pub prompt_tokens: u32, diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index cbb4117..4ec4d3b 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -10,7 +10,6 @@ use crate::agent::api::ApiClient; use crate::agent::types::*; use crate::thought::{self, ProcessTracker}; -use crate::agent::ui_channel::StreamTarget; use std::sync::OnceLock; @@ -72,7 +71,6 @@ pub async fn call_api_with_tools( &messages, Some(&tool_defs), &ui_tx, - StreamTarget::Autonomous, &reasoning, temperature, ).await {