Kill dead API code: stream_events, parsing.rs, build_response_message, log_diagnostics

Deleted: api/parsing.rs entirely (parsing now in context_new.rs),
stream_events (chat completions path), collect_stream, build_response_message,
log_diagnostics, tools_to_json_str, start_stream, chat_completion_stream_temp.

API layer is now just: stream_completion (token IDs in/out), SseReader,
send_and_check, and types. Zero errors in api/.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-08 15:06:33 -04:00
parent 48db4a42cc
commit 1e5cd0dd3f
3 changed files with 1 additions and 529 deletions

View file

@ -7,7 +7,6 @@
// Set POC_DEBUG=1 for verbose per-turn logging.
pub mod http;
pub(crate) mod parsing;
mod types;
mod openai;
@ -21,8 +20,6 @@ use self::http::{HttpClient, HttpResponse};
use tokio::sync::mpsc;
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
/// A JoinHandle that aborts its task when dropped.
pub(crate) struct AbortOnDrop(tokio::task::JoinHandle<()>);
@ -44,12 +41,6 @@ pub(crate) struct SamplingParams {
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
/// Build the tools JSON string from a slice of Tools.
fn tools_to_json_str(tools: &[agent_tools::Tool]) -> String {
let inner: Vec<String> = tools.iter().map(|t| t.to_json()).collect();
format!("[{}]", inner.join(","))
}
/// One token from the streaming completions API.
pub(crate) enum StreamToken {
Token { text: String, id: u32 },
@ -359,134 +350,3 @@ impl SseReader {
}
}
}
/// Build a response Message from accumulated content and tool calls.
/// Shared by both backends — the wire format differs but the internal
/// representation is the same.
///
/// If no structured tool calls came from the API but the content
/// contains leaked tool call XML (e.g. `<tool_call>...</tool_call>`
/// from models that emit tool calls as text), parse them out and
/// promote them to structured tool_calls. This way all consumers
/// see tool calls uniformly regardless of backend.
pub(crate) fn build_response_message(
content: String,
tool_calls: Vec<ToolCall>,
) -> Message {
// If the API returned structured tool calls, use them as-is.
if !tool_calls.is_empty() {
return Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: Some(tool_calls),
tool_call_id: None,
name: None,
timestamp: None,
};
}
// Check for leaked tool calls in content text.
let leaked = parsing::parse_leaked_tool_calls(&content);
if !leaked.is_empty() {
let cleaned = parsing::strip_leaked_artifacts(&content);
return Message {
role: Role::Assistant,
content: if cleaned.trim().is_empty() { None }
else { Some(MessageContent::Text(cleaned)) },
tool_calls: Some(leaked),
tool_call_id: None,
name: None,
timestamp: None,
};
}
Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}
}
/// Log stream diagnostics. Shared by both backends.
pub(crate) fn log_diagnostics(
content_len: usize,
tool_count: usize,
reasoning_chars: usize,
reasoning_effort: &str,
finish_reason: &Option<String>,
chunks_received: u64,
sse_lines_parsed: u64,
sse_parse_errors: u64,
empty_deltas: u64,
total_elapsed: Duration,
first_content_at: Option<Duration>,
usage: &Option<Usage>,
tools: &[ToolCall],
) {
let debug = std::env::var("POC_DEBUG").is_ok();
if reasoning_chars > 0 && reasoning_effort == "none" {
dbglog!(
"note: {} chars leaked reasoning (suppressed from display)",
reasoning_chars
);
}
if content_len == 0 && tool_count == 0 {
dbglog!(
"WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \
parse_errors: {}, empty_deltas: {}, {:.1}s)",
finish_reason, chunks_received, reasoning_chars,
sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64()
);
}
if finish_reason.is_none() && chunks_received > 0 {
dbglog!(
"WARNING: stream ended without finish_reason ({} chunks, {} content chars)",
chunks_received, content_len
);
}
if sse_parse_errors > 0 {
dbglog!(
"WARNING: {} SSE parse errors out of {} lines",
sse_parse_errors, sse_lines_parsed
);
}
if debug {
if let Some(u) = usage {
dbglog!(
"tokens: {} prompt + {} completion = {} total",
u.prompt_tokens, u.completion_tokens, u.total_tokens
);
}
let ttft = first_content_at
.map(|d| format!("{:.1}s", d.as_secs_f64()))
.unwrap_or_else(|| "none".to_string());
dbglog!(
"stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \
{} content chars, {} reasoning chars, {} tools, \
finish={:?}",
total_elapsed.as_secs_f64(),
ttft,
chunks_received,
sse_lines_parsed,
content_len,
reasoning_chars,
tool_count,
finish_reason,
);
if !tools.is_empty() {
for (i, tc) in tools.iter().enumerate() {
dbglog!(
" tool[{}]: {} (id: {}, {} arg chars)",
i, tc.function.name, tc.id, tc.function.arguments.len()
);
}
}
}
}

