From 483767d5e7f3093d932ed76904870b23969c068b Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 6 Jan 2026 20:57:23 +0530 Subject: [PATCH 1/5] refactor(repartition): split BatchPartitioner::try_new into hash and round-robin constructors --- .../physical-plan/src/repartition/mod.rs | 86 ++++++++++++++----- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 1efdaaabc7d6a..ce0622b238da4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -437,30 +437,61 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new( - partitioning: Partitioning, + /// Create a new [`BatchPartitioner`] for hash partitioning + pub fn try_new_hash( + exprs: Vec>, + num_partitions: usize, timer: metrics::Time, - input_partition: usize, - num_input_partitions: usize, ) -> Result { - let state = match partitioning { - Partitioning::RoundRobinBatch(num_partitions) => { - BatchPartitionerState::RoundRobin { - num_partitions, - // Distribute starting index evenly based on input partition, number of input partitions and number of partitions - // to avoid they all start at partition 0 and heavily skew on the lower partitions - next_idx: ((input_partition * num_partitions) / num_input_partitions), - } - } - Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { + Ok(Self { + state: BatchPartitionerState::Hash { exprs, num_partitions, hash_buffer: vec![], }, - other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), - }; + timer, + }) + } + + /// Create a new [`BatchPartitioner`] for round-robin partitioning + pub fn try_new_round_robin( + num_partitions: usize, + timer: metrics::Time, + input_partition: usize, + num_input_partitions: usize, + ) -> Result { + Ok(Self { + state: BatchPartitionerState::RoundRobin { + num_partitions, + next_idx: (input_partition * num_partitions) / num_input_partitions, + }, + timer, + }) + } + + pub fn try_new( + partitioning: Partitioning, + timer: metrics::Time, + input_partition: usize, + num_input_partitions: usize, + ) -> Result { + match partitioning { + Partitioning::Hash(exprs, num_partitions) => { + Self::try_new_hash(exprs, num_partitions, timer) + } + Partitioning::RoundRobinBatch(num_partitions) => { + Self::try_new_round_robin( + num_partitions, + timer, + input_partition, + num_input_partitions, + ) + } + other => { + not_impl_err!("Unsupported repartitioning scheme {other:?}") + } +} - Ok(Self { state, timer }) } /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] @@ -1245,12 +1276,27 @@ impl RepartitionExec { input_partition: usize, num_input_partitions: usize, ) -> Result<()> { - let mut partitioner = BatchPartitioner::try_new( - partitioning, + let mut partitioner = match &partitioning { + Partitioning::Hash(exprs, num_partitions) => { + BatchPartitioner::try_new_hash( + exprs.clone(), + *num_partitions, + metrics.repartition_time.clone(), + )? + } + Partitioning::RoundRobinBatch(num_partitions) => { + BatchPartitioner::try_new_round_robin( + *num_partitions, metrics.repartition_time.clone(), input_partition, num_input_partitions, - )?; + )? + } + other => { + return not_impl_err!("Unsupported repartitioning scheme {other:?}"); + } +}; + // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); From 4f31ece3178844316762ec99f8a361f391b5ef23 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 6 Jan 2026 21:32:25 +0530 Subject: [PATCH 2/5] chore: run rustfmt --- .../physical-plan/src/repartition/mod.rs | 72 +++++++++---------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index ce0622b238da4..cb6b79a895d2e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -437,7 +437,7 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - /// Create a new [`BatchPartitioner`] for hash partitioning + /// Create a new [`BatchPartitioner`] for hash partitioning pub fn try_new_hash( exprs: Vec>, num_partitions: usize, @@ -475,23 +475,20 @@ impl BatchPartitioner { input_partition: usize, num_input_partitions: usize, ) -> Result { - match partitioning { - Partitioning::Hash(exprs, num_partitions) => { - Self::try_new_hash(exprs, num_partitions, timer) - } - Partitioning::RoundRobinBatch(num_partitions) => { - Self::try_new_round_robin( - num_partitions, - timer, - input_partition, - num_input_partitions, - ) - } - other => { - not_impl_err!("Unsupported repartitioning scheme {other:?}") - } -} - + match partitioning { + Partitioning::Hash(exprs, num_partitions) => { + Self::try_new_hash(exprs, num_partitions, timer) + } + Partitioning::RoundRobinBatch(num_partitions) => Self::try_new_round_robin( + num_partitions, + timer, + input_partition, + num_input_partitions, + ), + other => { + not_impl_err!("Unsupported repartitioning scheme {other:?}") + } + } } /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] @@ -1276,27 +1273,24 @@ impl RepartitionExec { input_partition: usize, num_input_partitions: usize, ) -> Result<()> { - let mut partitioner = match &partitioning { - Partitioning::Hash(exprs, num_partitions) => { - BatchPartitioner::try_new_hash( - exprs.clone(), - *num_partitions, - metrics.repartition_time.clone(), - )? - } - Partitioning::RoundRobinBatch(num_partitions) => { - BatchPartitioner::try_new_round_robin( - *num_partitions, - metrics.repartition_time.clone(), - input_partition, - num_input_partitions, - )? - } - other => { - return not_impl_err!("Unsupported repartitioning scheme {other:?}"); - } -}; - + let mut partitioner = match &partitioning { + Partitioning::Hash(exprs, num_partitions) => BatchPartitioner::try_new_hash( + exprs.clone(), + *num_partitions, + metrics.repartition_time.clone(), + )?, + Partitioning::RoundRobinBatch(num_partitions) => { + BatchPartitioner::try_new_round_robin( + *num_partitions, + metrics.repartition_time.clone(), + input_partition, + num_input_partitions, + )? + } + other => { + return not_impl_err!("Unsupported repartitioning scheme {other:?}"); + } + }; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); From b96b8492bdc35b2d6582936c950eaf85d933142e Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 7 Jan 2026 00:30:04 +0530 Subject: [PATCH 3/5] docs(repartition): clarify parameters and rename non-fallible constructors --- .../physical-plan/src/repartition/mod.rs | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index cb6b79a895d2e..b46b5f11572da 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -434,39 +434,54 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seeds(0, 0, 0, 0); impl BatchPartitioner { - /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] + /// Create a new [`BatchPartitioner`] for hash-based repartitioning. /// - /// The time spent repartitioning will be recorded to `timer` - /// Create a new [`BatchPartitioner`] for hash partitioning - pub fn try_new_hash( + /// # Parameters + /// - `exprs`: Expressions used to compute the hash for each input row. + /// - `num_partitions`: Total number of output partitions. + /// - `timer`: Metric used to record time spent during repartitioning. + /// + /// # Notes + /// This constructor cannot fail and performs no validation. + pub fn new_hash_partitioner( exprs: Vec>, num_partitions: usize, timer: metrics::Time, - ) -> Result { - Ok(Self { + ) -> Self { + Self { state: BatchPartitionerState::Hash { exprs, num_partitions, hash_buffer: vec![], }, timer, - }) + } } - /// Create a new [`BatchPartitioner`] for round-robin partitioning - pub fn try_new_round_robin( + /// Create a new [`BatchPartitioner`] for round-robin repartitioning. + /// + /// # Parameters + /// - `num_partitions`: Total number of output partitions. + /// - `timer`: Metric used to record time spent during repartitioning. + /// - `input_partition`: Index of the current input partition. + /// - `num_input_partitions`: Total number of input partitions. + /// + /// # Notes + /// The starting output partition is derived from the input partition + /// to avoid skew when multiple input partitions are used. + pub fn new_round_robin_partitioner( num_partitions: usize, timer: metrics::Time, input_partition: usize, num_input_partitions: usize, - ) -> Result { - Ok(Self { + ) -> Self { + Self { state: BatchPartitionerState::RoundRobin { num_partitions, next_idx: (input_partition * num_partitions) / num_input_partitions, }, timer, - }) + } } pub fn try_new( @@ -477,14 +492,16 @@ impl BatchPartitioner { ) -> Result { match partitioning { Partitioning::Hash(exprs, num_partitions) => { - Self::try_new_hash(exprs, num_partitions, timer) + Ok(Self::new_hash_partitioner(exprs, num_partitions, timer)) + } + Partitioning::RoundRobinBatch(num_partitions) => { + Ok(Self::new_round_robin_partitioner( + num_partitions, + timer, + input_partition, + num_input_partitions, + )) } - Partitioning::RoundRobinBatch(num_partitions) => Self::try_new_round_robin( - num_partitions, - timer, - input_partition, - num_input_partitions, - ), other => { not_impl_err!("Unsupported repartitioning scheme {other:?}") } @@ -1274,18 +1291,20 @@ impl RepartitionExec { num_input_partitions: usize, ) -> Result<()> { let mut partitioner = match &partitioning { - Partitioning::Hash(exprs, num_partitions) => BatchPartitioner::try_new_hash( - exprs.clone(), - *num_partitions, - metrics.repartition_time.clone(), - )?, + Partitioning::Hash(exprs, num_partitions) => { + BatchPartitioner::new_hash_partitioner( + exprs.clone(), + *num_partitions, + metrics.repartition_time.clone(), + ) + } Partitioning::RoundRobinBatch(num_partitions) => { - BatchPartitioner::try_new_round_robin( + BatchPartitioner::new_round_robin_partitioner( *num_partitions, metrics.repartition_time.clone(), input_partition, num_input_partitions, - )? + ) } other => { return not_impl_err!("Unsupported repartitioning scheme {other:?}"); From aeecd3d60e3f772b9df048a8279e4134115074c0 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 7 Jan 2026 02:01:41 +0530 Subject: [PATCH 4/5] docs(repartition): add documentation for try_new constructor --- datafusion/physical-plan/src/repartition/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b46b5f11572da..bde43ee178111 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -483,6 +483,19 @@ impl BatchPartitioner { timer, } } + /// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme. + /// + /// This is a convenience constructor that delegates to the specialized + /// hash or round-robin constructors depending on the partitioning variant. + /// + /// # Parameters + /// - `partitioning`: Partitioning scheme to apply (hash or round-robin). + /// - `timer`: Metric used to record time spent during repartitioning. + /// - `input_partition`: Index of the current input partition. + /// - `num_input_partitions`: Total number of input partitions. + /// + /// # Errors + /// Returns an error if the provided partitioning scheme is not supported. pub fn try_new( partitioning: Partitioning, From 849e6aed9e0eaab9bc2cd6da3acff75e182a99c2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 7 Jan 2026 02:20:08 +0530 Subject: [PATCH 5/5] fix(clippy): remove empty line after doc comment --- datafusion/physical-plan/src/repartition/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index bde43ee178111..d50404c8fc1e8 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -496,7 +496,6 @@ impl BatchPartitioner { /// /// # Errors /// Returns an error if the provided partitioning scheme is not supported. - pub fn try_new( partitioning: Partitioning, timer: metrics::Time,