// api/openai.rs — OpenAI-compatible backend // // Works with any provider that implements the OpenAI chat completions // API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc. // Also used for local models (Qwen, llama) via compatible servers. use anyhow::Result; use reqwest::Client; use tokio::sync::mpsc; use super::types::*; use crate::user::ui_channel::{UiMessage, UiSender}; use super::StreamEvent; /// Stream SSE events from an OpenAI-compatible endpoint, sending /// parsed StreamEvents through the channel. The caller (runner) /// handles routing to the UI. pub(super) async fn stream_events( client: &Client, base_url: &str, api_key: &str, model: &str, messages: &[Message], tools_json: &serde_json::Value, tx: &mpsc::UnboundedSender, ui_tx: &UiSender, reasoning_effort: &str, sampling: super::SamplingParams, priority: Option, ) -> Result<()> { let has_tools = tools_json.as_array().map_or(false, |a| !a.is_empty()); let request = ChatRequest { model: model.to_string(), messages: messages.to_vec(), tool_choice: if has_tools { Some("auto".to_string()) } else { None }, tools: if has_tools { Some(tools_json.clone()) } else { None }, max_tokens: Some(16384), temperature: Some(sampling.temperature), top_p: Some(sampling.top_p), top_k: Some(sampling.top_k), stream: Some(true), reasoning: if reasoning_effort != "none" && reasoning_effort != "default" { Some(ReasoningConfig { enabled: true, effort: Some(reasoning_effort.to_string()), }) } else { None }, chat_template_kwargs: None, priority, }; let url = format!("{}/chat/completions", base_url); let msg_count = request.messages.len(); let pri_label = match priority { Some(p) => format!(", priority={}", p), None => String::new(), }; let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label); let request_json = serde_json::to_string_pretty(&request).ok(); let mut response = super::send_and_check( client, &url, &request, ("Authorization", &format!("Bearer {}", api_key)), &[], ui_tx, &debug_label, request_json.as_deref(), ) .await?; let mut reader = super::SseReader::new(ui_tx); reader.request_json = request_json; let mut content_len: usize = 0; let mut reasoning_chars: usize = 0; let mut tool_call_count: usize = 0; let mut empty_deltas: u64 = 0; let mut first_content_at = None; let mut finish_reason = None; let mut usage = None; while let Some(event) = reader.next_event(&mut response).await? { if let Some(err_msg) = event["error"]["message"].as_str() { let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or(""); let _ = ui_tx.send(UiMessage::Debug(format!( "API error in stream: {}", err_msg ))); anyhow::bail!("API error in stream: {} {}", err_msg, raw); } let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) { Ok(c) => c, Err(e) => { let preview = event.to_string(); let _ = ui_tx.send(UiMessage::Debug(format!( "unparseable SSE event ({}): {}", e, &preview[..preview.len().min(300)] ))); continue; } }; if let Some(ref u) = chunk.usage { let _ = tx.send(StreamEvent::Usage(u.clone())); usage = chunk.usage; } for choice in &chunk.choices { if choice.finish_reason.is_some() { finish_reason = choice.finish_reason.clone(); } let has_content = choice.delta.content.is_some(); let has_tools = choice.delta.tool_calls.is_some(); // Reasoning tokens — multiple field names across providers let mut has_reasoning = false; for r in [ choice.delta.reasoning_content.as_ref(), choice.delta.reasoning.as_ref(), ].into_iter().flatten() { reasoning_chars += r.len(); has_reasoning = true; if !r.is_empty() { let _ = tx.send(StreamEvent::Reasoning(r.clone())); } } if let Some(ref r) = choice.delta.reasoning_details { let s = r.to_string(); reasoning_chars += s.len(); has_reasoning = true; if !s.is_empty() && s != "null" { let _ = tx.send(StreamEvent::Reasoning(s)); } } if let Some(ref text_delta) = choice.delta.content { if first_content_at.is_none() && !text_delta.is_empty() { first_content_at = Some(reader.stream_start.elapsed()); } content_len += text_delta.len(); let _ = tx.send(StreamEvent::Content(text_delta.clone())); } if let Some(ref tc_deltas) = choice.delta.tool_calls { for tc_delta in tc_deltas { tool_call_count = tool_call_count.max(tc_delta.index + 1); let _ = tx.send(StreamEvent::ToolCallDelta { index: tc_delta.index, id: tc_delta.id.clone(), call_type: tc_delta.call_type.clone(), name: tc_delta.function.as_ref().and_then(|f| f.name.clone()), arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()), }); } } if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() { empty_deltas += 1; } } } let total_elapsed = reader.stream_start.elapsed(); super::log_diagnostics( ui_tx, content_len, tool_call_count, reasoning_chars, reasoning_effort, &finish_reason, reader.chunks_received, reader.sse_lines_parsed, reader.sse_parse_errors, empty_deltas, total_elapsed, first_content_at, &usage, &[], // tool_calls not accumulated here anymore ); let reason = finish_reason.unwrap_or_default(); let (pt, ct) = usage.as_ref() .map(|u| (u.prompt_tokens, u.completion_tokens)) .unwrap_or((0, 0)); let _ = tx.send(StreamEvent::Finished { reason, prompt_tokens: pt, completion_tokens: ct, }); Ok(()) }