remove Anthropic backend, add request logging on timeout
Delete anthropic.rs (713 lines) — we only use OpenAI-compatible
endpoints (vLLM, OpenRouter). Simplify ApiClient to store base_url
directly instead of Backend enum.
SseReader now stores the serialized request payload and saves it
to ~/.consciousness/logs/failed-request-{ts}.json on stream timeout,
so failed requests can be replayed with curl for debugging.
Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
078dcf22d0
commit
1f7b585d41
3 changed files with 37 additions and 769 deletions
|
|
@ -1,713 +0,0 @@
|
||||||
// api/anthropic.rs — Anthropic Messages API backend
|
|
||||||
//
|
|
||||||
// Native Anthropic wire format for direct API access. Key advantages
|
|
||||||
// over the OpenAI-compat path:
|
|
||||||
// - Prompt caching (90% cost reduction on repeated prefixes)
|
|
||||||
// - No middleman (OpenRouter) — cleaner error handling
|
|
||||||
// - Native tool use and thinking support
|
|
||||||
//
|
|
||||||
// Message format conversion happens at the boundary: internal Message
|
|
||||||
// types are converted to Anthropic content blocks on send, and
|
|
||||||
// Anthropic streaming events are converted back to internal types.
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use reqwest::Client;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use crate::agent::types::*;
|
|
||||||
use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender};
|
|
||||||
use super::StreamEvent;
|
|
||||||
|
|
||||||
// --- Anthropic wire types ---
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct Request {
|
|
||||||
model: String,
|
|
||||||
max_tokens: u32,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
system: Option<Vec<ContentBlock>>,
|
|
||||||
messages: Vec<ApiMessage>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
tools: Option<Vec<ToolDef>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
tool_choice: Option<ToolChoice>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
temperature: Option<f32>,
|
|
||||||
stream: bool,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
thinking: Option<ThinkingConfig>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct ApiMessage {
|
|
||||||
role: String,
|
|
||||||
content: ApiContent,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
#[serde(untagged)]
|
|
||||||
enum ApiContent {
|
|
||||||
Text(String),
|
|
||||||
Blocks(Vec<ContentBlock>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Clone)]
|
|
||||||
#[serde(tag = "type")]
|
|
||||||
enum ContentBlock {
|
|
||||||
#[serde(rename = "text")]
|
|
||||||
Text {
|
|
||||||
text: String,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
cache_control: Option<CacheControl>,
|
|
||||||
},
|
|
||||||
#[serde(rename = "tool_use")]
|
|
||||||
ToolUse {
|
|
||||||
id: String,
|
|
||||||
name: String,
|
|
||||||
input: serde_json::Value,
|
|
||||||
},
|
|
||||||
#[serde(rename = "tool_result")]
|
|
||||||
ToolResult {
|
|
||||||
tool_use_id: String,
|
|
||||||
content: String,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
is_error: Option<bool>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Clone)]
|
|
||||||
struct CacheControl {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
cache_type: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CacheControl {
|
|
||||||
fn ephemeral() -> Self {
|
|
||||||
Self {
|
|
||||||
cache_type: "ephemeral".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct ToolDef {
|
|
||||||
name: String,
|
|
||||||
description: String,
|
|
||||||
input_schema: serde_json::Value,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct ToolChoice {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
choice_type: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct ThinkingConfig {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
thinking_type: String,
|
|
||||||
budget_tokens: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Anthropic SSE event types ---
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct MessageStartEvent {
|
|
||||||
message: MessageStart,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct MessageStart {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
id: String,
|
|
||||||
usage: Option<StartUsage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct StartUsage {
|
|
||||||
input_tokens: u32,
|
|
||||||
#[serde(default)]
|
|
||||||
cache_creation_input_tokens: u32,
|
|
||||||
#[serde(default)]
|
|
||||||
cache_read_input_tokens: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct ContentBlockStartEvent {
|
|
||||||
index: usize,
|
|
||||||
content_block: ContentBlockType,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
#[serde(tag = "type")]
|
|
||||||
enum ContentBlockType {
|
|
||||||
#[serde(rename = "text")]
|
|
||||||
Text { text: String },
|
|
||||||
#[serde(rename = "tool_use")]
|
|
||||||
ToolUse { id: String, name: String },
|
|
||||||
#[serde(rename = "thinking")]
|
|
||||||
Thinking {},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct ContentBlockDeltaEvent {
|
|
||||||
index: usize,
|
|
||||||
delta: DeltaType,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
#[serde(tag = "type")]
|
|
||||||
enum DeltaType {
|
|
||||||
#[serde(rename = "text_delta")]
|
|
||||||
TextDelta { text: String },
|
|
||||||
#[serde(rename = "input_json_delta")]
|
|
||||||
InputJsonDelta { partial_json: String },
|
|
||||||
#[serde(rename = "thinking_delta")]
|
|
||||||
ThinkingDelta { thinking: String },
|
|
||||||
#[serde(rename = "signature_delta")]
|
|
||||||
SignatureDelta {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
signature: String,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct MessageDeltaEvent {
|
|
||||||
delta: MessageDelta,
|
|
||||||
usage: Option<DeltaUsage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct MessageDelta {
|
|
||||||
stop_reason: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct DeltaUsage {
|
|
||||||
output_tokens: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Conversion: internal types → Anthropic wire format ---
|
|
||||||
|
|
||||||
/// Convert internal Messages to Anthropic API format.
|
|
||||||
///
|
|
||||||
/// Key differences from OpenAI format:
|
|
||||||
/// - System messages → extracted to system parameter
|
|
||||||
/// - Tool role → user message with tool_result content block
|
|
||||||
/// - Assistant tool_calls → assistant message with tool_use content blocks
|
|
||||||
/// - Consecutive same-role messages must be merged
|
|
||||||
/// - Prompt caching: cache_control on the last static block (context message)
|
|
||||||
fn convert_messages(
|
|
||||||
messages: &[Message],
|
|
||||||
) -> (Option<Vec<ContentBlock>>, Vec<ApiMessage>) {
|
|
||||||
let mut system_blocks: Vec<ContentBlock> = Vec::new();
|
|
||||||
let mut api_messages: Vec<ApiMessage> = Vec::new();
|
|
||||||
|
|
||||||
// Track whether we've seen the first user message (identity context).
|
|
||||||
// The second user message gets cache_control to mark the end of the
|
|
||||||
// cacheable prefix (system prompt + context message).
|
|
||||||
let mut user_count = 0;
|
|
||||||
|
|
||||||
for msg in messages {
|
|
||||||
match msg.role {
|
|
||||||
Role::System => {
|
|
||||||
system_blocks.push(ContentBlock::Text {
|
|
||||||
text: msg.content_text().to_string(),
|
|
||||||
cache_control: Some(CacheControl::ephemeral()),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Role::User => {
|
|
||||||
user_count += 1;
|
|
||||||
// Cache the identity prefix: system + first two user messages
|
|
||||||
// (the context message and potentially the journal message).
|
|
||||||
let cache = if user_count <= 2 {
|
|
||||||
Some(CacheControl::ephemeral())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let content = match &msg.content {
|
|
||||||
Some(MessageContent::Parts(parts)) => {
|
|
||||||
let blocks: Vec<ContentBlock> = parts
|
|
||||||
.iter()
|
|
||||||
.filter_map(|p| match p {
|
|
||||||
ContentPart::Text { text } => {
|
|
||||||
Some(ContentBlock::Text {
|
|
||||||
text: text.clone(),
|
|
||||||
cache_control: cache.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
ContentPart::ImageUrl { image_url } => {
|
|
||||||
// Skip images for now — Anthropic uses a
|
|
||||||
// different image format (base64 source block)
|
|
||||||
let _ = image_url;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
ApiContent::Blocks(blocks)
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let text = msg.content_text().to_string();
|
|
||||||
if cache.is_some() {
|
|
||||||
ApiContent::Blocks(vec![ContentBlock::Text {
|
|
||||||
text,
|
|
||||||
cache_control: cache,
|
|
||||||
}])
|
|
||||||
} else {
|
|
||||||
ApiContent::Text(text)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
push_merged(&mut api_messages, "user", content);
|
|
||||||
}
|
|
||||||
Role::Assistant => {
|
|
||||||
let mut blocks: Vec<ContentBlock> = Vec::new();
|
|
||||||
|
|
||||||
// Text content
|
|
||||||
let text = msg.content_text();
|
|
||||||
if !text.is_empty() {
|
|
||||||
blocks.push(ContentBlock::Text {
|
|
||||||
text: text.to_string(),
|
|
||||||
cache_control: None,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tool calls → tool_use blocks
|
|
||||||
if let Some(ref calls) = msg.tool_calls {
|
|
||||||
for call in calls {
|
|
||||||
let input: serde_json::Value =
|
|
||||||
serde_json::from_str(&call.function.arguments)
|
|
||||||
.unwrap_or_default();
|
|
||||||
blocks.push(ContentBlock::ToolUse {
|
|
||||||
id: call.id.clone(),
|
|
||||||
name: call.function.name.clone(),
|
|
||||||
input,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if blocks.is_empty() {
|
|
||||||
// Empty assistant message — skip to avoid API rejection
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
api_messages.push(ApiMessage {
|
|
||||||
role: "assistant".to_string(),
|
|
||||||
content: ApiContent::Blocks(blocks),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Role::Tool => {
|
|
||||||
// Tool results become user messages with tool_result blocks
|
|
||||||
let tool_use_id = msg
|
|
||||||
.tool_call_id
|
|
||||||
.as_deref()
|
|
||||||
.unwrap_or("unknown")
|
|
||||||
.to_string();
|
|
||||||
let result_text = msg.content_text().to_string();
|
|
||||||
let is_error = if result_text.starts_with("Error:") {
|
|
||||||
Some(true)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let block = ContentBlock::ToolResult {
|
|
||||||
tool_use_id,
|
|
||||||
content: result_text,
|
|
||||||
is_error,
|
|
||||||
};
|
|
||||||
|
|
||||||
push_merged(
|
|
||||||
&mut api_messages,
|
|
||||||
"user",
|
|
||||||
ApiContent::Blocks(vec![block]),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let system = if system_blocks.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(system_blocks)
|
|
||||||
};
|
|
||||||
|
|
||||||
(system, api_messages)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Push a message, merging with the previous one if it has the same role.
|
|
||||||
/// Anthropic requires strict user/assistant alternation, and tool results
|
|
||||||
/// (mapped to user role) can pile up between assistant messages.
|
|
||||||
fn push_merged(messages: &mut Vec<ApiMessage>, role: &str, content: ApiContent) {
|
|
||||||
if let Some(last) = messages.last_mut() {
|
|
||||||
if last.role == role {
|
|
||||||
// Merge into existing message's content blocks
|
|
||||||
let existing = std::mem::replace(
|
|
||||||
&mut last.content,
|
|
||||||
ApiContent::Text(String::new()),
|
|
||||||
);
|
|
||||||
let mut blocks = match existing {
|
|
||||||
ApiContent::Text(t) => {
|
|
||||||
if t.is_empty() {
|
|
||||||
Vec::new()
|
|
||||||
} else {
|
|
||||||
vec![ContentBlock::Text {
|
|
||||||
text: t,
|
|
||||||
cache_control: None,
|
|
||||||
}]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApiContent::Blocks(b) => b,
|
|
||||||
};
|
|
||||||
match content {
|
|
||||||
ApiContent::Text(t) => {
|
|
||||||
if !t.is_empty() {
|
|
||||||
blocks.push(ContentBlock::Text {
|
|
||||||
text: t,
|
|
||||||
cache_control: None,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApiContent::Blocks(b) => blocks.extend(b),
|
|
||||||
}
|
|
||||||
last.content = ApiContent::Blocks(blocks);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
messages.push(ApiMessage {
|
|
||||||
role: role.to_string(),
|
|
||||||
content,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convert internal ToolDef to Anthropic format.
|
|
||||||
fn convert_tools(tools: &[crate::agent::types::ToolDef]) -> Vec<ToolDef> {
|
|
||||||
tools
|
|
||||||
.iter()
|
|
||||||
.map(|t| ToolDef {
|
|
||||||
name: t.function.name.clone(),
|
|
||||||
description: t.function.description.clone(),
|
|
||||||
input_schema: t.function.parameters.clone(),
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Streaming implementation ---
|
|
||||||
|
|
||||||
pub async fn stream(
|
|
||||||
client: &Client,
|
|
||||||
api_key: &str,
|
|
||||||
model: &str,
|
|
||||||
messages: &[Message],
|
|
||||||
tools: Option<&[crate::agent::types::ToolDef]>,
|
|
||||||
ui_tx: &UiSender,
|
|
||||||
target: StreamTarget,
|
|
||||||
reasoning_effort: &str,
|
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
|
||||||
let (system, api_messages) = convert_messages(messages);
|
|
||||||
|
|
||||||
let thinking = match reasoning_effort {
|
|
||||||
"none" => None,
|
|
||||||
"low" => Some(ThinkingConfig {
|
|
||||||
thinking_type: "enabled".to_string(),
|
|
||||||
budget_tokens: 2048,
|
|
||||||
}),
|
|
||||||
_ => Some(ThinkingConfig {
|
|
||||||
thinking_type: "enabled".to_string(),
|
|
||||||
budget_tokens: 16000,
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
|
|
||||||
// When thinking is enabled, temperature must be 1.0 (Anthropic requirement)
|
|
||||||
let temperature = if thinking.is_some() { None } else { Some(0.6) };
|
|
||||||
|
|
||||||
let request = Request {
|
|
||||||
model: model.to_string(),
|
|
||||||
max_tokens: if thinking.is_some() { 32768 } else { 16384 },
|
|
||||||
system,
|
|
||||||
messages: api_messages,
|
|
||||||
tools: tools.map(|t| convert_tools(t)),
|
|
||||||
tool_choice: tools.map(|_| ToolChoice {
|
|
||||||
choice_type: "auto".to_string(),
|
|
||||||
}),
|
|
||||||
temperature,
|
|
||||||
stream: true,
|
|
||||||
thinking,
|
|
||||||
};
|
|
||||||
|
|
||||||
let msg_count = messages.len();
|
|
||||||
let debug_label = format!("{} messages, model={}", msg_count, model);
|
|
||||||
|
|
||||||
let mut response = super::send_and_check(
|
|
||||||
client,
|
|
||||||
"https://api.anthropic.com/v1/messages",
|
|
||||||
&request,
|
|
||||||
("x-api-key", api_key),
|
|
||||||
&[("anthropic-version", "2023-06-01")],
|
|
||||||
ui_tx,
|
|
||||||
&debug_label,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let debug = std::env::var("POC_DEBUG").is_ok();
|
|
||||||
let mut reader = super::SseReader::new(ui_tx);
|
|
||||||
|
|
||||||
let mut content = String::new();
|
|
||||||
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
|
||||||
let mut input_tokens: u32 = 0;
|
|
||||||
let mut output_tokens: u32 = 0;
|
|
||||||
let mut cache_creation_tokens: u32 = 0;
|
|
||||||
let mut cache_read_tokens: u32 = 0;
|
|
||||||
let mut finish_reason: Option<String> = None;
|
|
||||||
|
|
||||||
// Track which content blocks are which type
|
|
||||||
let mut block_types: Vec<String> = Vec::new(); // "text", "tool_use", "thinking"
|
|
||||||
let mut tool_inputs: Vec<String> = Vec::new(); // accumulated JSON for tool_use blocks
|
|
||||||
let mut tool_ids: Vec<String> = Vec::new();
|
|
||||||
let mut tool_names: Vec<String> = Vec::new();
|
|
||||||
|
|
||||||
let mut reasoning_chars: usize = 0;
|
|
||||||
let mut empty_deltas: u64 = 0;
|
|
||||||
let mut first_content_at: Option<Duration> = None;
|
|
||||||
|
|
||||||
let reasoning_enabled = reasoning_effort != "none";
|
|
||||||
|
|
||||||
while let Some(event) = reader.next_event(&mut response).await? {
|
|
||||||
let event_type = event["type"].as_str().unwrap_or("");
|
|
||||||
|
|
||||||
match event_type {
|
|
||||||
"message_start" => {
|
|
||||||
if let Ok(ev) =
|
|
||||||
serde_json::from_value::<MessageStartEvent>(event.clone())
|
|
||||||
{
|
|
||||||
if let Some(u) = ev.message.usage {
|
|
||||||
input_tokens = u.input_tokens;
|
|
||||||
cache_creation_tokens = u.cache_creation_input_tokens;
|
|
||||||
cache_read_tokens = u.cache_read_input_tokens;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"content_block_start" => {
|
|
||||||
if let Ok(ev) =
|
|
||||||
serde_json::from_value::<ContentBlockStartEvent>(event.clone())
|
|
||||||
{
|
|
||||||
let idx = ev.index;
|
|
||||||
while block_types.len() <= idx {
|
|
||||||
block_types.push(String::new());
|
|
||||||
tool_inputs.push(String::new());
|
|
||||||
tool_ids.push(String::new());
|
|
||||||
tool_names.push(String::new());
|
|
||||||
}
|
|
||||||
match ev.content_block {
|
|
||||||
ContentBlockType::Text { text: initial } => {
|
|
||||||
block_types[idx] = "text".to_string();
|
|
||||||
if !initial.is_empty() {
|
|
||||||
content.push_str(&initial);
|
|
||||||
let _ = ui_tx
|
|
||||||
.send(UiMessage::TextDelta(initial, target));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ContentBlockType::ToolUse { id, name } => {
|
|
||||||
block_types[idx] = "tool_use".to_string();
|
|
||||||
tool_ids[idx] = id;
|
|
||||||
tool_names[idx] = name;
|
|
||||||
}
|
|
||||||
ContentBlockType::Thinking {} => {
|
|
||||||
block_types[idx] = "thinking".to_string();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"content_block_delta" => {
|
|
||||||
if let Ok(ev) =
|
|
||||||
serde_json::from_value::<ContentBlockDeltaEvent>(event.clone())
|
|
||||||
{
|
|
||||||
let idx = ev.index;
|
|
||||||
match ev.delta {
|
|
||||||
DeltaType::TextDelta { text: delta } => {
|
|
||||||
if first_content_at.is_none() && !delta.is_empty() {
|
|
||||||
first_content_at =
|
|
||||||
Some(reader.stream_start.elapsed());
|
|
||||||
let _ = ui_tx.send(UiMessage::Activity(
|
|
||||||
"streaming...".into(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
content.push_str(&delta);
|
|
||||||
let _ =
|
|
||||||
ui_tx.send(UiMessage::TextDelta(delta, target));
|
|
||||||
}
|
|
||||||
DeltaType::InputJsonDelta { partial_json } => {
|
|
||||||
if idx < tool_inputs.len() {
|
|
||||||
tool_inputs[idx].push_str(&partial_json);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DeltaType::ThinkingDelta { thinking } => {
|
|
||||||
reasoning_chars += thinking.len();
|
|
||||||
if reasoning_enabled && !thinking.is_empty() {
|
|
||||||
let _ =
|
|
||||||
ui_tx.send(UiMessage::Reasoning(thinking));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DeltaType::SignatureDelta { .. } => {}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
empty_deltas += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"content_block_stop" => {
|
|
||||||
// Finalize tool_use blocks
|
|
||||||
let idx = event["index"].as_u64().unwrap_or(0) as usize;
|
|
||||||
if idx < block_types.len() && block_types[idx] == "tool_use" {
|
|
||||||
let input: serde_json::Value =
|
|
||||||
serde_json::from_str(&tool_inputs[idx]).unwrap_or_default();
|
|
||||||
tool_calls.push(ToolCall {
|
|
||||||
id: tool_ids[idx].clone(),
|
|
||||||
call_type: "function".to_string(),
|
|
||||||
function: FunctionCall {
|
|
||||||
name: tool_names[idx].clone(),
|
|
||||||
arguments: serde_json::to_string(&input)
|
|
||||||
.unwrap_or_default(),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"message_delta" => {
|
|
||||||
if let Ok(ev) =
|
|
||||||
serde_json::from_value::<MessageDeltaEvent>(event.clone())
|
|
||||||
{
|
|
||||||
if let Some(reason) = ev.delta.stop_reason {
|
|
||||||
finish_reason = Some(reason);
|
|
||||||
}
|
|
||||||
if let Some(u) = ev.usage {
|
|
||||||
output_tokens = u.output_tokens;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"message_stop" | "ping" => {}
|
|
||||||
|
|
||||||
"error" => {
|
|
||||||
let err_msg = event["error"]["message"]
|
|
||||||
.as_str()
|
|
||||||
.unwrap_or("unknown error");
|
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
|
||||||
"API error in stream: {}",
|
|
||||||
err_msg
|
|
||||||
)));
|
|
||||||
anyhow::bail!("API error in stream: {}", err_msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => {
|
|
||||||
if debug {
|
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
|
||||||
"unknown SSE event type: {}",
|
|
||||||
event_type
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let total_elapsed = reader.stream_start.elapsed();
|
|
||||||
if !content.is_empty() {
|
|
||||||
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build Usage from Anthropic's token counts
|
|
||||||
let total_input = input_tokens + cache_creation_tokens + cache_read_tokens;
|
|
||||||
let usage = Some(Usage {
|
|
||||||
prompt_tokens: total_input,
|
|
||||||
completion_tokens: output_tokens,
|
|
||||||
total_tokens: total_input + output_tokens,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Log cache stats in debug mode
|
|
||||||
if debug && (cache_creation_tokens > 0 || cache_read_tokens > 0) {
|
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
|
||||||
"cache: {} write + {} read tokens (input: {} uncached)",
|
|
||||||
cache_creation_tokens, cache_read_tokens, input_tokens,
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
super::log_diagnostics(
|
|
||||||
ui_tx,
|
|
||||||
content.len(),
|
|
||||||
tool_calls.len(),
|
|
||||||
reasoning_chars,
|
|
||||||
reasoning_effort,
|
|
||||||
&finish_reason,
|
|
||||||
reader.chunks_received,
|
|
||||||
reader.sse_lines_parsed,
|
|
||||||
reader.sse_parse_errors,
|
|
||||||
empty_deltas,
|
|
||||||
total_elapsed,
|
|
||||||
first_content_at,
|
|
||||||
&usage,
|
|
||||||
&tool_calls,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok((super::build_response_message(content, tool_calls), usage))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wrapper that calls the existing stream() and synthesizes StreamEvents.
|
|
||||||
/// TODO: refactor to emit events during streaming like the OpenAI backend.
|
|
||||||
pub async fn stream_events(
|
|
||||||
client: &Client,
|
|
||||||
api_key: &str,
|
|
||||||
model: &str,
|
|
||||||
messages: &[Message],
|
|
||||||
tools: Option<&[crate::agent::types::ToolDef]>,
|
|
||||||
tx: &mpsc::UnboundedSender<StreamEvent>,
|
|
||||||
ui_tx: &UiSender,
|
|
||||||
reasoning_effort: &str,
|
|
||||||
) -> Result<()> {
|
|
||||||
let (msg, usage) = stream(
|
|
||||||
client, api_key, model, messages, tools,
|
|
||||||
ui_tx, StreamTarget::Conversation, reasoning_effort,
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
// Synthesize events from the completed message.
|
|
||||||
if let Some(text) = msg.content.as_ref().and_then(|c| match c {
|
|
||||||
MessageContent::Text(t) => Some(t.as_str()),
|
|
||||||
_ => None,
|
|
||||||
}) {
|
|
||||||
if !text.is_empty() {
|
|
||||||
let _ = tx.send(StreamEvent::Content(text.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(ref tcs) = msg.tool_calls {
|
|
||||||
for (i, tc) in tcs.iter().enumerate() {
|
|
||||||
let _ = tx.send(StreamEvent::ToolCallDelta {
|
|
||||||
index: i,
|
|
||||||
id: Some(tc.id.clone()),
|
|
||||||
call_type: Some(tc.call_type.clone()),
|
|
||||||
name: Some(tc.function.name.clone()),
|
|
||||||
arguments: Some(tc.function.arguments.clone()),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(u) = usage {
|
|
||||||
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
|
||||||
let _ = tx.send(StreamEvent::Finished {
|
|
||||||
reason: "stop".into(),
|
|
||||||
prompt_tokens: u.prompt_tokens,
|
|
||||||
completion_tokens: u.completion_tokens,
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
let _ = tx.send(StreamEvent::Finished {
|
|
||||||
reason: "stop".into(),
|
|
||||||
prompt_tokens: 0,
|
|
||||||
completion_tokens: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
@ -1,17 +1,11 @@
|
||||||
// api/ — LLM API client with pluggable backends
|
// api/ — LLM API client (OpenAI-compatible)
|
||||||
//
|
//
|
||||||
// Supports two wire formats:
|
// Works with any provider that implements the OpenAI chat completions
|
||||||
// - OpenAI-compatible (OpenRouter, vLLM, llama.cpp, Qwen)
|
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
|
||||||
// - Anthropic Messages API (direct API access, prompt caching)
|
|
||||||
//
|
|
||||||
// The backend is auto-detected from the API base URL. Both backends
|
|
||||||
// return the same internal types (Message, Usage) so the rest of
|
|
||||||
// the codebase doesn't need to know which is in use.
|
|
||||||
//
|
//
|
||||||
// Diagnostics: anomalies always logged to debug panel.
|
// Diagnostics: anomalies always logged to debug panel.
|
||||||
// Set POC_DEBUG=1 for verbose per-turn logging.
|
// Set POC_DEBUG=1 for verbose per-turn logging.
|
||||||
|
|
||||||
mod anthropic;
|
|
||||||
mod openai;
|
mod openai;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
@ -54,18 +48,11 @@ pub enum StreamEvent {
|
||||||
Error(String),
|
Error(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Backend {
|
|
||||||
OpenAi {
|
|
||||||
base_url: String,
|
|
||||||
},
|
|
||||||
Anthropic,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ApiClient {
|
pub struct ApiClient {
|
||||||
client: Client,
|
client: Client,
|
||||||
api_key: String,
|
api_key: String,
|
||||||
pub model: String,
|
pub model: String,
|
||||||
backend: Backend,
|
base_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ApiClient {
|
impl ApiClient {
|
||||||
|
|
@ -76,18 +63,11 @@ impl ApiClient {
|
||||||
.build()
|
.build()
|
||||||
.expect("failed to build HTTP client");
|
.expect("failed to build HTTP client");
|
||||||
|
|
||||||
let base = base_url.trim_end_matches('/').to_string();
|
|
||||||
let backend = if base.contains("anthropic.com") {
|
|
||||||
Backend::Anthropic
|
|
||||||
} else {
|
|
||||||
Backend::OpenAi { base_url: base }
|
|
||||||
};
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
api_key: api_key.to_string(),
|
api_key: api_key.to_string(),
|
||||||
model: model.to_string(),
|
model: model.to_string(),
|
||||||
backend,
|
base_url: base_url.trim_end_matches('/').to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -113,30 +93,14 @@ impl ApiClient {
|
||||||
let tools = tools.map(|t| t.to_vec());
|
let tools = tools.map(|t| t.to_vec());
|
||||||
let ui_tx = ui_tx.clone();
|
let ui_tx = ui_tx.clone();
|
||||||
let reasoning_effort = reasoning_effort.to_string();
|
let reasoning_effort = reasoning_effort.to_string();
|
||||||
let backend = match &self.backend {
|
let base_url = self.base_url.clone();
|
||||||
Backend::OpenAi { base_url } => Backend::OpenAi { base_url: base_url.clone() },
|
|
||||||
Backend::Anthropic => Backend::Anthropic,
|
|
||||||
};
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let result = match &backend {
|
let result = openai::stream_events(
|
||||||
Backend::OpenAi { base_url } => {
|
&client, &base_url, &api_key, &model,
|
||||||
openai::stream_events(
|
|
||||||
&client, base_url, &api_key, &model,
|
|
||||||
&messages, tools.as_deref(), &tx, &ui_tx,
|
&messages, tools.as_deref(), &tx, &ui_tx,
|
||||||
&reasoning_effort, temperature, priority,
|
&reasoning_effort, temperature, priority,
|
||||||
).await
|
).await;
|
||||||
}
|
|
||||||
Backend::Anthropic => {
|
|
||||||
// Anthropic backend still uses the old path for now —
|
|
||||||
// wrap it by calling the old stream() and synthesizing events.
|
|
||||||
anthropic::stream_events(
|
|
||||||
&client, &api_key, &model,
|
|
||||||
&messages, tools.as_deref(), &tx, &ui_tx,
|
|
||||||
&reasoning_effort,
|
|
||||||
).await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
||||||
}
|
}
|
||||||
|
|
@ -211,17 +175,12 @@ impl ApiClient {
|
||||||
|
|
||||||
/// Return a label for the active backend, used in startup info.
|
/// Return a label for the active backend, used in startup info.
|
||||||
pub fn backend_label(&self) -> &str {
|
pub fn backend_label(&self) -> &str {
|
||||||
match &self.backend {
|
if self.base_url.contains("openrouter") {
|
||||||
Backend::OpenAi { base_url } => {
|
|
||||||
if base_url.contains("openrouter") {
|
|
||||||
"openrouter"
|
"openrouter"
|
||||||
} else {
|
} else {
|
||||||
"openai-compat"
|
"openai-compat"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Backend::Anthropic => "anthropic",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send an HTTP request and check for errors. Shared by both backends.
|
/// Send an HTTP request and check for errors. Shared by both backends.
|
||||||
|
|
@ -332,6 +291,8 @@ pub(crate) struct SseReader {
|
||||||
debug: bool,
|
debug: bool,
|
||||||
ui_tx: UiSender,
|
ui_tx: UiSender,
|
||||||
done: bool,
|
done: bool,
|
||||||
|
/// Serialized request payload — saved to disk on timeout for replay debugging.
|
||||||
|
request_json: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SseReader {
|
impl SseReader {
|
||||||
|
|
@ -346,9 +307,15 @@ impl SseReader {
|
||||||
debug: std::env::var("POC_DEBUG").is_ok(),
|
debug: std::env::var("POC_DEBUG").is_ok(),
|
||||||
ui_tx: ui_tx.clone(),
|
ui_tx: ui_tx.clone(),
|
||||||
done: false,
|
done: false,
|
||||||
|
request_json: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach the serialized request payload for error diagnostics.
|
||||||
|
pub fn set_request(&mut self, request: &impl serde::Serialize) {
|
||||||
|
self.request_json = serde_json::to_string_pretty(request).ok();
|
||||||
|
}
|
||||||
|
|
||||||
/// Read the next SSE event from the response stream.
|
/// Read the next SSE event from the response stream.
|
||||||
/// Returns Ok(Some(value)) for each parsed data line,
|
/// Returns Ok(Some(value)) for each parsed data line,
|
||||||
/// Ok(None) when the stream ends or [DONE] is received.
|
/// Ok(None) when the stream ends or [DONE] is received.
|
||||||
|
|
@ -415,6 +382,19 @@ impl SseReader {
|
||||||
self.chunks_received,
|
self.chunks_received,
|
||||||
self.stream_start.elapsed().as_secs_f64()
|
self.stream_start.elapsed().as_secs_f64()
|
||||||
)));
|
)));
|
||||||
|
// Save the request for replay debugging
|
||||||
|
if let Some(ref json) = self.request_json {
|
||||||
|
let log_dir = dirs::home_dir()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.join(".consciousness/logs");
|
||||||
|
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
|
||||||
|
let path = log_dir.join(format!("failed-request-{}.json", ts));
|
||||||
|
if std::fs::write(&path, json).is_ok() {
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Debug(format!(
|
||||||
|
"saved failed request to {}", path.display()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
anyhow::bail!(
|
anyhow::bail!(
|
||||||
"stream timeout: no data for {}s ({} chunks received)",
|
"stream timeout: no data for {}s ({} chunks received)",
|
||||||
self.chunk_timeout.as_secs(),
|
self.chunk_timeout.as_secs(),
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,7 @@ pub async fn stream_events(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut reader = super::SseReader::new(ui_tx);
|
let mut reader = super::SseReader::new(ui_tx);
|
||||||
|
reader.set_request(&request);
|
||||||
|
|
||||||
let mut content_len: usize = 0;
|
let mut content_len: usize = 0;
|
||||||
let mut reasoning_chars: usize = 0;
|
let mut reasoning_chars: usize = 0;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue