consciousness/src/agent/api/mod.rs

731 lines
27 KiB
Rust
Raw Normal View History

// api/ — LLM API client (OpenAI-compatible)
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
//
// Diagnostics: anomalies always logged to debug panel.
// Set POC_DEBUG=1 for verbose per-turn logging.
pub mod parsing;
pub mod types;
mod openai;
pub use types::*;
use anyhow::Result;
use reqwest::Client;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
pub use types::ToolCall;
use crate::user::ui_channel::{UiMessage, UiSender, StreamTarget};
/// A JoinHandle that aborts its task when dropped.
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// Sampling parameters for model generation.
#[derive(Clone, Copy)]
pub struct SamplingParams {
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
}
// ─────────────────────────────────────────────────────────────
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
/// Build the tools JSON string from a slice of Tools.
fn tools_to_json_str(tools: &[agent_tools::Tool]) -> String {
let inner: Vec<String> = tools.iter().map(|t| t.to_json()).collect();
format!("[{}]", inner.join(","))
}
/// Events produced by the streaming API backends.
/// The runner reads these and decides what to display where.
pub enum StreamEvent {
/// Content token from the model's response.
Content(String),
/// Reasoning/thinking token (internal monologue).
Reasoning(String),
/// Incremental tool call delta (structured, from APIs that support it).
ToolCallDelta {
index: usize,
id: Option<String>,
call_type: Option<String>,
name: Option<String>,
arguments: Option<String>,
},
/// Token usage stats.
Usage(Usage),
/// Stream finished.
Finished {
reason: String,
prompt_tokens: u32,
completion_tokens: u32,
},
/// Error from the stream.
Error(String),
}
#[derive(Clone)]
pub struct ApiClient {
client: Client,
api_key: String,
pub model: String,
base_url: String,
}
impl ApiClient {
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
let client = Client::builder()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(600))
.build()
.expect("failed to build HTTP client");
Self {
client,
api_key: api_key.to_string(),
model: model.to_string(),
base_url: base_url.trim_end_matches('/').to_string(),
}
}
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
/// The caller (runner) reads events and handles routing to the UI.
///
pub fn start_stream(
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
ui_tx: &UiSender,
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let messages = messages.to_vec();
let tools_json = tools_to_json_str(tools);
let tools_value: serde_json::Value = serde_json::from_str(&tools_json).unwrap_or_default();
let ui_tx = ui_tx.clone();
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, &tools_value, &tx, &ui_tx,
&reasoning_effort, sampling, priority,
).await;
if let Err(e) = result {
let _ = tx.send(StreamEvent::Error(e.to_string()));
}
});
(rx, AbortOnDrop(handle))
}
pub async fn chat_completion_stream_temp(
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
ui_tx: &UiSender,
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, sampling, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => content.push_str(&text),
StreamEvent::Reasoning(_) => {}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => anyhow::bail!("{}", e),
}
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
anyhow::bail!("model stream error: {}", detail);
}
Ok((build_response_message(content, tool_calls), usage))
}
pub fn base_url(&self) -> &str { &self.base_url }
pub fn api_key(&self) -> &str { &self.api_key }
/// Return a label for the active backend, used in startup info.
pub fn backend_label(&self) -> &str {
if self.base_url.contains("openrouter") {
"openrouter"
} else {
"openai-compat"
}
}
}
/// Send an HTTP request and check for errors. Shared by both backends.
pub(crate) async fn send_and_check(
client: &Client,
url: &str,
body: &impl serde::Serialize,
auth_header: (&str, &str),
extra_headers: &[(&str, &str)],
ui_tx: &UiSender,
debug_label: &str,
request_json: Option<&str>,
) -> Result<reqwest::Response> {
let debug = std::env::var("POC_DEBUG").is_ok();
let start = Instant::now();
if debug {
let payload_size = serde_json::to_string(body)
.map(|s| s.len())
.unwrap_or(0);
let _ = ui_tx.send(UiMessage::Debug(format!(
"request: {}K payload, {}",
payload_size / 1024, debug_label,
)));
}
let mut req = client
.post(url)
.header(auth_header.0, auth_header.1)
.header("Content-Type", "application/json");
for (name, value) in extra_headers {
req = req.header(*name, *value);
}
let response = req
.json(body)
.send()
.await
.map_err(|e| {
let cause = if e.is_connect() {
"connection refused"
} else if e.is_timeout() {
"request timed out"
} else if e.is_request() {
"request error"
} else {
"unknown"
};
anyhow::anyhow!("{} ({}): {:?}", cause, url, e.without_url())
})?;
let status = response.status();
let elapsed = start.elapsed();
if debug {
// Log interesting response headers
let headers = response.headers();
for name in [
"x-ratelimit-remaining",
"x-ratelimit-limit",
"x-request-id",
] {
if let Some(val) = headers.get(name) {
let _ = ui_tx.send(UiMessage::Debug(format!(
"header {}: {}",
name,
val.to_str().unwrap_or("?")
)));
}
}
}
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
let _ = ui_tx.send(UiMessage::Debug(format!(
"HTTP {} after {:.1}s ({}): {}",
status,
elapsed.as_secs_f64(),
url,
&body[..body.len().min(500)]
)));
if let Some(json) = request_json {
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} (HTTP {})", path.display(), status
)));
}
}
anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.len().min(1000)]);
}
if debug {
let _ = ui_tx.send(UiMessage::Debug(format!(
"connected in {:.1}s (HTTP {})",
elapsed.as_secs_f64(),
status.as_u16()
)));
}
Ok(response)
}
/// SSE stream reader. Handles the generic SSE plumbing shared by both
/// backends: chunk reading with timeout, line buffering, `data:` prefix
/// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics.
/// Yields parsed events as serde_json::Value — each backend handles its
/// own event types.
pub(crate) struct SseReader {
line_buf: String,
chunk_timeout: Duration,
pub stream_start: Instant,
pub chunks_received: u64,
pub sse_lines_parsed: u64,
pub sse_parse_errors: u64,
debug: bool,
ui_tx: UiSender,
done: bool,
/// Serialized request payload — saved to disk on errors for replay debugging.
pub(crate) request_json: Option<String>,
}
impl SseReader {
pub(crate) fn new(ui_tx: &UiSender) -> Self {
Self {
line_buf: String::new(),
chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs),
stream_start: Instant::now(),
chunks_received: 0,
sse_lines_parsed: 0,
sse_parse_errors: 0,
debug: std::env::var("POC_DEBUG").is_ok(),
ui_tx: ui_tx.clone(),
done: false,
request_json: None,
}
}
/// Attach the serialized request payload for error diagnostics.
/// Save the request payload to disk for replay debugging.
fn save_failed_request(&self, reason: &str) {
let Some(ref json) = self.request_json else { return };
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} ({})", path.display(), reason
)));
}
}
/// Read the next SSE event from the response stream.
/// Returns Ok(Some(value)) for each parsed data line,
/// Ok(None) when the stream ends or [DONE] is received.
pub(crate) async fn next_event(
&mut self,
response: &mut reqwest::Response,
) -> Result<Option<serde_json::Value>> {
loop {
// Drain complete lines from the buffer before reading more chunks
while let Some(newline_pos) = self.line_buf.find('\n') {
let line = self.line_buf[..newline_pos].trim().to_string();
self.line_buf = self.line_buf[newline_pos + 1..].to_string();
if line == "data: [DONE]" {
self.done = true;
return Ok(None);
}
if line.is_empty()
|| line.starts_with("event: ")
|| !line.starts_with("data: ")
{
continue;
}
let json_str = &line[6..];
self.sse_lines_parsed += 1;
match serde_json::from_str(json_str) {
Ok(v) => return Ok(Some(v)),
Err(e) => {
self.sse_parse_errors += 1;
if self.sse_parse_errors == 1 || self.debug {
let preview = if json_str.len() > 200 {
format!("{}...", &json_str[..200])
} else {
json_str.to_string()
};
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"SSE parse error (#{}) {}: {}",
self.sse_parse_errors, e, preview
)));
}
continue;
}
}
}
if self.done {
return Ok(None);
}
// Read more data from the response stream
match tokio::time::timeout(self.chunk_timeout, response.chunk()).await {
Ok(Ok(Some(chunk))) => {
self.chunks_received += 1;
self.line_buf.push_str(&String::from_utf8_lossy(&chunk));
}
Ok(Ok(None)) => return Ok(None),
Ok(Err(e)) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream error after {} chunks, {:.1}s, {} sse lines: {} | buf: {}",
self.chunks_received,
self.stream_start.elapsed().as_secs_f64(),
self.sse_lines_parsed,
e, buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
return Err(e.into());
}
Err(_) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream timeout: {}s, {} chunks, {} sse lines, {:.1}s elapsed | buf: {}",
self.chunk_timeout.as_secs(),
self.chunks_received,
self.sse_lines_parsed,
self.stream_start.elapsed().as_secs_f64(),
buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
anyhow::bail!(
"stream timeout: no data for {}s ({} chunks received)",
self.chunk_timeout.as_secs(),
self.chunks_received
);
}
}
}
}
}
/// Build a response Message from accumulated content and tool calls.
/// Shared by both backends — the wire format differs but the internal
/// representation is the same.
///
/// If no structured tool calls came from the API but the content
/// contains leaked tool call XML (e.g. `<tool_call>...</tool_call>`
/// from models that emit tool calls as text), parse them out and
/// promote them to structured tool_calls. This way all consumers
/// see tool calls uniformly regardless of backend.
pub fn build_response_message(
content: String,
tool_calls: Vec<ToolCall>,
) -> Message {
// If the API returned structured tool calls, use them as-is.
if !tool_calls.is_empty() {
return Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: Some(tool_calls),
tool_call_id: None,
name: None,
timestamp: None,
};
}
// Check for leaked tool calls in content text.
let leaked = parsing::parse_leaked_tool_calls(&content);
if !leaked.is_empty() {
let cleaned = parsing::strip_leaked_artifacts(&content);
return Message {
role: Role::Assistant,
content: if cleaned.trim().is_empty() { None }
else { Some(MessageContent::Text(cleaned)) },
tool_calls: Some(leaked),
tool_call_id: None,
name: None,
timestamp: None,
};
}
Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}
}
/// Log stream diagnostics. Shared by both backends.
pub(crate) fn log_diagnostics(
ui_tx: &UiSender,
content_len: usize,
tool_count: usize,
reasoning_chars: usize,
reasoning_effort: &str,
finish_reason: &Option<String>,
chunks_received: u64,
sse_lines_parsed: u64,
sse_parse_errors: u64,
empty_deltas: u64,
total_elapsed: Duration,
first_content_at: Option<Duration>,
usage: &Option<Usage>,
tools: &[ToolCall],
) {
let debug = std::env::var("POC_DEBUG").is_ok();
if reasoning_chars > 0 && reasoning_effort == "none" {
let _ = ui_tx.send(UiMessage::Debug(format!(
"note: {} chars leaked reasoning (suppressed from display)",
reasoning_chars
)));
}
if content_len == 0 && tool_count == 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \
parse_errors: {}, empty_deltas: {}, {:.1}s)",
finish_reason, chunks_received, reasoning_chars,
sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64()
)));
}
if finish_reason.is_none() && chunks_received > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: stream ended without finish_reason ({} chunks, {} content chars)",
chunks_received, content_len
)));
}
if sse_parse_errors > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: {} SSE parse errors out of {} lines",
sse_parse_errors, sse_lines_parsed
)));
}
if debug {
if let Some(u) = usage {
let _ = ui_tx.send(UiMessage::Debug(format!(
"tokens: {} prompt + {} completion = {} total",
u.prompt_tokens, u.completion_tokens, u.total_tokens
)));
}
let ttft = first_content_at
.map(|d| format!("{:.1}s", d.as_secs_f64()))
.unwrap_or_else(|| "none".to_string());
let _ = ui_tx.send(UiMessage::Debug(format!(
"stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \
{} content chars, {} reasoning chars, {} tools, \
finish={:?}",
total_elapsed.as_secs_f64(),
ttft,
chunks_received,
sse_lines_parsed,
content_len,
reasoning_chars,
tool_count,
finish_reason,
)));
if !tools.is_empty() {
for (i, tc) in tools.iter().enumerate() {
let _ = ui_tx.send(UiMessage::Debug(format!(
" tool[{}]: {} (id: {}, {} arg chars)",
i, tc.function.name, tc.id, tc.function.arguments.len()
)));
}
}
}
}
// ---------------------------------------------------------------------------
// Stream collection — assembles StreamEvents into a complete response
// ---------------------------------------------------------------------------
/// Result of collecting a complete response from the stream.
pub struct StreamResult {
pub content: String,
pub tool_calls: Vec<ToolCall>,
pub usage: Option<Usage>,
pub finish_reason: Option<String>,
pub error: Option<String>,
/// Remaining display buffer (caller should flush if not in a tool call).
pub display_buf: String,
/// Whether we were mid-tool-call when the stream ended.
pub in_tool_call: bool,
}
/// Collect stream events into a complete response. Handles:
/// - Content accumulation and display buffering
/// - Leaked tool call detection and dispatch (Qwen XML in content)
/// - Structured tool call delta assembly (OpenAI-style)
/// - UI forwarding (text deltas, reasoning, tool call notifications)
pub async fn collect_stream(
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
ui_tx: &UiSender,
target: StreamTarget,
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
) -> StreamResult {
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
let mut in_tool_call = false;
let mut tool_call_buf = String::new();
let mut error = None;
let mut first_content = true;
let mut display_buf = String::new();
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => {
if first_content {
let _ = ui_tx.send(UiMessage::Activity("streaming...".into()));
first_content = false;
}
content.push_str(&text);
if in_tool_call {
tool_call_buf.push_str(&text);
if let Some(end) = tool_call_buf.find("</tool_call>") {
let body = &tool_call_buf[..end];
if let Some(call) = parsing::parse_tool_call_body(body) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args);
let is_background = args.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let call_id = call.id.clone();
let call_name = call.function.name.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = agent_tools::dispatch_with_agent(
&call.function.name, &args, Some(agent_handle)).await;
(call, output)
});
active_tools.lock().unwrap().push(ActiveToolCall {
id: call_id,
name: call_name,
detail: args_summary,
started: std::time::Instant::now(),
background: is_background,
handle,
});
}
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
tool_call_buf.clear();
in_tool_call = false;
if !remaining.trim().is_empty() {
display_buf.push_str(&remaining);
}
}
} else {
display_buf.push_str(&text);
if let Some(pos) = display_buf.find("<tool_call>") {
let before = &display_buf[..pos];
if !before.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target));
}
display_buf.clear();
in_tool_call = true;
} else {
let safe = display_buf.len().saturating_sub(10);
let safe = display_buf.floor_char_boundary(safe);
if safe > 0 {
let flush = display_buf[..safe].to_string();
display_buf = display_buf[safe..].to_string();
let _ = ui_tx.send(UiMessage::TextDelta(flush, target));
}
}
}
}
StreamEvent::Reasoning(text) => {
let _ = ui_tx.send(UiMessage::Reasoning(text));
}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => {
error = Some(e);
break;
}
}
}
StreamResult { content, tool_calls, usage, finish_reason, error, display_buf, in_tool_call }
}