diff --git a/src/agent/api/anthropic.rs b/src/agent/api/anthropic.rs deleted file mode 100644 index dc42820..0000000 --- a/src/agent/api/anthropic.rs +++ /dev/null @@ -1,713 +0,0 @@ -// api/anthropic.rs — Anthropic Messages API backend -// -// Native Anthropic wire format for direct API access. Key advantages -// over the OpenAI-compat path: -// - Prompt caching (90% cost reduction on repeated prefixes) -// - No middleman (OpenRouter) — cleaner error handling -// - Native tool use and thinking support -// -// Message format conversion happens at the boundary: internal Message -// types are converted to Anthropic content blocks on send, and -// Anthropic streaming events are converted back to internal types. - -use anyhow::Result; -use reqwest::Client; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -use tokio::sync::mpsc; - -use crate::agent::types::*; -use crate::agent::ui_channel::{StreamTarget, UiMessage, UiSender}; -use super::StreamEvent; - -// --- Anthropic wire types --- - -#[derive(Serialize)] -struct Request { - model: String, - max_tokens: u32, - #[serde(skip_serializing_if = "Option::is_none")] - system: Option>, - messages: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - tools: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - tool_choice: Option, - #[serde(skip_serializing_if = "Option::is_none")] - temperature: Option, - stream: bool, - #[serde(skip_serializing_if = "Option::is_none")] - thinking: Option, -} - -#[derive(Serialize)] -struct ApiMessage { - role: String, - content: ApiContent, -} - -#[derive(Serialize)] -#[serde(untagged)] -enum ApiContent { - Text(String), - Blocks(Vec), -} - -#[derive(Serialize, Clone)] -#[serde(tag = "type")] -enum ContentBlock { - #[serde(rename = "text")] - Text { - text: String, - #[serde(skip_serializing_if = "Option::is_none")] - cache_control: Option, - }, - #[serde(rename = "tool_use")] - ToolUse { - id: String, - name: String, - input: serde_json::Value, - }, - #[serde(rename = "tool_result")] - ToolResult { - tool_use_id: String, - content: String, - #[serde(skip_serializing_if = "Option::is_none")] - is_error: Option, - }, -} - -#[derive(Serialize, Clone)] -struct CacheControl { - #[serde(rename = "type")] - cache_type: String, -} - -impl CacheControl { - fn ephemeral() -> Self { - Self { - cache_type: "ephemeral".to_string(), - } - } -} - -#[derive(Serialize)] -struct ToolDef { - name: String, - description: String, - input_schema: serde_json::Value, -} - -#[derive(Serialize)] -struct ToolChoice { - #[serde(rename = "type")] - choice_type: String, -} - -#[derive(Serialize)] -struct ThinkingConfig { - #[serde(rename = "type")] - thinking_type: String, - budget_tokens: u32, -} - -// --- Anthropic SSE event types --- - -#[derive(Deserialize)] -struct MessageStartEvent { - message: MessageStart, -} - -#[derive(Deserialize)] -struct MessageStart { - #[allow(dead_code)] - id: String, - usage: Option, -} - -#[derive(Deserialize)] -struct StartUsage { - input_tokens: u32, - #[serde(default)] - cache_creation_input_tokens: u32, - #[serde(default)] - cache_read_input_tokens: u32, -} - -#[derive(Deserialize)] -struct ContentBlockStartEvent { - index: usize, - content_block: ContentBlockType, -} - -#[derive(Deserialize)] -#[serde(tag = "type")] -enum ContentBlockType { - #[serde(rename = "text")] - Text { text: String }, - #[serde(rename = "tool_use")] - ToolUse { id: String, name: String }, - #[serde(rename = "thinking")] - Thinking {}, -} - -#[derive(Deserialize)] -struct ContentBlockDeltaEvent { - index: usize, - delta: DeltaType, -} - -#[derive(Deserialize)] -#[serde(tag = "type")] -enum DeltaType { - #[serde(rename = "text_delta")] - TextDelta { text: String }, - #[serde(rename = "input_json_delta")] - InputJsonDelta { partial_json: String }, - #[serde(rename = "thinking_delta")] - ThinkingDelta { thinking: String }, - #[serde(rename = "signature_delta")] - SignatureDelta { - #[allow(dead_code)] - signature: String, - }, -} - -#[derive(Deserialize)] -struct MessageDeltaEvent { - delta: MessageDelta, - usage: Option, -} - -#[derive(Deserialize)] -struct MessageDelta { - stop_reason: Option, -} - -#[derive(Deserialize)] -struct DeltaUsage { - output_tokens: u32, -} - -// --- Conversion: internal types → Anthropic wire format --- - -/// Convert internal Messages to Anthropic API format. -/// -/// Key differences from OpenAI format: -/// - System messages → extracted to system parameter -/// - Tool role → user message with tool_result content block -/// - Assistant tool_calls → assistant message with tool_use content blocks -/// - Consecutive same-role messages must be merged -/// - Prompt caching: cache_control on the last static block (context message) -fn convert_messages( - messages: &[Message], -) -> (Option>, Vec) { - let mut system_blocks: Vec = Vec::new(); - let mut api_messages: Vec = Vec::new(); - - // Track whether we've seen the first user message (identity context). - // The second user message gets cache_control to mark the end of the - // cacheable prefix (system prompt + context message). - let mut user_count = 0; - - for msg in messages { - match msg.role { - Role::System => { - system_blocks.push(ContentBlock::Text { - text: msg.content_text().to_string(), - cache_control: Some(CacheControl::ephemeral()), - }); - } - Role::User => { - user_count += 1; - // Cache the identity prefix: system + first two user messages - // (the context message and potentially the journal message). - let cache = if user_count <= 2 { - Some(CacheControl::ephemeral()) - } else { - None - }; - - let content = match &msg.content { - Some(MessageContent::Parts(parts)) => { - let blocks: Vec = parts - .iter() - .filter_map(|p| match p { - ContentPart::Text { text } => { - Some(ContentBlock::Text { - text: text.clone(), - cache_control: cache.clone(), - }) - } - ContentPart::ImageUrl { image_url } => { - // Skip images for now — Anthropic uses a - // different image format (base64 source block) - let _ = image_url; - None - } - }) - .collect(); - ApiContent::Blocks(blocks) - } - _ => { - let text = msg.content_text().to_string(); - if cache.is_some() { - ApiContent::Blocks(vec![ContentBlock::Text { - text, - cache_control: cache, - }]) - } else { - ApiContent::Text(text) - } - } - }; - - push_merged(&mut api_messages, "user", content); - } - Role::Assistant => { - let mut blocks: Vec = Vec::new(); - - // Text content - let text = msg.content_text(); - if !text.is_empty() { - blocks.push(ContentBlock::Text { - text: text.to_string(), - cache_control: None, - }); - } - - // Tool calls → tool_use blocks - if let Some(ref calls) = msg.tool_calls { - for call in calls { - let input: serde_json::Value = - serde_json::from_str(&call.function.arguments) - .unwrap_or_default(); - blocks.push(ContentBlock::ToolUse { - id: call.id.clone(), - name: call.function.name.clone(), - input, - }); - } - } - - if blocks.is_empty() { - // Empty assistant message — skip to avoid API rejection - continue; - } - - api_messages.push(ApiMessage { - role: "assistant".to_string(), - content: ApiContent::Blocks(blocks), - }); - } - Role::Tool => { - // Tool results become user messages with tool_result blocks - let tool_use_id = msg - .tool_call_id - .as_deref() - .unwrap_or("unknown") - .to_string(); - let result_text = msg.content_text().to_string(); - let is_error = if result_text.starts_with("Error:") { - Some(true) - } else { - None - }; - - let block = ContentBlock::ToolResult { - tool_use_id, - content: result_text, - is_error, - }; - - push_merged( - &mut api_messages, - "user", - ApiContent::Blocks(vec![block]), - ); - } - } - } - - let system = if system_blocks.is_empty() { - None - } else { - Some(system_blocks) - }; - - (system, api_messages) -} - -/// Push a message, merging with the previous one if it has the same role. -/// Anthropic requires strict user/assistant alternation, and tool results -/// (mapped to user role) can pile up between assistant messages. -fn push_merged(messages: &mut Vec, role: &str, content: ApiContent) { - if let Some(last) = messages.last_mut() { - if last.role == role { - // Merge into existing message's content blocks - let existing = std::mem::replace( - &mut last.content, - ApiContent::Text(String::new()), - ); - let mut blocks = match existing { - ApiContent::Text(t) => { - if t.is_empty() { - Vec::new() - } else { - vec![ContentBlock::Text { - text: t, - cache_control: None, - }] - } - } - ApiContent::Blocks(b) => b, - }; - match content { - ApiContent::Text(t) => { - if !t.is_empty() { - blocks.push(ContentBlock::Text { - text: t, - cache_control: None, - }); - } - } - ApiContent::Blocks(b) => blocks.extend(b), - } - last.content = ApiContent::Blocks(blocks); - return; - } - } - messages.push(ApiMessage { - role: role.to_string(), - content, - }); -} - -/// Convert internal ToolDef to Anthropic format. -fn convert_tools(tools: &[crate::agent::types::ToolDef]) -> Vec { - tools - .iter() - .map(|t| ToolDef { - name: t.function.name.clone(), - description: t.function.description.clone(), - input_schema: t.function.parameters.clone(), - }) - .collect() -} - -// --- Streaming implementation --- - -pub async fn stream( - client: &Client, - api_key: &str, - model: &str, - messages: &[Message], - tools: Option<&[crate::agent::types::ToolDef]>, - ui_tx: &UiSender, - target: StreamTarget, - reasoning_effort: &str, -) -> Result<(Message, Option)> { - let (system, api_messages) = convert_messages(messages); - - let thinking = match reasoning_effort { - "none" => None, - "low" => Some(ThinkingConfig { - thinking_type: "enabled".to_string(), - budget_tokens: 2048, - }), - _ => Some(ThinkingConfig { - thinking_type: "enabled".to_string(), - budget_tokens: 16000, - }), - }; - - // When thinking is enabled, temperature must be 1.0 (Anthropic requirement) - let temperature = if thinking.is_some() { None } else { Some(0.6) }; - - let request = Request { - model: model.to_string(), - max_tokens: if thinking.is_some() { 32768 } else { 16384 }, - system, - messages: api_messages, - tools: tools.map(|t| convert_tools(t)), - tool_choice: tools.map(|_| ToolChoice { - choice_type: "auto".to_string(), - }), - temperature, - stream: true, - thinking, - }; - - let msg_count = messages.len(); - let debug_label = format!("{} messages, model={}", msg_count, model); - - let mut response = super::send_and_check( - client, - "https://api.anthropic.com/v1/messages", - &request, - ("x-api-key", api_key), - &[("anthropic-version", "2023-06-01")], - ui_tx, - &debug_label, - ) - .await?; - - let debug = std::env::var("POC_DEBUG").is_ok(); - let mut reader = super::SseReader::new(ui_tx); - - let mut content = String::new(); - let mut tool_calls: Vec = Vec::new(); - let mut input_tokens: u32 = 0; - let mut output_tokens: u32 = 0; - let mut cache_creation_tokens: u32 = 0; - let mut cache_read_tokens: u32 = 0; - let mut finish_reason: Option = None; - - // Track which content blocks are which type - let mut block_types: Vec = Vec::new(); // "text", "tool_use", "thinking" - let mut tool_inputs: Vec = Vec::new(); // accumulated JSON for tool_use blocks - let mut tool_ids: Vec = Vec::new(); - let mut tool_names: Vec = Vec::new(); - - let mut reasoning_chars: usize = 0; - let mut empty_deltas: u64 = 0; - let mut first_content_at: Option = None; - - let reasoning_enabled = reasoning_effort != "none"; - - while let Some(event) = reader.next_event(&mut response).await? { - let event_type = event["type"].as_str().unwrap_or(""); - - match event_type { - "message_start" => { - if let Ok(ev) = - serde_json::from_value::(event.clone()) - { - if let Some(u) = ev.message.usage { - input_tokens = u.input_tokens; - cache_creation_tokens = u.cache_creation_input_tokens; - cache_read_tokens = u.cache_read_input_tokens; - } - } - } - - "content_block_start" => { - if let Ok(ev) = - serde_json::from_value::(event.clone()) - { - let idx = ev.index; - while block_types.len() <= idx { - block_types.push(String::new()); - tool_inputs.push(String::new()); - tool_ids.push(String::new()); - tool_names.push(String::new()); - } - match ev.content_block { - ContentBlockType::Text { text: initial } => { - block_types[idx] = "text".to_string(); - if !initial.is_empty() { - content.push_str(&initial); - let _ = ui_tx - .send(UiMessage::TextDelta(initial, target)); - } - } - ContentBlockType::ToolUse { id, name } => { - block_types[idx] = "tool_use".to_string(); - tool_ids[idx] = id; - tool_names[idx] = name; - } - ContentBlockType::Thinking {} => { - block_types[idx] = "thinking".to_string(); - } - } - } - } - - "content_block_delta" => { - if let Ok(ev) = - serde_json::from_value::(event.clone()) - { - let idx = ev.index; - match ev.delta { - DeltaType::TextDelta { text: delta } => { - if first_content_at.is_none() && !delta.is_empty() { - first_content_at = - Some(reader.stream_start.elapsed()); - let _ = ui_tx.send(UiMessage::Activity( - "streaming...".into(), - )); - } - content.push_str(&delta); - let _ = - ui_tx.send(UiMessage::TextDelta(delta, target)); - } - DeltaType::InputJsonDelta { partial_json } => { - if idx < tool_inputs.len() { - tool_inputs[idx].push_str(&partial_json); - } - } - DeltaType::ThinkingDelta { thinking } => { - reasoning_chars += thinking.len(); - if reasoning_enabled && !thinking.is_empty() { - let _ = - ui_tx.send(UiMessage::Reasoning(thinking)); - } - } - DeltaType::SignatureDelta { .. } => {} - } - } else { - empty_deltas += 1; - } - } - - "content_block_stop" => { - // Finalize tool_use blocks - let idx = event["index"].as_u64().unwrap_or(0) as usize; - if idx < block_types.len() && block_types[idx] == "tool_use" { - let input: serde_json::Value = - serde_json::from_str(&tool_inputs[idx]).unwrap_or_default(); - tool_calls.push(ToolCall { - id: tool_ids[idx].clone(), - call_type: "function".to_string(), - function: FunctionCall { - name: tool_names[idx].clone(), - arguments: serde_json::to_string(&input) - .unwrap_or_default(), - }, - }); - } - } - - "message_delta" => { - if let Ok(ev) = - serde_json::from_value::(event.clone()) - { - if let Some(reason) = ev.delta.stop_reason { - finish_reason = Some(reason); - } - if let Some(u) = ev.usage { - output_tokens = u.output_tokens; - } - } - } - - "message_stop" | "ping" => {} - - "error" => { - let err_msg = event["error"]["message"] - .as_str() - .unwrap_or("unknown error"); - let _ = ui_tx.send(UiMessage::Debug(format!( - "API error in stream: {}", - err_msg - ))); - anyhow::bail!("API error in stream: {}", err_msg); - } - - _ => { - if debug { - let _ = ui_tx.send(UiMessage::Debug(format!( - "unknown SSE event type: {}", - event_type - ))); - } - } - } - } - - let total_elapsed = reader.stream_start.elapsed(); - if !content.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); - } - - // Build Usage from Anthropic's token counts - let total_input = input_tokens + cache_creation_tokens + cache_read_tokens; - let usage = Some(Usage { - prompt_tokens: total_input, - completion_tokens: output_tokens, - total_tokens: total_input + output_tokens, - }); - - // Log cache stats in debug mode - if debug && (cache_creation_tokens > 0 || cache_read_tokens > 0) { - let _ = ui_tx.send(UiMessage::Debug(format!( - "cache: {} write + {} read tokens (input: {} uncached)", - cache_creation_tokens, cache_read_tokens, input_tokens, - ))); - } - - super::log_diagnostics( - ui_tx, - content.len(), - tool_calls.len(), - reasoning_chars, - reasoning_effort, - &finish_reason, - reader.chunks_received, - reader.sse_lines_parsed, - reader.sse_parse_errors, - empty_deltas, - total_elapsed, - first_content_at, - &usage, - &tool_calls, - ); - - Ok((super::build_response_message(content, tool_calls), usage)) -} - -/// Wrapper that calls the existing stream() and synthesizes StreamEvents. -/// TODO: refactor to emit events during streaming like the OpenAI backend. -pub async fn stream_events( - client: &Client, - api_key: &str, - model: &str, - messages: &[Message], - tools: Option<&[crate::agent::types::ToolDef]>, - tx: &mpsc::UnboundedSender, - ui_tx: &UiSender, - reasoning_effort: &str, -) -> Result<()> { - let (msg, usage) = stream( - client, api_key, model, messages, tools, - ui_tx, StreamTarget::Conversation, reasoning_effort, - ).await?; - - // Synthesize events from the completed message. - if let Some(text) = msg.content.as_ref().and_then(|c| match c { - MessageContent::Text(t) => Some(t.as_str()), - _ => None, - }) { - if !text.is_empty() { - let _ = tx.send(StreamEvent::Content(text.to_string())); - } - } - if let Some(ref tcs) = msg.tool_calls { - for (i, tc) in tcs.iter().enumerate() { - let _ = tx.send(StreamEvent::ToolCallDelta { - index: i, - id: Some(tc.id.clone()), - call_type: Some(tc.call_type.clone()), - name: Some(tc.function.name.clone()), - arguments: Some(tc.function.arguments.clone()), - }); - } - } - if let Some(u) = usage { - let _ = tx.send(StreamEvent::Usage(u.clone())); - let _ = tx.send(StreamEvent::Finished { - reason: "stop".into(), - prompt_tokens: u.prompt_tokens, - completion_tokens: u.completion_tokens, - }); - } else { - let _ = tx.send(StreamEvent::Finished { - reason: "stop".into(), - prompt_tokens: 0, - completion_tokens: 0, - }); - } - - Ok(()) -} diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 528ea8b..81a068a 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -1,17 +1,11 @@ -// api/ — LLM API client with pluggable backends +// api/ — LLM API client (OpenAI-compatible) // -// Supports two wire formats: -// - OpenAI-compatible (OpenRouter, vLLM, llama.cpp, Qwen) -// - Anthropic Messages API (direct API access, prompt caching) -// -// The backend is auto-detected from the API base URL. Both backends -// return the same internal types (Message, Usage) so the rest of -// the codebase doesn't need to know which is in use. +// Works with any provider that implements the OpenAI chat completions +// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc. // // Diagnostics: anomalies always logged to debug panel. // Set POC_DEBUG=1 for verbose per-turn logging. -mod anthropic; mod openai; use anyhow::Result; @@ -54,18 +48,11 @@ pub enum StreamEvent { Error(String), } -enum Backend { - OpenAi { - base_url: String, - }, - Anthropic, -} - pub struct ApiClient { client: Client, api_key: String, pub model: String, - backend: Backend, + base_url: String, } impl ApiClient { @@ -76,18 +63,11 @@ impl ApiClient { .build() .expect("failed to build HTTP client"); - let base = base_url.trim_end_matches('/').to_string(); - let backend = if base.contains("anthropic.com") { - Backend::Anthropic - } else { - Backend::OpenAi { base_url: base } - }; - Self { client, api_key: api_key.to_string(), model: model.to_string(), - backend, + base_url: base_url.trim_end_matches('/').to_string(), } } @@ -113,30 +93,14 @@ impl ApiClient { let tools = tools.map(|t| t.to_vec()); let ui_tx = ui_tx.clone(); let reasoning_effort = reasoning_effort.to_string(); - let backend = match &self.backend { - Backend::OpenAi { base_url } => Backend::OpenAi { base_url: base_url.clone() }, - Backend::Anthropic => Backend::Anthropic, - }; + let base_url = self.base_url.clone(); tokio::spawn(async move { - let result = match &backend { - Backend::OpenAi { base_url } => { - openai::stream_events( - &client, base_url, &api_key, &model, - &messages, tools.as_deref(), &tx, &ui_tx, - &reasoning_effort, temperature, priority, - ).await - } - Backend::Anthropic => { - // Anthropic backend still uses the old path for now — - // wrap it by calling the old stream() and synthesizing events. - anthropic::stream_events( - &client, &api_key, &model, - &messages, tools.as_deref(), &tx, &ui_tx, - &reasoning_effort, - ).await - } - }; + let result = openai::stream_events( + &client, &base_url, &api_key, &model, + &messages, tools.as_deref(), &tx, &ui_tx, + &reasoning_effort, temperature, priority, + ).await; if let Err(e) = result { let _ = tx.send(StreamEvent::Error(e.to_string())); } @@ -211,15 +175,10 @@ impl ApiClient { /// Return a label for the active backend, used in startup info. pub fn backend_label(&self) -> &str { - match &self.backend { - Backend::OpenAi { base_url } => { - if base_url.contains("openrouter") { - "openrouter" - } else { - "openai-compat" - } - } - Backend::Anthropic => "anthropic", + if self.base_url.contains("openrouter") { + "openrouter" + } else { + "openai-compat" } } } @@ -332,6 +291,8 @@ pub(crate) struct SseReader { debug: bool, ui_tx: UiSender, done: bool, + /// Serialized request payload — saved to disk on timeout for replay debugging. + request_json: Option, } impl SseReader { @@ -346,9 +307,15 @@ impl SseReader { debug: std::env::var("POC_DEBUG").is_ok(), ui_tx: ui_tx.clone(), done: false, + request_json: None, } } + /// Attach the serialized request payload for error diagnostics. + pub fn set_request(&mut self, request: &impl serde::Serialize) { + self.request_json = serde_json::to_string_pretty(request).ok(); + } + /// Read the next SSE event from the response stream. /// Returns Ok(Some(value)) for each parsed data line, /// Ok(None) when the stream ends or [DONE] is received. @@ -415,6 +382,19 @@ impl SseReader { self.chunks_received, self.stream_start.elapsed().as_secs_f64() ))); + // Save the request for replay debugging + if let Some(ref json) = self.request_json { + let log_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/logs"); + let ts = chrono::Local::now().format("%Y%m%dT%H%M%S"); + let path = log_dir.join(format!("failed-request-{}.json", ts)); + if std::fs::write(&path, json).is_ok() { + let _ = self.ui_tx.send(UiMessage::Debug(format!( + "saved failed request to {}", path.display() + ))); + } + } anyhow::bail!( "stream timeout: no data for {}s ({} chunks received)", self.chunk_timeout.as_secs(), diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index aec50ec..0d6a11f 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -68,6 +68,7 @@ pub async fn stream_events( .await?; let mut reader = super::SseReader::new(ui_tx); + reader.set_request(&request); let mut content_len: usize = 0; let mut reasoning_chars: usize = 0;