From 9c79d7a037136ea78a4cf37cbfb5214dcb5cf7f4 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Wed, 8 Apr 2026 14:55:10 -0400 Subject: [PATCH] =?UTF-8?q?WIP:=20Wiring=20context=5Fnew=20into=20agent=20?= =?UTF-8?q?=E2=80=94=20turn=20loop,=20StreamToken,=20dead=20code=20removal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Work in progress. New turn loop uses ResponseParser + StreamToken. Killed StreamEvent, append_streaming, finalize_streaming, streaming_index, assemble_api_messages, working_stack. Many methods still reference old types — fixing next. Co-Authored-By: Proof of Concept --- Cargo.lock | 1 + src/agent/api/mod.rs | 36 +-- src/agent/api/openai.rs | 98 ++------ src/agent/mod.rs | 491 ++++++++++++++-------------------------- 4 files changed, 202 insertions(+), 424 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e91e1db..a8a7128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -360,6 +360,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index ff50e5a..69c685a 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -50,28 +50,10 @@ fn tools_to_json_str(tools: &[agent_tools::Tool]) -> String { format!("[{}]", inner.join(",")) } -/// Events produced by the streaming API backends. -/// The runner reads these and decides what to display where. -pub(crate) enum StreamEvent { - /// Content token from the model's response. - Content(String), - /// Reasoning/thinking token (internal monologue). - Reasoning(String), - /// Incremental tool call delta (structured, from APIs that support it). - ToolCallDelta { - index: usize, - id: Option, - call_type: Option, - name: Option, - arguments: Option, - }, - /// Token usage stats. - Usage(Usage), - /// Stream finished. - Finished { - reason: String, - }, - /// Error from the stream. +/// One token from the streaming completions API. +pub(crate) enum StreamToken { + Token { text: String, id: u32 }, + Done { usage: Option }, Error(String), } @@ -133,14 +115,14 @@ impl ApiClient { (rx, AbortOnDrop(handle)) } - /// Start a streaming completion with raw token IDs. - /// No message formatting — the caller provides the complete prompt as tokens. - pub(crate) fn start_stream_completions( + /// Stream a completion with raw token IDs. + /// Returns (text, token_id) per token via channel. + pub(crate) fn stream_completion( &self, prompt_tokens: &[u32], sampling: SamplingParams, priority: Option, - ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { + ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); let client = self.client.clone(); let api_key = self.api_key.clone(); @@ -154,7 +136,7 @@ impl ApiClient { &prompt_tokens, &tx, sampling, priority, ).await; if let Err(e) = result { - let _ = tx.send(StreamEvent::Error(e.to_string())); + let _ = tx.send(StreamToken::Error(e.to_string())); } }); diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index f4da2a6..4d766f2 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc; use super::http::HttpClient; use super::types::*; -use super::StreamEvent; +use super::StreamToken; /// Stream SSE events from an OpenAI-compatible endpoint, sending /// parsed StreamEvents through the channel. The caller (runner) @@ -186,16 +186,16 @@ pub(super) async fn stream_events( Ok(()) } -/// Stream from the /v1/completions endpoint using raw token IDs. -/// Tool calls come as text ( tags) and are parsed by the caller. -/// Thinking content comes as tags and is split into Reasoning events. +/// Stream from /v1/completions with raw token IDs in and out. +/// Each SSE chunk yields one token (text + id). All parsing (think tags, +/// tool calls) is handled by the ResponseParser, not here. pub(super) async fn stream_completions( client: &HttpClient, base_url: &str, api_key: &str, model: &str, prompt_tokens: &[u32], - tx: &mpsc::UnboundedSender, + tx: &mpsc::UnboundedSender, sampling: super::SamplingParams, priority: Option, ) -> Result<()> { @@ -207,6 +207,8 @@ pub(super) async fn stream_completions( "top_p": sampling.top_p, "top_k": sampling.top_k, "stream": true, + "return_token_ids": true, + "skip_special_tokens": false, "stop_token_ids": [super::super::tokenizer::IM_END], }); if let Some(p) = priority { @@ -229,20 +231,15 @@ pub(super) async fn stream_completions( let mut reader = super::SseReader::new(); let mut content_len: usize = 0; - let mut first_content_at = None; - let mut finish_reason = None; let mut usage = None; - let mut in_think = false; while let Some(event) = reader.next_event(&mut response).await? { if let Some(err_msg) = event["error"]["message"].as_str() { anyhow::bail!("API error in stream: {}", err_msg); } - // Completions chunks have a simpler structure if let Some(u) = event["usage"].as_object() { if let Ok(u) = serde_json::from_value::(serde_json::Value::Object(u.clone())) { - let _ = tx.send(StreamEvent::Usage(u.clone())); usage = Some(u); } } @@ -253,78 +250,27 @@ pub(super) async fn stream_completions( }; for choice in choices { - if let Some(reason) = choice["finish_reason"].as_str() { - if reason != "null" { - finish_reason = Some(reason.to_string()); - } - } + let text = choice["text"].as_str().unwrap_or(""); + let token_ids = choice["token_ids"].as_array(); - if let Some(text) = choice["text"].as_str() { - if text.is_empty() { continue; } - - // Handle tags — split into Reasoning vs Content - if text.contains("") || in_think { - // Simple state machine for think tags - let mut remaining = text; - while !remaining.is_empty() { - if in_think { - if let Some(end) = remaining.find("") { - let thinking = &remaining[..end]; - if !thinking.is_empty() { - let _ = tx.send(StreamEvent::Reasoning(thinking.to_string())); - } - remaining = &remaining[end + 8..]; - in_think = false; - } else { - let _ = tx.send(StreamEvent::Reasoning(remaining.to_string())); - break; - } - } else { - if let Some(start) = remaining.find("") { - let content = &remaining[..start]; - if !content.is_empty() { - content_len += content.len(); - if first_content_at.is_none() { - first_content_at = Some(reader.stream_start.elapsed()); - } - let _ = tx.send(StreamEvent::Content(content.to_string())); - } - remaining = &remaining[start + 7..]; - in_think = true; - } else { - content_len += remaining.len(); - if first_content_at.is_none() { - first_content_at = Some(reader.stream_start.elapsed()); - } - let _ = tx.send(StreamEvent::Content(remaining.to_string())); - break; - } - } + if let Some(ids) = token_ids { + for (i, id_val) in ids.iter().enumerate() { + if let Some(id) = id_val.as_u64() { + content_len += text.len(); + let _ = tx.send(StreamToken::Token { + text: if i == 0 { text.to_string() } else { String::new() }, + id: id as u32, + }); } - } else { - content_len += text.len(); - if first_content_at.is_none() { - first_content_at = Some(reader.stream_start.elapsed()); - } - let _ = tx.send(StreamEvent::Content(text.to_string())); } + } else if !text.is_empty() { + // Fallback: text without token IDs (shouldn't happen with return_token_ids) + content_len += text.len(); + let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 }); } } } - let total_elapsed = reader.stream_start.elapsed(); - super::log_diagnostics( - content_len, 0, 0, "none", - &finish_reason, - reader.chunks_received, - reader.sse_lines_parsed, - reader.sse_parse_errors, - 0, total_elapsed, first_content_at, - &usage, &[], - ); - - let reason = finish_reason.unwrap_or_default(); - let _ = tx.send(StreamEvent::Finished { reason }); - + let _ = tx.send(StreamToken::Done { usage }); Ok(()) } diff --git a/src/agent/mod.rs b/src/agent/mod.rs index c279e41..6bf2eee 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -23,13 +23,11 @@ pub mod tools; use std::sync::Arc; use anyhow::Result; -use api::{ApiClient, ToolCall}; -use api::{ContentPart, Message, MessageContent, Role}; -use context::{ConversationEntry, ContextEntry, ContextState}; +use api::ApiClient; +use context_new::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; use tools::{summarize_args, working_stack}; use crate::mind::log::ConversationLog; -use crate::agent::context::ContextSection; // --- Activity tracking (RAII guards) --- @@ -163,7 +161,6 @@ pub struct Agent { pub provenance: String, /// Persistent conversation log — append-only record of all messages. pub conversation_log: Option, - /// Mutable context state — personality, working stack, etc. pub context: ContextState, /// App config — used to reload identity on compaction and model switching. pub app_config: crate::config::AppConfig, @@ -190,11 +187,9 @@ impl Agent { conversation_log: Option, active_tools: tools::SharedActiveTools, ) -> Self { - let mut system = ContextSection::new("System prompt"); - system.push(ContextEntry::new( - ConversationEntry::System(Message::system(&system_prompt)), None)); + let mut context = ContextState::new(); + context.push(Section::System, AstNode::system_msg(&system_prompt)); - // Tool definitions — part of the context, tokenized and scored let tool_defs: Vec = tools::tools().iter() .map(|t| t.to_json()).collect(); if !tool_defs.is_empty() { @@ -207,22 +202,12 @@ impl Agent { IMPORTANT: Function calls MUST follow the specified format.", tool_defs.join("\n"), ); - system.push(ContextEntry::new( - ConversationEntry::System(Message::system(&tools_text)), None)); + context.push(Section::System, AstNode::system_msg(&tools_text)); } - let mut identity = ContextSection::new("Identity"); - for (_name, content) in &personality { - identity.push(ContextEntry::new( - ConversationEntry::Message(Message::user(content)), None)); + for (name, content) in &personality { + context.push(Section::Identity, AstNode::memory(name, content)); } - let context = ContextState { - system, - identity, - journal: ContextSection::new("Journal"), - conversation: ContextSection::new("Conversation"), - working_stack: Vec::new(), - }; let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let mut agent = Self { client, @@ -250,7 +235,6 @@ impl Agent { }; agent.load_startup_journal(); - agent.load_working_stack(); agent } @@ -287,165 +271,41 @@ impl Agent { } } - /// Assemble the full message list for the API call from typed sources. - /// System prompt + personality context + journal + conversation messages. - pub fn assemble_api_messages(&self) -> Vec { - let mut msgs = Vec::new(); - // System section - for e in self.context.system.entries() { - msgs.push(e.entry.api_message().clone()); - } - // Identity — render personality files + working stack into one user message - let ctx = self.context.render_context_message(); - if !ctx.is_empty() { - msgs.push(Message::user(ctx)); - } - // Journal — render into one user message - let jnl = self.context.render_journal(); - if !jnl.is_empty() { - msgs.push(Message::user(jnl)); - } - // Conversation entries - msgs.extend(self.context.conversation.entries().iter() - .filter(|e| !e.entry.is_log() && !e.entry.is_thinking()) - .map(|e| e.entry.api_message().clone())); - msgs - } - - /// Assemble the full prompt as token IDs for the completions API. - /// System section (includes tools), identity, journal, conversation, - /// then the assistant prompt suffix. + /// Assemble the full prompt as token IDs. + /// Context sections + assistant prompt suffix. pub fn assemble_prompt_tokens(&self) -> Vec { - let mut tokens = Vec::new(); - - // System section — includes system prompt + tool definitions - for e in self.context.system.entries() { - tokens.extend(&e.token_ids); - } - - // Identity — rendered as one user message - let ctx = self.context.render_context_message(); - if !ctx.is_empty() { - tokens.extend(tokenizer::tokenize_entry("user", &ctx)); - } - - // Journal — rendered as one user message - let jnl = self.context.render_journal(); - if !jnl.is_empty() { - tokens.extend(tokenizer::tokenize_entry("user", &jnl)); - } - - // Conversation entries — use cached token_ids - for e in self.context.conversation.entries() { - if e.entry.is_log() || e.entry.is_thinking() { continue; } - tokens.extend(&e.token_ids); - } - - // Prompt the assistant to respond + let mut tokens = self.context.token_ids(); tokens.push(tokenizer::IM_START); tokens.extend(tokenizer::encode("assistant\n")); - tokens } - /// Run agent orchestration cycle, returning structured output. - /// Push a conversation message — stamped and logged. - pub fn push_message(&mut self, mut msg: Message) { - msg.stamp(); - let entry = ConversationEntry::Message(msg); - self.push_entry(entry); - } - - pub fn push_entry(&mut self, entry: ConversationEntry) { + /// Push a node into the conversation and log it. + pub fn push_node(&mut self, node: AstNode) { if let Some(ref log) = self.conversation_log { - if let Err(e) = log.append(&entry) { + if let Err(e) = log.append_node(&node) { eprintln!("warning: failed to log entry: {:#}", e); } } - self.context.conversation.push(ContextEntry::new( - entry, Some(chrono::Utc::now()))); - + self.context.push(Section::Conversation, node); self.changed.notify_one(); } - /// Find the index of the in-progress streaming entry (unstamped assistant message). - fn streaming_index(&self) -> Option { - self.context.conversation.entries().iter().rposition(|ce| { - if ce.token_ids.is_empty() { return false; } - let m = ce.entry.message(); - m.role == Role::Assistant && m.timestamp.is_none() - }) - } - - /// Append streaming text to the last entry (creating a partial - /// assistant entry if needed). Called by collect_stream per token batch. - fn append_streaming(&mut self, text: &str) { - if let Some(idx) = self.streaming_index() { - assert!(!self.context.conversation.entries()[idx].token_ids.is_empty(), - "streaming_index returned entry with empty token_ids at {}", idx); - let mut msg = self.context.conversation.entries()[idx].entry.message().clone(); - msg.append_content(text); - self.context.conversation.set_message(idx, msg); - } else { - self.context.conversation.push(ContextEntry::new( - ConversationEntry::Message(Message { - role: Role::Assistant, - content: Some(MessageContent::Text(text.to_string())), - tool_calls: None, - tool_call_id: None, - name: None, - timestamp: None, - }), - None, - )); - } - - self.changed.notify_one(); - } - - /// Finalize the streaming entry with the complete response message. - /// Finds the unstamped assistant entry, replaces it via set() with proper token count. - fn finalize_streaming(&mut self, msg: Message) { - if let Some(i) = self.streaming_index() { - let mut stamped = msg.clone(); - stamped.stamp(); - self.context.conversation.set(i, ContextEntry::new( - ConversationEntry::Message(stamped), - Some(chrono::Utc::now()), - )); - } else { - self.push_message(msg.clone()); - } - - // Log the finalized entry - if let Some(ref log) = self.conversation_log { - let entry = ConversationEntry::Message(msg); - if let Err(e) = log.append(&entry) { - eprintln!("warning: failed to log finalized entry: {:#}", e); - } - } - - self.changed.notify_one(); - } - - /// Send a user message and run the agent loop until the model - /// produces a text response (no more tool calls). Streams text - /// and tool activity through the UI channel. - /// - /// Takes Arc> and manages locking internally so the - /// lock is never held across I/O (API streaming, tool dispatch). + /// Run the agent turn loop: assemble prompt, stream response, + /// parse into AST, dispatch tool calls, repeat until text response. pub async fn turn( agent: Arc>, ) -> Result { - // --- Pre-loop setup (lock 1): collect finished tools --- let active_tools = { - let mut finished = Vec::new(); - let tools = { - let me = agent.lock().await; + let me = agent.lock().await; + me.active_tools.clone() + }; - // Collect completed background tool handles — remove from active list - // but don't await yet (MutexGuard isn't Send). - let mut tools = me.active_tools.lock().unwrap(); + // Collect finished background tools + { + let mut finished = Vec::new(); + { + let mut tools = active_tools.lock().unwrap(); let mut i = 0; while i < tools.len() { if tools[i].handle.is_finished() { @@ -454,192 +314,181 @@ impl Agent { i += 1; } } - - me.active_tools.clone() - }; - - // Await finished handles without holding the agent lock - let mut bg_results = Vec::new(); - for entry in finished { - if let Ok((call, output)) = entry.handle.await { - bg_results.push((call, output)); - } } - - // Re-acquire to apply background tool results - if !bg_results.is_empty() { + if !finished.is_empty() { + let mut results = Vec::new(); + for entry in finished { + if let Ok((call, output)) = entry.handle.await { + results.push((call, output)); + } + } let mut me = agent.lock().await; let mut bg_ds = DispatchState::new(); - for (call, output) in bg_results { + for (call, output) in results { me.apply_tool_result(&call, output, &mut bg_ds); } } - - tools - }; + } let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; let mut ds = DispatchState::new(); loop { - // --- Lock 2: assemble messages, start stream --- let _thinking = start_activity(&agent, "thinking...").await; + + // Assemble prompt and start stream (brief lock) let (mut rx, _stream_guard) = { let me = agent.lock().await; - let sampling = api::SamplingParams { - temperature: me.temperature, - top_p: me.top_p, - top_k: me.top_k, - }; - if tokenizer::is_initialized() { - let prompt_tokens = me.assemble_prompt_tokens(); - me.client.start_stream_completions( - &prompt_tokens, - sampling, - None, - ) - } else { - let api_messages = me.assemble_api_messages(); - me.client.start_stream( - &api_messages, - &me.tools, - &me.reasoning_effort, - sampling, - None, - ) - } + let prompt_tokens = me.assemble_prompt_tokens(); + me.client.stream_completion( + &prompt_tokens, + api::SamplingParams { + temperature: me.temperature, + top_p: me.top_p, + top_k: me.top_k, + }, + None, + ) }; - // --- Lock released --- - // --- Stream loop (no lock) --- - let sr = api::collect_stream( - &mut rx, &agent, &active_tools, - ).await; - let api::StreamResult { - content, tool_calls, usage, finish_reason, - error: stream_error, display_buf, in_tool_call, reasoning, - } = sr; - // --- Stream complete --- - - // Push thinking entry if model produced reasoning - if !reasoning.is_empty() { + // Create assistant branch and parser (brief lock) + let branch_idx = { let mut me = agent.lock().await; - me.push_entry(context::ConversationEntry::Thinking(reasoning)); + let idx = me.context.len(Section::Conversation); + me.context.push(Section::Conversation, + AstNode::branch(Role::Assistant, vec![])); + idx + }; + let mut parser = ResponseParser::new(branch_idx); + let mut pending_calls: Vec = Vec::new(); + let mut had_content = false; + let mut stream_error: Option = None; + + // Stream loop — no lock held across I/O + while let Some(event) = rx.recv().await { + match event { + api::StreamToken::Token { text, id: _ } => { + had_content = true; + let mut me = agent.lock().await; + let calls = parser.feed(&text, &mut me.context); + for call in calls { + // Dispatch tool call immediately + let call_clone = call.clone(); + let agent_handle = agent.clone(); + let handle = tokio::spawn(async move { + let args: serde_json::Value = + serde_json::from_str(&call_clone.arguments).unwrap_or_default(); + let output = tools::dispatch_with_agent( + &call_clone.name, &args, Some(agent_handle), + ).await; + (call_clone, output) + }); + active_tools.lock().unwrap().push(tools::ActiveToolCall { + id: call.id.clone(), + name: call.name.clone(), + detail: call.arguments.clone(), + started: std::time::Instant::now(), + background: false, + handle, + }); + pending_calls.push(call); + } + } + api::StreamToken::Error(e) => { + stream_error = Some(e); + break; + } + api::StreamToken::Done { usage } => { + if let Some(u) = usage { + agent.lock().await.last_prompt_tokens = u.prompt_tokens; + } + break; + } + } } - // --- Lock 3: process results --- - let (msg, pending) = { + // Flush parser remainder + { let mut me = agent.lock().await; + parser.finish(&mut me.context); + } - // Handle stream errors with retry logic - if let Some(e) = stream_error { - let err = anyhow::anyhow!("{}", e); - if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { - overflow_retries += 1; - me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); - me.compact(); - continue; - } - if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { - empty_retries += 1; - me.notify(format!("stream error — retrying ({}/2)", empty_retries)); - drop(me); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - return Err(err); - } - - if finish_reason.as_deref() == Some("error") { - let detail = if content.is_empty() { "no details".into() } else { content }; - return Err(anyhow::anyhow!("model stream error: {}", detail)); - } - - // Flush remaining display buffer to streaming entry - if !in_tool_call && !display_buf.is_empty() { - me.append_streaming(&display_buf); - } - - let msg = api::build_response_message(content, tool_calls); - - if let Some(usage) = &usage { - me.last_prompt_tokens = usage.prompt_tokens; - } - - // Empty response — nudge and retry - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); - if !has_content && !has_tools { - if empty_retries < 2 { - empty_retries += 1; - dbglog!( - "empty response, injecting nudge and retrying ({}/2)", - empty_retries, - ); - me.push_message(Message::user( - "[system] Your previous response was empty. \ - Please respond with text or use a tool." - )); - continue; - } - } else { - empty_retries = 0; - } - - // Collect non-background tool calls fired during streaming - let mut tools_guard = active_tools.lock().unwrap(); - let mut non_bg = Vec::new(); - let mut i = 0; - while i < tools_guard.len() { - if !tools_guard[i].background { - non_bg.push(tools_guard.remove(i)); - } else { - i += 1; - } - } - (msg, non_bg) - }; - - if !pending.is_empty() { - agent.lock().await.finalize_streaming(msg.clone()); - - // Drop lock before awaiting tool handles - let mut results = Vec::new(); - for entry in pending { - if let Ok(r) = entry.handle.await { - results.push(r); - } - } - // Reacquire to apply results + // Handle errors + if let Some(e) = stream_error { + let err = anyhow::anyhow!("{}", e); let mut me = agent.lock().await; - for (call, output) in results { - me.apply_tool_result(&call, output, &mut ds); + if context_new::is_context_overflow(&err) && overflow_retries < 2 { + overflow_retries += 1; + me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); + me.compact(); + continue; + } + if context_new::is_stream_error(&err) && empty_retries < 2 { + empty_retries += 1; + me.notify(format!("stream error — retrying ({}/2)", empty_retries)); + drop(me); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + return Err(err); + } + + // Empty response — nudge and retry + if !had_content && pending_calls.is_empty() { + if empty_retries < 2 { + empty_retries += 1; + let mut me = agent.lock().await; + me.push_node(AstNode::user_msg( + "[system] Your previous response was empty. \ + Please respond with text or use a tool." + )); + continue; + } + } else { + empty_retries = 0; + } + + // Wait for tool calls to complete + if !pending_calls.is_empty() { + ds.had_tool_calls = true; + + // Collect non-background tool handles + let mut handles = Vec::new(); + { + let mut tools_guard = active_tools.lock().unwrap(); + let mut i = 0; + while i < tools_guard.len() { + if !tools_guard[i].background { + handles.push(tools_guard.remove(i)); + } else { + i += 1; + } + } + } + + for entry in handles { + if let Ok((call, output)) = entry.handle.await { + let mut me = agent.lock().await; + me.apply_tool_result(&call, output, &mut ds); + } } continue; } - // Tool calls (structured API path) - if let Some(ref tool_calls) = msg.tool_calls { - if !tool_calls.is_empty() { - agent.lock().await.finalize_streaming(msg.clone()); - let calls: Vec = tool_calls.clone(); - // Drop lock before tool dispatch - for call in &calls { - Agent::dispatch_tool_call_unlocked( - &agent, &active_tools, call, &mut ds, - ).await; - } - continue; - } - } + // Text-only response — extract text and return + let text = { + let me = agent.lock().await; + let children = me.context.conversation()[branch_idx].children(); + children.iter() + .filter_map(|c| c.leaf()) + .filter(|l| matches!(l.body(), NodeBody::Content(_))) + .map(|l| l.body().text()) + .collect::>() + .join("") + }; - // Genuinely text-only response - let text = msg.content_text().to_string(); let mut me = agent.lock().await; - me.finalize_streaming(msg); - - // Drain pending control flags if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }