refactor: runner owns stream routing, suppress tool call XML from display
Split the streaming pipeline: API backends yield StreamEvents through a channel, the runner reads them and routes to the appropriate UI pane. - Add StreamEvent enum (Content, Reasoning, ToolCallDelta, etc.) - API start_stream() spawns backend as a task, returns event receiver - Runner loops over events, sends content to conversation pane but suppresses <tool_call> XML with a buffered tail for partial tags - OpenAI backend refactored to stream_events() — no more UI coupling - Anthropic backend gets a wrapper that synthesizes events from the existing stream() (TODO: native event streaming) - chat_completion_stream() kept for subconscious agents, reimplemented on top of the event stream - Usage derives Clone Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
912626c5f0
commit
13453606ae
6 changed files with 338 additions and 114 deletions
|
|
@ -15,8 +15,11 @@ use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::agent::types::*;
|
use crate::agent::types::*;
|
||||||
use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender};
|
use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender};
|
||||||
|
use super::StreamEvent;
|
||||||
|
|
||||||
// --- Anthropic wire types ---
|
// --- Anthropic wire types ---
|
||||||
|
|
||||||
|
|
@ -653,3 +656,58 @@ pub async fn stream(
|
||||||
|
|
||||||
Ok((super::build_response_message(content, tool_calls), usage))
|
Ok((super::build_response_message(content, tool_calls), usage))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper that calls the existing stream() and synthesizes StreamEvents.
|
||||||
|
/// TODO: refactor to emit events during streaming like the OpenAI backend.
|
||||||
|
pub async fn stream_events(
|
||||||
|
client: &Client,
|
||||||
|
api_key: &str,
|
||||||
|
model: &str,
|
||||||
|
messages: &[Message],
|
||||||
|
tools: Option<&[crate::agent::types::ToolDef]>,
|
||||||
|
tx: &mpsc::UnboundedSender<StreamEvent>,
|
||||||
|
ui_tx: &UiSender,
|
||||||
|
reasoning_effort: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let (msg, usage) = stream(
|
||||||
|
client, api_key, model, messages, tools,
|
||||||
|
ui_tx, StreamTarget::Conversation, reasoning_effort,
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
// Synthesize events from the completed message.
|
||||||
|
if let Some(text) = msg.content.as_ref().and_then(|c| match c {
|
||||||
|
MessageContent::Text(t) => Some(t.as_str()),
|
||||||
|
_ => None,
|
||||||
|
}) {
|
||||||
|
if !text.is_empty() {
|
||||||
|
let _ = tx.send(StreamEvent::Content(text.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(ref tcs) = msg.tool_calls {
|
||||||
|
for (i, tc) in tcs.iter().enumerate() {
|
||||||
|
let _ = tx.send(StreamEvent::ToolCallDelta {
|
||||||
|
index: i,
|
||||||
|
id: Some(tc.id.clone()),
|
||||||
|
call_type: Some(tc.call_type.clone()),
|
||||||
|
name: Some(tc.function.name.clone()),
|
||||||
|
arguments: Some(tc.function.arguments.clone()),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(u) = usage {
|
||||||
|
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
||||||
|
let _ = tx.send(StreamEvent::Finished {
|
||||||
|
reason: "stop".into(),
|
||||||
|
prompt_tokens: u.prompt_tokens,
|
||||||
|
completion_tokens: u.completion_tokens,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let _ = tx.send(StreamEvent::Finished {
|
||||||
|
reason: "stop".into(),
|
||||||
|
prompt_tokens: 0,
|
||||||
|
completion_tokens: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,41 @@ use anyhow::Result;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::agent::types::*;
|
use crate::agent::types::*;
|
||||||
use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender};
|
use crate::agent::ui_channel::{UiMessage, UiSender};
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────
|
||||||
|
// Stream events — yielded by backends, consumed by the runner
|
||||||
|
// ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Events produced by the streaming API backends.
|
||||||
|
/// The runner reads these and decides what to display where.
|
||||||
|
pub enum StreamEvent {
|
||||||
|
/// Content token from the model's response.
|
||||||
|
Content(String),
|
||||||
|
/// Reasoning/thinking token (internal monologue).
|
||||||
|
Reasoning(String),
|
||||||
|
/// Incremental tool call delta (structured, from APIs that support it).
|
||||||
|
ToolCallDelta {
|
||||||
|
index: usize,
|
||||||
|
id: Option<String>,
|
||||||
|
call_type: Option<String>,
|
||||||
|
name: Option<String>,
|
||||||
|
arguments: Option<String>,
|
||||||
|
},
|
||||||
|
/// Token usage stats.
|
||||||
|
Usage(Usage),
|
||||||
|
/// Stream finished.
|
||||||
|
Finished {
|
||||||
|
reason: String,
|
||||||
|
prompt_tokens: u32,
|
||||||
|
completion_tokens: u32,
|
||||||
|
},
|
||||||
|
/// Error from the stream.
|
||||||
|
Error(String),
|
||||||
|
}
|
||||||
|
|
||||||
enum Backend {
|
enum Backend {
|
||||||
OpenAi {
|
OpenAi {
|
||||||
|
|
@ -58,20 +91,71 @@ impl ApiClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
|
||||||
|
/// The caller (runner) reads events and handles routing to the UI.
|
||||||
|
///
|
||||||
|
/// The old `chat_completion_stream` method is kept for the subconscious
|
||||||
|
/// agents which don't need fine-grained stream control.
|
||||||
|
pub fn start_stream(
|
||||||
|
&self,
|
||||||
|
messages: &[Message],
|
||||||
|
tools: Option<&[ToolDef]>,
|
||||||
|
ui_tx: &UiSender,
|
||||||
|
reasoning_effort: &str,
|
||||||
|
temperature: Option<f32>,
|
||||||
|
) -> mpsc::UnboundedReceiver<StreamEvent> {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
let client = self.client.clone();
|
||||||
|
let api_key = self.api_key.clone();
|
||||||
|
let model = self.model.clone();
|
||||||
|
let messages = messages.to_vec();
|
||||||
|
let tools = tools.map(|t| t.to_vec());
|
||||||
|
let ui_tx = ui_tx.clone();
|
||||||
|
let reasoning_effort = reasoning_effort.to_string();
|
||||||
|
let backend = match &self.backend {
|
||||||
|
Backend::OpenAi { base_url } => Backend::OpenAi { base_url: base_url.clone() },
|
||||||
|
Backend::Anthropic => Backend::Anthropic,
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let result = match &backend {
|
||||||
|
Backend::OpenAi { base_url } => {
|
||||||
|
openai::stream_events(
|
||||||
|
&client, base_url, &api_key, &model,
|
||||||
|
&messages, tools.as_deref(), &tx, &ui_tx,
|
||||||
|
&reasoning_effort, temperature,
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
Backend::Anthropic => {
|
||||||
|
// Anthropic backend still uses the old path for now —
|
||||||
|
// wrap it by calling the old stream() and synthesizing events.
|
||||||
|
anthropic::stream_events(
|
||||||
|
&client, &api_key, &model,
|
||||||
|
&messages, tools.as_deref(), &tx, &ui_tx,
|
||||||
|
&reasoning_effort,
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = result {
|
||||||
|
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
/// Streaming chat completion. Returns the assembled response message
|
/// Streaming chat completion. Returns the assembled response message
|
||||||
/// plus optional usage stats. Text tokens stream through the UI channel.
|
/// plus optional usage stats. Text tokens stream through the UI channel.
|
||||||
///
|
///
|
||||||
/// Empty response handling is done at the agent level (agent.rs)
|
/// Used by subconscious agents that don't need per-token routing.
|
||||||
/// where the conversation can be modified between retries.
|
|
||||||
pub async fn chat_completion_stream(
|
pub async fn chat_completion_stream(
|
||||||
&self,
|
&self,
|
||||||
messages: &[Message],
|
messages: &[Message],
|
||||||
tools: Option<&[ToolDef]>,
|
tools: Option<&[ToolDef]>,
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
target: StreamTarget,
|
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
) -> Result<(Message, Option<Usage>)> {
|
||||||
self.chat_completion_stream_temp(messages, tools, ui_tx, target, reasoning_effort, None).await
|
self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn chat_completion_stream_temp(
|
pub async fn chat_completion_stream_temp(
|
||||||
|
|
@ -79,24 +163,48 @@ impl ApiClient {
|
||||||
messages: &[Message],
|
messages: &[Message],
|
||||||
tools: Option<&[ToolDef]>,
|
tools: Option<&[ToolDef]>,
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
target: StreamTarget,
|
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
) -> Result<(Message, Option<Usage>)> {
|
||||||
match &self.backend {
|
// Use the event stream and accumulate into a message.
|
||||||
Backend::OpenAi { base_url } => {
|
let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature);
|
||||||
openai::stream(
|
let mut content = String::new();
|
||||||
&self.client, base_url, &self.api_key, &self.model,
|
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
||||||
messages, tools, ui_tx, target, reasoning_effort, temperature,
|
let mut usage = None;
|
||||||
).await
|
let mut finish_reason = None;
|
||||||
|
|
||||||
|
while let Some(event) = rx.recv().await {
|
||||||
|
match event {
|
||||||
|
StreamEvent::Content(text) => content.push_str(&text),
|
||||||
|
StreamEvent::Reasoning(_) => {}
|
||||||
|
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
|
||||||
|
while tool_calls.len() <= index {
|
||||||
|
tool_calls.push(ToolCall {
|
||||||
|
id: String::new(),
|
||||||
|
call_type: "function".to_string(),
|
||||||
|
function: FunctionCall { name: String::new(), arguments: String::new() },
|
||||||
|
});
|
||||||
}
|
}
|
||||||
Backend::Anthropic => {
|
if let Some(id) = id { tool_calls[index].id = id; }
|
||||||
anthropic::stream(
|
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
|
||||||
&self.client, &self.api_key, &self.model,
|
if let Some(n) = name { tool_calls[index].function.name = n; }
|
||||||
messages, tools, ui_tx, target, reasoning_effort,
|
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
|
||||||
).await
|
}
|
||||||
|
StreamEvent::Usage(u) => usage = Some(u),
|
||||||
|
StreamEvent::Finished { reason, .. } => {
|
||||||
|
finish_reason = Some(reason);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
StreamEvent::Error(e) => anyhow::bail!("{}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if finish_reason.as_deref() == Some("error") {
|
||||||
|
let detail = if content.is_empty() { "no details".into() } else { content };
|
||||||
|
anyhow::bail!("model stream error: {}", detail);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((build_response_message(content, tool_calls), usage))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a label for the active backend, used in startup info.
|
/// Return a label for the active backend, used in startup info.
|
||||||
|
|
@ -325,7 +433,7 @@ impl SseReader {
|
||||||
/// from models that emit tool calls as text), parse them out and
|
/// from models that emit tool calls as text), parse them out and
|
||||||
/// promote them to structured tool_calls. This way all consumers
|
/// promote them to structured tool_calls. This way all consumers
|
||||||
/// see tool calls uniformly regardless of backend.
|
/// see tool calls uniformly regardless of backend.
|
||||||
pub(crate) fn build_response_message(
|
pub fn build_response_message(
|
||||||
content: String,
|
content: String,
|
||||||
tool_calls: Vec<ToolCall>,
|
tool_calls: Vec<ToolCall>,
|
||||||
) -> Message {
|
) -> Message {
|
||||||
|
|
|
||||||
|
|
@ -6,23 +6,27 @@
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use std::time::Duration;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::agent::types::*;
|
use crate::agent::types::*;
|
||||||
use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender};
|
use crate::agent::ui_channel::{UiMessage, UiSender};
|
||||||
|
use super::StreamEvent;
|
||||||
|
|
||||||
pub async fn stream(
|
/// Stream SSE events from an OpenAI-compatible endpoint, sending
|
||||||
|
/// parsed StreamEvents through the channel. The caller (runner)
|
||||||
|
/// handles routing to the UI.
|
||||||
|
pub async fn stream_events(
|
||||||
client: &Client,
|
client: &Client,
|
||||||
base_url: &str,
|
base_url: &str,
|
||||||
api_key: &str,
|
api_key: &str,
|
||||||
model: &str,
|
model: &str,
|
||||||
messages: &[Message],
|
messages: &[Message],
|
||||||
tools: Option<&[ToolDef]>,
|
tools: Option<&[ToolDef]>,
|
||||||
|
tx: &mpsc::UnboundedSender<StreamEvent>,
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
target: StreamTarget,
|
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
) -> Result<()> {
|
||||||
let request = ChatRequest {
|
let request = ChatRequest {
|
||||||
model: model.to_string(),
|
model: model.to_string(),
|
||||||
messages: messages.to_vec(),
|
messages: messages.to_vec(),
|
||||||
|
|
@ -59,23 +63,19 @@ pub async fn stream(
|
||||||
|
|
||||||
let mut reader = super::SseReader::new(ui_tx);
|
let mut reader = super::SseReader::new(ui_tx);
|
||||||
|
|
||||||
let mut content = String::new();
|
let mut content_len: usize = 0;
|
||||||
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
|
||||||
let mut usage = None;
|
|
||||||
let mut finish_reason = None;
|
|
||||||
let mut reasoning_chars: usize = 0;
|
let mut reasoning_chars: usize = 0;
|
||||||
|
let mut tool_call_count: usize = 0;
|
||||||
let mut empty_deltas: u64 = 0;
|
let mut empty_deltas: u64 = 0;
|
||||||
let mut first_content_at: Option<Duration> = None;
|
let mut first_content_at = None;
|
||||||
|
let mut finish_reason = None;
|
||||||
let _reasoning_enabled = reasoning_effort != "none";
|
let mut usage = None;
|
||||||
|
|
||||||
while let Some(event) = reader.next_event(&mut response).await? {
|
while let Some(event) = reader.next_event(&mut response).await? {
|
||||||
// OpenRouter sometimes embeds error objects in the stream
|
|
||||||
if let Some(err_msg) = event["error"]["message"].as_str() {
|
if let Some(err_msg) = event["error"]["message"].as_str() {
|
||||||
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
|
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
let _ = ui_tx.send(UiMessage::Debug(format!(
|
||||||
"API error in stream: {}",
|
"API error in stream: {}", err_msg
|
||||||
err_msg
|
|
||||||
)));
|
)));
|
||||||
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
|
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
|
||||||
}
|
}
|
||||||
|
|
@ -83,7 +83,6 @@ pub async fn stream(
|
||||||
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
|
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log unparseable events — they may contain error info
|
|
||||||
let preview = event.to_string();
|
let preview = event.to_string();
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
let _ = ui_tx.send(UiMessage::Debug(format!(
|
||||||
"unparseable SSE event ({}): {}",
|
"unparseable SSE event ({}): {}",
|
||||||
|
|
@ -93,7 +92,8 @@ pub async fn stream(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if chunk.usage.is_some() {
|
if let Some(ref u) = chunk.usage {
|
||||||
|
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
||||||
usage = chunk.usage;
|
usage = chunk.usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -107,18 +107,14 @@ pub async fn stream(
|
||||||
|
|
||||||
// Reasoning tokens — multiple field names across providers
|
// Reasoning tokens — multiple field names across providers
|
||||||
let mut has_reasoning = false;
|
let mut has_reasoning = false;
|
||||||
if let Some(ref r) = choice.delta.reasoning_content {
|
for r in [
|
||||||
|
choice.delta.reasoning_content.as_ref(),
|
||||||
|
choice.delta.reasoning.as_ref(),
|
||||||
|
].into_iter().flatten() {
|
||||||
reasoning_chars += r.len();
|
reasoning_chars += r.len();
|
||||||
has_reasoning = true;
|
has_reasoning = true;
|
||||||
if !r.is_empty() {
|
if !r.is_empty() {
|
||||||
let _ = ui_tx.send(UiMessage::Reasoning(r.clone()));
|
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(ref r) = choice.delta.reasoning {
|
|
||||||
reasoning_chars += r.len();
|
|
||||||
has_reasoning = true;
|
|
||||||
if !r.is_empty() {
|
|
||||||
let _ = ui_tx.send(UiMessage::Reasoning(r.clone()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(ref r) = choice.delta.reasoning_details {
|
if let Some(ref r) = choice.delta.reasoning_details {
|
||||||
|
|
@ -126,47 +122,29 @@ pub async fn stream(
|
||||||
reasoning_chars += s.len();
|
reasoning_chars += s.len();
|
||||||
has_reasoning = true;
|
has_reasoning = true;
|
||||||
if !s.is_empty() && s != "null" {
|
if !s.is_empty() && s != "null" {
|
||||||
let _ = ui_tx.send(UiMessage::Reasoning(s));
|
let _ = tx.send(StreamEvent::Reasoning(s));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref text_delta) = choice.delta.content {
|
if let Some(ref text_delta) = choice.delta.content {
|
||||||
if first_content_at.is_none() && !text_delta.is_empty() {
|
if first_content_at.is_none() && !text_delta.is_empty() {
|
||||||
first_content_at = Some(reader.stream_start.elapsed());
|
first_content_at = Some(reader.stream_start.elapsed());
|
||||||
let _ = ui_tx.send(UiMessage::Activity("streaming...".into()));
|
|
||||||
}
|
}
|
||||||
content.push_str(text_delta);
|
content_len += text_delta.len();
|
||||||
let _ = ui_tx.send(UiMessage::TextDelta(text_delta.clone(), target));
|
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref tc_deltas) = choice.delta.tool_calls {
|
if let Some(ref tc_deltas) = choice.delta.tool_calls {
|
||||||
for tc_delta in tc_deltas {
|
for tc_delta in tc_deltas {
|
||||||
let idx = tc_delta.index;
|
tool_call_count = tool_call_count.max(tc_delta.index + 1);
|
||||||
while tool_calls.len() <= idx {
|
let _ = tx.send(StreamEvent::ToolCallDelta {
|
||||||
tool_calls.push(ToolCall {
|
index: tc_delta.index,
|
||||||
id: String::new(),
|
id: tc_delta.id.clone(),
|
||||||
call_type: "function".to_string(),
|
call_type: tc_delta.call_type.clone(),
|
||||||
function: FunctionCall {
|
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
|
||||||
name: String::new(),
|
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
|
||||||
arguments: String::new(),
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if let Some(ref id) = tc_delta.id {
|
|
||||||
tool_calls[idx].id = id.clone();
|
|
||||||
}
|
|
||||||
if let Some(ref ct) = tc_delta.call_type {
|
|
||||||
tool_calls[idx].call_type = ct.clone();
|
|
||||||
}
|
|
||||||
if let Some(ref func) = tc_delta.function {
|
|
||||||
if let Some(ref name) = func.name {
|
|
||||||
tool_calls[idx].function.name = name.clone();
|
|
||||||
}
|
|
||||||
if let Some(ref args) = func.arguments {
|
|
||||||
tool_calls[idx].function.arguments.push_str(args);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
|
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
|
||||||
|
|
@ -179,8 +157,8 @@ pub async fn stream(
|
||||||
|
|
||||||
super::log_diagnostics(
|
super::log_diagnostics(
|
||||||
ui_tx,
|
ui_tx,
|
||||||
content.len(),
|
content_len,
|
||||||
tool_calls.len(),
|
tool_call_count,
|
||||||
reasoning_chars,
|
reasoning_chars,
|
||||||
reasoning_effort,
|
reasoning_effort,
|
||||||
&finish_reason,
|
&finish_reason,
|
||||||
|
|
@ -191,25 +169,18 @@ pub async fn stream(
|
||||||
total_elapsed,
|
total_elapsed,
|
||||||
first_content_at,
|
first_content_at,
|
||||||
&usage,
|
&usage,
|
||||||
&tool_calls,
|
&[], // tool_calls not accumulated here anymore
|
||||||
);
|
);
|
||||||
|
|
||||||
// Model/provider error delivered inside the stream (HTTP 200 but
|
let reason = finish_reason.unwrap_or_default();
|
||||||
// finish_reason="error"). Surface whatever content came back as
|
let (pt, ct) = usage.as_ref()
|
||||||
// the error message so the caller can retry or display it.
|
.map(|u| (u.prompt_tokens, u.completion_tokens))
|
||||||
// Don't append the trailing newline — this isn't real content.
|
.unwrap_or((0, 0));
|
||||||
if finish_reason.as_deref() == Some("error") {
|
let _ = tx.send(StreamEvent::Finished {
|
||||||
let detail = if content.is_empty() {
|
reason,
|
||||||
"no details".to_string()
|
prompt_tokens: pt,
|
||||||
} else {
|
completion_tokens: ct,
|
||||||
content
|
});
|
||||||
};
|
|
||||||
anyhow::bail!("model stream error: {}", detail);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !content.is_empty() {
|
Ok(())
|
||||||
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((super::build_response_message(content, tool_calls), usage))
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ use tiktoken_rs::CoreBPE;
|
||||||
use crate::agent::api::ApiClient;
|
use crate::agent::api::ApiClient;
|
||||||
use crate::agent::journal;
|
use crate::agent::journal;
|
||||||
use crate::agent::log::ConversationLog;
|
use crate::agent::log::ConversationLog;
|
||||||
|
use crate::agent::api::StreamEvent;
|
||||||
use crate::agent::tools;
|
use crate::agent::tools;
|
||||||
use crate::agent::tools::ProcessTracker;
|
use crate::agent::tools::ProcessTracker;
|
||||||
use crate::agent::types::*;
|
use crate::agent::types::*;
|
||||||
|
|
@ -251,21 +252,94 @@ impl Agent {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let _ = ui_tx.send(UiMessage::Activity("thinking...".into()));
|
let _ = ui_tx.send(UiMessage::Activity("thinking...".into()));
|
||||||
let api_result = self
|
|
||||||
.client
|
// Stream events from the API — we route each event to the
|
||||||
.chat_completion_stream(
|
// appropriate UI pane rather than letting the API layer do it.
|
||||||
|
let mut rx = self.client.start_stream(
|
||||||
&self.messages,
|
&self.messages,
|
||||||
Some(&self.tool_defs),
|
Some(&self.tool_defs),
|
||||||
ui_tx,
|
ui_tx,
|
||||||
target,
|
|
||||||
&self.reasoning_effort,
|
&self.reasoning_effort,
|
||||||
)
|
None,
|
||||||
.await;
|
);
|
||||||
|
|
||||||
// Context overflow → compact and retry (max 2 attempts)
|
let mut content = String::new();
|
||||||
// Stream error → retry with backoff (max 2 attempts)
|
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
||||||
let (msg, usage) = match api_result {
|
let mut usage = None;
|
||||||
Err(e) if crate::agent::context::is_context_overflow(&e) && overflow_retries < 2 => {
|
let mut finish_reason = None;
|
||||||
|
let mut in_tool_call = false;
|
||||||
|
let mut stream_error = None;
|
||||||
|
let mut first_content = true;
|
||||||
|
// Buffer for content not yet sent to UI — holds a tail
|
||||||
|
// that might be a partial <tool_call> tag.
|
||||||
|
let mut display_buf = String::new();
|
||||||
|
|
||||||
|
while let Some(event) = rx.recv().await {
|
||||||
|
match event {
|
||||||
|
StreamEvent::Content(text) => {
|
||||||
|
if first_content {
|
||||||
|
let _ = ui_tx.send(UiMessage::Activity("streaming...".into()));
|
||||||
|
first_content = false;
|
||||||
|
}
|
||||||
|
content.push_str(&text);
|
||||||
|
|
||||||
|
if in_tool_call {
|
||||||
|
// Already inside a tool call — suppress display.
|
||||||
|
} else {
|
||||||
|
display_buf.push_str(&text);
|
||||||
|
|
||||||
|
if let Some(pos) = display_buf.find("<tool_call>") {
|
||||||
|
// Flush content before the tag, suppress the rest.
|
||||||
|
let before = &display_buf[..pos];
|
||||||
|
if !before.is_empty() {
|
||||||
|
let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target));
|
||||||
|
}
|
||||||
|
display_buf.clear();
|
||||||
|
in_tool_call = true;
|
||||||
|
} else {
|
||||||
|
// Flush display_buf except a tail that could be
|
||||||
|
// a partial "<tool_call>" (10 chars).
|
||||||
|
let safe = display_buf.len().saturating_sub(10);
|
||||||
|
if safe > 0 {
|
||||||
|
let flush = display_buf[..safe].to_string();
|
||||||
|
display_buf = display_buf[safe..].to_string();
|
||||||
|
let _ = ui_tx.send(UiMessage::TextDelta(flush, target));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StreamEvent::Reasoning(text) => {
|
||||||
|
let _ = ui_tx.send(UiMessage::Reasoning(text));
|
||||||
|
}
|
||||||
|
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
|
||||||
|
while tool_calls.len() <= index {
|
||||||
|
tool_calls.push(ToolCall {
|
||||||
|
id: String::new(),
|
||||||
|
call_type: "function".to_string(),
|
||||||
|
function: FunctionCall { name: String::new(), arguments: String::new() },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(id) = id { tool_calls[index].id = id; }
|
||||||
|
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
|
||||||
|
if let Some(n) = name { tool_calls[index].function.name = n; }
|
||||||
|
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
|
||||||
|
}
|
||||||
|
StreamEvent::Usage(u) => usage = Some(u),
|
||||||
|
StreamEvent::Finished { reason, .. } => {
|
||||||
|
finish_reason = Some(reason);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
StreamEvent::Error(e) => {
|
||||||
|
stream_error = Some(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle stream errors with retry logic
|
||||||
|
if let Some(e) = stream_error {
|
||||||
|
let err = anyhow::anyhow!("{}", e);
|
||||||
|
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
|
||||||
overflow_retries += 1;
|
overflow_retries += 1;
|
||||||
let _ = ui_tx.send(UiMessage::Info(format!(
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
||||||
"[context overflow — compacting and retrying ({}/2)]",
|
"[context overflow — compacting and retrying ({}/2)]",
|
||||||
|
|
@ -274,7 +348,7 @@ impl Agent {
|
||||||
self.emergency_compact();
|
self.emergency_compact();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Err(e) if crate::agent::context::is_stream_error(&e) && empty_retries < 2 => {
|
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
|
||||||
empty_retries += 1;
|
empty_retries += 1;
|
||||||
let _ = ui_tx.send(UiMessage::Info(format!(
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
||||||
"[stream error: {} — retrying ({}/2)]",
|
"[stream error: {} — retrying ({}/2)]",
|
||||||
|
|
@ -283,8 +357,23 @@ impl Agent {
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
other => other?,
|
return Err(err);
|
||||||
};
|
}
|
||||||
|
|
||||||
|
if finish_reason.as_deref() == Some("error") {
|
||||||
|
let detail = if content.is_empty() { "no details".into() } else { content };
|
||||||
|
return Err(anyhow::anyhow!("model stream error: {}", detail));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush remaining display buffer (normal responses without tool calls).
|
||||||
|
if !in_tool_call && !display_buf.is_empty() {
|
||||||
|
let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target));
|
||||||
|
}
|
||||||
|
if !content.is_empty() && !in_tool_call {
|
||||||
|
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
|
||||||
|
}
|
||||||
|
|
||||||
|
let msg = crate::agent::api::build_response_message(content, tool_calls);
|
||||||
|
|
||||||
// Strip ephemeral tool calls (journal) that the API has
|
// Strip ephemeral tool calls (journal) that the API has
|
||||||
// now processed. They're persisted to disk; no need to keep
|
// now processed. They're persisted to disk; no need to keep
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ pub struct Choice {
|
||||||
pub finish_reason: Option<String>,
|
pub finish_reason: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct Usage {
|
pub struct Usage {
|
||||||
pub prompt_tokens: u32,
|
pub prompt_tokens: u32,
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@
|
||||||
use crate::agent::api::ApiClient;
|
use crate::agent::api::ApiClient;
|
||||||
use crate::agent::types::*;
|
use crate::agent::types::*;
|
||||||
use crate::thought::{self, ProcessTracker};
|
use crate::thought::{self, ProcessTracker};
|
||||||
use crate::agent::ui_channel::StreamTarget;
|
|
||||||
|
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
|
|
||||||
|
|
@ -72,7 +71,6 @@ pub async fn call_api_with_tools(
|
||||||
&messages,
|
&messages,
|
||||||
Some(&tool_defs),
|
Some(&tool_defs),
|
||||||
&ui_tx,
|
&ui_tx,
|
||||||
StreamTarget::Autonomous,
|
|
||||||
&reasoning,
|
&reasoning,
|
||||||
temperature,
|
temperature,
|
||||||
).await {
|
).await {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue