From 4fc9676545ed18cd0d10840820c7646d5e248b69 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 11 Apr 2026 00:45:01 -0400 Subject: [PATCH] channels: parallel queries with timeout per daemon One misbehaving channel daemon (accepting connections but not responding to capnp RPCs) would block channel_list indefinitely. Spawn each daemon query as a separate task with a 3-second timeout. A hung daemon now shows as disconnected instead of hanging the entire tool call. Co-Authored-By: Kent Overstreet --- src/agent/tools/channels.rs | 30 ++++++++++++++++++++++++------ src/thalamus/channels.rs | 36 +++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs index 170217b..716c1ed 100644 --- a/src/agent/tools/channels.rs +++ b/src/agent/tools/channels.rs @@ -323,17 +323,35 @@ async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { sup.load_config(); sup.ensure_running(); - let mut result = Vec::new(); + let mut futs = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { - result.push((daemon_name, false, 0)); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { vec![(name, false, 0u32)] } + })); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); - match rpc_list(&sock).await { - None => result.push((daemon_name, false, 0)), - Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)), - Some(channels) => result.extend(channels), + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(3), + rpc_list(&sock), + ).await { + Ok(Some(channels)) if !channels.is_empty() => channels, + Ok(Some(_)) => vec![(name, true, 0)], + _ => vec![(name, false, 0)], + } + } + })); + } + + let mut result = Vec::new(); + for fut in futs { + if let Ok(entries) = fut.await { + result.extend(entries); } } result diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 3bdbac6..30eba5d 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -208,25 +208,35 @@ async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { sup.load_config(); sup.ensure_running(); // restart any dead daemons - let mut result = Vec::new(); + let mut futs = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { - result.push((daemon_name, false, 0)); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { vec![(name, false, 0u32)] } + })); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); - match query_one_daemon(&sock).await { - None => { - // Connection failed despite socket existing - result.push((daemon_name, false, 0)); - } - Some(channels) if channels.is_empty() => { - // Connected but no channels yet - result.push((daemon_name, true, 0)); - } - Some(channels) => { - result.extend(channels); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(3), + query_one_daemon(&sock), + ).await { + Ok(Some(channels)) if !channels.is_empty() => channels, + Ok(Some(_)) => vec![(name, true, 0)], + _ => vec![(name, false, 0)], + } } + })); + } + + let mut result = Vec::new(); + for fut in futs { + if let Ok(entries) = fut.await { + result.extend(entries); } } result