2026-04-02 14:13:23 -04:00
|
|
|
// api/ — LLM API client (OpenAI-compatible)
|
2026-03-25 00:52:41 -04:00
|
|
|
//
|
2026-04-02 14:13:23 -04:00
|
|
|
// Works with any provider that implements the OpenAI chat completions
|
|
|
|
|
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
|
2026-03-25 00:52:41 -04:00
|
|
|
//
|
|
|
|
|
// Diagnostics: anomalies always logged to debug panel.
|
|
|
|
|
// Set POC_DEBUG=1 for verbose per-turn logging.
|
|
|
|
|
|
2026-04-07 12:50:40 -04:00
|
|
|
pub mod http;
|
2026-04-07 13:43:25 -04:00
|
|
|
mod parsing;
|
Restrict API types visibility — types module is now private
Only Message, Role, MessageContent, ContentPart, ToolCall,
FunctionCall, Usage, ImageUrl are pub-exported from agent::api.
Internal types (ChatRequest, ChatCompletionChunk, ChunkChoice,
Delta, ReasoningConfig, ToolCallDelta, FunctionCallDelta) are
pub(crate) — invisible outside the crate.
All callers updated to import from agent::api:: instead of
agent::api::types::.
Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-07 13:39:20 -04:00
|
|
|
mod types;
|
2026-03-25 00:52:41 -04:00
|
|
|
mod openai;
|
|
|
|
|
|
Restrict API types visibility — types module is now private
Only Message, Role, MessageContent, ContentPart, ToolCall,
FunctionCall, Usage, ImageUrl are pub-exported from agent::api.
Internal types (ChatRequest, ChatCompletionChunk, ChunkChoice,
Delta, ReasoningConfig, ToolCallDelta, FunctionCallDelta) are
pub(crate) — invisible outside the crate.
All callers updated to import from agent::api:: instead of
agent::api::types::.
Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-07 13:39:20 -04:00
|
|
|
// Public API types — used outside agent::api
|
|
|
|
|
pub use types::{Message, MessageContent, ContentPart, ImageUrl, Role, ToolCall, FunctionCall, Usage};
|
2026-04-04 00:29:11 -04:00
|
|
|
|
2026-03-25 00:52:41 -04:00
|
|
|
use anyhow::Result;
|
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
2026-04-07 12:50:40 -04:00
|
|
|
use self::http::{HttpClient, HttpResponse};
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
use tokio::sync::mpsc;
|
|
|
|
|
|
2026-04-04 18:05:16 -04:00
|
|
|
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
|
2026-03-29 21:22:42 -04:00
|
|
|
|
2026-04-02 18:41:02 -04:00
|
|
|
/// A JoinHandle that aborts its task when dropped.
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) struct AbortOnDrop(tokio::task::JoinHandle<()>);
|
2026-04-02 18:41:02 -04:00
|
|
|
|
|
|
|
|
impl Drop for AbortOnDrop {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
self.0.abort();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 13:48:24 -04:00
|
|
|
/// Sampling parameters for model generation.
|
|
|
|
|
#[derive(Clone, Copy)]
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) struct SamplingParams {
|
2026-04-04 13:48:24 -04:00
|
|
|
pub temperature: f32,
|
|
|
|
|
pub top_p: f32,
|
|
|
|
|
pub top_k: u32,
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
|
// Stream events — yielded by backends, consumed by the runner
|
|
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
2026-04-04 16:39:04 -04:00
|
|
|
/// Build the tools JSON string from a slice of Tools.
|
|
|
|
|
fn tools_to_json_str(tools: &[agent_tools::Tool]) -> String {
|
|
|
|
|
let inner: Vec<String> = tools.iter().map(|t| t.to_json()).collect();
|
|
|
|
|
format!("[{}]", inner.join(","))
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
/// Events produced by the streaming API backends.
|
|
|
|
|
/// The runner reads these and decides what to display where.
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) enum StreamEvent {
|
2026-03-29 21:22:42 -04:00
|
|
|
/// Content token from the model's response.
|
|
|
|
|
Content(String),
|
|
|
|
|
/// Reasoning/thinking token (internal monologue).
|
|
|
|
|
Reasoning(String),
|
|
|
|
|
/// Incremental tool call delta (structured, from APIs that support it).
|
|
|
|
|
ToolCallDelta {
|
|
|
|
|
index: usize,
|
|
|
|
|
id: Option<String>,
|
|
|
|
|
call_type: Option<String>,
|
|
|
|
|
name: Option<String>,
|
|
|
|
|
arguments: Option<String>,
|
|
|
|
|
},
|
|
|
|
|
/// Token usage stats.
|
|
|
|
|
Usage(Usage),
|
|
|
|
|
/// Stream finished.
|
|
|
|
|
Finished {
|
|
|
|
|
reason: String,
|
|
|
|
|
},
|
|
|
|
|
/// Error from the stream.
|
|
|
|
|
Error(String),
|
|
|
|
|
}
|
2026-03-25 00:52:41 -04:00
|
|
|
|
2026-04-02 22:18:50 -04:00
|
|
|
#[derive(Clone)]
|
2026-03-25 00:52:41 -04:00
|
|
|
pub struct ApiClient {
|
2026-04-07 12:50:40 -04:00
|
|
|
client: HttpClient,
|
2026-03-25 00:52:41 -04:00
|
|
|
api_key: String,
|
|
|
|
|
pub model: String,
|
2026-04-02 14:13:23 -04:00
|
|
|
base_url: String,
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ApiClient {
|
|
|
|
|
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
|
2026-04-07 12:50:40 -04:00
|
|
|
let client = HttpClient::builder()
|
2026-03-25 00:52:41 -04:00
|
|
|
.connect_timeout(Duration::from_secs(30))
|
|
|
|
|
.timeout(Duration::from_secs(600))
|
2026-04-07 12:50:40 -04:00
|
|
|
.build();
|
2026-03-25 00:52:41 -04:00
|
|
|
|
|
|
|
|
Self {
|
|
|
|
|
client,
|
|
|
|
|
api_key: api_key.to_string(),
|
|
|
|
|
model: model.to_string(),
|
2026-04-02 14:13:23 -04:00
|
|
|
base_url: base_url.trim_end_matches('/').to_string(),
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-29 21:22:42 -04:00
|
|
|
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
|
|
|
|
|
/// The caller (runner) reads events and handles routing to the UI.
|
|
|
|
|
///
|
2026-04-07 13:55:30 -04:00
|
|
|
pub(crate) fn start_stream(
|
2026-03-29 21:22:42 -04:00
|
|
|
&self,
|
|
|
|
|
messages: &[Message],
|
2026-04-04 16:39:04 -04:00
|
|
|
tools: &[agent_tools::Tool],
|
2026-03-29 21:22:42 -04:00
|
|
|
reasoning_effort: &str,
|
2026-04-04 13:48:24 -04:00
|
|
|
sampling: SamplingParams,
|
2026-04-01 23:21:39 -04:00
|
|
|
priority: Option<i32>,
|
2026-04-02 18:41:02 -04:00
|
|
|
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
|
2026-03-29 21:22:42 -04:00
|
|
|
let (tx, rx) = mpsc::unbounded_channel();
|
|
|
|
|
let client = self.client.clone();
|
|
|
|
|
let api_key = self.api_key.clone();
|
|
|
|
|
let model = self.model.clone();
|
|
|
|
|
let messages = messages.to_vec();
|
2026-04-04 16:39:04 -04:00
|
|
|
let tools_json = tools_to_json_str(tools);
|
|
|
|
|
let tools_value: serde_json::Value = serde_json::from_str(&tools_json).unwrap_or_default();
|
2026-03-29 21:22:42 -04:00
|
|
|
let reasoning_effort = reasoning_effort.to_string();
|
2026-04-02 14:13:23 -04:00
|
|
|
let base_url = self.base_url.clone();
|
2026-03-29 21:22:42 -04:00
|
|
|
|
2026-04-02 18:41:02 -04:00
|
|
|
let handle = tokio::spawn(async move {
|
2026-04-02 14:13:23 -04:00
|
|
|
let result = openai::stream_events(
|
|
|
|
|
&client, &base_url, &api_key, &model,
|
2026-04-05 22:34:48 -04:00
|
|
|
&messages, &tools_value, &tx,
|
2026-04-04 13:48:24 -04:00
|
|
|
&reasoning_effort, sampling, priority,
|
2026-04-02 14:13:23 -04:00
|
|
|
).await;
|
2026-03-29 21:22:42 -04:00
|
|
|
if let Err(e) = result {
|
2026-04-05 22:18:07 -04:00
|
|
|
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
2026-03-29 21:22:42 -04:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
2026-04-02 18:41:02 -04:00
|
|
|
(rx, AbortOnDrop(handle))
|
2026-03-29 21:22:42 -04:00
|
|
|
}
|
|
|
|
|
|
2026-04-08 11:42:22 -04:00
|
|
|
/// Start a streaming completion with raw token IDs.
|
|
|
|
|
/// No message formatting — the caller provides the complete prompt as tokens.
|
|
|
|
|
pub(crate) fn start_stream_completions(
|
|
|
|
|
&self,
|
|
|
|
|
prompt_tokens: &[u32],
|
|
|
|
|
sampling: SamplingParams,
|
|
|
|
|
priority: Option<i32>,
|
|
|
|
|
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
|
|
|
|
|
let (tx, rx) = mpsc::unbounded_channel();
|
|
|
|
|
let client = self.client.clone();
|
|
|
|
|
let api_key = self.api_key.clone();
|
|
|
|
|
let model = self.model.clone();
|
|
|
|
|
let prompt_tokens = prompt_tokens.to_vec();
|
|
|
|
|
let base_url = self.base_url.clone();
|
|
|
|
|
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
|
let result = openai::stream_completions(
|
|
|
|
|
&client, &base_url, &api_key, &model,
|
|
|
|
|
&prompt_tokens, &tx, sampling, priority,
|
|
|
|
|
).await;
|
|
|
|
|
if let Err(e) = result {
|
|
|
|
|
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
(rx, AbortOnDrop(handle))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-07 13:55:30 -04:00
|
|
|
pub(crate) async fn chat_completion_stream_temp(
|
2026-03-25 00:52:41 -04:00
|
|
|
&self,
|
|
|
|
|
messages: &[Message],
|
2026-04-04 16:39:04 -04:00
|
|
|
tools: &[agent_tools::Tool],
|
2026-03-25 00:52:41 -04:00
|
|
|
reasoning_effort: &str,
|
2026-04-04 13:48:24 -04:00
|
|
|
sampling: SamplingParams,
|
2026-04-01 23:21:39 -04:00
|
|
|
priority: Option<i32>,
|
2026-03-25 00:52:41 -04:00
|
|
|
) -> Result<(Message, Option<Usage>)> {
|
2026-03-29 21:22:42 -04:00
|
|
|
// Use the event stream and accumulate into a message.
|
2026-04-05 22:34:48 -04:00
|
|
|
let (mut rx, _handle) = self.start_stream(messages, tools, reasoning_effort, sampling, priority);
|
2026-03-29 21:22:42 -04:00
|
|
|
let mut content = String::new();
|
|
|
|
|
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
|
|
|
|
let mut usage = None;
|
|
|
|
|
let mut finish_reason = None;
|
|
|
|
|
|
|
|
|
|
while let Some(event) = rx.recv().await {
|
|
|
|
|
match event {
|
|
|
|
|
StreamEvent::Content(text) => content.push_str(&text),
|
|
|
|
|
StreamEvent::Reasoning(_) => {}
|
|
|
|
|
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
|
|
|
|
|
while tool_calls.len() <= index {
|
|
|
|
|
tool_calls.push(ToolCall {
|
|
|
|
|
id: String::new(),
|
|
|
|
|
call_type: "function".to_string(),
|
|
|
|
|
function: FunctionCall { name: String::new(), arguments: String::new() },
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
if let Some(id) = id { tool_calls[index].id = id; }
|
|
|
|
|
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
|
|
|
|
|
if let Some(n) = name { tool_calls[index].function.name = n; }
|
|
|
|
|
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
|
|
|
|
|
}
|
|
|
|
|
StreamEvent::Usage(u) => usage = Some(u),
|
|
|
|
|
StreamEvent::Finished { reason, .. } => {
|
|
|
|
|
finish_reason = Some(reason);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
StreamEvent::Error(e) => anyhow::bail!("{}", e),
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
2026-03-29 21:22:42 -04:00
|
|
|
|
|
|
|
|
if finish_reason.as_deref() == Some("error") {
|
|
|
|
|
let detail = if content.is_empty() { "no details".into() } else { content };
|
|
|
|
|
anyhow::bail!("model stream error: {}", detail);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok((build_response_message(content, tool_calls), usage))
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
2026-04-02 22:13:55 -04:00
|
|
|
pub fn base_url(&self) -> &str { &self.base_url }
|
|
|
|
|
pub fn api_key(&self) -> &str { &self.api_key }
|
|
|
|
|
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send an HTTP request and check for errors. Shared by both backends.
|
|
|
|
|
pub(crate) async fn send_and_check(
|
2026-04-07 12:50:40 -04:00
|
|
|
client: &HttpClient,
|
2026-03-25 00:52:41 -04:00
|
|
|
url: &str,
|
|
|
|
|
body: &impl serde::Serialize,
|
|
|
|
|
auth_header: (&str, &str),
|
|
|
|
|
extra_headers: &[(&str, &str)],
|
|
|
|
|
debug_label: &str,
|
2026-04-02 15:10:40 -04:00
|
|
|
request_json: Option<&str>,
|
2026-04-07 12:50:40 -04:00
|
|
|
) -> Result<HttpResponse> {
|
2026-03-25 00:52:41 -04:00
|
|
|
let debug = std::env::var("POC_DEBUG").is_ok();
|
|
|
|
|
let start = Instant::now();
|
|
|
|
|
|
|
|
|
|
if debug {
|
|
|
|
|
let payload_size = serde_json::to_string(body)
|
|
|
|
|
.map(|s| s.len())
|
|
|
|
|
.unwrap_or(0);
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"request: {}K payload, {}",
|
|
|
|
|
payload_size / 1024, debug_label,
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
2026-04-07 12:50:40 -04:00
|
|
|
let mut headers: Vec<(&str, &str)> = Vec::with_capacity(extra_headers.len() + 1);
|
|
|
|
|
headers.push(auth_header);
|
|
|
|
|
headers.extend_from_slice(extra_headers);
|
2026-03-25 00:52:41 -04:00
|
|
|
|
2026-04-07 12:50:40 -04:00
|
|
|
let response = client
|
|
|
|
|
.send_json("POST", url, &headers, body)
|
2026-03-25 00:52:41 -04:00
|
|
|
.await
|
|
|
|
|
.map_err(|e| {
|
2026-04-07 12:50:40 -04:00
|
|
|
let msg = e.to_string();
|
|
|
|
|
let cause = if msg.contains("connect timeout") || msg.contains("TCP connect") {
|
2026-03-25 00:52:41 -04:00
|
|
|
"connection refused"
|
2026-04-07 12:50:40 -04:00
|
|
|
} else if msg.contains("request timeout") {
|
2026-03-25 00:52:41 -04:00
|
|
|
"request timed out"
|
|
|
|
|
} else {
|
2026-04-07 12:50:40 -04:00
|
|
|
"request error"
|
2026-03-25 00:52:41 -04:00
|
|
|
};
|
2026-04-07 12:50:40 -04:00
|
|
|
anyhow::anyhow!("{} ({}): {}", cause, url, msg)
|
2026-03-25 00:52:41 -04:00
|
|
|
})?;
|
|
|
|
|
|
|
|
|
|
let status = response.status();
|
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
|
|
|
|
|
|
if debug {
|
|
|
|
|
for name in [
|
|
|
|
|
"x-ratelimit-remaining",
|
|
|
|
|
"x-ratelimit-limit",
|
|
|
|
|
"x-request-id",
|
|
|
|
|
] {
|
2026-04-07 12:50:40 -04:00
|
|
|
if let Some(val) = response.header(name) {
|
|
|
|
|
dbglog!("header {}: {}", name, val);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !status.is_success() {
|
|
|
|
|
let body = response.text().await.unwrap_or_default();
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"HTTP {} after {:.1}s ({}): {}",
|
|
|
|
|
status,
|
|
|
|
|
elapsed.as_secs_f64(),
|
|
|
|
|
url,
|
|
|
|
|
&body[..body.len().min(500)]
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-04-02 15:10:40 -04:00
|
|
|
if let Some(json) = request_json {
|
|
|
|
|
let log_dir = dirs::home_dir()
|
|
|
|
|
.unwrap_or_default()
|
2026-04-02 19:28:56 -04:00
|
|
|
.join(".consciousness/logs/failed-requests");
|
|
|
|
|
let _ = std::fs::create_dir_all(&log_dir);
|
2026-04-02 15:10:40 -04:00
|
|
|
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
|
2026-04-02 19:28:56 -04:00
|
|
|
let path = log_dir.join(format!("{}.json", ts));
|
2026-04-02 15:10:40 -04:00
|
|
|
if std::fs::write(&path, json).is_ok() {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-04-02 15:10:40 -04:00
|
|
|
"saved failed request to {} (HTTP {})", path.display(), status
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-04-02 15:10:40 -04:00
|
|
|
}
|
|
|
|
|
}
|
2026-03-25 00:52:41 -04:00
|
|
|
anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.len().min(1000)]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if debug {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"connected in {:.1}s (HTTP {})",
|
|
|
|
|
elapsed.as_secs_f64(),
|
|
|
|
|
status.as_u16()
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(response)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// SSE stream reader. Handles the generic SSE plumbing shared by both
|
|
|
|
|
/// backends: chunk reading with timeout, line buffering, `data:` prefix
|
|
|
|
|
/// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics.
|
|
|
|
|
/// Yields parsed events as serde_json::Value — each backend handles its
|
|
|
|
|
/// own event types.
|
|
|
|
|
pub(crate) struct SseReader {
|
|
|
|
|
line_buf: String,
|
|
|
|
|
chunk_timeout: Duration,
|
|
|
|
|
pub stream_start: Instant,
|
|
|
|
|
pub chunks_received: u64,
|
|
|
|
|
pub sse_lines_parsed: u64,
|
|
|
|
|
pub sse_parse_errors: u64,
|
|
|
|
|
debug: bool,
|
|
|
|
|
done: bool,
|
2026-04-02 15:10:40 -04:00
|
|
|
/// Serialized request payload — saved to disk on errors for replay debugging.
|
|
|
|
|
pub(crate) request_json: Option<String>,
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl SseReader {
|
2026-04-05 22:34:48 -04:00
|
|
|
pub(crate) fn new() -> Self {
|
2026-03-25 00:52:41 -04:00
|
|
|
Self {
|
|
|
|
|
line_buf: String::new(),
|
2026-04-02 18:46:27 -04:00
|
|
|
chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs),
|
2026-03-25 00:52:41 -04:00
|
|
|
stream_start: Instant::now(),
|
|
|
|
|
chunks_received: 0,
|
|
|
|
|
sse_lines_parsed: 0,
|
|
|
|
|
sse_parse_errors: 0,
|
|
|
|
|
debug: std::env::var("POC_DEBUG").is_ok(),
|
|
|
|
|
done: false,
|
2026-04-02 14:13:23 -04:00
|
|
|
request_json: None,
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-02 14:13:23 -04:00
|
|
|
/// Attach the serialized request payload for error diagnostics.
|
2026-04-02 15:10:40 -04:00
|
|
|
/// Save the request payload to disk for replay debugging.
|
|
|
|
|
fn save_failed_request(&self, reason: &str) {
|
|
|
|
|
let Some(ref json) = self.request_json else { return };
|
|
|
|
|
let log_dir = dirs::home_dir()
|
|
|
|
|
.unwrap_or_default()
|
2026-04-02 19:28:56 -04:00
|
|
|
.join(".consciousness/logs/failed-requests");
|
|
|
|
|
let _ = std::fs::create_dir_all(&log_dir);
|
2026-04-02 15:10:40 -04:00
|
|
|
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
|
2026-04-02 19:28:56 -04:00
|
|
|
let path = log_dir.join(format!("{}.json", ts));
|
2026-04-02 15:10:40 -04:00
|
|
|
if std::fs::write(&path, json).is_ok() {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-04-02 15:10:40 -04:00
|
|
|
"saved failed request to {} ({})", path.display(), reason
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-04-02 15:10:40 -04:00
|
|
|
}
|
2026-04-02 14:13:23 -04:00
|
|
|
}
|
|
|
|
|
|
2026-03-25 00:52:41 -04:00
|
|
|
/// Read the next SSE event from the response stream.
|
|
|
|
|
/// Returns Ok(Some(value)) for each parsed data line,
|
|
|
|
|
/// Ok(None) when the stream ends or [DONE] is received.
|
2026-04-02 16:15:32 -04:00
|
|
|
pub(crate) async fn next_event(
|
2026-03-25 00:52:41 -04:00
|
|
|
&mut self,
|
2026-04-07 12:50:40 -04:00
|
|
|
response: &mut HttpResponse,
|
2026-03-25 00:52:41 -04:00
|
|
|
) -> Result<Option<serde_json::Value>> {
|
|
|
|
|
loop {
|
|
|
|
|
// Drain complete lines from the buffer before reading more chunks
|
|
|
|
|
while let Some(newline_pos) = self.line_buf.find('\n') {
|
|
|
|
|
let line = self.line_buf[..newline_pos].trim().to_string();
|
|
|
|
|
self.line_buf = self.line_buf[newline_pos + 1..].to_string();
|
|
|
|
|
|
|
|
|
|
if line == "data: [DONE]" {
|
|
|
|
|
self.done = true;
|
|
|
|
|
return Ok(None);
|
|
|
|
|
}
|
|
|
|
|
if line.is_empty()
|
|
|
|
|
|| line.starts_with("event: ")
|
|
|
|
|
|| !line.starts_with("data: ")
|
|
|
|
|
{
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let json_str = &line[6..];
|
|
|
|
|
self.sse_lines_parsed += 1;
|
|
|
|
|
|
|
|
|
|
match serde_json::from_str(json_str) {
|
|
|
|
|
Ok(v) => return Ok(Some(v)),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
self.sse_parse_errors += 1;
|
|
|
|
|
if self.sse_parse_errors == 1 || self.debug {
|
|
|
|
|
let preview = if json_str.len() > 200 {
|
|
|
|
|
format!("{}...", &json_str[..200])
|
|
|
|
|
} else {
|
|
|
|
|
json_str.to_string()
|
|
|
|
|
};
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"SSE parse error (#{}) {}: {}",
|
|
|
|
|
self.sse_parse_errors, e, preview
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if self.done {
|
|
|
|
|
return Ok(None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read more data from the response stream
|
|
|
|
|
match tokio::time::timeout(self.chunk_timeout, response.chunk()).await {
|
|
|
|
|
Ok(Ok(Some(chunk))) => {
|
|
|
|
|
self.chunks_received += 1;
|
|
|
|
|
self.line_buf.push_str(&String::from_utf8_lossy(&chunk));
|
|
|
|
|
}
|
|
|
|
|
Ok(Ok(None)) => return Ok(None),
|
2026-04-02 15:10:40 -04:00
|
|
|
Ok(Err(e)) => {
|
2026-04-02 18:49:33 -04:00
|
|
|
let buf_preview = if self.line_buf.is_empty() {
|
|
|
|
|
"(empty)".to_string()
|
|
|
|
|
} else {
|
|
|
|
|
let n = self.line_buf.len().min(500);
|
|
|
|
|
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
|
|
|
|
|
};
|
|
|
|
|
let msg = format!(
|
|
|
|
|
"stream error after {} chunks, {:.1}s, {} sse lines: {} | buf: {}",
|
|
|
|
|
self.chunks_received,
|
|
|
|
|
self.stream_start.elapsed().as_secs_f64(),
|
|
|
|
|
self.sse_lines_parsed,
|
|
|
|
|
e, buf_preview,
|
|
|
|
|
);
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!("{}", msg);
|
2026-04-02 18:49:33 -04:00
|
|
|
self.save_failed_request(&msg);
|
2026-04-02 15:10:40 -04:00
|
|
|
return Err(e.into());
|
|
|
|
|
}
|
2026-03-25 00:52:41 -04:00
|
|
|
Err(_) => {
|
2026-04-02 18:49:33 -04:00
|
|
|
let buf_preview = if self.line_buf.is_empty() {
|
|
|
|
|
"(empty)".to_string()
|
|
|
|
|
} else {
|
|
|
|
|
let n = self.line_buf.len().min(500);
|
|
|
|
|
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
|
|
|
|
|
};
|
2026-04-02 15:10:40 -04:00
|
|
|
let msg = format!(
|
2026-04-02 18:49:33 -04:00
|
|
|
"stream timeout: {}s, {} chunks, {} sse lines, {:.1}s elapsed | buf: {}",
|
2026-03-25 00:52:41 -04:00
|
|
|
self.chunk_timeout.as_secs(),
|
|
|
|
|
self.chunks_received,
|
2026-04-02 18:49:33 -04:00
|
|
|
self.sse_lines_parsed,
|
|
|
|
|
self.stream_start.elapsed().as_secs_f64(),
|
|
|
|
|
buf_preview,
|
2026-04-02 15:10:40 -04:00
|
|
|
);
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!("{}", msg);
|
2026-04-02 15:10:40 -04:00
|
|
|
self.save_failed_request(&msg);
|
2026-03-25 00:52:41 -04:00
|
|
|
anyhow::bail!(
|
|
|
|
|
"stream timeout: no data for {}s ({} chunks received)",
|
|
|
|
|
self.chunk_timeout.as_secs(),
|
|
|
|
|
self.chunks_received
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build a response Message from accumulated content and tool calls.
|
|
|
|
|
/// Shared by both backends — the wire format differs but the internal
|
|
|
|
|
/// representation is the same.
|
2026-03-29 20:57:59 -04:00
|
|
|
///
|
|
|
|
|
/// If no structured tool calls came from the API but the content
|
|
|
|
|
/// contains leaked tool call XML (e.g. `<tool_call>...</tool_call>`
|
|
|
|
|
/// from models that emit tool calls as text), parse them out and
|
|
|
|
|
/// promote them to structured tool_calls. This way all consumers
|
|
|
|
|
/// see tool calls uniformly regardless of backend.
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) fn build_response_message(
|
2026-03-25 00:52:41 -04:00
|
|
|
content: String,
|
|
|
|
|
tool_calls: Vec<ToolCall>,
|
|
|
|
|
) -> Message {
|
2026-03-29 20:57:59 -04:00
|
|
|
// If the API returned structured tool calls, use them as-is.
|
|
|
|
|
if !tool_calls.is_empty() {
|
|
|
|
|
return Message {
|
|
|
|
|
role: Role::Assistant,
|
|
|
|
|
content: if content.is_empty() { None }
|
|
|
|
|
else { Some(MessageContent::Text(content)) },
|
|
|
|
|
tool_calls: Some(tool_calls),
|
|
|
|
|
tool_call_id: None,
|
|
|
|
|
name: None,
|
|
|
|
|
timestamp: None,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check for leaked tool calls in content text.
|
2026-04-04 17:58:49 -04:00
|
|
|
let leaked = parsing::parse_leaked_tool_calls(&content);
|
2026-03-29 20:57:59 -04:00
|
|
|
if !leaked.is_empty() {
|
2026-04-04 17:58:49 -04:00
|
|
|
let cleaned = parsing::strip_leaked_artifacts(&content);
|
2026-03-29 20:57:59 -04:00
|
|
|
return Message {
|
|
|
|
|
role: Role::Assistant,
|
|
|
|
|
content: if cleaned.trim().is_empty() { None }
|
|
|
|
|
else { Some(MessageContent::Text(cleaned)) },
|
|
|
|
|
tool_calls: Some(leaked),
|
|
|
|
|
tool_call_id: None,
|
|
|
|
|
name: None,
|
|
|
|
|
timestamp: None,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-25 00:52:41 -04:00
|
|
|
Message {
|
|
|
|
|
role: Role::Assistant,
|
2026-03-29 20:57:59 -04:00
|
|
|
content: if content.is_empty() { None }
|
|
|
|
|
else { Some(MessageContent::Text(content)) },
|
|
|
|
|
tool_calls: None,
|
2026-03-25 00:52:41 -04:00
|
|
|
tool_call_id: None,
|
|
|
|
|
name: None,
|
|
|
|
|
timestamp: None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Log stream diagnostics. Shared by both backends.
|
|
|
|
|
pub(crate) fn log_diagnostics(
|
|
|
|
|
content_len: usize,
|
|
|
|
|
tool_count: usize,
|
|
|
|
|
reasoning_chars: usize,
|
|
|
|
|
reasoning_effort: &str,
|
|
|
|
|
finish_reason: &Option<String>,
|
|
|
|
|
chunks_received: u64,
|
|
|
|
|
sse_lines_parsed: u64,
|
|
|
|
|
sse_parse_errors: u64,
|
|
|
|
|
empty_deltas: u64,
|
|
|
|
|
total_elapsed: Duration,
|
|
|
|
|
first_content_at: Option<Duration>,
|
|
|
|
|
usage: &Option<Usage>,
|
|
|
|
|
tools: &[ToolCall],
|
|
|
|
|
) {
|
|
|
|
|
let debug = std::env::var("POC_DEBUG").is_ok();
|
|
|
|
|
|
|
|
|
|
if reasoning_chars > 0 && reasoning_effort == "none" {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"note: {} chars leaked reasoning (suppressed from display)",
|
|
|
|
|
reasoning_chars
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
if content_len == 0 && tool_count == 0 {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \
|
|
|
|
|
parse_errors: {}, empty_deltas: {}, {:.1}s)",
|
|
|
|
|
finish_reason, chunks_received, reasoning_chars,
|
|
|
|
|
sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64()
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
if finish_reason.is_none() && chunks_received > 0 {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"WARNING: stream ended without finish_reason ({} chunks, {} content chars)",
|
|
|
|
|
chunks_received, content_len
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
if sse_parse_errors > 0 {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"WARNING: {} SSE parse errors out of {} lines",
|
|
|
|
|
sse_parse_errors, sse_lines_parsed
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if debug {
|
|
|
|
|
if let Some(u) = usage {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"tokens: {} prompt + {} completion = {} total",
|
|
|
|
|
u.prompt_tokens, u.completion_tokens, u.total_tokens
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
let ttft = first_content_at
|
|
|
|
|
.map(|d| format!("{:.1}s", d.as_secs_f64()))
|
|
|
|
|
.unwrap_or_else(|| "none".to_string());
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
"stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \
|
|
|
|
|
{} content chars, {} reasoning chars, {} tools, \
|
|
|
|
|
finish={:?}",
|
|
|
|
|
total_elapsed.as_secs_f64(),
|
|
|
|
|
ttft,
|
|
|
|
|
chunks_received,
|
|
|
|
|
sse_lines_parsed,
|
|
|
|
|
content_len,
|
|
|
|
|
reasoning_chars,
|
|
|
|
|
tool_count,
|
|
|
|
|
finish_reason,
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
if !tools.is_empty() {
|
|
|
|
|
for (i, tc) in tools.iter().enumerate() {
|
2026-04-05 21:45:55 -04:00
|
|
|
dbglog!(
|
2026-03-25 00:52:41 -04:00
|
|
|
" tool[{}]: {} (id: {}, {} arg chars)",
|
|
|
|
|
i, tc.function.name, tc.id, tc.function.arguments.len()
|
2026-04-05 21:45:55 -04:00
|
|
|
);
|
2026-03-25 00:52:41 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-04 18:05:16 -04:00
|
|
|
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Stream collection — assembles StreamEvents into a complete response
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
/// Result of collecting a complete response from the stream.
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) struct StreamResult {
|
|
|
|
|
pub(crate) content: String,
|
|
|
|
|
pub(crate) tool_calls: Vec<ToolCall>,
|
|
|
|
|
pub(crate) usage: Option<Usage>,
|
|
|
|
|
pub(crate) finish_reason: Option<String>,
|
|
|
|
|
pub(crate) error: Option<String>,
|
2026-04-04 18:05:16 -04:00
|
|
|
/// Remaining display buffer (caller should flush if not in a tool call).
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) display_buf: String,
|
2026-04-07 22:49:35 -04:00
|
|
|
/// Accumulated reasoning/thinking content.
|
|
|
|
|
pub(crate) reasoning: String,
|
2026-04-04 18:05:16 -04:00
|
|
|
/// Whether we were mid-tool-call when the stream ended.
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) in_tool_call: bool,
|
2026-04-04 18:05:16 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Collect stream events into a complete response. Handles:
|
|
|
|
|
/// - Content accumulation and display buffering
|
|
|
|
|
/// - Leaked tool call detection and dispatch (Qwen XML in content)
|
|
|
|
|
/// - Structured tool call delta assembly (OpenAI-style)
|
|
|
|
|
/// - UI forwarding (text deltas, reasoning, tool call notifications)
|
2026-04-07 13:43:25 -04:00
|
|
|
pub(crate) async fn collect_stream(
|
2026-04-04 18:05:16 -04:00
|
|
|
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
|
2026-04-05 21:13:48 -04:00
|
|
|
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
|
2026-04-05 22:34:48 -04:00
|
|
|
active_tools: &crate::agent::tools::SharedActiveTools,
|
2026-04-04 18:05:16 -04:00
|
|
|
) -> StreamResult {
|
|
|
|
|
let mut content = String::new();
|
|
|
|
|
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
|
|
|
|
let mut usage = None;
|
|
|
|
|
let mut finish_reason = None;
|
|
|
|
|
let mut in_tool_call = false;
|
|
|
|
|
let mut tool_call_buf = String::new();
|
|
|
|
|
let mut error = None;
|
|
|
|
|
let mut first_content = true;
|
|
|
|
|
let mut display_buf = String::new();
|
2026-04-07 22:49:35 -04:00
|
|
|
let mut reasoning_buf = String::new();
|
2026-04-05 22:18:07 -04:00
|
|
|
let mut _streaming_guard: Option<super::ActivityGuard> = None;
|
2026-04-04 18:05:16 -04:00
|
|
|
|
|
|
|
|
while let Some(event) = rx.recv().await {
|
|
|
|
|
match event {
|
|
|
|
|
StreamEvent::Content(text) => {
|
|
|
|
|
if first_content {
|
2026-04-05 22:18:07 -04:00
|
|
|
_streaming_guard = Some(super::start_activity(agent, "streaming...").await);
|
2026-04-04 18:05:16 -04:00
|
|
|
first_content = false;
|
|
|
|
|
}
|
|
|
|
|
content.push_str(&text);
|
|
|
|
|
|
|
|
|
|
if in_tool_call {
|
|
|
|
|
tool_call_buf.push_str(&text);
|
|
|
|
|
if let Some(end) = tool_call_buf.find("</tool_call>") {
|
|
|
|
|
let body = &tool_call_buf[..end];
|
|
|
|
|
if let Some(call) = parsing::parse_tool_call_body(body) {
|
|
|
|
|
let args: serde_json::Value =
|
|
|
|
|
serde_json::from_str(&call.function.arguments).unwrap_or_default();
|
|
|
|
|
let args_summary = summarize_args(&call.function.name, &args);
|
|
|
|
|
let is_background = args.get("run_in_background")
|
|
|
|
|
.and_then(|v| v.as_bool())
|
|
|
|
|
.unwrap_or(false);
|
|
|
|
|
let call_id = call.id.clone();
|
|
|
|
|
let call_name = call.function.name.clone();
|
|
|
|
|
let agent_handle = agent.clone();
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
|
let output = agent_tools::dispatch_with_agent(
|
|
|
|
|
&call.function.name, &args, Some(agent_handle)).await;
|
|
|
|
|
(call, output)
|
|
|
|
|
});
|
|
|
|
|
active_tools.lock().unwrap().push(ActiveToolCall {
|
|
|
|
|
id: call_id,
|
|
|
|
|
name: call_name,
|
|
|
|
|
detail: args_summary,
|
|
|
|
|
started: std::time::Instant::now(),
|
|
|
|
|
background: is_background,
|
|
|
|
|
handle,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
|
|
|
|
|
tool_call_buf.clear();
|
|
|
|
|
in_tool_call = false;
|
|
|
|
|
if !remaining.trim().is_empty() {
|
|
|
|
|
display_buf.push_str(&remaining);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
display_buf.push_str(&text);
|
|
|
|
|
if let Some(pos) = display_buf.find("<tool_call>") {
|
|
|
|
|
let before = &display_buf[..pos];
|
|
|
|
|
if !before.is_empty() {
|
2026-04-05 22:18:07 -04:00
|
|
|
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(before); }
|
2026-04-04 18:05:16 -04:00
|
|
|
}
|
|
|
|
|
display_buf.clear();
|
|
|
|
|
in_tool_call = true;
|
|
|
|
|
} else {
|
|
|
|
|
let safe = display_buf.len().saturating_sub(10);
|
|
|
|
|
let safe = display_buf.floor_char_boundary(safe);
|
|
|
|
|
if safe > 0 {
|
|
|
|
|
let flush = display_buf[..safe].to_string();
|
|
|
|
|
display_buf = display_buf[safe..].to_string();
|
2026-04-05 22:18:07 -04:00
|
|
|
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(&flush); }
|
2026-04-04 18:05:16 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
StreamEvent::Reasoning(text) => {
|
2026-04-07 22:49:35 -04:00
|
|
|
reasoning_buf.push_str(&text);
|
2026-04-04 18:05:16 -04:00
|
|
|
}
|
|
|
|
|
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
|
|
|
|
|
while tool_calls.len() <= index {
|
|
|
|
|
tool_calls.push(ToolCall {
|
|
|
|
|
id: String::new(),
|
|
|
|
|
call_type: "function".to_string(),
|
|
|
|
|
function: FunctionCall { name: String::new(), arguments: String::new() },
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
if let Some(id) = id { tool_calls[index].id = id; }
|
|
|
|
|
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
|
|
|
|
|
if let Some(n) = name { tool_calls[index].function.name = n; }
|
|
|
|
|
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
|
|
|
|
|
}
|
|
|
|
|
StreamEvent::Usage(u) => usage = Some(u),
|
|
|
|
|
StreamEvent::Finished { reason, .. } => {
|
|
|
|
|
finish_reason = Some(reason);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
StreamEvent::Error(e) => {
|
|
|
|
|
error = Some(e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-07 22:49:35 -04:00
|
|
|
StreamResult { content, tool_calls, usage, finish_reason, error, display_buf, in_tool_call, reasoning: reasoning_buf }
|
2026-04-04 18:05:16 -04:00
|
|
|
}
|