diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 85d0543..d2016c9 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -17,6 +17,15 @@ use tokio::sync::mpsc; use crate::agent::types::*; use crate::agent::ui_channel::{UiMessage, UiSender}; +/// A JoinHandle that aborts its task when dropped. +pub struct AbortOnDrop(tokio::task::JoinHandle<()>); + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); + } +} + // ───────────────────────────────────────────────────────────── // Stream events — yielded by backends, consumed by the runner // ───────────────────────────────────────────────────────────── @@ -82,7 +91,7 @@ impl ApiClient { reasoning_effort: &str, temperature: Option, priority: Option, - ) -> mpsc::UnboundedReceiver { + ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); let client = self.client.clone(); let api_key = self.api_key.clone(); @@ -93,7 +102,7 @@ impl ApiClient { let reasoning_effort = reasoning_effort.to_string(); let base_url = self.base_url.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let result = openai::stream_events( &client, &base_url, &api_key, &model, &messages, tools.as_deref(), &tx, &ui_tx, @@ -104,7 +113,7 @@ impl ApiClient { } }); - rx + (rx, AbortOnDrop(handle)) } pub async fn chat_completion_stream_temp( @@ -117,7 +126,7 @@ impl ApiClient { priority: Option, ) -> Result<(Message, Option)> { // Use the event stream and accumulate into a message. - let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); + let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; @@ -296,7 +305,7 @@ impl SseReader { pub(crate) fn new(ui_tx: &UiSender) -> Self { Self { line_buf: String::new(), - chunk_timeout: Duration::from_secs(120), + chunk_timeout: Duration::from_secs(60), stream_start: Instant::now(), chunks_received: 0, sse_lines_parsed: 0, diff --git a/src/agent/runner.rs b/src/agent/runner.rs index bd4eab6..e4fe9f6 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -246,7 +246,7 @@ impl Agent { // Stream events from the API — we route each event to the // appropriate UI pane rather than letting the API layer do it. let api_messages = self.assemble_api_messages(); - let mut rx = self.client.start_stream( + let (mut rx, _stream_guard) = self.client.start_stream( &api_messages, Some(&self.tool_defs), ui_tx,