diff --git a/src/agent/tools/channels.rs b/src/agent/tools/channels.rs index 170217b..716c1ed 100644 --- a/src/agent/tools/channels.rs +++ b/src/agent/tools/channels.rs @@ -323,17 +323,35 @@ async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { sup.load_config(); sup.ensure_running(); - let mut result = Vec::new(); + let mut futs = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { - result.push((daemon_name, false, 0)); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { vec![(name, false, 0u32)] } + })); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); - match rpc_list(&sock).await { - None => result.push((daemon_name, false, 0)), - Some(channels) if channels.is_empty() => result.push((daemon_name, true, 0)), - Some(channels) => result.extend(channels), + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(3), + rpc_list(&sock), + ).await { + Ok(Some(channels)) if !channels.is_empty() => channels, + Ok(Some(_)) => vec![(name, true, 0)], + _ => vec![(name, false, 0)], + } + } + })); + } + + let mut result = Vec::new(); + for fut in futs { + if let Ok(entries) = fut.await { + result.extend(entries); } } result diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 3bdbac6..30eba5d 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -208,25 +208,35 @@ async fn fetch_all_channels_inner() -> Vec<(String, bool, u32)> { sup.load_config(); sup.ensure_running(); // restart any dead daemons - let mut result = Vec::new(); + let mut futs = Vec::new(); for (daemon_name, _enabled, alive) in sup.status() { if !alive { - result.push((daemon_name, false, 0)); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { vec![(name, false, 0u32)] } + })); continue; } let sock = channels_dir.join(format!("{}.sock", daemon_name)); - match query_one_daemon(&sock).await { - None => { - // Connection failed despite socket existing - result.push((daemon_name, false, 0)); - } - Some(channels) if channels.is_empty() => { - // Connected but no channels yet - result.push((daemon_name, true, 0)); - } - Some(channels) => { - result.extend(channels); + futs.push(tokio::task::spawn_local({ + let name = daemon_name.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(3), + query_one_daemon(&sock), + ).await { + Ok(Some(channels)) if !channels.is_empty() => channels, + Ok(Some(_)) => vec![(name, true, 0)], + _ => vec![(name, false, 0)], + } } + })); + } + + let mut result = Vec::new(); + for fut in futs { + if let Ok(entries) = fut.await { + result.extend(entries); } } result