2026-03-25 00:52:41 -04:00
|
|
|
// api/openai.rs — OpenAI-compatible backend
|
|
|
|
|
//
|
|
|
|
|
// Works with any provider that implements the OpenAI chat completions
|
|
|
|
|
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
|
|
|
|
|
// Also used for local models (Qwen, llama) via compatible servers.
|
|
|
|
|
|
|
|
|
|
use anyhow::Result;
|
2026-03-29 21:22:42 -04:00
|
|
|
use tokio::sync::mpsc;
|
2026-03-25 00:52:41 -04:00
|
|
|
|
2026-04-07 12:50:40 -04:00
|
|
|
use super::http::HttpClient;
|
2026-04-04 00:29:11 -04:00
|
|
|
use super::types::*;
|
2026-03-29 21:22:42 -04:00
|
|
|
use super::StreamEvent;
|
2026-03-25 00:52:41 -04:00
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
/// Stream SSE events from an OpenAI-compatible endpoint, sending
|
|
|
|
|
/// parsed StreamEvents through the channel. The caller (runner)
|
|
|
|
|
/// handles routing to the UI.
|
2026-04-02 16:15:32 -04:00
|
|
|
pub(super) async fn stream_events(
|
2026-04-07 12:50:40 -04:00
|
|
|
client: &HttpClient,
|
2026-03-25 00:52:41 -04:00
|
|
|
base_url: &str,
|
|
|
|
|
api_key: &str,
|
|
|
|
|
model: &str,
|
|
|
|
|
messages: &[Message],
|
2026-04-04 16:39:04 -04:00
|
|
|
tools_json: &serde_json::Value,
|
2026-03-29 21:22:42 -04:00
|
|
|
tx: &mpsc::UnboundedSender<StreamEvent>,
|
2026-03-25 00:52:41 -04:00
|
|
|
reasoning_effort: &str,
|
2026-04-04 13:48:24 -04:00
|
|
|
sampling: super::SamplingParams,
|
2026-04-01 23:21:39 -04:00
|
|
|
priority: Option<i32>,
|
2026-03-29 21:22:42 -04:00
|
|
|
) -> Result<()> {
|
2026-04-04 16:39:04 -04:00
|
|
|
let has_tools = tools_json.as_array().map_or(false, |a| !a.is_empty());
|
2026-03-25 00:52:41 -04:00
|
|
|
let request = ChatRequest {
|
|
|
|
|
model: model.to_string(),
|
|
|
|
|
messages: messages.to_vec(),
|
2026-04-04 16:39:04 -04:00
|
|
|
tool_choice: if has_tools { Some("auto".to_string()) } else { None },
|
|
|
|
|
tools: if has_tools { Some(tools_json.clone()) } else { None },
|
2026-03-25 00:52:41 -04:00
|
|
|
max_tokens: Some(16384),
|
2026-04-04 13:48:24 -04:00
|
|
|
temperature: Some(sampling.temperature),
|
|
|
|
|
top_p: Some(sampling.top_p),
|
|
|
|
|
top_k: Some(sampling.top_k),
|
2026-03-25 00:52:41 -04:00
|
|
|
stream: Some(true),
|
|
|
|
|
reasoning: if reasoning_effort != "none" && reasoning_effort != "default" {
|
|
|
|
|
Some(ReasoningConfig {
|
|
|
|
|
enabled: true,
|
|
|
|
|
effort: Some(reasoning_effort.to_string()),
|
|
|
|
|
})
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
},
|
|
|
|
|
chat_template_kwargs: None,
|
2026-04-01 23:21:39 -04:00
|
|
|
priority,
|
2026-03-25 00:52:41 -04:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let url = format!("{}/chat/completions", base_url);
|
|
|
|
|
let msg_count = request.messages.len();
|
2026-04-02 00:32:23 -04:00
|
|
|
let pri_label = match priority {
|
|
|
|
|
Some(p) => format!(", priority={}", p),
|
|
|
|
|
None => String::new(),
|
|
|
|
|
};
|
|
|
|
|
let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label);
|
2026-04-02 15:10:40 -04:00
|
|
|
let request_json = serde_json::to_string_pretty(&request).ok();
|
2026-03-25 00:52:41 -04:00
|
|
|
|
|
|
|
|
let mut response = super::send_and_check(
|
|
|
|
|
client,
|
|
|
|
|
&url,
|
|
|
|
|
&request,
|
|
|
|
|
("Authorization", &format!("Bearer {}", api_key)),
|
|
|
|
|
&[],
|
|
|
|
|
&debug_label,
|
2026-04-02 15:10:40 -04:00
|
|
|
request_json.as_deref(),
|
2026-03-25 00:52:41 -04:00
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
2026-04-05 22:34:48 -04:00
|
|
|
let mut reader = super::SseReader::new();
|
2026-04-02 15:10:40 -04:00
|
|
|
reader.request_json = request_json;
|
2026-03-25 00:52:41 -04:00
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
let mut content_len: usize = 0;
|
2026-03-25 00:52:41 -04:00
|
|
|
let mut reasoning_chars: usize = 0;
|
2026-03-29 21:22:42 -04:00
|
|
|
let mut tool_call_count: usize = 0;
|
2026-03-25 00:52:41 -04:00
|
|
|
let mut empty_deltas: u64 = 0;
|
2026-03-29 21:22:42 -04:00
|
|
|
let mut first_content_at = None;
|
|
|
|
|
let mut finish_reason = None;
|
|
|
|
|
let mut usage = None;
|
2026-03-25 00:52:41 -04:00
|
|
|
|
|
|
|
|
while let Some(event) = reader.next_event(&mut response).await? {
|
|
|
|
|
if let Some(err_msg) = event["error"]["message"].as_str() {
|
|
|
|
|
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-29 21:22:42 -04:00
|
|
|
"API error in stream: {}", err_msg
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
|
|
|
|
|
Ok(c) => c,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let preview = event.to_string();
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"unparseable SSE event ({}): {}",
|
|
|
|
|
e, &preview[..preview.len().min(300)]
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
if let Some(ref u) = chunk.usage {
|
2026-04-05 22:18:07 -04:00
|
|
|
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
2026-03-25 00:52:41 -04:00
|
|
|
usage = chunk.usage;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for choice in &chunk.choices {
|
|
|
|
|
if choice.finish_reason.is_some() {
|
|
|
|
|
finish_reason = choice.finish_reason.clone();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let has_content = choice.delta.content.is_some();
|
|
|
|
|
let has_tools = choice.delta.tool_calls.is_some();
|
|
|
|
|
|
|
|
|
|
// Reasoning tokens — multiple field names across providers
|
|
|
|
|
let mut has_reasoning = false;
|
2026-03-29 21:22:42 -04:00
|
|
|
for r in [
|
|
|
|
|
choice.delta.reasoning_content.as_ref(),
|
|
|
|
|
choice.delta.reasoning.as_ref(),
|
|
|
|
|
].into_iter().flatten() {
|
2026-03-25 00:52:41 -04:00
|
|
|
reasoning_chars += r.len();
|
|
|
|
|
has_reasoning = true;
|
|
|
|
|
if !r.is_empty() {
|
2026-04-05 22:18:07 -04:00
|
|
|
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if let Some(ref r) = choice.delta.reasoning_details {
|
|
|
|
|
let s = r.to_string();
|
|
|
|
|
reasoning_chars += s.len();
|
|
|
|
|
has_reasoning = true;
|
|
|
|
|
if !s.is_empty() && s != "null" {
|
2026-03-29 21:22:42 -04:00
|
|
|
let _ = tx.send(StreamEvent::Reasoning(s));
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(ref text_delta) = choice.delta.content {
|
|
|
|
|
if first_content_at.is_none() && !text_delta.is_empty() {
|
|
|
|
|
first_content_at = Some(reader.stream_start.elapsed());
|
|
|
|
|
}
|
2026-03-29 21:22:42 -04:00
|
|
|
content_len += text_delta.len();
|
2026-04-05 22:18:07 -04:00
|
|
|
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(ref tc_deltas) = choice.delta.tool_calls {
|
|
|
|
|
for tc_delta in tc_deltas {
|
2026-03-29 21:22:42 -04:00
|
|
|
tool_call_count = tool_call_count.max(tc_delta.index + 1);
|
|
|
|
|
let _ = tx.send(StreamEvent::ToolCallDelta {
|
|
|
|
|
index: tc_delta.index,
|
|
|
|
|
id: tc_delta.id.clone(),
|
|
|
|
|
call_type: tc_delta.call_type.clone(),
|
|
|
|
|
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
|
|
|
|
|
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
|
|
|
|
|
});
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
|
|
|
|
|
empty_deltas += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let total_elapsed = reader.stream_start.elapsed();
|
|
|
|
|
|
|
|
|
|
super::log_diagnostics(
|
2026-03-29 21:22:42 -04:00
|
|
|
content_len,
|
|
|
|
|
tool_call_count,
|
2026-03-25 00:52:41 -04:00
|
|
|
reasoning_chars,
|
|
|
|
|
reasoning_effort,
|
|
|
|
|
&finish_reason,
|
|
|
|
|
reader.chunks_received,
|
|
|
|
|
reader.sse_lines_parsed,
|
|
|
|
|
reader.sse_parse_errors,
|
|
|
|
|
empty_deltas,
|
|
|
|
|
total_elapsed,
|
|
|
|
|
first_content_at,
|
|
|
|
|
&usage,
|
2026-03-29 21:22:42 -04:00
|
|
|
&[], // tool_calls not accumulated here anymore
|
2026-03-25 00:52:41 -04:00
|
|
|
);
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
let reason = finish_reason.unwrap_or_default();
|
2026-04-07 13:55:30 -04:00
|
|
|
let _ = tx.send(StreamEvent::Finished { reason });
|
2026-03-29 21:22:42 -04:00
|
|
|
|
|
|
|
|
Ok(())
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
2026-04-08 11:42:22 -04:00
|
|
|
|
|
|
|
|
/// Stream from the /v1/completions endpoint using raw token IDs.
|
|
|
|
|
/// Tool calls come as text (<tool_call> tags) and are parsed by the caller.
|
|
|
|
|
/// Thinking content comes as <think> tags and is split into Reasoning events.
|
|
|
|
|
pub(super) async fn stream_completions(
|
|
|
|
|
client: &HttpClient,
|
|
|
|
|
base_url: &str,
|
|
|
|
|
api_key: &str,
|
|
|
|
|
model: &str,
|
|
|
|
|
prompt_tokens: &[u32],
|
|
|
|
|
tx: &mpsc::UnboundedSender<StreamEvent>,
|
|
|
|
|
sampling: super::SamplingParams,
|
|
|
|
|
priority: Option<i32>,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let mut request = serde_json::json!({
|
|
|
|
|
"model": model,
|
|
|
|
|
"prompt": prompt_tokens,
|
|
|
|
|
"max_tokens": 16384,
|
|
|
|
|
"temperature": sampling.temperature,
|
|
|
|
|
"top_p": sampling.top_p,
|
|
|
|
|
"top_k": sampling.top_k,
|
|
|
|
|
"stream": true,
|
|
|
|
|
"stop_token_ids": [super::super::tokenizer::IM_END],
|
|
|
|
|
});
|
|
|
|
|
if let Some(p) = priority {
|
|
|
|
|
request["priority"] = serde_json::json!(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let url = format!("{}/completions", base_url);
|
|
|
|
|
let debug_label = format!("{} prompt tokens, model={}", prompt_tokens.len(), model);
|
|
|
|
|
|
|
|
|
|
let mut response = super::send_and_check(
|
|
|
|
|
client,
|
|
|
|
|
&url,
|
|
|
|
|
&request,
|
|
|
|
|
("Authorization", &format!("Bearer {}", api_key)),
|
|
|
|
|
&[],
|
|
|
|
|
&debug_label,
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
let mut reader = super::SseReader::new();
|
|
|
|
|
let mut content_len: usize = 0;
|
|
|
|
|
let mut first_content_at = None;
|
|
|
|
|
let mut finish_reason = None;
|
|
|
|
|
let mut usage = None;
|
|
|
|
|
let mut in_think = false;
|
|
|
|
|
|
|
|
|
|
while let Some(event) = reader.next_event(&mut response).await? {
|
|
|
|
|
if let Some(err_msg) = event["error"]["message"].as_str() {
|
|
|
|
|
anyhow::bail!("API error in stream: {}", err_msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Completions chunks have a simpler structure
|
|
|
|
|
if let Some(u) = event["usage"].as_object() {
|
|
|
|
|
if let Ok(u) = serde_json::from_value::<Usage>(serde_json::Value::Object(u.clone())) {
|
|
|
|
|
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
|
|
|
|
usage = Some(u);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let choices = match event["choices"].as_array() {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => continue,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for choice in choices {
|
|
|
|
|
if let Some(reason) = choice["finish_reason"].as_str() {
|
|
|
|
|
if reason != "null" {
|
|
|
|
|
finish_reason = Some(reason.to_string());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(text) = choice["text"].as_str() {
|
|
|
|
|
if text.is_empty() { continue; }
|
|
|
|
|
|
|
|
|
|
// Handle <think> tags — split into Reasoning vs Content
|
|
|
|
|
if text.contains("<think>") || in_think {
|
|
|
|
|
// Simple state machine for think tags
|
|
|
|
|
let mut remaining = text;
|
|
|
|
|
while !remaining.is_empty() {
|
|
|
|
|
if in_think {
|
|
|
|
|
if let Some(end) = remaining.find("</think>") {
|
|
|
|
|
let thinking = &remaining[..end];
|
|
|
|
|
if !thinking.is_empty() {
|
|
|
|
|
let _ = tx.send(StreamEvent::Reasoning(thinking.to_string()));
|
|
|
|
|
}
|
|
|
|
|
remaining = &remaining[end + 8..];
|
|
|
|
|
in_think = false;
|
|
|
|
|
} else {
|
|
|
|
|
let _ = tx.send(StreamEvent::Reasoning(remaining.to_string()));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if let Some(start) = remaining.find("<think>") {
|
|
|
|
|
let content = &remaining[..start];
|
|
|
|
|
if !content.is_empty() {
|
|
|
|
|
content_len += content.len();
|
|
|
|
|
if first_content_at.is_none() {
|
|
|
|
|
first_content_at = Some(reader.stream_start.elapsed());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx.send(StreamEvent::Content(content.to_string()));
|
|
|
|
|
}
|
|
|
|
|
remaining = &remaining[start + 7..];
|
|
|
|
|
in_think = true;
|
|
|
|
|
} else {
|
|
|
|
|
content_len += remaining.len();
|
|
|
|
|
if first_content_at.is_none() {
|
|
|
|
|
first_content_at = Some(reader.stream_start.elapsed());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx.send(StreamEvent::Content(remaining.to_string()));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
content_len += text.len();
|
|
|
|
|
if first_content_at.is_none() {
|
|
|
|
|
first_content_at = Some(reader.stream_start.elapsed());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx.send(StreamEvent::Content(text.to_string()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let total_elapsed = reader.stream_start.elapsed();
|
|
|
|
|
super::log_diagnostics(
|
|
|
|
|
content_len, 0, 0, "none",
|
|
|
|
|
&finish_reason,
|
|
|
|
|
reader.chunks_received,
|
|
|
|
|
reader.sse_lines_parsed,
|
|
|
|
|
reader.sse_parse_errors,
|
|
|
|
|
0, total_elapsed, first_content_at,
|
|
|
|
|
&usage, &[],
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let reason = finish_reason.unwrap_or_default();
|
|
|
|
|
let _ = tx.send(StreamEvent::Finished { reason });
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|