Skip to content

Commit 72f734f

Browse files
authored
feat(grpc): add round_robin LB policy (#2451)
This change is based heavily on #2405, especially the tests.
1 parent d0d3f62 commit 72f734f

File tree

4 files changed

+1373
-17
lines changed

4 files changed

+1373
-17
lines changed

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
// production.
3131

3232
use std::collections::HashSet;
33+
use std::error::Error;
3334
use std::fmt::Debug;
3435
use std::sync::Mutex;
3536
use std::{collections::HashMap, hash::Hash, mem, sync::Arc};
@@ -287,9 +288,10 @@ where
287288
&mut self,
288289
child_updates: impl IntoIterator<Item = ChildUpdate<T>>,
289290
channel_controller: &mut dyn ChannelController,
290-
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
291+
) -> Result<(), Box<dyn Error + Send + Sync>> {
291292
// Split the child updates into the IDs and builders, and the
292293
// ResolverUpdates/LbConfigs.
294+
let mut errs = vec![];
293295
let (ids_builders, updates): (Vec<_>, Vec<_>) = child_updates
294296
.into_iter()
295297
.map(|e| ((e.child_identifier, e.child_policy_builder), e.child_update))
@@ -306,14 +308,59 @@ where
306308
continue;
307309
};
308310
let mut channel_controller = WrappedController::new(channel_controller);
309-
let _ = child.policy.resolver_update(
311+
if let Err(err) = child.policy.resolver_update(
310312
resolver_update,
311313
config.as_ref(),
312314
&mut channel_controller,
313-
);
315+
) {
316+
errs.push(err);
317+
}
314318
self.resolve_child_controller(channel_controller, child_idx);
315319
}
316-
Ok(())
320+
if errs.is_empty() {
321+
Ok(())
322+
} else {
323+
let err = errs
324+
.into_iter()
325+
.map(|e| e.to_string())
326+
.collect::<Vec<_>>()
327+
.join("; ");
328+
Err(err.into())
329+
}
330+
}
331+
332+
/// Forwards the `resolver_update` and `config` to all current children.
333+
///
334+
/// Returns the Result from calling into each child.
335+
pub fn resolver_update(
336+
&mut self,
337+
resolver_update: ResolverUpdate,
338+
config: Option<&LbConfig>,
339+
channel_controller: &mut dyn ChannelController,
340+
) -> Result<(), Box<dyn Error + Send + Sync>> {
341+
let mut errs = Vec::with_capacity(self.children.len());
342+
for child_idx in 0..self.children.len() {
343+
let child = &mut self.children[child_idx];
344+
let mut channel_controller = WrappedController::new(channel_controller);
345+
if let Err(err) = child.policy.resolver_update(
346+
resolver_update.clone(),
347+
config,
348+
&mut channel_controller,
349+
) {
350+
errs.push(err);
351+
}
352+
self.resolve_child_controller(channel_controller, child_idx);
353+
}
354+
if errs.is_empty() {
355+
Ok(())
356+
} else {
357+
let err = errs
358+
.into_iter()
359+
.map(|e| e.to_string())
360+
.collect::<Vec<_>>()
361+
.join("; ");
362+
Err(err.into())
363+
}
317364
}
318365

319366
/// Forwards the incoming subchannel_update to the child that created the
@@ -434,6 +481,7 @@ mod test {
434481
use crate::client::ConnectivityState;
435482
use crate::rt::default_runtime;
436483
use std::collections::HashMap;
484+
use std::error::Error;
437485
use std::panic;
438486
use std::sync::Arc;
439487
use std::sync::Mutex;
@@ -498,7 +546,7 @@ mod test {
498546
endpoints: Vec<Endpoint>,
499547
builder: Arc<dyn LbPolicyBuilder>,
500548
tcc: &mut dyn ChannelController,
501-
) {
549+
) -> Result<(), Box<dyn Error + Send + Sync>> {
502550
let updates = endpoints.iter().map(|e| ChildUpdate {
503551
child_identifier: e.clone(),
504552
child_policy_builder: builder.clone(),
@@ -513,7 +561,7 @@ mod test {
513561
)),
514562
});
515563

516-
assert!(child_manager.update(updates, tcc).is_ok());
564+
child_manager.update(updates, tcc)
517565
}
518566

519567
fn move_subchannel_to_state(
@@ -595,7 +643,8 @@ mod test {
595643
endpoints.clone(),
596644
builder,
597645
tcc.as_mut(),
598-
);
646+
)
647+
.unwrap();
599648
let mut subchannels = vec![];
600649
for endpoint in endpoints {
601650
subchannels.push(
@@ -648,7 +697,8 @@ mod test {
648697
endpoints.clone(),
649698
builder,
650699
tcc.as_mut(),
651-
);
700+
)
701+
.unwrap();
652702
let mut subchannels = vec![];
653703
for endpoint in endpoints {
654704
subchannels.push(
@@ -699,7 +749,8 @@ mod test {
699749
endpoints.clone(),
700750
builder,
701751
tcc.as_mut(),
702-
);
752+
)
753+
.unwrap();
703754
let mut subchannels = vec![];
704755
for endpoint in endpoints {
705756
subchannels.push(
@@ -740,7 +791,8 @@ mod test {
740791
endpoints.clone(),
741792
builder,
742793
tcc.as_mut(),
743-
);
794+
)
795+
.unwrap();
744796
let mut subchannels = vec![];
745797
for endpoint in endpoints {
746798
subchannels.push(

grpc/src/client/load_balancing/graceful_switch.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct GracefulSwitchLbConfig {
2626
/// to to active and tear down the previously active policy.
2727
#[derive(Debug)]
2828
pub(crate) struct GracefulSwitchPolicy {
29-
child_manager: ChildManager<()>, // Child ID is the name of the child policy.
29+
child_manager: ChildManager<()>, // Child ID empty - only the name of the child LB policy matters.
3030
last_update: Option<LbState>, // Saves the last output LbState to determine if an update is needed.
3131
active_child_builder: Option<Arc<dyn LbPolicyBuilder>>,
3232
}
@@ -69,11 +69,9 @@ impl LbPolicy for GracefulSwitchPolicy {
6969
});
7070
}
7171

72-
let res = self
73-
.child_manager
74-
.update(children.into_iter(), channel_controller)?;
72+
let res = self.child_manager.update(children, channel_controller);
7573
self.update_picker(channel_controller);
76-
Ok(())
74+
res
7775
}
7876

7977
fn subchannel_update(

grpc/src/client/load_balancing/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::client::{
4949
pub(crate) mod child_manager;
5050
pub(crate) mod graceful_switch;
5151
pub(crate) mod pick_first;
52+
pub(crate) mod round_robin;
5253

5354
#[cfg(test)]
5455
pub(crate) mod test_utils;
@@ -606,11 +607,11 @@ impl Picker for QueuingPicker {
606607
}
607608

608609
#[derive(Debug)]
609-
pub(crate) struct Failing {
610+
pub(crate) struct FailingPicker {
610611
pub error: String,
611612
}
612613

613-
impl Picker for Failing {
614+
impl Picker for FailingPicker {
614615
fn pick(&self, _: &Request) -> PickResult {
615616
PickResult::Fail(Status::unavailable(self.error.clone()))
616617
}

0 commit comments

Comments
 (0)