diff --git a/cpp/src/arrow/acero/aggregate_benchmark.cc b/cpp/src/arrow/acero/aggregate_benchmark.cc index 2e9cccd80d99..0facefb04856 100644 --- a/cpp/src/arrow/acero/aggregate_benchmark.cc +++ b/cpp/src/arrow/acero/aggregate_benchmark.cc @@ -26,6 +26,7 @@ #include "arrow/acero/exec_plan.h" #include "arrow/acero/options.h" #include "arrow/array/array_primitive.h" +#include "arrow/array/concatenate.h" #include "arrow/compute/api.h" #include "arrow/table.h" #include "arrow/testing/generator.h" @@ -362,14 +363,14 @@ std::shared_ptr RecordBatchFromArrays( Result> BatchGroupBy( std::shared_ptr batch, std::vector aggregates, - std::vector keys, std::vector segment_keys, + std::vector keys, std::vector segment_keys, bool fast, bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, Table::FromRecordBatches({std::move(batch)})); Declaration plan = Declaration::Sequence( {{"table_source", TableSourceNodeOptions(std::move(table))}, {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys), - std::move(segment_keys))}}); + std::move(segment_keys), fast)}}); return DeclarationToTable(std::move(plan), use_threads, memory_pool); } @@ -377,7 +378,7 @@ static void BenchmarkAggregate( benchmark::State& state, std::vector aggregates, const std::vector>& arguments, const std::vector>& keys, - const std::vector>& segment_keys = {}) { + const std::vector>& segment_keys = {}, bool fast = true) { std::shared_ptr batch = RecordBatchFromArrays(arguments, keys, segment_keys); std::vector key_refs; @@ -395,24 +396,92 @@ static void BenchmarkAggregate( } int64_t total_bytes = TotalBufferSize(*batch); for (auto _ : state) { - ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs)); + ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs, fast)); } state.SetBytesProcessed(total_bytes * state.iterations()); state.SetItemsProcessed(batch->num_rows() * state.iterations()); } -#define GROUP_BY_BENCHMARK(Name, Impl) \ - static void Name(benchmark::State& state) { \ - RegressionArgs args(state, false); \ - auto rng = random::RandomArrayGenerator(1923); \ - (Impl)(); \ - } \ - BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \ - BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \ +#define GROUP_BY_BENCHMARK(Name, Impl) \ + static void Name(benchmark::State& state) { \ + RegressionArgs args(state, false); \ + auto rng = random::RandomArrayGenerator(1923); \ + (Impl)(); \ + } \ + BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \ + BenchmarkSetArgsWithSizes(bench, {1000, 10000, 100000, 1 * 1024 * 1024}); \ }) // Grouped Sum +GROUP_BY_BENCHMARK(SumIntRangeIntKey, [&] { + ASSERT_OK_AND_ASSIGN(auto key, gen::Step()->Generate(args.size)); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast); +}); + +GROUP_BY_BENCHMARK(SumIntRangeX2IntKey, [&] { + ASSERT_OK_AND_ASSIGN(auto range, gen::Step()->Generate(args.size / 2)); + ASSERT_OK_AND_ASSIGN(auto key, Concatenate({range, range})); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast); +}); + +GROUP_BY_BENCHMARK(SumIntRandomIntKey, [&] { + auto key = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast); +}); + +GROUP_BY_BENCHMARK(CountIntRangeIntKey, [&] { + ASSERT_OK_AND_ASSIGN(auto key, gen::Step()->Generate(args.size)); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast); +}); + +GROUP_BY_BENCHMARK(CountIntRangeX2IntKey, [&] { + ASSERT_OK_AND_ASSIGN(auto range, gen::Step()->Generate(args.size / 2)); + ASSERT_OK_AND_ASSIGN(auto key, Concatenate({range, range})); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast); +}); + +GROUP_BY_BENCHMARK(CountIntRandomIntKey, [&] { + auto key = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + auto summand = rng.Int64(args.size, + /*min=*/0, + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0); + + BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast); +}); + GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyStringSet, [&] { auto summand = rng.Float64(args.size, /*min=*/0.0, diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h index e2b1ab121f4c..146120990d6e 100644 --- a/cpp/src/arrow/acero/aggregate_internal.h +++ b/cpp/src/arrow/acero/aggregate_internal.h @@ -246,7 +246,7 @@ class GroupByNode : public ExecNode, public TracedNode { std::vector> agg_src_types, std::vector> agg_src_fieldsets, std::vector aggs, - std::vector agg_kernels) + std::vector agg_kernels, bool fast = true) : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema)), TracedNode(this), segmenter_(std::move(segmenter)), @@ -256,7 +256,8 @@ class GroupByNode : public ExecNode, public TracedNode { agg_src_types_(std::move(agg_src_types)), agg_src_fieldsets_(std::move(agg_src_fieldsets)), aggs_(std::move(aggs)), - agg_kernels_(std::move(agg_kernels)) {} + agg_kernels_(std::move(agg_kernels)), + fast_(fast) {} Status Init() override; @@ -348,6 +349,7 @@ class GroupByNode : public ExecNode, public TracedNode { /// \brief Total number of output batches produced int total_output_batches_ = 0; + bool fast_; std::vector local_states_; ExecBatch out_data_; }; diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc b/cpp/src/arrow/acero/groupby_aggregate_node.cc index 5c62ddf15a25..4735662c281b 100644 --- a/cpp/src/arrow/acero/groupby_aggregate_node.cc +++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc @@ -186,6 +186,7 @@ Result GroupByNode::Make(ExecPlan* plan, std::vector input const auto& keys = aggregate_options.keys; const auto& segment_keys = aggregate_options.segment_keys; auto aggs = aggregate_options.aggregates; + bool fast = aggregate_options.fast; bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1; const auto& input_schema = input->output_schema(); @@ -198,7 +199,7 @@ Result GroupByNode::Make(ExecPlan* plan, std::vector input input, std::move(args.output_schema), std::move(args.grouping_key_field_ids), std::move(args.segment_key_field_ids), std::move(args.segmenter), std::move(args.kernel_intypes), std::move(args.target_fieldsets), - std::move(args.aggregates), std::move(args.kernels)); + std::move(args.aggregates), std::move(args.kernels), fast); } Status GroupByNode::ResetKernelStates() { @@ -430,8 +431,9 @@ Status GroupByNode::InitLocalStateIfNeeded(ThreadLocalState* state) { } // Construct grouper - ARROW_ASSIGN_OR_RAISE(state->grouper, - Grouper::Make(key_types, plan_->query_context()->exec_context())); + ARROW_ASSIGN_OR_RAISE( + state->grouper, + Grouper::Make(key_types, plan_->query_context()->exec_context(), fast_)); // Build vector of aggregate source field data types std::vector> agg_src_types(agg_kernels_.size()); diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 26293725582b..960ed5cce2dd 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -337,10 +337,11 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { /// \brief create an instance from values explicit AggregateNodeOptions(std::vector aggregates, std::vector keys = {}, - std::vector segment_keys = {}) + std::vector segment_keys = {}, bool fast = true) : aggregates(std::move(aggregates)), keys(std::move(keys)), - segment_keys(std::move(segment_keys)) {} + segment_keys(std::move(segment_keys)), + fast(fast) {} // aggregations which will be applied to the targeted fields std::vector aggregates; @@ -348,6 +349,7 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { std::vector keys; // keys by which aggregations will be segmented (optional) std::vector segment_keys; + bool fast; }; /// \brief a default value at which backpressure will be applied diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc index d62333af3700..dfc97d2b5357 100644 --- a/cpp/src/arrow/compute/row/grouper.cc +++ b/cpp/src/arrow/compute/row/grouper.cc @@ -955,8 +955,8 @@ struct GrouperFastImpl : public Grouper { } // namespace Result> Grouper::Make(const std::vector& key_types, - ExecContext* ctx) { - if (GrouperFastImpl::CanUse(key_types)) { + ExecContext* ctx, bool fast) { + if (fast && GrouperFastImpl::CanUse(key_types)) { return GrouperFastImpl::Make(key_types, ctx); } return GrouperImpl::Make(key_types, ctx); diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h index 7554e5ef159a..5d6a0f6f75e7 100644 --- a/cpp/src/arrow/compute/row/grouper.h +++ b/cpp/src/arrow/compute/row/grouper.h @@ -107,7 +107,8 @@ class ARROW_EXPORT Grouper { /// Construct a Grouper which receives the specified key types static Result> Make(const std::vector& key_types, - ExecContext* ctx = default_exec_context()); + ExecContext* ctx = default_exec_context(), + bool fast = true); /// Reset all intermediate state, make the grouper logically as just `Make`ed. /// The underlying buffers, if any, may or may not be released though. diff --git a/cpp/src/arrow/util/benchmark_util.h b/cpp/src/arrow/util/benchmark_util.h index 75639ac11ae4..323d1e26a499 100644 --- a/cpp/src/arrow/util/benchmark_util.h +++ b/cpp/src/arrow/util/benchmark_util.h @@ -89,9 +89,10 @@ void BenchmarkSetArgsWithSizes(benchmark::internal::Benchmark* bench, bench->Unit(benchmark::kMicrosecond); for (const auto size : sizes) { - for (const auto inverse_null_proportion : kInverseNullProportions) { - bench->Args({static_cast(size), inverse_null_proportion}); - } + // for (const auto inverse_null_proportion : kInverseNullProportions) { + bench->Args({static_cast(size), true}); + bench->Args({static_cast(size), false}); + // } } } @@ -113,10 +114,15 @@ struct RegressionArgs { // proportion of nulls in generated arrays double null_proportion; + bool fast; + // If size_is_bytes is true, then it's a number of bytes, otherwise it's the // number of items processed (for reporting) explicit RegressionArgs(benchmark::State& state, bool size_is_bytes = true) - : size(state.range(0)), state_(state), size_is_bytes_(size_is_bytes) { + : size(state.range(0)), + fast(state.range(1)), + state_(state), + size_is_bytes_(size_is_bytes) { if (state.range(1) == 0) { this->null_proportion = 0.0; } else {