From 48db4a42ccfab89290cb41897dea476c72ad6f43 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Wed, 8 Apr 2026 15:01:42 -0400 Subject: [PATCH] =?UTF-8?q?WIP:=20Kill=20chat=20API=20path=20=E2=80=94=20S?= =?UTF-8?q?treamEvent,=20collect=5Fstream,=20build=5Fresponse=5Fmessage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed start_stream, chat_completion_stream_temp, collect_stream, StreamResult, build_response_message. All streaming goes through stream_completion → StreamToken now. ConversationLog rewritten for AstNode serialization. Remaining: openai.rs stream_events, mind/, user/, oneshot, learn. Co-Authored-By: Proof of Concept --- src/agent/api/mod.rs | 220 ------------------------------------------- 1 file changed, 220 deletions(-) diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 69c685a..0f24e7e 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -80,41 +80,6 @@ impl ApiClient { } } - /// Start a streaming chat completion. Returns a receiver of StreamEvents. - /// The caller (runner) reads events and handles routing to the UI. - /// - pub(crate) fn start_stream( - &self, - messages: &[Message], - tools: &[agent_tools::Tool], - reasoning_effort: &str, - sampling: SamplingParams, - priority: Option, - ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { - let (tx, rx) = mpsc::unbounded_channel(); - let client = self.client.clone(); - let api_key = self.api_key.clone(); - let model = self.model.clone(); - let messages = messages.to_vec(); - let tools_json = tools_to_json_str(tools); - let tools_value: serde_json::Value = serde_json::from_str(&tools_json).unwrap_or_default(); - let reasoning_effort = reasoning_effort.to_string(); - let base_url = self.base_url.clone(); - - let handle = tokio::spawn(async move { - let result = openai::stream_events( - &client, &base_url, &api_key, &model, - &messages, &tools_value, &tx, - &reasoning_effort, sampling, priority, - ).await; - if let Err(e) = result { - let _ = tx.send(StreamEvent::Error(e.to_string())); - } - }); - - (rx, AbortOnDrop(handle)) - } - /// Stream a completion with raw token IDs. /// Returns (text, token_id) per token via channel. pub(crate) fn stream_completion( @@ -143,55 +108,6 @@ impl ApiClient { (rx, AbortOnDrop(handle)) } - pub(crate) async fn chat_completion_stream_temp( - &self, - messages: &[Message], - tools: &[agent_tools::Tool], - reasoning_effort: &str, - sampling: SamplingParams, - priority: Option, - ) -> Result<(Message, Option)> { - // Use the event stream and accumulate into a message. - let (mut rx, _handle) = self.start_stream(messages, tools, reasoning_effort, sampling, priority); - let mut content = String::new(); - let mut tool_calls: Vec = Vec::new(); - let mut usage = None; - let mut finish_reason = None; - - while let Some(event) = rx.recv().await { - match event { - StreamEvent::Content(text) => content.push_str(&text), - StreamEvent::Reasoning(_) => {} - StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { - while tool_calls.len() <= index { - tool_calls.push(ToolCall { - id: String::new(), - call_type: "function".to_string(), - function: FunctionCall { name: String::new(), arguments: String::new() }, - }); - } - if let Some(id) = id { tool_calls[index].id = id; } - if let Some(ct) = call_type { tool_calls[index].call_type = ct; } - if let Some(n) = name { tool_calls[index].function.name = n; } - if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } - } - StreamEvent::Usage(u) => usage = Some(u), - StreamEvent::Finished { reason, .. } => { - finish_reason = Some(reason); - break; - } - StreamEvent::Error(e) => anyhow::bail!("{}", e), - } - } - - if finish_reason.as_deref() == Some("error") { - let detail = if content.is_empty() { "no details".into() } else { content }; - anyhow::bail!("model stream error: {}", detail); - } - - Ok((build_response_message(content, tool_calls), usage)) - } - pub fn base_url(&self) -> &str { &self.base_url } pub fn api_key(&self) -> &str { &self.api_key } @@ -574,139 +490,3 @@ pub(crate) fn log_diagnostics( } } } - -// --------------------------------------------------------------------------- -// Stream collection — assembles StreamEvents into a complete response -// --------------------------------------------------------------------------- - -/// Result of collecting a complete response from the stream. -pub(crate) struct StreamResult { - pub(crate) content: String, - pub(crate) tool_calls: Vec, - pub(crate) usage: Option, - pub(crate) finish_reason: Option, - pub(crate) error: Option, - /// Remaining display buffer (caller should flush if not in a tool call). - pub(crate) display_buf: String, - /// Accumulated reasoning/thinking content. - pub(crate) reasoning: String, - /// Whether we were mid-tool-call when the stream ended. - pub(crate) in_tool_call: bool, -} - -/// Collect stream events into a complete response. Handles: -/// - Content accumulation and display buffering -/// - Leaked tool call detection and dispatch (Qwen XML in content) -/// - Structured tool call delta assembly (OpenAI-style) -/// - UI forwarding (text deltas, reasoning, tool call notifications) -pub(crate) async fn collect_stream( - rx: &mut mpsc::UnboundedReceiver, - agent: &std::sync::Arc>, - active_tools: &crate::agent::tools::SharedActiveTools, -) -> StreamResult { - let mut content = String::new(); - let mut tool_calls: Vec = Vec::new(); - let mut usage = None; - let mut finish_reason = None; - let mut in_tool_call = false; - let mut tool_call_buf = String::new(); - let mut error = None; - let mut first_content = true; - let mut display_buf = String::new(); - let mut reasoning_buf = String::new(); - let mut _streaming_guard: Option = None; - - while let Some(event) = rx.recv().await { - match event { - StreamEvent::Content(text) => { - if first_content { - _streaming_guard = Some(super::start_activity(agent, "streaming...").await); - first_content = false; - } - content.push_str(&text); - - if in_tool_call { - tool_call_buf.push_str(&text); - if let Some(end) = tool_call_buf.find("") { - let body = &tool_call_buf[..end]; - if let Some(call) = parsing::parse_tool_call_body(body) { - let args: serde_json::Value = - serde_json::from_str(&call.function.arguments).unwrap_or_default(); - let args_summary = summarize_args(&call.function.name, &args); - let is_background = args.get("run_in_background") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - let call_id = call.id.clone(); - let call_name = call.function.name.clone(); - let agent_handle = agent.clone(); - let handle = tokio::spawn(async move { - let output = agent_tools::dispatch_with_agent( - &call.function.name, &args, Some(agent_handle)).await; - (call, output) - }); - active_tools.lock().unwrap().push(ActiveToolCall { - id: call_id, - name: call_name, - detail: args_summary, - started: std::time::Instant::now(), - background: is_background, - handle, - }); - } - let remaining = tool_call_buf[end + "".len()..].to_string(); - tool_call_buf.clear(); - in_tool_call = false; - if !remaining.trim().is_empty() { - display_buf.push_str(&remaining); - } - } - } else { - display_buf.push_str(&text); - if let Some(pos) = display_buf.find("") { - let before = &display_buf[..pos]; - if !before.is_empty() { - if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(before); } - } - display_buf.clear(); - in_tool_call = true; - } else { - let safe = display_buf.len().saturating_sub(10); - let safe = display_buf.floor_char_boundary(safe); - if safe > 0 { - let flush = display_buf[..safe].to_string(); - display_buf = display_buf[safe..].to_string(); - if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(&flush); } - } - } - } - } - StreamEvent::Reasoning(text) => { - reasoning_buf.push_str(&text); - } - StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { - while tool_calls.len() <= index { - tool_calls.push(ToolCall { - id: String::new(), - call_type: "function".to_string(), - function: FunctionCall { name: String::new(), arguments: String::new() }, - }); - } - if let Some(id) = id { tool_calls[index].id = id; } - if let Some(ct) = call_type { tool_calls[index].call_type = ct; } - if let Some(n) = name { tool_calls[index].function.name = n; } - if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } - } - StreamEvent::Usage(u) => usage = Some(u), - StreamEvent::Finished { reason, .. } => { - finish_reason = Some(reason); - break; - } - StreamEvent::Error(e) => { - error = Some(e); - break; - } - } - } - - StreamResult { content, tool_calls, usage, finish_reason, error, display_buf, in_tool_call, reasoning: reasoning_buf } -}