consciousness/src/agent/api/openai.rs

193 lines
6.3 KiB
Rust
Raw Normal View History

// api/openai.rs — OpenAI-compatible backend
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
// Also used for local models (Qwen, llama) via compatible servers.
use anyhow::Result;
use reqwest::Client;
use tokio::sync::mpsc;
use crate::agent::types::*;
use crate::agent::ui_channel::{UiMessage, UiSender};
use super::StreamEvent;
/// Stream SSE events from an OpenAI-compatible endpoint, sending
/// parsed StreamEvents through the channel. The caller (runner)
/// handles routing to the UI.
pub async fn stream_events(
client: &Client,
base_url: &str,
api_key: &str,
model: &str,
messages: &[Message],
tools: Option<&[ToolDef]>,
tx: &mpsc::UnboundedSender<StreamEvent>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> Result<()> {
let request = ChatRequest {
model: model.to_string(),
messages: messages.to_vec(),
tool_choice: tools.map(|_| "auto".to_string()),
tools: tools.map(|t| t.to_vec()),
max_tokens: Some(16384),
temperature: Some(temperature.unwrap_or(0.6)),
stream: Some(true),
reasoning: if reasoning_effort != "none" && reasoning_effort != "default" {
Some(ReasoningConfig {
enabled: true,
effort: Some(reasoning_effort.to_string()),
})
} else {
None
},
chat_template_kwargs: None,
priority,
};
let url = format!("{}/chat/completions", base_url);
let msg_count = request.messages.len();
let pri_label = match priority {
Some(p) => format!(", priority={}", p),
None => String::new(),
};
let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label);
let mut response = super::send_and_check(
client,
&url,
&request,
("Authorization", &format!("Bearer {}", api_key)),
&[],
ui_tx,
&debug_label,
)
.await?;
let mut reader = super::SseReader::new(ui_tx);
let mut content_len: usize = 0;
let mut reasoning_chars: usize = 0;
let mut tool_call_count: usize = 0;
let mut empty_deltas: u64 = 0;
let mut first_content_at = None;
let mut finish_reason = None;
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
let _ = ui_tx.send(UiMessage::Debug(format!(
"API error in stream: {}", err_msg
)));
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
}
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
Ok(c) => c,
Err(e) => {
let preview = event.to_string();
let _ = ui_tx.send(UiMessage::Debug(format!(
"unparseable SSE event ({}): {}",
e, &preview[..preview.len().min(300)]
)));
continue;
}
};
if let Some(ref u) = chunk.usage {
let _ = tx.send(StreamEvent::Usage(u.clone()));
usage = chunk.usage;
}
for choice in &chunk.choices {
if choice.finish_reason.is_some() {
finish_reason = choice.finish_reason.clone();
}
let has_content = choice.delta.content.is_some();
let has_tools = choice.delta.tool_calls.is_some();
// Reasoning tokens — multiple field names across providers
let mut has_reasoning = false;
for r in [
choice.delta.reasoning_content.as_ref(),
choice.delta.reasoning.as_ref(),
].into_iter().flatten() {
reasoning_chars += r.len();
has_reasoning = true;
if !r.is_empty() {
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
}
}
if let Some(ref r) = choice.delta.reasoning_details {
let s = r.to_string();
reasoning_chars += s.len();
has_reasoning = true;
if !s.is_empty() && s != "null" {
let _ = tx.send(StreamEvent::Reasoning(s));
}
}
if let Some(ref text_delta) = choice.delta.content {
if first_content_at.is_none() && !text_delta.is_empty() {
first_content_at = Some(reader.stream_start.elapsed());
}
content_len += text_delta.len();
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
}
if let Some(ref tc_deltas) = choice.delta.tool_calls {
for tc_delta in tc_deltas {
tool_call_count = tool_call_count.max(tc_delta.index + 1);
let _ = tx.send(StreamEvent::ToolCallDelta {
index: tc_delta.index,
id: tc_delta.id.clone(),
call_type: tc_delta.call_type.clone(),
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
});
}
}
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
empty_deltas += 1;
}
}
}
let total_elapsed = reader.stream_start.elapsed();
super::log_diagnostics(
ui_tx,
content_len,
tool_call_count,
reasoning_chars,
reasoning_effort,
&finish_reason,
reader.chunks_received,
reader.sse_lines_parsed,
reader.sse_parse_errors,
empty_deltas,
total_elapsed,
first_content_at,
&usage,
&[], // tool_calls not accumulated here anymore
);
let reason = finish_reason.unwrap_or_default();
let (pt, ct) = usage.as_ref()
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
let _ = tx.send(StreamEvent::Finished {
reason,
prompt_tokens: pt,
completion_tokens: ct,
});
Ok(())
}