// api/openai.rs — OpenAI-compatible backend // // Works with any provider that implements the OpenAI chat completions // API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc. // Also used for local models (Qwen, llama) via compatible servers. use anyhow::Result; use reqwest::Client; use std::time::Duration; use crate::agent::types::*; use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender}; pub async fn stream( client: &Client, base_url: &str, api_key: &str, model: &str, messages: &[Message], tools: Option<&[ToolDef]>, ui_tx: &UiSender, target: StreamTarget, reasoning_effort: &str, temperature: Option, ) -> Result<(Message, Option)> { let request = ChatRequest { model: model.to_string(), messages: messages.to_vec(), tool_choice: tools.map(|_| "auto".to_string()), tools: tools.map(|t| t.to_vec()), max_tokens: Some(16384), temperature: Some(temperature.unwrap_or(0.6)), stream: Some(true), reasoning: if reasoning_effort != "none" && reasoning_effort != "default" { Some(ReasoningConfig { enabled: true, effort: Some(reasoning_effort.to_string()), }) } else { None }, chat_template_kwargs: None, }; let url = format!("{}/chat/completions", base_url); let msg_count = request.messages.len(); let debug_label = format!("{} messages, model={}", msg_count, model); let mut response = super::send_and_check( client, &url, &request, ("Authorization", &format!("Bearer {}", api_key)), &[], ui_tx, &debug_label, ) .await?; let mut reader = super::SseReader::new(ui_tx); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; let mut finish_reason = None; let mut reasoning_chars: usize = 0; let mut empty_deltas: u64 = 0; let mut first_content_at: Option = None; let _reasoning_enabled = reasoning_effort != "none"; while let Some(event) = reader.next_event(&mut response).await? { // OpenRouter sometimes embeds error objects in the stream if let Some(err_msg) = event["error"]["message"].as_str() { let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or(""); let _ = ui_tx.send(UiMessage::Debug(format!( "API error in stream: {}", err_msg ))); anyhow::bail!("API error in stream: {} {}", err_msg, raw); } let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) { Ok(c) => c, Err(e) => { // Log unparseable events — they may contain error info let preview = event.to_string(); let _ = ui_tx.send(UiMessage::Debug(format!( "unparseable SSE event ({}): {}", e, &preview[..preview.len().min(300)] ))); continue; } }; if chunk.usage.is_some() { usage = chunk.usage; } for choice in &chunk.choices { if choice.finish_reason.is_some() { finish_reason = choice.finish_reason.clone(); } let has_content = choice.delta.content.is_some(); let has_tools = choice.delta.tool_calls.is_some(); // Reasoning tokens — multiple field names across providers let mut has_reasoning = false; if let Some(ref r) = choice.delta.reasoning_content { reasoning_chars += r.len(); has_reasoning = true; if !r.is_empty() { let _ = ui_tx.send(UiMessage::Reasoning(r.clone())); } } if let Some(ref r) = choice.delta.reasoning { reasoning_chars += r.len(); has_reasoning = true; if !r.is_empty() { let _ = ui_tx.send(UiMessage::Reasoning(r.clone())); } } if let Some(ref r) = choice.delta.reasoning_details { let s = r.to_string(); reasoning_chars += s.len(); has_reasoning = true; if !s.is_empty() && s != "null" { let _ = ui_tx.send(UiMessage::Reasoning(s)); } } if let Some(ref text_delta) = choice.delta.content { if first_content_at.is_none() && !text_delta.is_empty() { first_content_at = Some(reader.stream_start.elapsed()); let _ = ui_tx.send(UiMessage::Activity("streaming...".into())); } content.push_str(text_delta); let _ = ui_tx.send(UiMessage::TextDelta(text_delta.clone(), target)); } if let Some(ref tc_deltas) = choice.delta.tool_calls { for tc_delta in tc_deltas { let idx = tc_delta.index; while tool_calls.len() <= idx { tool_calls.push(ToolCall { id: String::new(), call_type: "function".to_string(), function: FunctionCall { name: String::new(), arguments: String::new(), }, }); } if let Some(ref id) = tc_delta.id { tool_calls[idx].id = id.clone(); } if let Some(ref ct) = tc_delta.call_type { tool_calls[idx].call_type = ct.clone(); } if let Some(ref func) = tc_delta.function { if let Some(ref name) = func.name { tool_calls[idx].function.name = name.clone(); } if let Some(ref args) = func.arguments { tool_calls[idx].function.arguments.push_str(args); } } } } if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() { empty_deltas += 1; } } } let total_elapsed = reader.stream_start.elapsed(); super::log_diagnostics( ui_tx, content.len(), tool_calls.len(), reasoning_chars, reasoning_effort, &finish_reason, reader.chunks_received, reader.sse_lines_parsed, reader.sse_parse_errors, empty_deltas, total_elapsed, first_content_at, &usage, &tool_calls, ); // Model/provider error delivered inside the stream (HTTP 200 but // finish_reason="error"). Surface whatever content came back as // the error message so the caller can retry or display it. // Don't append the trailing newline — this isn't real content. if finish_reason.as_deref() == Some("error") { let detail = if content.is_empty() { "no details".to_string() } else { content }; anyhow::bail!("model stream error: {}", detail); } if !content.is_empty() { let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); } Ok((super::build_response_message(content, tool_calls), usage)) }