Skip to content

Commit d4e0bc1

Browse files
authored
[sled-diagnostics] use ParallelTaskSet for multiple commands (#8151)
As a part of the #8166 investigation we decided that we should move from `FuturesUnordered` to a `JoinSet` to gain parallelism.
1 parent 947515b commit d4e0bc1

File tree

3 files changed

+84
-42
lines changed

3 files changed

+84
-42
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sled-diagnostics/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ libc.workspace = true
1717
omicron-workspace-hack.workspace = true
1818
once_cell.workspace = true
1919
oxlog.workspace = true
20+
parallel-task-set.workspace = true
2021
rand.workspace = true
2122
schemars.workspace = true
2223
serde.workspace = true

sled-diagnostics/src/lib.rs

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
//! Diagnostics for an Oxide sled that exposes common support commands.
66
7-
use futures::{StreamExt, stream::FuturesUnordered};
7+
use futures::StreamExt;
8+
use futures::stream::FuturesUnordered;
9+
use parallel_task_set::ParallelTaskSet;
810
use slog::Logger;
911

1012
#[macro_use]
@@ -29,6 +31,9 @@ pub use crate::queries::{
2931
};
3032
use queries::*;
3133

34+
/// Max number of ptool commands to run in parallel
35+
const MAX_PTOOL_PARALLELISM: usize = 50;
36+
3237
/// List all zones on a sled.
3338
pub async fn zoneadm_info()
3439
-> Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError> {
@@ -38,33 +43,43 @@ pub async fn zoneadm_info()
3843
/// Retrieve various `ipadm` command output for the system.
3944
pub async fn ipadm_info()
4045
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
41-
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
42-
.into_iter()
43-
.map(|c| async move {
44-
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
45-
})
46-
.collect::<FuturesUnordered<_>>()
47-
.collect::<Vec<Result<_, _>>>()
48-
.await
46+
let mut results = Vec::new();
47+
let mut commands = ParallelTaskSet::new();
48+
for command in
49+
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
50+
{
51+
if let Some(res) = commands
52+
.spawn(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
53+
.await
54+
{
55+
results.push(res);
56+
}
57+
}
58+
results.extend(commands.join_all().await);
59+
results
4960
}
5061

5162
/// Retrieve various `dladm` command output for the system.
5263
pub async fn dladm_info()
5364
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
54-
[
65+
let mut results = Vec::new();
66+
let mut commands = ParallelTaskSet::new();
67+
for command in [
5568
dladm_show_phys(),
5669
dladm_show_ether(),
5770
dladm_show_link(),
5871
dladm_show_vnic(),
5972
dladm_show_linkprop(),
60-
]
61-
.into_iter()
62-
.map(|c| async move {
63-
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
64-
})
65-
.collect::<FuturesUnordered<_>>()
66-
.collect::<Vec<Result<_, _>>>()
67-
.await
73+
] {
74+
if let Some(res) = commands
75+
.spawn(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
76+
.await
77+
{
78+
results.push(res);
79+
}
80+
}
81+
results.extend(commands.join_all().await);
82+
results
6883
}
6984

7085
pub async fn nvmeadm_info()
@@ -83,14 +98,23 @@ pub async fn pargs_oxide_processes(
8398
Err(e) => return vec![Err(e.into())],
8499
};
85100

86-
pids.iter()
87-
.map(|pid| pargs_process(*pid))
88-
.map(|c| async move {
89-
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
90-
})
91-
.collect::<FuturesUnordered<_>>()
92-
.collect::<Vec<Result<_, _>>>()
93-
.await
101+
let mut results = Vec::with_capacity(pids.len());
102+
let mut commands =
103+
ParallelTaskSet::new_with_parallelism(MAX_PTOOL_PARALLELISM);
104+
for pid in pids {
105+
if let Some(res) = commands
106+
.spawn(execute_command_with_timeout(
107+
pargs_process(pid),
108+
DEFAULT_TIMEOUT,
109+
))
110+
.await
111+
{
112+
results.push(res);
113+
}
114+
}
115+
116+
results.extend(commands.join_all().await);
117+
results
94118
}
95119

96120
pub async fn pstack_oxide_processes(
@@ -104,14 +128,22 @@ pub async fn pstack_oxide_processes(
104128
Err(e) => return vec![Err(e.into())],
105129
};
106130

107-
pids.iter()
108-
.map(|pid| pstack_process(*pid))
109-
.map(|c| async move {
110-
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
111-
})
112-
.collect::<FuturesUnordered<_>>()
113-
.collect::<Vec<Result<_, _>>>()
114-
.await
131+
let mut results = Vec::with_capacity(pids.len());
132+
let mut commands =
133+
ParallelTaskSet::new_with_parallelism(MAX_PTOOL_PARALLELISM);
134+
for pid in pids {
135+
if let Some(res) = commands
136+
.spawn(execute_command_with_timeout(
137+
pstack_process(pid),
138+
DEFAULT_TIMEOUT,
139+
))
140+
.await
141+
{
142+
results.push(res);
143+
}
144+
}
145+
results.extend(commands.join_all().await);
146+
results
115147
}
116148

117149
pub async fn pfiles_oxide_processes(
@@ -125,14 +157,22 @@ pub async fn pfiles_oxide_processes(
125157
Err(e) => return vec![Err(e.into())],
126158
};
127159

128-
pids.iter()
129-
.map(|pid| pfiles_process(*pid))
130-
.map(|c| async move {
131-
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
132-
})
133-
.collect::<FuturesUnordered<_>>()
134-
.collect::<Vec<Result<_, _>>>()
135-
.await
160+
let mut results = Vec::with_capacity(pids.len());
161+
let mut commands =
162+
ParallelTaskSet::new_with_parallelism(MAX_PTOOL_PARALLELISM);
163+
for pid in pids {
164+
if let Some(res) = commands
165+
.spawn(execute_command_with_timeout(
166+
pfiles_process(pid),
167+
DEFAULT_TIMEOUT,
168+
))
169+
.await
170+
{
171+
results.push(res);
172+
}
173+
}
174+
results.extend(commands.join_all().await);
175+
results
136176
}
137177

138178
/// Retrieve various `zfs` command output for the system.

0 commit comments

Comments
 (0)