Skip to content

Commit c141fca

Browse files
authored
chore: enforce clippy::needless_pass_by_value to datafusion-physical-plan (#18864)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18545 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See issue ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 12cb4ca commit c141fca

File tree

29 files changed

+239
-209
lines changed

29 files changed

+239
-209
lines changed

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,7 @@ impl GroupValues for GroupValuesRows {
236236
// https://github.com/apache/datafusion/issues/7647
237237
for (field, array) in self.schema.fields.iter().zip(&mut output) {
238238
let expected = field.data_type();
239-
*array =
240-
dictionary_encode_if_necessary(Arc::<dyn Array>::clone(array), expected)?;
239+
*array = dictionary_encode_if_necessary(array, expected)?;
241240
}
242241

243242
self.group_values = Some(group_values);
@@ -259,7 +258,7 @@ impl GroupValues for GroupValuesRows {
259258
}
260259

261260
fn dictionary_encode_if_necessary(
262-
array: ArrayRef,
261+
array: &ArrayRef,
263262
expected: &DataType,
264263
) -> Result<ArrayRef> {
265264
match (expected, array.data_type()) {
@@ -269,10 +268,7 @@ fn dictionary_encode_if_necessary(
269268
.iter()
270269
.zip(struct_array.columns())
271270
.map(|(expected_field, column)| {
272-
dictionary_encode_if_necessary(
273-
Arc::<dyn Array>::clone(column),
274-
expected_field.data_type(),
275-
)
271+
dictionary_encode_if_necessary(column, expected_field.data_type())
276272
})
277273
.collect::<Result<Vec<_>>>()?;
278274

@@ -289,13 +285,13 @@ fn dictionary_encode_if_necessary(
289285
Arc::<arrow::datatypes::Field>::clone(expected_field),
290286
list.offsets().clone(),
291287
dictionary_encode_if_necessary(
292-
Arc::<dyn Array>::clone(list.values()),
288+
list.values(),
293289
expected_field.data_type(),
294290
)?,
295291
list.nulls().cloned(),
296292
)?))
297293
}
298294
(DataType::Dictionary(_, _), _) => Ok(cast(array.as_ref(), expected)?),
299-
(_, _) => Ok(Arc::<dyn Array>::clone(&array)),
295+
(_, _) => Ok(Arc::<dyn Array>::clone(array)),
300296
}
301297
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ impl AggregateExec {
626626
fn execute_typed(
627627
&self,
628628
partition: usize,
629-
context: Arc<TaskContext>,
629+
context: &Arc<TaskContext>,
630630
) -> Result<StreamType> {
631631
// no group by at all
632632
if self.group_by.expr.is_empty() {
@@ -761,7 +761,7 @@ impl AggregateExec {
761761
&self.input_order_mode
762762
}
763763

764-
fn statistics_inner(&self, child_statistics: Statistics) -> Result<Statistics> {
764+
fn statistics_inner(&self, child_statistics: &Statistics) -> Result<Statistics> {
765765
// TODO stats: group expressions:
766766
// - once expressions will be able to compute their own stats, use it here
767767
// - case where we group by on a column for which with have the `distinct` stat
@@ -1020,7 +1020,7 @@ impl ExecutionPlan for AggregateExec {
10201020
partition: usize,
10211021
context: Arc<TaskContext>,
10221022
) -> Result<SendableRecordBatchStream> {
1023-
self.execute_typed(partition, context)
1023+
self.execute_typed(partition, &context)
10241024
.map(|stream| stream.into())
10251025
}
10261026

@@ -1033,7 +1033,8 @@ impl ExecutionPlan for AggregateExec {
10331033
}
10341034

10351035
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1036-
self.statistics_inner(self.input().partition_statistics(partition)?)
1036+
let child_statistics = self.input().partition_statistics(partition)?;
1037+
self.statistics_inner(&child_statistics)
10371038
}
10381039

10391040
fn cardinality_effect(&self) -> CardinalityEffect {
@@ -2220,7 +2221,7 @@ mod tests {
22202221
Arc::clone(&input_schema),
22212222
)?);
22222223

2223-
let stream = partial_aggregate.execute_typed(0, Arc::clone(&task_ctx))?;
2224+
let stream = partial_aggregate.execute_typed(0, &task_ctx)?;
22242225

22252226
// ensure that we really got the version we wanted
22262227
match version {

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ impl AggregateStream {
6868
/// Create a new AggregateStream
6969
pub fn new(
7070
agg: &AggregateExec,
71-
context: Arc<TaskContext>,
71+
context: &Arc<TaskContext>,
7272
partition: usize,
7373
) -> Result<Self> {
7474
let agg_schema = Arc::clone(&agg.schema);
7575
let agg_filter_expr = agg.filter_expr.clone();
7676

7777
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
78-
let input = agg.input.execute(partition, Arc::clone(&context))?;
78+
let input = agg.input.execute(partition, Arc::clone(context))?;
7979

8080
let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?;
8181
let filter_expressions = match agg.mode {
@@ -115,7 +115,7 @@ impl AggregateStream {
115115
let timer = elapsed_compute.timer();
116116
let result = aggregate_batch(
117117
&this.mode,
118-
batch,
118+
&batch,
119119
&mut this.accumulators,
120120
&this.aggregate_expressions,
121121
&this.filter_expressions,
@@ -195,7 +195,7 @@ impl RecordBatchStream for AggregateStream {
195195
/// TODO: Make this a member function
196196
fn aggregate_batch(
197197
mode: &AggregateMode,
198-
batch: RecordBatch,
198+
batch: &RecordBatch,
199199
accumulators: &mut [AccumulatorItem],
200200
expressions: &[Vec<Arc<dyn PhysicalExpr>>],
201201
filters: &[Option<Arc<dyn PhysicalExpr>>],
@@ -215,8 +215,8 @@ fn aggregate_batch(
215215
.try_for_each(|((accum, expr), filter)| {
216216
// 1.2
217217
let batch = match filter {
218-
Some(filter) => Cow::Owned(batch_filter(&batch, filter)?),
219-
None => Cow::Borrowed(&batch),
218+
Some(filter) => Cow::Owned(batch_filter(batch, filter)?),
219+
None => Cow::Borrowed(batch),
220220
};
221221

222222
// 1.3

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ impl GroupedHashAggregateStream {
445445
/// Create a new GroupedHashAggregateStream
446446
pub fn new(
447447
agg: &AggregateExec,
448-
context: Arc<TaskContext>,
448+
context: &Arc<TaskContext>,
449449
partition: usize,
450450
) -> Result<Self> {
451451
debug!("Creating GroupedHashAggregateStream");
@@ -454,7 +454,7 @@ impl GroupedHashAggregateStream {
454454
let agg_filter_expr = agg.filter_expr.clone();
455455

456456
let batch_size = context.session_config().batch_size();
457-
let input = agg.input.execute(partition, Arc::clone(&context))?;
457+
let input = agg.input.execute(partition, Arc::clone(context))?;
458458
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
459459
let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition);
460460

@@ -685,7 +685,7 @@ impl Stream for GroupedHashAggregateStream {
685685
}
686686

687687
// Do the grouping
688-
self.group_aggregate_batch(batch)?;
688+
self.group_aggregate_batch(&batch)?;
689689

690690
// If we can begin emitting rows, do so,
691691
// otherwise keep consuming input
@@ -739,7 +739,7 @@ impl Stream for GroupedHashAggregateStream {
739739
self.spill_previous_if_necessary(&batch)?;
740740

741741
// Do the grouping
742-
self.group_aggregate_batch(batch)?;
742+
self.group_aggregate_batch(&batch)?;
743743

744744
// If we can begin emitting rows, do so,
745745
// otherwise keep consuming input
@@ -788,7 +788,7 @@ impl Stream for GroupedHashAggregateStream {
788788
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
789789
probe.record_skipped(&batch);
790790
}
791-
let states = self.transform_to_states(batch)?;
791+
let states = self.transform_to_states(&batch)?;
792792
return Poll::Ready(Some(Ok(
793793
states.record_output(&self.baseline_metrics)
794794
)));
@@ -881,12 +881,12 @@ impl RecordBatchStream for GroupedHashAggregateStream {
881881

882882
impl GroupedHashAggregateStream {
883883
/// Perform group-by aggregation for the given [`RecordBatch`].
884-
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
884+
fn group_aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> {
885885
// Evaluate the grouping expressions
886886
let group_by_values = if self.spill_state.is_stream_merging {
887-
evaluate_group_by(&self.spill_state.merging_group_by, &batch)?
887+
evaluate_group_by(&self.spill_state.merging_group_by, batch)?
888888
} else {
889-
evaluate_group_by(&self.group_by, &batch)?
889+
evaluate_group_by(&self.group_by, batch)?
890890
};
891891

892892
// Only create the timer if there are actual aggregate arguments to evaluate
@@ -903,18 +903,18 @@ impl GroupedHashAggregateStream {
903903

904904
// Evaluate the aggregation expressions.
905905
let input_values = if self.spill_state.is_stream_merging {
906-
evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)?
906+
evaluate_many(&self.spill_state.merging_aggregate_arguments, batch)?
907907
} else {
908-
evaluate_many(&self.aggregate_arguments, &batch)?
908+
evaluate_many(&self.aggregate_arguments, batch)?
909909
};
910910
drop(timer);
911911

912912
// Evaluate the filter expressions, if any, against the inputs
913913
let filter_values = if self.spill_state.is_stream_merging {
914914
let filter_expressions = vec![None; self.accumulators.len()];
915-
evaluate_optional(&filter_expressions, &batch)?
915+
evaluate_optional(&filter_expressions, batch)?
916916
} else {
917-
evaluate_optional(&self.filter_expressions, &batch)?
917+
evaluate_optional(&self.filter_expressions, batch)?
918918
};
919919

920920
for group_values in &group_by_values {
@@ -1248,10 +1248,10 @@ impl GroupedHashAggregateStream {
12481248
}
12491249

12501250
/// Transforms input batch to intermediate aggregate state, without grouping it
1251-
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
1252-
let mut group_values = evaluate_group_by(&self.group_by, &batch)?;
1253-
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
1254-
let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
1251+
fn transform_to_states(&self, batch: &RecordBatch) -> Result<RecordBatch> {
1252+
let mut group_values = evaluate_group_by(&self.group_by, batch)?;
1253+
let input_values = evaluate_many(&self.aggregate_arguments, batch)?;
1254+
let filter_values = evaluate_optional(&self.filter_expressions, batch)?;
12551255

12561256
assert_eq_or_internal_err!(
12571257
group_values.len(),

datafusion/physical-plan/src/aggregates/topk/heap.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -326,13 +326,7 @@ impl<VAL: ValueType> TopKHeap<VAL> {
326326
}
327327
}
328328

329-
fn _tree_print(
330-
&self,
331-
idx: usize,
332-
prefix: String,
333-
is_tail: bool,
334-
output: &mut String,
335-
) {
329+
fn _tree_print(&self, idx: usize, prefix: &str, is_tail: bool, output: &mut String) {
336330
if let Some(Some(hi)) = self.heap.get(idx) {
337331
let connector = if idx != 0 {
338332
if is_tail {
@@ -357,10 +351,10 @@ impl<VAL: ValueType> TopKHeap<VAL> {
357351
let right_exists = right_idx < self.len;
358352

359353
if left_exists {
360-
self._tree_print(left_idx, child_prefix.clone(), !right_exists, output);
354+
self._tree_print(left_idx, &child_prefix, !right_exists, output);
361355
}
362356
if right_exists {
363-
self._tree_print(right_idx, child_prefix, true, output);
357+
self._tree_print(right_idx, &child_prefix, true, output);
364358
}
365359
}
366360
}
@@ -370,7 +364,7 @@ impl<VAL: ValueType> Display for TopKHeap<VAL> {
370364
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
371365
let mut output = String::new();
372366
if !self.heap.is_empty() {
373-
self._tree_print(0, String::new(), true, &mut output);
367+
self._tree_print(0, "", true, &mut output);
374368
}
375369
write!(f, "{output}")
376370
}

datafusion/physical-plan/src/aggregates/topk_stream.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ pub struct GroupedTopKAggregateStream {
5454
impl GroupedTopKAggregateStream {
5555
pub fn new(
5656
aggr: &AggregateExec,
57-
context: Arc<TaskContext>,
57+
context: &Arc<TaskContext>,
5858
partition: usize,
5959
limit: usize,
6060
) -> Result<Self> {
6161
let agg_schema = Arc::clone(&aggr.schema);
6262
let group_by = aggr.group_by.clone();
63-
let input = aggr.input.execute(partition, Arc::clone(&context))?;
63+
let input = aggr.input.execute(partition, Arc::clone(context))?;
6464
let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition);
6565
let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition);
6666
let aggregate_arguments =
@@ -97,11 +97,12 @@ impl RecordBatchStream for GroupedTopKAggregateStream {
9797
}
9898

9999
impl GroupedTopKAggregateStream {
100-
fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> {
100+
fn intern(&mut self, ids: &ArrayRef, vals: &ArrayRef) -> Result<()> {
101101
let _timer = self.group_by_metrics.time_calculating_group_ids.timer();
102102

103103
let len = ids.len();
104-
self.priority_map.set_batch(ids, Arc::clone(&vals));
104+
self.priority_map
105+
.set_batch(Arc::clone(ids), Arc::clone(vals));
105106

106107
let has_nulls = vals.null_count() > 0;
107108
for row_idx in 0..len {
@@ -167,7 +168,7 @@ impl Stream for GroupedTopKAggregateStream {
167168
let input_values = Arc::clone(&input_values[0][0]);
168169

169170
// iterate over each column of group_by values
170-
(*self).intern(group_by_values, input_values)?;
171+
(*self).intern(&group_by_values, &input_values)?;
171172
}
172173
// inner is done, emit all rows and switch to producing output
173174
None => {

datafusion/physical-plan/src/analyze.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ impl ExecutionPlan for AnalyzeExec {
206206
show_statistics,
207207
total_rows,
208208
duration,
209-
captured_input,
210-
captured_schema,
209+
&captured_input,
210+
&captured_schema,
211211
&metric_types,
212212
)
213213
};
@@ -225,8 +225,8 @@ fn create_output_batch(
225225
show_statistics: bool,
226226
total_rows: usize,
227227
duration: std::time::Duration,
228-
input: Arc<dyn ExecutionPlan>,
229-
schema: SchemaRef,
228+
input: &Arc<dyn ExecutionPlan>,
229+
schema: &SchemaRef,
230230
metric_types: &[MetricType],
231231
) -> Result<RecordBatch> {
232232
let mut type_builder = StringBuilder::with_capacity(1, 1024);
@@ -262,7 +262,7 @@ fn create_output_batch(
262262
}
263263

264264
RecordBatch::try_new(
265-
schema,
265+
Arc::clone(schema),
266266
vec![
267267
Arc::new(type_builder.finish()),
268268
Arc::new(plan_builder.finish()),

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,7 @@ pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
11181118
/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
11191119
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
11201120
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1121+
#[expect(clippy::needless_pass_by_value)]
11211122
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
11221123
plan.properties().evaluation_type == EvaluationType::Eager
11231124
}
@@ -1172,6 +1173,10 @@ pub async fn collect(
11721173
///
11731174
/// Dropping the stream will abort the execution of the query, and free up
11741175
/// any allocated resources
1176+
#[expect(
1177+
clippy::needless_pass_by_value,
1178+
reason = "Public API that historically takes owned Arcs"
1179+
)]
11751180
pub fn execute_stream(
11761181
plan: Arc<dyn ExecutionPlan>,
11771182
context: Arc<TaskContext>,
@@ -1236,6 +1241,10 @@ pub async fn collect_partitioned(
12361241
///
12371242
/// Dropping the stream will abort the execution of the query, and free up
12381243
/// any allocated resources
1244+
#[expect(
1245+
clippy::needless_pass_by_value,
1246+
reason = "Public API that historically takes owned Arcs"
1247+
)]
12391248
pub fn execute_stream_partitioned(
12401249
plan: Arc<dyn ExecutionPlan>,
12411250
context: Arc<TaskContext>,
@@ -1267,6 +1276,10 @@ pub fn execute_stream_partitioned(
12671276
/// violate the `not null` constraints specified in the `sink_schema`. If there are
12681277
/// such columns, it wraps the resulting stream to enforce the `not null` constraints
12691278
/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1279+
#[expect(
1280+
clippy::needless_pass_by_value,
1281+
reason = "Public API that historically takes owned Arcs"
1282+
)]
12701283
pub fn execute_input_stream(
12711284
input: Arc<dyn ExecutionPlan>,
12721285
sink_schema: SchemaRef,

0 commit comments

Comments
 (0)