Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 81 additions & 12 deletions cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
Expand Down Expand Up @@ -362,22 +363,22 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys,
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys, bool fast,
bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys),
std::move(segment_keys))}});
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add pivot_wider benchmarks.

std::move(segment_keys), fast)}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkAggregate(
benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys = {}) {
const std::vector<std::shared_ptr<Array>>& segment_keys = {}, bool fast = true) {
std::shared_ptr<RecordBatch> batch =
RecordBatchFromArrays(arguments, keys, segment_keys);
std::vector<FieldRef> key_refs;
Expand All @@ -395,24 +396,92 @@ static void BenchmarkAggregate(
}
int64_t total_bytes = TotalBufferSize(*batch);
for (auto _ : state) {
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs));
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs, fast));
}
state.SetBytesProcessed(total_bytes * state.iterations());
state.SetItemsProcessed(batch->num_rows() * state.iterations());
}

#define GROUP_BY_BENCHMARK(Name, Impl) \
static void Name(benchmark::State& state) { \
RegressionArgs args(state, false); \
auto rng = random::RandomArrayGenerator(1923); \
(Impl)(); \
} \
BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \
BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \
#define GROUP_BY_BENCHMARK(Name, Impl) \
static void Name(benchmark::State& state) { \
RegressionArgs args(state, false); \
auto rng = random::RandomArrayGenerator(1923); \
(Impl)(); \
} \
BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \
BenchmarkSetArgsWithSizes(bench, {1000, 10000, 100000, 1 * 1024 * 1024}); \
})

// Grouped Sum

