From 13d9cc962ea25b035dc23849d5545f9430fba580 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Thu, 2 Apr 2026 18:41:02 -0400 Subject: [PATCH] abort orphaned stream tasks on drop, reduce timeout to 60s Spawned streaming tasks were never cancelled when a turn ended or retried, leaving zombie tasks blocked on dead vLLM connections. AbortOnDrop wrapper aborts the task when it goes out of scope. Chunk timeout reduced from 120s to 60s. Co-Authored-By: Proof of Concept --- src/agent/api/mod.rs | 19 ++++++++++++++----- src/agent/runner.rs | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 85d0543..d2016c9 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -17,6 +17,15 @@ use tokio::sync::mpsc; use crate::agent::types::*; use crate::agent::ui_channel::{UiMessage, UiSender}; +/// A JoinHandle that aborts its task when dropped. +pub struct AbortOnDrop(tokio::task::JoinHandle<()>); + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); + } +} + // ───────────────────────────────────────────────────────────── // Stream events — yielded by backends, consumed by the runner // ───────────────────────────────────────────────────────────── @@ -82,7 +91,7 @@ impl ApiClient { reasoning_effort: &str, temperature: Option, priority: Option, - ) -> mpsc::UnboundedReceiver { + ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); let client = self.client.clone(); let api_key = self.api_key.clone(); @@ -93,7 +102,7 @@ impl ApiClient { let reasoning_effort = reasoning_effort.to_string(); let base_url = self.base_url.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let result = openai::stream_events( &client, &base_url, &api_key, &model, &messages, tools.as_deref(), &tx, &ui_tx, @@ -104,7 +113,7 @@ impl ApiClient { } }); - rx + (rx, AbortOnDrop(handle)) } pub async fn chat_completion_stream_temp( @@ -117,7 +126,7 @@ impl ApiClient { priority: Option, ) -> Result<(Message, Option)> { // Use the event stream and accumulate into a message. - let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); + let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; @@ -296,7 +305,7 @@ impl SseReader { pub(crate) fn new(ui_tx: &UiSender) -> Self { Self { line_buf: String::new(), - chunk_timeout: Duration::from_secs(120), + chunk_timeout: Duration::from_secs(60), stream_start: Instant::now(), chunks_received: 0, sse_lines_parsed: 0, diff --git a/src/agent/runner.rs b/src/agent/runner.rs index bd4eab6..e4fe9f6 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -246,7 +246,7 @@ impl Agent { // Stream events from the API — we route each event to the // appropriate UI pane rather than letting the API layer do it. let api_messages = self.assemble_api_messages(); - let mut rx = self.client.start_stream( + let (mut rx, _stream_guard) = self.client.start_stream( &api_messages, Some(&self.tool_defs), ui_tx,