// api/anthropic.rs — Anthropic Messages API backend // // Native Anthropic wire format for direct API access. Key advantages // over the OpenAI-compat path: // - Prompt caching (90% cost reduction on repeated prefixes) // - No middleman (OpenRouter) — cleaner error handling // - Native tool use and thinking support // // Message format conversion happens at the boundary: internal Message // types are converted to Anthropic content blocks on send, and // Anthropic streaming events are converted back to internal types. use anyhow::Result; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::time::Duration; use crate::types::*; use crate::ui_channel::{StreamTarget, UiMessage, UiSender}; // --- Anthropic wire types --- #[derive(Serialize)] struct Request { model: String, max_tokens: u32, #[serde(skip_serializing_if = "Option::is_none")] system: Option>, messages: Vec, #[serde(skip_serializing_if = "Option::is_none")] tools: Option>, #[serde(skip_serializing_if = "Option::is_none")] tool_choice: Option, #[serde(skip_serializing_if = "Option::is_none")] temperature: Option, stream: bool, #[serde(skip_serializing_if = "Option::is_none")] thinking: Option, } #[derive(Serialize)] struct ApiMessage { role: String, content: ApiContent, } #[derive(Serialize)] #[serde(untagged)] enum ApiContent { Text(String), Blocks(Vec), } #[derive(Serialize, Clone)] #[serde(tag = "type")] enum ContentBlock { #[serde(rename = "text")] Text { text: String, #[serde(skip_serializing_if = "Option::is_none")] cache_control: Option, }, #[serde(rename = "tool_use")] ToolUse { id: String, name: String, input: serde_json::Value, }, #[serde(rename = "tool_result")] ToolResult { tool_use_id: String, content: String, #[serde(skip_serializing_if = "Option::is_none")] is_error: Option, }, } #[derive(Serialize, Clone)] struct CacheControl { #[serde(rename = "type")] cache_type: String, } impl CacheControl { fn ephemeral() -> Self { Self { cache_type: "ephemeral".to_string(), } } } #[derive(Serialize)] struct ToolDef { name: String, description: String, input_schema: serde_json::Value, } #[derive(Serialize)] struct ToolChoice { #[serde(rename = "type")] choice_type: String, } #[derive(Serialize)] struct ThinkingConfig { #[serde(rename = "type")] thinking_type: String, budget_tokens: u32, } // --- Anthropic SSE event types --- #[derive(Deserialize)] struct MessageStartEvent { message: MessageStart, } #[derive(Deserialize)] struct MessageStart { #[allow(dead_code)] id: String, usage: Option, } #[derive(Deserialize)] struct StartUsage { input_tokens: u32, #[serde(default)] cache_creation_input_tokens: u32, #[serde(default)] cache_read_input_tokens: u32, } #[derive(Deserialize)] struct ContentBlockStartEvent { index: usize, content_block: ContentBlockType, } #[derive(Deserialize)] #[serde(tag = "type")] enum ContentBlockType { #[serde(rename = "text")] Text { text: String }, #[serde(rename = "tool_use")] ToolUse { id: String, name: String }, #[serde(rename = "thinking")] Thinking {}, } #[derive(Deserialize)] struct ContentBlockDeltaEvent { index: usize, delta: DeltaType, } #[derive(Deserialize)] #[serde(tag = "type")] enum DeltaType { #[serde(rename = "text_delta")] TextDelta { text: String }, #[serde(rename = "input_json_delta")] InputJsonDelta { partial_json: String }, #[serde(rename = "thinking_delta")] ThinkingDelta { thinking: String }, #[serde(rename = "signature_delta")] SignatureDelta { #[allow(dead_code)] signature: String, }, } #[derive(Deserialize)] struct MessageDeltaEvent { delta: MessageDelta, usage: Option, } #[derive(Deserialize)] struct MessageDelta { stop_reason: Option, } #[derive(Deserialize)] struct DeltaUsage { output_tokens: u32, } // --- Conversion: internal types → Anthropic wire format --- /// Convert internal Messages to Anthropic API format. /// /// Key differences from OpenAI format: /// - System messages → extracted to system parameter /// - Tool role → user message with tool_result content block /// - Assistant tool_calls → assistant message with tool_use content blocks /// - Consecutive same-role messages must be merged /// - Prompt caching: cache_control on the last static block (context message) fn convert_messages( messages: &[Message], ) -> (Option>, Vec) { let mut system_blocks: Vec = Vec::new(); let mut api_messages: Vec = Vec::new(); // Track whether we've seen the first user message (identity context). // The second user message gets cache_control to mark the end of the // cacheable prefix (system prompt + context message). let mut user_count = 0; for msg in messages { match msg.role { Role::System => { system_blocks.push(ContentBlock::Text { text: msg.content_text().to_string(), cache_control: Some(CacheControl::ephemeral()), }); } Role::User => { user_count += 1; // Cache the identity prefix: system + first two user messages // (the context message and potentially the journal message). let cache = if user_count <= 2 { Some(CacheControl::ephemeral()) } else { None }; let content = match &msg.content { Some(MessageContent::Parts(parts)) => { let blocks: Vec = parts .iter() .filter_map(|p| match p { ContentPart::Text { text } => { Some(ContentBlock::Text { text: text.clone(), cache_control: cache.clone(), }) } ContentPart::ImageUrl { image_url } => { // Skip images for now — Anthropic uses a // different image format (base64 source block) let _ = image_url; None } }) .collect(); ApiContent::Blocks(blocks) } _ => { let text = msg.content_text().to_string(); if cache.is_some() { ApiContent::Blocks(vec![ContentBlock::Text { text, cache_control: cache, }]) } else { ApiContent::Text(text) } } }; push_merged(&mut api_messages, "user", content); } Role::Assistant => { let mut blocks: Vec = Vec::new(); // Text content let text = msg.content_text(); if !text.is_empty() { blocks.push(ContentBlock::Text { text: text.to_string(), cache_control: None, }); } // Tool calls → tool_use blocks if let Some(ref calls) = msg.tool_calls { for call in calls { let input: serde_json::Value = serde_json::from_str(&call.function.arguments) .unwrap_or_default(); blocks.push(ContentBlock::ToolUse { id: call.id.clone(), name: call.function.name.clone(), input, }); } } if blocks.is_empty() { // Empty assistant message — skip to avoid API rejection continue; } api_messages.push(ApiMessage { role: "assistant".to_string(), content: ApiContent::Blocks(blocks), }); } Role::Tool => { // Tool results become user messages with tool_result blocks let tool_use_id = msg .tool_call_id .as_deref() .unwrap_or("unknown") .to_string(); let result_text = msg.content_text().to_string(); let is_error = if result_text.starts_with("Error:") { Some(true) } else { None }; let block = ContentBlock::ToolResult { tool_use_id, content: result_text, is_error, }; push_merged( &mut api_messages, "user", ApiContent::Blocks(vec![block]), ); } } } let system = if system_blocks.is_empty() { None } else { Some(system_blocks) }; (system, api_messages) } /// Push a message, merging with the previous one if it has the same role. /// Anthropic requires strict user/assistant alternation, and tool results /// (mapped to user role) can pile up between assistant messages. fn push_merged(messages: &mut Vec, role: &str, content: ApiContent) { if let Some(last) = messages.last_mut() { if last.role == role { // Merge into existing message's content blocks let existing = std::mem::replace( &mut last.content, ApiContent::Text(String::new()), ); let mut blocks = match existing { ApiContent::Text(t) => { if t.is_empty() { Vec::new() } else { vec![ContentBlock::Text { text: t, cache_control: None, }] } } ApiContent::Blocks(b) => b, }; match content { ApiContent::Text(t) => { if !t.is_empty() { blocks.push(ContentBlock::Text { text: t, cache_control: None, }); } } ApiContent::Blocks(b) => blocks.extend(b), } last.content = ApiContent::Blocks(blocks); return; } } messages.push(ApiMessage { role: role.to_string(), content, }); } /// Convert internal ToolDef to Anthropic format. fn convert_tools(tools: &[crate::types::ToolDef]) -> Vec { tools .iter() .map(|t| ToolDef { name: t.function.name.clone(), description: t.function.description.clone(), input_schema: t.function.parameters.clone(), }) .collect() } // --- Streaming implementation --- pub async fn stream( client: &Client, api_key: &str, model: &str, messages: &[Message], tools: Option<&[crate::types::ToolDef]>, ui_tx: &UiSender, target: StreamTarget, reasoning_effort: &str, ) -> Result<(Message, Option)> { let (system, api_messages) = convert_messages(messages); let thinking = match reasoning_effort { "none" => None, "low" => Some(ThinkingConfig { thinking_type: "enabled".to_string(), budget_tokens: 2048, }), _ => Some(ThinkingConfig { thinking_type: "enabled".to_string(), budget_tokens: 16000, }), }; // When thinking is enabled, temperature must be 1.0 (Anthropic requirement) let temperature = if thinking.is_some() { None } else { Some(0.6) }; let request = Request { model: model.to_string(), max_tokens: if thinking.is_some() { 32768 } else { 16384 }, system, messages: api_messages, tools: tools.map(|t| convert_tools(t)), tool_choice: tools.map(|_| ToolChoice { choice_type: "auto".to_string(), }), temperature, stream: true, thinking, }; let msg_count = messages.len(); let debug_label = format!("{} messages, model={}", msg_count, model); let mut response = super::send_and_check( client, "https://api.anthropic.com/v1/messages", &request, ("x-api-key", api_key), &[("anthropic-version", "2023-06-01")], ui_tx, &debug_label, ) .await?; let debug = std::env::var("POC_DEBUG").is_ok(); let mut reader = super::SseReader::new(ui_tx); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut input_tokens: u32 = 0; let mut output_tokens: u32 = 0; let mut cache_creation_tokens: u32 = 0; let mut cache_read_tokens: u32 = 0; let mut finish_reason: Option = None; // Track which content blocks are which type let mut block_types: Vec = Vec::new(); // "text", "tool_use", "thinking" let mut tool_inputs: Vec = Vec::new(); // accumulated JSON for tool_use blocks let mut tool_ids: Vec = Vec::new(); let mut tool_names: Vec = Vec::new(); let mut reasoning_chars: usize = 0; let mut empty_deltas: u64 = 0; let mut first_content_at: Option = None; let reasoning_enabled = reasoning_effort != "none"; while let Some(event) = reader.next_event(&mut response).await? { let event_type = event["type"].as_str().unwrap_or(""); match event_type { "message_start" => { if let Ok(ev) = serde_json::from_value::(event.clone()) { if let Some(u) = ev.message.usage { input_tokens = u.input_tokens; cache_creation_tokens = u.cache_creation_input_tokens; cache_read_tokens = u.cache_read_input_tokens; } } } "content_block_start" => { if let Ok(ev) = serde_json::from_value::(event.clone()) { let idx = ev.index; while block_types.len() <= idx { block_types.push(String::new()); tool_inputs.push(String::new()); tool_ids.push(String::new()); tool_names.push(String::new()); } match ev.content_block { ContentBlockType::Text { text: initial } => { block_types[idx] = "text".to_string(); if !initial.is_empty() { content.push_str(&initial); let _ = ui_tx .send(UiMessage::TextDelta(initial, target)); } } ContentBlockType::ToolUse { id, name } => { block_types[idx] = "tool_use".to_string(); tool_ids[idx] = id; tool_names[idx] = name; } ContentBlockType::Thinking {} => { block_types[idx] = "thinking".to_string(); } } } } "content_block_delta" => { if let Ok(ev) = serde_json::from_value::(event.clone()) { let idx = ev.index; match ev.delta { DeltaType::TextDelta { text: delta } => { if first_content_at.is_none() && !delta.is_empty() { first_content_at = Some(reader.stream_start.elapsed()); let _ = ui_tx.send(UiMessage::Activity( "streaming...".into(), )); } content.push_str(&delta); let _ = ui_tx.send(UiMessage::TextDelta(delta, target)); } DeltaType::InputJsonDelta { partial_json } => { if idx < tool_inputs.len() { tool_inputs[idx].push_str(&partial_json); } } DeltaType::ThinkingDelta { thinking } => { reasoning_chars += thinking.len(); if reasoning_enabled && !thinking.is_empty() { let _ = ui_tx.send(UiMessage::Reasoning(thinking)); } } DeltaType::SignatureDelta { .. } => {} } } else { empty_deltas += 1; } } "content_block_stop" => { // Finalize tool_use blocks let idx = event["index"].as_u64().unwrap_or(0) as usize; if idx < block_types.len() && block_types[idx] == "tool_use" { let input: serde_json::Value = serde_json::from_str(&tool_inputs[idx]).unwrap_or_default(); tool_calls.push(ToolCall { id: tool_ids[idx].clone(), call_type: "function".to_string(), function: FunctionCall { name: tool_names[idx].clone(), arguments: serde_json::to_string(&input) .unwrap_or_default(), }, }); } } "message_delta" => { if let Ok(ev) = serde_json::from_value::(event.clone()) { if let Some(reason) = ev.delta.stop_reason { finish_reason = Some(reason); } if let Some(u) = ev.usage { output_tokens = u.output_tokens; } } } "message_stop" | "ping" => {} "error" => { let err_msg = event["error"]["message"] .as_str() .unwrap_or("unknown error"); let _ = ui_tx.send(UiMessage::Debug(format!( "API error in stream: {}", err_msg ))); anyhow::bail!("API error in stream: {}", err_msg); } _ => { if debug { let _ = ui_tx.send(UiMessage::Debug(format!( "unknown SSE event type: {}", event_type ))); } } } } let total_elapsed = reader.stream_start.elapsed(); if !content.is_empty() { let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); } // Build Usage from Anthropic's token counts let total_input = input_tokens + cache_creation_tokens + cache_read_tokens; let usage = Some(Usage { prompt_tokens: total_input, completion_tokens: output_tokens, total_tokens: total_input + output_tokens, }); // Log cache stats in debug mode if debug && (cache_creation_tokens > 0 || cache_read_tokens > 0) { let _ = ui_tx.send(UiMessage::Debug(format!( "cache: {} write + {} read tokens (input: {} uncached)", cache_creation_tokens, cache_read_tokens, input_tokens, ))); } super::log_diagnostics( ui_tx, content.len(), tool_calls.len(), reasoning_chars, reasoning_effort, &finish_reason, reader.chunks_received, reader.sse_lines_parsed, reader.sse_parse_errors, empty_deltas, total_elapsed, first_content_at, &usage, &tool_calls, ); Ok((super::build_response_message(content, tool_calls), usage)) }