View file

@ -11,181 +11,6 @@ use super::http::HttpClient;
use super::types::*;
use super::StreamToken;
/// Stream SSE events from an OpenAI-compatible endpoint, sending
/// parsed StreamEvents through the channel. The caller (runner)
/// handles routing to the UI.
pub(super) async fn stream_events(
client: &HttpClient,
base_url: &str,
api_key: &str,
model: &str,
messages: &[Message],
tools_json: &serde_json::Value,
tx: &mpsc::UnboundedSender<StreamEvent>,
reasoning_effort: &str,
sampling: super::SamplingParams,
priority: Option<i32>,
) -> Result<()> {
let has_tools = tools_json.as_array().map_or(false, |a| !a.is_empty());
let request = ChatRequest {
model: model.to_string(),
messages: messages.to_vec(),
tool_choice: if has_tools { Some("auto".to_string()) } else { None },
tools: if has_tools { Some(tools_json.clone()) } else { None },
max_tokens: Some(16384),
temperature: Some(sampling.temperature),
top_p: Some(sampling.top_p),
top_k: Some(sampling.top_k),
stream: Some(true),
reasoning: if reasoning_effort != "none" && reasoning_effort != "default" {
Some(ReasoningConfig {
enabled: true,
effort: Some(reasoning_effort.to_string()),
})
} else {
None
},
chat_template_kwargs: None,
priority,
};
let url = format!("{}/chat/completions", base_url);
let msg_count = request.messages.len();
let pri_label = match priority {
Some(p) => format!(", priority={}", p),
None => String::new(),
};
let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label);
let request_json = serde_json::to_string_pretty(&request).ok();
let mut response = super::send_and_check(
client,
&url,
&request,
("Authorization", &format!("Bearer {}", api_key)),
&[],
&debug_label,
request_json.as_deref(),
)
.await?;
let mut reader = super::SseReader::new();
reader.request_json = request_json;
let mut content_len: usize = 0;
let mut reasoning_chars: usize = 0;
let mut tool_call_count: usize = 0;
let mut empty_deltas: u64 = 0;
let mut first_content_at = None;
let mut finish_reason = None;
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
dbglog!(
"API error in stream: {}", err_msg
);
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
}
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
Ok(c) => c,
Err(e) => {
let preview = event.to_string();
dbglog!(
"unparseable SSE event ({}): {}",
e, &preview[..preview.len().min(300)]
);
continue;
}
};
if let Some(ref u) = chunk.usage {
let _ = tx.send(StreamEvent::Usage(u.clone()));
usage = chunk.usage;
}
for choice in &chunk.choices {
if choice.finish_reason.is_some() {
finish_reason = choice.finish_reason.clone();
}
let has_content = choice.delta.content.is_some();
let has_tools = choice.delta.tool_calls.is_some();
// Reasoning tokens — multiple field names across providers
let mut has_reasoning = false;
for r in [
choice.delta.reasoning_content.as_ref(),
choice.delta.reasoning.as_ref(),
].into_iter().flatten() {
reasoning_chars += r.len();
has_reasoning = true;
if !r.is_empty() {
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
}
}
if let Some(ref r) = choice.delta.reasoning_details {
let s = r.to_string();
reasoning_chars += s.len();
has_reasoning = true;
if !s.is_empty() && s != "null" {
let _ = tx.send(StreamEvent::Reasoning(s));
}
}
if let Some(ref text_delta) = choice.delta.content {
if first_content_at.is_none() && !text_delta.is_empty() {
first_content_at = Some(reader.stream_start.elapsed());
}
content_len += text_delta.len();
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
}
if let Some(ref tc_deltas) = choice.delta.tool_calls {
for tc_delta in tc_deltas {
tool_call_count = tool_call_count.max(tc_delta.index + 1);
let _ = tx.send(StreamEvent::ToolCallDelta {
index: tc_delta.index,
id: tc_delta.id.clone(),
call_type: tc_delta.call_type.clone(),
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
});
}
}
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
empty_deltas += 1;
}
}
}
let total_elapsed = reader.stream_start.elapsed();
super::log_diagnostics(
content_len,
tool_call_count,
reasoning_chars,
reasoning_effort,
&finish_reason,
reader.chunks_received,
reader.sse_lines_parsed,
reader.sse_parse_errors,
empty_deltas,
total_elapsed,
first_content_at,
&usage,
&[], // tool_calls not accumulated here anymore
);
let reason = finish_reason.unwrap_or_default();
let _ = tx.send(StreamEvent::Finished { reason });
Ok(())
}
/// Stream from /v1/completions with raw token IDs in and out.
/// Each SSE chunk yields one token (text + id). All parsing (think tags,
/// tool calls) is handled by the ResponseParser, not here.
@ -230,7 +55,6 @@ pub(super) async fn stream_completions(
.await?;
let mut reader = super::SseReader::new();
let mut content_len: usize = 0;
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
@ -256,7 +80,6 @@ pub(super) async fn stream_completions(
if let Some(ids) = token_ids {
for (i, id_val) in ids.iter().enumerate() {
if let Some(id) = id_val.as_u64() {
content_len += text.len();
let _ = tx.send(StreamToken::Token {
text: if i == 0 { text.to_string() } else { String::new() },
id: id as u32,
@ -264,8 +87,6 @@ pub(super) async fn stream_completions(
}
}
} else if !text.is_empty() {
// Fallback: text without token IDs (shouldn't happen with return_token_ids)
content_len += text.len();
let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 });
}
}

View file

@ -1,209 +0,0 @@
// parsing.rs — Tool call parsing for leaked/streamed XML
//
// When models stream tool calls as XML text (Qwen-style <tool_call>
// blocks) rather than structured tool_calls, this module extracts
// them from the response text.
//
// Handles two wire formats:
// - Qwen XML: <function=name><parameter=key>value</parameter></function>
// - JSON: {"name": "...", "arguments": {...}}
//
// Also handles streaming artifacts: whitespace inside XML tags from
// token boundaries, </think> tags, etc.
use super::types::{ToolCall, FunctionCall};
/// Parse leaked tool calls from response text.
/// Looks for `<tool_call>...</tool_call>` blocks and tries both
/// XML and JSON formats for the body.
/// Parse a single tool call body (content between `<tool_call>` and `</tool_call>`).
pub(crate) fn parse_tool_call_body(body: &str) -> Option<ToolCall> {
let normalized = normalize_xml_tags(body);
let body = normalized.trim();
let mut counter = 0u32;
parse_xml_tool_call(body, &mut counter)
.or_else(|| parse_json_tool_call(body, &mut counter))
}
pub(super) fn parse_leaked_tool_calls(text: &str) -> Vec<ToolCall> {
// Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "<function=bash>"
// This handles streaming tokenizers that split tags across tokens.
let normalized = normalize_xml_tags(text);
let text = &normalized;
let mut calls = Vec::new();
let mut search_from = 0;
let mut call_counter: u32 = 0;
while let Some(start) = text[search_from..].find("<tool_call>") {
let abs_start = search_from + start;
let after_tag = abs_start + "<tool_call>".len();
let end = match text[after_tag..].find("</tool_call>") {
Some(pos) => after_tag + pos,
None => break,
};
let body = text[after_tag..end].trim();
search_from = end + "</tool_call>".len();
// Try XML format first, then JSON
if let Some(call) = parse_xml_tool_call(body, &mut call_counter) {
calls.push(call);
} else if let Some(call) = parse_json_tool_call(body, &mut call_counter) {
calls.push(call);
}
}
calls
}
/// Normalize whitespace inside XML-like tags for streaming tokenizers.
/// Collapses whitespace between `<` and `>` so that `<\nfunction\n=\nbash\n>`
/// becomes `<function=bash>`, and `</\nparameter\n>` becomes `</parameter>`.
/// Leaves content between tags untouched.
fn normalize_xml_tags(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '<' {
let mut tag = String::from('<');
for inner in chars.by_ref() {
if inner == '>' {
tag.push('>');
break;
} else if inner.is_whitespace() {
// Skip whitespace inside tags
} else {
tag.push(inner);
}
}
result.push_str(&tag);
} else {
result.push(ch);
}
}
result
}
/// Parse a Qwen-style `<tag=value>body</tag>` pseudo-XML element.
/// Returns `(value, body, rest)` on success.
fn parse_qwen_tag<'a>(s: &'a str, tag: &str) -> Option<(&'a str, &'a str, &'a str)> {
let open = format!("<{}=", tag);
let close = format!("</{}>", tag);
let start = s.find(&open)? + open.len();
let name_end = start + s[start..].find('>')?;
let body_start = name_end + 1;
let body_end = body_start + s[body_start..].find(&close)?;
Some((
s[start..name_end].trim(),
s[body_start..body_end].trim(),
&s[body_end + close.len()..],
))
}
/// Parse Qwen's XML tool call format.
fn parse_xml_tool_call(body: &str, counter: &mut u32) -> Option<ToolCall> {
let (func_name, func_body, _) = parse_qwen_tag(body, "function")?;
let func_name = func_name.to_string();
let mut args = serde_json::Map::new();
let mut rest = func_body;
while let Some((key, val, remainder)) = parse_qwen_tag(rest, "parameter") {
args.insert(key.to_string(), serde_json::Value::String(val.to_string()));
rest = remainder;
}
*counter += 1;
Some(ToolCall {
id: format!("leaked_{}", counter),
call_type: "function".to_string(),
function: FunctionCall {
name: func_name,
arguments: serde_json::to_string(&args).unwrap_or_default(),
},
})
}
/// Parse JSON tool call format (some models emit this).
fn parse_json_tool_call(body: &str, counter: &mut u32) -> Option<ToolCall> {
let v: serde_json::Value = serde_json::from_str(body).ok()?;
let name = v["name"].as_str()?;
let arguments = &v["arguments"];
*counter += 1;
Some(ToolCall {
id: format!("leaked_{}", counter),
call_type: "function".to_string(),
function: FunctionCall {
name: name.to_string(),
arguments: serde_json::to_string(arguments).unwrap_or_default(),
},
})
}
/// Strip tool call XML and thinking tokens from text so the conversation
/// history stays clean. Removes `<tool_call>...</tool_call>` blocks and
/// `</think>` tags (thinking content before them is kept — it's useful context).
pub(super) fn strip_leaked_artifacts(text: &str) -> String {
let normalized = normalize_xml_tags(text);
let mut result = normalized.clone();
// Remove <tool_call>...</tool_call> blocks
while let Some(start) = result.find("<tool_call>") {
if let Some(end_pos) = result[start..].find("</tool_call>") {
let end = start + end_pos + "</tool_call>".len();
result = format!("{}{}", &result[..start], &result[end..]);
} else {
break;
}
}
// Remove </think> tags (but keep the thinking text before them)
result = result.replace("</think>", "");
result.trim().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_leaked_tool_call_clean() {
let text = "thinking\n</think>\n<tool_call>\n<function=bash>\n<parameter=command>poc-memory used core-personality</parameter>\n</function>\n</tool_call>";
let calls = parse_leaked_tool_calls(text);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].function.name, "bash");
let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap();
assert_eq!(args["command"], "poc-memory used core-personality");
}
#[test]
fn test_leaked_tool_call_streamed_whitespace() {
// Streaming tokenizer splits XML tags across tokens with newlines
let text = "<tool_call>\n<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd</\nparameter\n>\n</\nfunction\n>\n</tool_call>";
let calls = parse_leaked_tool_calls(text);
assert_eq!(calls.len(), 1, "should parse streamed format");
assert_eq!(calls[0].function.name, "bash");
let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap();
assert_eq!(args["command"], "pwd");
}
#[test]
fn test_normalize_preserves_content() {
let text = "<function=bash>\n<parameter=command>echo hello world</parameter>\n</function>";
let normalized = normalize_xml_tags(text);
// Newlines between tags are not inside tags, so preserved
assert_eq!(normalized, "<function=bash>\n<parameter=command>echo hello world</parameter>\n</function>");
}
#[test]
fn test_normalize_strips_tag_internal_whitespace() {
let text = "<\nfunction\n=\nbash\n>";
let normalized = normalize_xml_tags(text);
assert_eq!(normalized, "<function=bash>");
}
}