GROUP_BY_BENCHMARK(SumIntRangeIntKey, [&] {
ASSERT_OK_AND_ASSIGN(auto key, gen::Step<int64_t>()->Generate(args.size));
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(SumIntRangeX2IntKey, [&] {
ASSERT_OK_AND_ASSIGN(auto range, gen::Step<int64_t>()->Generate(args.size / 2));
ASSERT_OK_AND_ASSIGN(auto key, Concatenate({range, range}));
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(SumIntRandomIntKey, [&] {
auto key = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(CountIntRangeIntKey, [&] {
ASSERT_OK_AND_ASSIGN(auto key, gen::Step<int64_t>()->Generate(args.size));
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(CountIntRangeX2IntKey, [&] {
ASSERT_OK_AND_ASSIGN(auto range, gen::Step<int64_t>()->Generate(args.size / 2));
ASSERT_OK_AND_ASSIGN(auto key, Concatenate({range, range}));
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(CountIntRandomIntKey, [&] {
auto key = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);
auto summand = rng.Int64(args.size,
/*min=*/0,
/*max=*/std::numeric_limits<int64_t>::max(),
/*null_probability=*/0);

BenchmarkAggregate(state, {{"hash_count", ""}}, {summand}, {key}, {}, args.fast);
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyStringSet, [&] {
auto summand = rng.Float64(args.size,
/*min=*/0.0,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class GroupByNode : public ExecNode, public TracedNode {
std::vector<std::vector<TypeHolder>> agg_src_types,
std::vector<std::vector<int>> agg_src_fieldsets,
std::vector<Aggregate> aggs,
std::vector<const HashAggregateKernel*> agg_kernels)
std::vector<const HashAggregateKernel*> agg_kernels, bool fast = true)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema)),
TracedNode(this),
segmenter_(std::move(segmenter)),
Expand All @@ -256,7 +256,8 @@ class GroupByNode : public ExecNode, public TracedNode {
agg_src_types_(std::move(agg_src_types)),
agg_src_fieldsets_(std::move(agg_src_fieldsets)),
aggs_(std::move(aggs)),
agg_kernels_(std::move(agg_kernels)) {}
agg_kernels_(std::move(agg_kernels)),
fast_(fast) {}

Status Init() override;

Expand Down Expand Up @@ -348,6 +349,7 @@ class GroupByNode : public ExecNode, public TracedNode {
/// \brief Total number of output batches produced
int total_output_batches_ = 0;

bool fast_;
std::vector<ThreadLocalState> local_states_;
ExecBatch out_data_;
};
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/acero/groupby_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ Result<ExecNode*> GroupByNode::Make(ExecPlan* plan, std::vector<ExecNode*> input
const auto& keys = aggregate_options.keys;
const auto& segment_keys = aggregate_options.segment_keys;
auto aggs = aggregate_options.aggregates;
bool fast = aggregate_options.fast;
bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1;

const auto& input_schema = input->output_schema();
Expand All @@ -198,7 +199,7 @@ Result<ExecNode*> GroupByNode::Make(ExecPlan* plan, std::vector<ExecNode*> input
input, std::move(args.output_schema), std::move(args.grouping_key_field_ids),
std::move(args.segment_key_field_ids), std::move(args.segmenter),
std::move(args.kernel_intypes), std::move(args.target_fieldsets),
std::move(args.aggregates), std::move(args.kernels));
std::move(args.aggregates), std::move(args.kernels), fast);
}

Status GroupByNode::ResetKernelStates() {
Expand Down Expand Up @@ -430,8 +431,9 @@ Status GroupByNode::InitLocalStateIfNeeded(ThreadLocalState* state) {
}

// Construct grouper
ARROW_ASSIGN_OR_RAISE(state->grouper,
Grouper::Make(key_types, plan_->query_context()->exec_context()));
ARROW_ASSIGN_OR_RAISE(
state->grouper,
Grouper::Make(key_types, plan_->query_context()->exec_context(), fast_));

// Build vector of aggregate source field data types
std::vector<std::vector<TypeHolder>> agg_src_types(agg_kernels_.size());
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,19 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
/// \brief create an instance from values
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys = {},
std::vector<FieldRef> segment_keys = {})
std::vector<FieldRef> segment_keys = {}, bool fast = true)
: aggregates(std::move(aggregates)),
keys(std::move(keys)),
segment_keys(std::move(segment_keys)) {}
segment_keys(std::move(segment_keys)),
fast(fast) {}

// aggregations which will be applied to the targeted fields
std::vector<Aggregate> aggregates;
// keys by which aggregations will be grouped (optional)
std::vector<FieldRef> keys;
// keys by which aggregations will be segmented (optional)
std::vector<FieldRef> segment_keys;
bool fast;
};

/// \brief a default value at which backpressure will be applied
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,8 +955,8 @@ struct GrouperFastImpl : public Grouper {
} // namespace

Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<TypeHolder>& key_types,
ExecContext* ctx) {
if (GrouperFastImpl::CanUse(key_types)) {
ExecContext* ctx, bool fast) {
if (fast && GrouperFastImpl::CanUse(key_types)) {
return GrouperFastImpl::Make(key_types, ctx);
}
return GrouperImpl::Make(key_types, ctx);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/row/grouper.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class ARROW_EXPORT Grouper {

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<TypeHolder>& key_types,
ExecContext* ctx = default_exec_context());
ExecContext* ctx = default_exec_context(),
bool fast = true);

/// Reset all intermediate state, make the grouper logically as just `Make`ed.
/// The underlying buffers, if any, may or may not be released though.
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/arrow/util/benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ void BenchmarkSetArgsWithSizes(benchmark::internal::Benchmark* bench,
bench->Unit(benchmark::kMicrosecond);

for (const auto size : sizes) {
for (const auto inverse_null_proportion : kInverseNullProportions) {
bench->Args({static_cast<ArgsType>(size), inverse_null_proportion});
}
// for (const auto inverse_null_proportion : kInverseNullProportions) {
bench->Args({static_cast<ArgsType>(size), true});
bench->Args({static_cast<ArgsType>(size), false});
// }
}
}

Expand All @@ -113,10 +114,15 @@ struct RegressionArgs {
// proportion of nulls in generated arrays
double null_proportion;

bool fast;

// If size_is_bytes is true, then it's a number of bytes, otherwise it's the
// number of items processed (for reporting)
explicit RegressionArgs(benchmark::State& state, bool size_is_bytes = true)
: size(state.range(0)), state_(state), size_is_bytes_(size_is_bytes) {
: size(state.range(0)),
fast(state.range(1)),
state_(state),
size_is_bytes_(size_is_bytes) {
if (state.range(1) == 0) {
this->null_proportion = 0.0;
} else {
Expand Down
Loading