abort orphaned stream tasks on drop, reduce timeout to 60s

Spawned streaming tasks were never cancelled when a turn ended or
retried, leaving zombie tasks blocked on dead vLLM connections.
AbortOnDrop wrapper aborts the task when it goes out of scope.

Chunk timeout reduced from 120s to 60s.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-02 18:41:02 -04:00
parent 0148dbaa06
commit 13d9cc962e
2 changed files with 15 additions and 6 deletions

View file

@ -17,6 +17,15 @@ use tokio::sync::mpsc;
use crate::agent::types::*;
use crate::agent::ui_channel::{UiMessage, UiSender};
/// A JoinHandle that aborts its task when dropped.
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
// ─────────────────────────────────────────────────────────────
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
@ -82,7 +91,7 @@ impl ApiClient {
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> mpsc::UnboundedReceiver<StreamEvent> {
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
@ -93,7 +102,7 @@ impl ApiClient {
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, tools.as_deref(), &tx, &ui_tx,
@ -104,7 +113,7 @@ impl ApiClient {
}
});
rx
(rx, AbortOnDrop(handle))
}
pub async fn chat_completion_stream_temp(
@ -117,7 +126,7 @@ impl ApiClient {
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority);
let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
@ -296,7 +305,7 @@ impl SseReader {
pub(crate) fn new(ui_tx: &UiSender) -> Self {
Self {
line_buf: String::new(),
chunk_timeout: Duration::from_secs(120),
chunk_timeout: Duration::from_secs(60),
stream_start: Instant::now(),
chunks_received: 0,
sse_lines_parsed: 0,

View file

@ -246,7 +246,7 @@ impl Agent {
// Stream events from the API — we route each event to the
// appropriate UI pane rather than letting the API layer do it.
let api_messages = self.assemble_api_messages();
let mut rx = self.client.start_stream(
let (mut rx, _stream_guard) = self.client.start_stream(
&api_messages,
Some(&self.tool_defs),
ui_tx,