From e72e660cd1d94e76f4986ca0fda387b68558c457 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 5 Jan 2023 18:32:26 +0100 Subject: [PATCH] channels: add `Distribute` pact This PR adds the `Distribute` pact that aims to evenly distribute data among all workers by routing each container to a randomly selected worker. Traditionally this "defensive distribution" could be implemented using an `Exchange` pact whose key function round-robined the records or randomly distributed them in some other way. While this works it has couple of downsides: * The key function is calculated once per record, instead of once per container * Each record must be copied out of the original container into a per-worker container depending on the key function The `Distribute` pact streamlines this pattern by avoiding copying each record to a separate container and immediately pushing each container to a random worker. == Future work == A potential future improvement is to circulate in-band statistics over the channel about how many messages each workers has seen. This would allow each worker to estimate the current skew and only leap into action once things are bad enough. Signed-off-by: Petros Angelatos --- timely/Cargo.toml | 1 + timely/src/dataflow/channels/pact.rs | 41 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 0b550e53c..01f0fc326 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -21,6 +21,7 @@ bincode= ["timely_communication/bincode"] getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] +fnv="1.0.2" getopts-dep = { package = "getopts", version = "0.2.14", optional = true } serde = "1.0" serde_derive = "1.0" diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19..3cfd37601 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -8,6 +8,7 @@ //! The progress tracking logic assumes that this number is independent of the pact used. use std::{fmt::{self, Debug}, marker::PhantomData}; +use std::hash::{Hash, Hasher}; use timely_container::PushPartitioned; use crate::communication::{Push, Pull, Data}; @@ -52,6 +53,46 @@ impl ParallelizationContractCore for Pipeline { } } +/// A connection that dynamically distributes records to all workers +#[derive(Debug)] +pub struct Distribute; + +impl ParallelizationContractCore for Distribute { + type Pusher = DistributePusher>>>>; + type Puller = LogPuller>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + (DistributePusher::new(senders), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + } +} + +/// Distributes records among target pushees. +/// +/// It is more efficient than `Exchange` when the target worker doesn't matter +pub struct DistributePusher

{ + pushers: Vec

, +} + +impl

DistributePusher

{ + /// Allocates a new `DistributePusher` from a supplied set of pushers + pub fn new(pushers: Vec

) -> DistributePusher

{ + DistributePusher { + pushers, + } + } +} + +impl>> Push> for DistributePusher

{ + fn push(&mut self, message: &mut Option>) { + let mut state: fnv::FnvHasher = Default::default(); + std::time::Instant::now().hash(&mut state); + let worker_idx = (state.finish() as usize) % self.pushers.len(); + self.pushers[worker_idx].push(message); + } +} + /// An exchange between multiple observers by data pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> }