// api/ — LLM API client (OpenAI-compatible) // // Works with any provider that implements the OpenAI chat completions // API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc. // // Diagnostics: anomalies always logged to debug panel. // Set POC_DEBUG=1 for verbose per-turn logging. mod openai; use anyhow::Result; use reqwest::Client; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use crate::agent::types::*; use crate::agent::ui_channel::{UiMessage, UiSender}; /// A JoinHandle that aborts its task when dropped. pub struct AbortOnDrop(tokio::task::JoinHandle<()>); impl Drop for AbortOnDrop { fn drop(&mut self) { self.0.abort(); } } // ───────────────────────────────────────────────────────────── // Stream events — yielded by backends, consumed by the runner // ───────────────────────────────────────────────────────────── /// Events produced by the streaming API backends. /// The runner reads these and decides what to display where. pub enum StreamEvent { /// Content token from the model's response. Content(String), /// Reasoning/thinking token (internal monologue). Reasoning(String), /// Incremental tool call delta (structured, from APIs that support it). ToolCallDelta { index: usize, id: Option, call_type: Option, name: Option, arguments: Option, }, /// Token usage stats. Usage(Usage), /// Stream finished. Finished { reason: String, prompt_tokens: u32, completion_tokens: u32, }, /// Error from the stream. Error(String), } pub struct ApiClient { client: Client, api_key: String, pub model: String, base_url: String, } impl ApiClient { pub fn new(base_url: &str, api_key: &str, model: &str) -> Self { let client = Client::builder() .connect_timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(600)) .build() .expect("failed to build HTTP client"); Self { client, api_key: api_key.to_string(), model: model.to_string(), base_url: base_url.trim_end_matches('/').to_string(), } } /// Start a streaming chat completion. Returns a receiver of StreamEvents. /// The caller (runner) reads events and handles routing to the UI. /// pub fn start_stream( &self, messages: &[Message], tools: Option<&[ToolDef]>, ui_tx: &UiSender, reasoning_effort: &str, temperature: Option, priority: Option, ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); let client = self.client.clone(); let api_key = self.api_key.clone(); let model = self.model.clone(); let messages = messages.to_vec(); let tools = tools.map(|t| t.to_vec()); let ui_tx = ui_tx.clone(); let reasoning_effort = reasoning_effort.to_string(); let base_url = self.base_url.clone(); let handle = tokio::spawn(async move { let result = openai::stream_events( &client, &base_url, &api_key, &model, &messages, tools.as_deref(), &tx, &ui_tx, &reasoning_effort, temperature, priority, ).await; if let Err(e) = result { let _ = tx.send(StreamEvent::Error(e.to_string())); } }); (rx, AbortOnDrop(handle)) } pub async fn chat_completion_stream_temp( &self, messages: &[Message], tools: Option<&[ToolDef]>, ui_tx: &UiSender, reasoning_effort: &str, temperature: Option, priority: Option, ) -> Result<(Message, Option)> { // Use the event stream and accumulate into a message. let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; let mut finish_reason = None; while let Some(event) = rx.recv().await { match event { StreamEvent::Content(text) => content.push_str(&text), StreamEvent::Reasoning(_) => {} StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { while tool_calls.len() <= index { tool_calls.push(ToolCall { id: String::new(), call_type: "function".to_string(), function: FunctionCall { name: String::new(), arguments: String::new() }, }); } if let Some(id) = id { tool_calls[index].id = id; } if let Some(ct) = call_type { tool_calls[index].call_type = ct; } if let Some(n) = name { tool_calls[index].function.name = n; } if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } } StreamEvent::Usage(u) => usage = Some(u), StreamEvent::Finished { reason, .. } => { finish_reason = Some(reason); break; } StreamEvent::Error(e) => anyhow::bail!("{}", e), } } if finish_reason.as_deref() == Some("error") { let detail = if content.is_empty() { "no details".into() } else { content }; anyhow::bail!("model stream error: {}", detail); } Ok((build_response_message(content, tool_calls), usage)) } /// Return a label for the active backend, used in startup info. pub fn backend_label(&self) -> &str { if self.base_url.contains("openrouter") { "openrouter" } else { "openai-compat" } } } /// Send an HTTP request and check for errors. Shared by both backends. pub(crate) async fn send_and_check( client: &Client, url: &str, body: &impl serde::Serialize, auth_header: (&str, &str), extra_headers: &[(&str, &str)], ui_tx: &UiSender, debug_label: &str, request_json: Option<&str>, ) -> Result { let debug = std::env::var("POC_DEBUG").is_ok(); let start = Instant::now(); if debug { let payload_size = serde_json::to_string(body) .map(|s| s.len()) .unwrap_or(0); let _ = ui_tx.send(UiMessage::Debug(format!( "request: {}K payload, {}", payload_size / 1024, debug_label, ))); } let mut req = client .post(url) .header(auth_header.0, auth_header.1) .header("Content-Type", "application/json"); for (name, value) in extra_headers { req = req.header(*name, *value); } let response = req .json(body) .send() .await .map_err(|e| { let cause = if e.is_connect() { "connection refused" } else if e.is_timeout() { "request timed out" } else if e.is_request() { "request error" } else { "unknown" }; anyhow::anyhow!("{} ({}): {:?}", cause, url, e.without_url()) })?; let status = response.status(); let elapsed = start.elapsed(); if debug { // Log interesting response headers let headers = response.headers(); for name in [ "x-ratelimit-remaining", "x-ratelimit-limit", "x-request-id", ] { if let Some(val) = headers.get(name) { let _ = ui_tx.send(UiMessage::Debug(format!( "header {}: {}", name, val.to_str().unwrap_or("?") ))); } } } if !status.is_success() { let body = response.text().await.unwrap_or_default(); let _ = ui_tx.send(UiMessage::Debug(format!( "HTTP {} after {:.1}s ({}): {}", status, elapsed.as_secs_f64(), url, &body[..body.len().min(500)] ))); if let Some(json) = request_json { let log_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/logs"); let ts = chrono::Local::now().format("%Y%m%dT%H%M%S"); let path = log_dir.join(format!("failed-request-{}.json", ts)); if std::fs::write(&path, json).is_ok() { let _ = ui_tx.send(UiMessage::Debug(format!( "saved failed request to {} (HTTP {})", path.display(), status ))); } } anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.len().min(1000)]); } if debug { let _ = ui_tx.send(UiMessage::Debug(format!( "connected in {:.1}s (HTTP {})", elapsed.as_secs_f64(), status.as_u16() ))); } Ok(response) } /// SSE stream reader. Handles the generic SSE plumbing shared by both /// backends: chunk reading with timeout, line buffering, `data:` prefix /// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics. /// Yields parsed events as serde_json::Value — each backend handles its /// own event types. pub(crate) struct SseReader { line_buf: String, chunk_timeout: Duration, pub stream_start: Instant, pub chunks_received: u64, pub sse_lines_parsed: u64, pub sse_parse_errors: u64, debug: bool, ui_tx: UiSender, done: bool, /// Serialized request payload — saved to disk on errors for replay debugging. pub(crate) request_json: Option, } impl SseReader { pub(crate) fn new(ui_tx: &UiSender) -> Self { Self { line_buf: String::new(), chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs), stream_start: Instant::now(), chunks_received: 0, sse_lines_parsed: 0, sse_parse_errors: 0, debug: std::env::var("POC_DEBUG").is_ok(), ui_tx: ui_tx.clone(), done: false, request_json: None, } } /// Attach the serialized request payload for error diagnostics. /// Save the request payload to disk for replay debugging. fn save_failed_request(&self, reason: &str) { let Some(ref json) = self.request_json else { return }; let log_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/logs"); let ts = chrono::Local::now().format("%Y%m%dT%H%M%S"); let path = log_dir.join(format!("failed-request-{}.json", ts)); if std::fs::write(&path, json).is_ok() { let _ = self.ui_tx.send(UiMessage::Debug(format!( "saved failed request to {} ({})", path.display(), reason ))); } } /// Read the next SSE event from the response stream. /// Returns Ok(Some(value)) for each parsed data line, /// Ok(None) when the stream ends or [DONE] is received. pub(crate) async fn next_event( &mut self, response: &mut reqwest::Response, ) -> Result> { loop { // Drain complete lines from the buffer before reading more chunks while let Some(newline_pos) = self.line_buf.find('\n') { let line = self.line_buf[..newline_pos].trim().to_string(); self.line_buf = self.line_buf[newline_pos + 1..].to_string(); if line == "data: [DONE]" { self.done = true; return Ok(None); } if line.is_empty() || line.starts_with("event: ") || !line.starts_with("data: ") { continue; } let json_str = &line[6..]; self.sse_lines_parsed += 1; match serde_json::from_str(json_str) { Ok(v) => return Ok(Some(v)), Err(e) => { self.sse_parse_errors += 1; if self.sse_parse_errors == 1 || self.debug { let preview = if json_str.len() > 200 { format!("{}...", &json_str[..200]) } else { json_str.to_string() }; let _ = self.ui_tx.send(UiMessage::Debug(format!( "SSE parse error (#{}) {}: {}", self.sse_parse_errors, e, preview ))); } continue; } } } if self.done { return Ok(None); } // Read more data from the response stream match tokio::time::timeout(self.chunk_timeout, response.chunk()).await { Ok(Ok(Some(chunk))) => { self.chunks_received += 1; self.line_buf.push_str(&String::from_utf8_lossy(&chunk)); } Ok(Ok(None)) => return Ok(None), Ok(Err(e)) => { self.save_failed_request(&format!("stream error: {}", e)); return Err(e.into()); } Err(_) => { let msg = format!( "stream timeout: no data for {}s ({} chunks, {:.1}s elapsed)", self.chunk_timeout.as_secs(), self.chunks_received, self.stream_start.elapsed().as_secs_f64() ); let _ = self.ui_tx.send(UiMessage::Debug(msg.clone())); self.save_failed_request(&msg); anyhow::bail!( "stream timeout: no data for {}s ({} chunks received)", self.chunk_timeout.as_secs(), self.chunks_received ); } } } } } /// Build a response Message from accumulated content and tool calls. /// Shared by both backends — the wire format differs but the internal /// representation is the same. /// /// If no structured tool calls came from the API but the content /// contains leaked tool call XML (e.g. `...` /// from models that emit tool calls as text), parse them out and /// promote them to structured tool_calls. This way all consumers /// see tool calls uniformly regardless of backend. pub fn build_response_message( content: String, tool_calls: Vec, ) -> Message { // If the API returned structured tool calls, use them as-is. if !tool_calls.is_empty() { return Message { role: Role::Assistant, content: if content.is_empty() { None } else { Some(MessageContent::Text(content)) }, tool_calls: Some(tool_calls), tool_call_id: None, name: None, timestamp: None, }; } // Check for leaked tool calls in content text. let leaked = crate::agent::parsing::parse_leaked_tool_calls(&content); if !leaked.is_empty() { let cleaned = crate::agent::parsing::strip_leaked_artifacts(&content); return Message { role: Role::Assistant, content: if cleaned.trim().is_empty() { None } else { Some(MessageContent::Text(cleaned)) }, tool_calls: Some(leaked), tool_call_id: None, name: None, timestamp: None, }; } Message { role: Role::Assistant, content: if content.is_empty() { None } else { Some(MessageContent::Text(content)) }, tool_calls: None, tool_call_id: None, name: None, timestamp: None, } } /// Log stream diagnostics. Shared by both backends. pub(crate) fn log_diagnostics( ui_tx: &UiSender, content_len: usize, tool_count: usize, reasoning_chars: usize, reasoning_effort: &str, finish_reason: &Option, chunks_received: u64, sse_lines_parsed: u64, sse_parse_errors: u64, empty_deltas: u64, total_elapsed: Duration, first_content_at: Option, usage: &Option, tools: &[ToolCall], ) { let debug = std::env::var("POC_DEBUG").is_ok(); if reasoning_chars > 0 && reasoning_effort == "none" { let _ = ui_tx.send(UiMessage::Debug(format!( "note: {} chars leaked reasoning (suppressed from display)", reasoning_chars ))); } if content_len == 0 && tool_count == 0 { let _ = ui_tx.send(UiMessage::Debug(format!( "WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \ parse_errors: {}, empty_deltas: {}, {:.1}s)", finish_reason, chunks_received, reasoning_chars, sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64() ))); } if finish_reason.is_none() && chunks_received > 0 { let _ = ui_tx.send(UiMessage::Debug(format!( "WARNING: stream ended without finish_reason ({} chunks, {} content chars)", chunks_received, content_len ))); } if sse_parse_errors > 0 { let _ = ui_tx.send(UiMessage::Debug(format!( "WARNING: {} SSE parse errors out of {} lines", sse_parse_errors, sse_lines_parsed ))); } if debug { if let Some(u) = usage { let _ = ui_tx.send(UiMessage::Debug(format!( "tokens: {} prompt + {} completion = {} total", u.prompt_tokens, u.completion_tokens, u.total_tokens ))); } let ttft = first_content_at .map(|d| format!("{:.1}s", d.as_secs_f64())) .unwrap_or_else(|| "none".to_string()); let _ = ui_tx.send(UiMessage::Debug(format!( "stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \ {} content chars, {} reasoning chars, {} tools, \ finish={:?}", total_elapsed.as_secs_f64(), ttft, chunks_received, sse_lines_parsed, content_len, reasoning_chars, tool_count, finish_reason, ))); if !tools.is_empty() { for (i, tc) in tools.iter().enumerate() { let _ = ui_tx.send(UiMessage::Debug(format!( " tool[{}]: {} (id: {}, {} arg chars)", i, tc.function.name, tc.id, tc.function.arguments.len() ))); } } } }