WIP: Kill chat API path — StreamEvent, collect_stream, build_response_message

Removed start_stream, chat_completion_stream_temp, collect_stream,
StreamResult, build_response_message. All streaming goes through
stream_completion → StreamToken now. ConversationLog rewritten
for AstNode serialization.

Remaining: openai.rs stream_events, mind/, user/, oneshot, learn.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-08 15:01:42 -04:00
parent a68377907a
commit 48db4a42cc

View file

@ -80,41 +80,6 @@ impl ApiClient {
} }
} }
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
/// The caller (runner) reads events and handles routing to the UI.
///
pub(crate) fn start_stream(
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let messages = messages.to_vec();
let tools_json = tools_to_json_str(tools);
let tools_value: serde_json::Value = serde_json::from_str(&tools_json).unwrap_or_default();
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, &tools_value, &tx,
&reasoning_effort, sampling, priority,
).await;
if let Err(e) = result {
let _ = tx.send(StreamEvent::Error(e.to_string()));
}
});
(rx, AbortOnDrop(handle))
}
/// Stream a completion with raw token IDs. /// Stream a completion with raw token IDs.
/// Returns (text, token_id) per token via channel. /// Returns (text, token_id) per token via channel.
pub(crate) fn stream_completion( pub(crate) fn stream_completion(
@ -143,55 +108,6 @@ impl ApiClient {
(rx, AbortOnDrop(handle)) (rx, AbortOnDrop(handle))
} }
pub(crate) async fn chat_completion_stream_temp(
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let (mut rx, _handle) = self.start_stream(messages, tools, reasoning_effort, sampling, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => content.push_str(&text),
StreamEvent::Reasoning(_) => {}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => anyhow::bail!("{}", e),
}
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
anyhow::bail!("model stream error: {}", detail);
}
Ok((build_response_message(content, tool_calls), usage))
}
pub fn base_url(&self) -> &str { &self.base_url } pub fn base_url(&self) -> &str { &self.base_url }
pub fn api_key(&self) -> &str { &self.api_key } pub fn api_key(&self) -> &str { &self.api_key }
@ -574,139 +490,3 @@ pub(crate) fn log_diagnostics(
} }
} }
} }
// ---------------------------------------------------------------------------
// Stream collection — assembles StreamEvents into a complete response
// ---------------------------------------------------------------------------
/// Result of collecting a complete response from the stream.
pub(crate) struct StreamResult {
pub(crate) content: String,
pub(crate) tool_calls: Vec<ToolCall>,
pub(crate) usage: Option<Usage>,
pub(crate) finish_reason: Option<String>,
pub(crate) error: Option<String>,
/// Remaining display buffer (caller should flush if not in a tool call).
pub(crate) display_buf: String,
/// Accumulated reasoning/thinking content.
pub(crate) reasoning: String,
/// Whether we were mid-tool-call when the stream ended.
pub(crate) in_tool_call: bool,
}
/// Collect stream events into a complete response. Handles:
/// - Content accumulation and display buffering
/// - Leaked tool call detection and dispatch (Qwen XML in content)
/// - Structured tool call delta assembly (OpenAI-style)
/// - UI forwarding (text deltas, reasoning, tool call notifications)
pub(crate) async fn collect_stream(
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
active_tools: &crate::agent::tools::SharedActiveTools,
) -> StreamResult {
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
let mut in_tool_call = false;
let mut tool_call_buf = String::new();
let mut error = None;
let mut first_content = true;
let mut display_buf = String::new();
let mut reasoning_buf = String::new();
let mut _streaming_guard: Option<super::ActivityGuard> = None;
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => {
if first_content {
_streaming_guard = Some(super::start_activity(agent, "streaming...").await);
first_content = false;
}
content.push_str(&text);
if in_tool_call {
tool_call_buf.push_str(&text);
if let Some(end) = tool_call_buf.find("</tool_call>") {
let body = &tool_call_buf[..end];
if let Some(call) = parsing::parse_tool_call_body(body) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args);
let is_background = args.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let call_id = call.id.clone();
let call_name = call.function.name.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = agent_tools::dispatch_with_agent(
&call.function.name, &args, Some(agent_handle)).await;
(call, output)
});
active_tools.lock().unwrap().push(ActiveToolCall {
id: call_id,
name: call_name,
detail: args_summary,
started: std::time::Instant::now(),
background: is_background,
handle,
});
}
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
tool_call_buf.clear();
in_tool_call = false;
if !remaining.trim().is_empty() {
display_buf.push_str(&remaining);
}
}
} else {
display_buf.push_str(&text);
if let Some(pos) = display_buf.find("<tool_call>") {
let before = &display_buf[..pos];
if !before.is_empty() {
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(before); }
}
display_buf.clear();
in_tool_call = true;
} else {
let safe = display_buf.len().saturating_sub(10);
let safe = display_buf.floor_char_boundary(safe);
if safe > 0 {
let flush = display_buf[..safe].to_string();
display_buf = display_buf[safe..].to_string();
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(&flush); }
}
}
}
}
StreamEvent::Reasoning(text) => {
reasoning_buf.push_str(&text);
}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => {
error = Some(e);
break;
}
}
}
StreamResult { content, tool_calls, usage, finish_reason, error, display_buf, in_tool_call, reasoning: reasoning_buf }
}