diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 0f24e7e..25ac419 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -7,7 +7,6 @@ // Set POC_DEBUG=1 for verbose per-turn logging. pub mod http; -pub(crate) mod parsing; mod types; mod openai; @@ -21,8 +20,6 @@ use self::http::{HttpClient, HttpResponse}; use tokio::sync::mpsc; -use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall}; - /// A JoinHandle that aborts its task when dropped. pub(crate) struct AbortOnDrop(tokio::task::JoinHandle<()>); @@ -44,12 +41,6 @@ pub(crate) struct SamplingParams { // Stream events — yielded by backends, consumed by the runner // ───────────────────────────────────────────────────────────── -/// Build the tools JSON string from a slice of Tools. -fn tools_to_json_str(tools: &[agent_tools::Tool]) -> String { - let inner: Vec = tools.iter().map(|t| t.to_json()).collect(); - format!("[{}]", inner.join(",")) -} - /// One token from the streaming completions API. pub(crate) enum StreamToken { Token { text: String, id: u32 }, @@ -359,134 +350,3 @@ impl SseReader { } } } - -/// Build a response Message from accumulated content and tool calls. -/// Shared by both backends — the wire format differs but the internal -/// representation is the same. -/// -/// If no structured tool calls came from the API but the content -/// contains leaked tool call XML (e.g. `...` -/// from models that emit tool calls as text), parse them out and -/// promote them to structured tool_calls. This way all consumers -/// see tool calls uniformly regardless of backend. -pub(crate) fn build_response_message( - content: String, - tool_calls: Vec, -) -> Message { - // If the API returned structured tool calls, use them as-is. - if !tool_calls.is_empty() { - return Message { - role: Role::Assistant, - content: if content.is_empty() { None } - else { Some(MessageContent::Text(content)) }, - tool_calls: Some(tool_calls), - tool_call_id: None, - name: None, - timestamp: None, - }; - } - - // Check for leaked tool calls in content text. - let leaked = parsing::parse_leaked_tool_calls(&content); - if !leaked.is_empty() { - let cleaned = parsing::strip_leaked_artifacts(&content); - return Message { - role: Role::Assistant, - content: if cleaned.trim().is_empty() { None } - else { Some(MessageContent::Text(cleaned)) }, - tool_calls: Some(leaked), - tool_call_id: None, - name: None, - timestamp: None, - }; - } - - Message { - role: Role::Assistant, - content: if content.is_empty() { None } - else { Some(MessageContent::Text(content)) }, - tool_calls: None, - tool_call_id: None, - name: None, - timestamp: None, - } -} - -/// Log stream diagnostics. Shared by both backends. -pub(crate) fn log_diagnostics( - content_len: usize, - tool_count: usize, - reasoning_chars: usize, - reasoning_effort: &str, - finish_reason: &Option, - chunks_received: u64, - sse_lines_parsed: u64, - sse_parse_errors: u64, - empty_deltas: u64, - total_elapsed: Duration, - first_content_at: Option, - usage: &Option, - tools: &[ToolCall], -) { - let debug = std::env::var("POC_DEBUG").is_ok(); - - if reasoning_chars > 0 && reasoning_effort == "none" { - dbglog!( - "note: {} chars leaked reasoning (suppressed from display)", - reasoning_chars - ); - } - if content_len == 0 && tool_count == 0 { - dbglog!( - "WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \ - parse_errors: {}, empty_deltas: {}, {:.1}s)", - finish_reason, chunks_received, reasoning_chars, - sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64() - ); - } - if finish_reason.is_none() && chunks_received > 0 { - dbglog!( - "WARNING: stream ended without finish_reason ({} chunks, {} content chars)", - chunks_received, content_len - ); - } - if sse_parse_errors > 0 { - dbglog!( - "WARNING: {} SSE parse errors out of {} lines", - sse_parse_errors, sse_lines_parsed - ); - } - - if debug { - if let Some(u) = usage { - dbglog!( - "tokens: {} prompt + {} completion = {} total", - u.prompt_tokens, u.completion_tokens, u.total_tokens - ); - } - let ttft = first_content_at - .map(|d| format!("{:.1}s", d.as_secs_f64())) - .unwrap_or_else(|| "none".to_string()); - dbglog!( - "stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \ - {} content chars, {} reasoning chars, {} tools, \ - finish={:?}", - total_elapsed.as_secs_f64(), - ttft, - chunks_received, - sse_lines_parsed, - content_len, - reasoning_chars, - tool_count, - finish_reason, - ); - if !tools.is_empty() { - for (i, tc) in tools.iter().enumerate() { - dbglog!( - " tool[{}]: {} (id: {}, {} arg chars)", - i, tc.function.name, tc.id, tc.function.arguments.len() - ); - } - } - } -} diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index 4d766f2..6577037 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -11,181 +11,6 @@ use super::http::HttpClient; use super::types::*; use super::StreamToken; -/// Stream SSE events from an OpenAI-compatible endpoint, sending -/// parsed StreamEvents through the channel. The caller (runner) -/// handles routing to the UI. -pub(super) async fn stream_events( - client: &HttpClient, - base_url: &str, - api_key: &str, - model: &str, - messages: &[Message], - tools_json: &serde_json::Value, - tx: &mpsc::UnboundedSender, - reasoning_effort: &str, - sampling: super::SamplingParams, - priority: Option, -) -> Result<()> { - let has_tools = tools_json.as_array().map_or(false, |a| !a.is_empty()); - let request = ChatRequest { - model: model.to_string(), - messages: messages.to_vec(), - tool_choice: if has_tools { Some("auto".to_string()) } else { None }, - tools: if has_tools { Some(tools_json.clone()) } else { None }, - max_tokens: Some(16384), - temperature: Some(sampling.temperature), - top_p: Some(sampling.top_p), - top_k: Some(sampling.top_k), - stream: Some(true), - reasoning: if reasoning_effort != "none" && reasoning_effort != "default" { - Some(ReasoningConfig { - enabled: true, - effort: Some(reasoning_effort.to_string()), - }) - } else { - None - }, - chat_template_kwargs: None, - priority, - }; - - let url = format!("{}/chat/completions", base_url); - let msg_count = request.messages.len(); - let pri_label = match priority { - Some(p) => format!(", priority={}", p), - None => String::new(), - }; - let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label); - let request_json = serde_json::to_string_pretty(&request).ok(); - - let mut response = super::send_and_check( - client, - &url, - &request, - ("Authorization", &format!("Bearer {}", api_key)), - &[], - &debug_label, - request_json.as_deref(), - ) - .await?; - - let mut reader = super::SseReader::new(); - reader.request_json = request_json; - - let mut content_len: usize = 0; - let mut reasoning_chars: usize = 0; - let mut tool_call_count: usize = 0; - let mut empty_deltas: u64 = 0; - let mut first_content_at = None; - let mut finish_reason = None; - let mut usage = None; - - while let Some(event) = reader.next_event(&mut response).await? { - if let Some(err_msg) = event["error"]["message"].as_str() { - let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or(""); - dbglog!( - "API error in stream: {}", err_msg - ); - anyhow::bail!("API error in stream: {} {}", err_msg, raw); - } - - let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) { - Ok(c) => c, - Err(e) => { - let preview = event.to_string(); - dbglog!( - "unparseable SSE event ({}): {}", - e, &preview[..preview.len().min(300)] - ); - continue; - } - }; - - if let Some(ref u) = chunk.usage { - let _ = tx.send(StreamEvent::Usage(u.clone())); - usage = chunk.usage; - } - - for choice in &chunk.choices { - if choice.finish_reason.is_some() { - finish_reason = choice.finish_reason.clone(); - } - - let has_content = choice.delta.content.is_some(); - let has_tools = choice.delta.tool_calls.is_some(); - - // Reasoning tokens — multiple field names across providers - let mut has_reasoning = false; - for r in [ - choice.delta.reasoning_content.as_ref(), - choice.delta.reasoning.as_ref(), - ].into_iter().flatten() { - reasoning_chars += r.len(); - has_reasoning = true; - if !r.is_empty() { - let _ = tx.send(StreamEvent::Reasoning(r.clone())); - } - } - if let Some(ref r) = choice.delta.reasoning_details { - let s = r.to_string(); - reasoning_chars += s.len(); - has_reasoning = true; - if !s.is_empty() && s != "null" { - let _ = tx.send(StreamEvent::Reasoning(s)); - } - } - - if let Some(ref text_delta) = choice.delta.content { - if first_content_at.is_none() && !text_delta.is_empty() { - first_content_at = Some(reader.stream_start.elapsed()); - } - content_len += text_delta.len(); - let _ = tx.send(StreamEvent::Content(text_delta.clone())); - } - - if let Some(ref tc_deltas) = choice.delta.tool_calls { - for tc_delta in tc_deltas { - tool_call_count = tool_call_count.max(tc_delta.index + 1); - let _ = tx.send(StreamEvent::ToolCallDelta { - index: tc_delta.index, - id: tc_delta.id.clone(), - call_type: tc_delta.call_type.clone(), - name: tc_delta.function.as_ref().and_then(|f| f.name.clone()), - arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()), - }); - } - } - - if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() { - empty_deltas += 1; - } - } - } - - let total_elapsed = reader.stream_start.elapsed(); - - super::log_diagnostics( - content_len, - tool_call_count, - reasoning_chars, - reasoning_effort, - &finish_reason, - reader.chunks_received, - reader.sse_lines_parsed, - reader.sse_parse_errors, - empty_deltas, - total_elapsed, - first_content_at, - &usage, - &[], // tool_calls not accumulated here anymore - ); - - let reason = finish_reason.unwrap_or_default(); - let _ = tx.send(StreamEvent::Finished { reason }); - - Ok(()) -} - /// Stream from /v1/completions with raw token IDs in and out. /// Each SSE chunk yields one token (text + id). All parsing (think tags, /// tool calls) is handled by the ResponseParser, not here. @@ -230,7 +55,6 @@ pub(super) async fn stream_completions( .await?; let mut reader = super::SseReader::new(); - let mut content_len: usize = 0; let mut usage = None; while let Some(event) = reader.next_event(&mut response).await? { @@ -256,16 +80,13 @@ pub(super) async fn stream_completions( if let Some(ids) = token_ids { for (i, id_val) in ids.iter().enumerate() { if let Some(id) = id_val.as_u64() { - content_len += text.len(); - let _ = tx.send(StreamToken::Token { + let _ = tx.send(StreamToken::Token { text: if i == 0 { text.to_string() } else { String::new() }, id: id as u32, }); } } } else if !text.is_empty() { - // Fallback: text without token IDs (shouldn't happen with return_token_ids) - content_len += text.len(); let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 }); } } diff --git a/src/agent/api/parsing.rs b/src/agent/api/parsing.rs deleted file mode 100644 index e252f3c..0000000 --- a/src/agent/api/parsing.rs +++ /dev/null @@ -1,209 +0,0 @@ -// parsing.rs — Tool call parsing for leaked/streamed XML -// -// When models stream tool calls as XML text (Qwen-style -// blocks) rather than structured tool_calls, this module extracts -// them from the response text. -// -// Handles two wire formats: -// - Qwen XML: value -// - JSON: {"name": "...", "arguments": {...}} -// -// Also handles streaming artifacts: whitespace inside XML tags from -// token boundaries, tags, etc. - -use super::types::{ToolCall, FunctionCall}; - -/// Parse leaked tool calls from response text. -/// Looks for `...` blocks and tries both -/// XML and JSON formats for the body. -/// Parse a single tool call body (content between `` and ``). -pub(crate) fn parse_tool_call_body(body: &str) -> Option { - let normalized = normalize_xml_tags(body); - let body = normalized.trim(); - let mut counter = 0u32; - parse_xml_tool_call(body, &mut counter) - .or_else(|| parse_json_tool_call(body, &mut counter)) -} - -pub(super) fn parse_leaked_tool_calls(text: &str) -> Vec { - // Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "" - // This handles streaming tokenizers that split tags across tokens. - let normalized = normalize_xml_tags(text); - let text = &normalized; - - let mut calls = Vec::new(); - let mut search_from = 0; - let mut call_counter: u32 = 0; - - while let Some(start) = text[search_from..].find("") { - let abs_start = search_from + start; - let after_tag = abs_start + "".len(); - - let end = match text[after_tag..].find("") { - Some(pos) => after_tag + pos, - None => break, - }; - - let body = text[after_tag..end].trim(); - search_from = end + "".len(); - - // Try XML format first, then JSON - if let Some(call) = parse_xml_tool_call(body, &mut call_counter) { - calls.push(call); - } else if let Some(call) = parse_json_tool_call(body, &mut call_counter) { - calls.push(call); - } - } - - calls -} - -/// Normalize whitespace inside XML-like tags for streaming tokenizers. -/// Collapses whitespace between `<` and `>` so that `<\nfunction\n=\nbash\n>` -/// becomes ``, and `` becomes ``. -/// Leaves content between tags untouched. -fn normalize_xml_tags(text: &str) -> String { - let mut result = String::with_capacity(text.len()); - let mut chars = text.chars().peekable(); - while let Some(ch) = chars.next() { - if ch == '<' { - let mut tag = String::from('<'); - for inner in chars.by_ref() { - if inner == '>' { - tag.push('>'); - break; - } else if inner.is_whitespace() { - // Skip whitespace inside tags - } else { - tag.push(inner); - } - } - result.push_str(&tag); - } else { - result.push(ch); - } - } - result -} - -/// Parse a Qwen-style `body` pseudo-XML element. -/// Returns `(value, body, rest)` on success. -fn parse_qwen_tag<'a>(s: &'a str, tag: &str) -> Option<(&'a str, &'a str, &'a str)> { - let open = format!("<{}=", tag); - let close = format!("", tag); - - let start = s.find(&open)? + open.len(); - let name_end = start + s[start..].find('>')?; - let body_start = name_end + 1; - let body_end = body_start + s[body_start..].find(&close)?; - - Some(( - s[start..name_end].trim(), - s[body_start..body_end].trim(), - &s[body_end + close.len()..], - )) -} - -/// Parse Qwen's XML tool call format. -fn parse_xml_tool_call(body: &str, counter: &mut u32) -> Option { - let (func_name, func_body, _) = parse_qwen_tag(body, "function")?; - let func_name = func_name.to_string(); - - let mut args = serde_json::Map::new(); - let mut rest = func_body; - while let Some((key, val, remainder)) = parse_qwen_tag(rest, "parameter") { - args.insert(key.to_string(), serde_json::Value::String(val.to_string())); - rest = remainder; - } - - *counter += 1; - Some(ToolCall { - id: format!("leaked_{}", counter), - call_type: "function".to_string(), - function: FunctionCall { - name: func_name, - arguments: serde_json::to_string(&args).unwrap_or_default(), - }, - }) -} - -/// Parse JSON tool call format (some models emit this). -fn parse_json_tool_call(body: &str, counter: &mut u32) -> Option { - let v: serde_json::Value = serde_json::from_str(body).ok()?; - let name = v["name"].as_str()?; - let arguments = &v["arguments"]; - - *counter += 1; - Some(ToolCall { - id: format!("leaked_{}", counter), - call_type: "function".to_string(), - function: FunctionCall { - name: name.to_string(), - arguments: serde_json::to_string(arguments).unwrap_or_default(), - }, - }) -} - -/// Strip tool call XML and thinking tokens from text so the conversation -/// history stays clean. Removes `...` blocks and -/// `` tags (thinking content before them is kept — it's useful context). -pub(super) fn strip_leaked_artifacts(text: &str) -> String { - let normalized = normalize_xml_tags(text); - let mut result = normalized.clone(); - - // Remove ... blocks - while let Some(start) = result.find("") { - if let Some(end_pos) = result[start..].find("") { - let end = start + end_pos + "".len(); - result = format!("{}{}", &result[..start], &result[end..]); - } else { - break; - } - } - - // Remove tags (but keep the thinking text before them) - result = result.replace("", ""); - - result.trim().to_string() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_leaked_tool_call_clean() { - let text = "thinking\n\n\n\npoc-memory used core-personality\n\n"; - let calls = parse_leaked_tool_calls(text); - assert_eq!(calls.len(), 1); - assert_eq!(calls[0].function.name, "bash"); - let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap(); - assert_eq!(args["command"], "poc-memory used core-personality"); - } - - #[test] - fn test_leaked_tool_call_streamed_whitespace() { - // Streaming tokenizer splits XML tags across tokens with newlines - let text = "\n<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd\n\n"; - let calls = parse_leaked_tool_calls(text); - assert_eq!(calls.len(), 1, "should parse streamed format"); - assert_eq!(calls[0].function.name, "bash"); - let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap(); - assert_eq!(args["command"], "pwd"); - } - - #[test] - fn test_normalize_preserves_content() { - let text = "\necho hello world\n"; - let normalized = normalize_xml_tags(text); - // Newlines between tags are not inside tags, so preserved - assert_eq!(normalized, "\necho hello world\n"); - } - - #[test] - fn test_normalize_strips_tag_internal_whitespace() { - let text = "<\nfunction\n=\nbash\n>"; - let normalized = normalize_xml_tags(text); - assert_eq!(normalized, ""); - } -}