consciousness/src/agent/api/openai.rs
Kent Overstreet 1e5cd0dd3f Kill dead API code: stream_events, parsing.rs, build_response_message, log_diagnostics
Deleted: api/parsing.rs entirely (parsing now in context_new.rs),
stream_events (chat completions path), collect_stream, build_response_message,
log_diagnostics, tools_to_json_str, start_stream, chat_completion_stream_temp.

API layer is now just: stream_completion (token IDs in/out), SseReader,
send_and_check, and types. Zero errors in api/.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-08 15:06:33 -04:00

97 lines
3.1 KiB
Rust

// api/openai.rs — OpenAI-compatible backend
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
// Also used for local models (Qwen, llama) via compatible servers.
use anyhow::Result;
use tokio::sync::mpsc;
use super::http::HttpClient;
use super::types::*;
use super::StreamToken;
/// Stream from /v1/completions with raw token IDs in and out.
/// Each SSE chunk yields one token (text + id). All parsing (think tags,
/// tool calls) is handled by the ResponseParser, not here.
pub(super) async fn stream_completions(
client: &HttpClient,
base_url: &str,
api_key: &str,
model: &str,
prompt_tokens: &[u32],
tx: &mpsc::UnboundedSender<StreamToken>,
sampling: super::SamplingParams,
priority: Option<i32>,
) -> Result<()> {
let mut request = serde_json::json!({
"model": model,
"prompt": prompt_tokens,
"max_tokens": 16384,
"temperature": sampling.temperature,
"top_p": sampling.top_p,
"top_k": sampling.top_k,
"stream": true,
"return_token_ids": true,
"skip_special_tokens": false,
"stop_token_ids": [super::super::tokenizer::IM_END],
});
if let Some(p) = priority {
request["priority"] = serde_json::json!(p);
}
let url = format!("{}/completions", base_url);
let debug_label = format!("{} prompt tokens, model={}", prompt_tokens.len(), model);
let mut response = super::send_and_check(
client,
&url,
&request,
("Authorization", &format!("Bearer {}", api_key)),
&[],
&debug_label,
None,
)
.await?;
let mut reader = super::SseReader::new();
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
anyhow::bail!("API error in stream: {}", err_msg);
}
if let Some(u) = event["usage"].as_object() {
if let Ok(u) = serde_json::from_value::<Usage>(serde_json::Value::Object(u.clone())) {
usage = Some(u);
}
}
let choices = match event["choices"].as_array() {
Some(c) => c,
None => continue,
};
for choice in choices {
let text = choice["text"].as_str().unwrap_or("");
let token_ids = choice["token_ids"].as_array();
if let Some(ids) = token_ids {
for (i, id_val) in ids.iter().enumerate() {
if let Some(id) = id_val.as_u64() {
let _ = tx.send(StreamToken::Token {
text: if i == 0 { text.to_string() } else { String::new() },
id: id as u32,
});
}
}
} else if !text.is_empty() {
let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 });
}
}
}
let _ = tx.send(StreamToken::Done { usage });
Ok(())
}