Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ else
-DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \
-DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \
-DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \
-DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the hash join benchmark is never ran in our CI. And this is probably why.

-DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \
-DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \
-DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \
Expand Down
1 change: 0 additions & 1 deletion cpp/CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"cacheVariables": {
"ARROW_BUILD_BENCHMARKS": "ON",
"ARROW_BUILD_BENCHMARKS_REFERENCE": "ON",
"ARROW_BUILD_OPENMP_BENCHMARKS": "ON",
"ARROW_BUILD_DETAILED_BENCHMARKS": "OFF",
"CMAKE_BUILD_TYPE": "RelWithDebInfo"
}
Expand Down
3 changes: 0 additions & 3 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is configured" ON)
define_option(ARROW_BUILD_BENCHMARKS_REFERENCE
"Build the Arrow micro reference benchmarks" OFF)

define_option(ARROW_BUILD_OPENMP_BENCHMARKS
"Build the Arrow benchmarks that rely on OpenMP" OFF)

define_option(ARROW_BUILD_DETAILED_BENCHMARKS
"Build benchmarks that do a longer exploration of performance" OFF)

Expand Down
21 changes: 3 additions & 18 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,36 +221,21 @@ if(ARROW_BUILD_BENCHMARKS)

add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc)

if(ARROW_BUILD_OPENMP_BENCHMARKS)
find_package(OpenMP REQUIRED)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feeling comfortable of removing one dependency.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too!

add_arrow_acero_benchmark(hash_join_benchmark
EXTRA_LINK_LIBS
OpenMP::OpenMP_CXX
SOURCES
hash_join_benchmark.cc)
if(MSVC)
target_compile_options(arrow-compute-hash-join-benchmark
PRIVATE "-openmp:experimental -openmp:llvm")
endif()
endif()
add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc)

if(ARROW_BUILD_STATIC)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
endif()
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
else()
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
endif()
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
endif()
endif()
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using arrow::compute::ExecBatch;

/// \brief A container that accumulates batches until they are ready to
/// be processed.
class AccumulationQueue {
class ARROW_ACERO_EXPORT AccumulationQueue {
public:
AccumulationQueue() : row_count_(0) {}
~AccumulationQueue() = default;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/hash_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace acero {

using util::AccumulationQueue;

class HashJoinImpl {
class ARROW_ACERO_EXPORT HashJoinImpl {
public:
using OutputBatchCallback = std::function<Status(int64_t, ExecBatch)>;
using BuildFinishedCallback = std::function<Status(size_t)>;
Expand Down
54 changes: 28 additions & 26 deletions cpp/src/arrow/acero/hash_join_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
#include <cstdio>
#include <memory>

#include <omp.h>

namespace arrow {
namespace acero {
struct BenchmarkSettings {
Expand All @@ -56,6 +54,8 @@ struct BenchmarkSettings {
int var_length_max = 20; // Maximum length of any var length types

Expression residual_filter = literal(true);

bool stats_probe_rows = true;
};

class JoinBenchmark {
Expand Down Expand Up @@ -128,6 +128,7 @@ class JoinBenchmark {
for (ExecBatch& batch : r_batches_with_schema.batches)
r_batches_.InsertBatch(std::move(batch));

stats_.num_build_rows = settings.num_build_batches * settings.batch_size;
stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;

schema_mgr_ = std::make_unique<HashJoinSchema>();
Expand All @@ -141,14 +142,9 @@ class JoinBenchmark {
join_ = *HashJoinImpl::MakeSwiss();
}

omp_set_num_threads(settings.num_threads);
auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
#pragma omp task
{ DCHECK_OK(func(omp_get_thread_num())); }
return Status::OK();
};

scheduler_ = TaskScheduler::Make();
thread_pool_ = arrow::internal::GetCpuThreadPool();
DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads));
DCHECK_OK(ctx_.Init(nullptr));

auto register_task_group_callback = [&](std::function<Status(size_t, int64_t)> task,
Expand All @@ -157,15 +153,15 @@ class JoinBenchmark {
};

auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) {
return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks);
return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id, num_tasks);
};

DCHECK_OK(join_->Init(
&ctx_, settings.join_type, settings.num_threads, &(schema_mgr_->proj_maps[0]),
&(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter,
std::move(register_task_group_callback), std::move(start_task_group_callback),
[](int64_t, ExecBatch) { return Status::OK(); },
[](int64_t) { return Status::OK(); }));
[&](int64_t) { return Status::OK(); }));

task_group_probe_ = scheduler_->RegisterTaskGroup(
[this](size_t thread_index, int64_t task_id) -> Status {
Expand All @@ -178,25 +174,27 @@ class JoinBenchmark {
scheduler_->RegisterEnd();

DCHECK_OK(scheduler_->StartScheduling(
0 /*thread index*/, std::move(schedule_callback),
static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/,
settings.num_threads == 1));
/*thread_id=*/0,
[&](std::function<Status(size_t)> task) -> Status {
return thread_pool_->Spawn([&, task]() { DCHECK_OK(task(thread_indexer_())); });
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose Spawn() + conditional variable not Submit() + Future::status() (or Spawn() + ThreadPool::WaitForIdle())? Is it easy to write/maintain?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right.

Spawn() + WaitForIdle() is easier and suffice (I was wrongly suspecting WaitForIdle() not working so changed to using cond var. But later it turned out the problem was something else and forgot to change it back.)

I'm updating it. Thank you for pointing this out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

},
thread_pool_->GetCapacity(), settings.num_threads == 1));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change to thread_pool_->GetCapacity() (settings.num_threads) from 2 * settings.num_threads? Is the original 2 * settings.num_threads wrong?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument controls the max number of concurrent tasks in the scheduler, so any value >= settings.num_threads is fine. (The original doubling isn't wrong though.)

}

void RunJoin() {
#pragma omp parallel
{
int tid = omp_get_thread_num();
#pragma omp single
DCHECK_OK(
join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t thread_index) {
return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
l_batches_.batch_count());
}));
}
DCHECK_OK(join_->BuildHashTable(
/*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) {
return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
l_batches_.batch_count());
}));

thread_pool_->WaitForIdle();
}

std::unique_ptr<TaskScheduler> scheduler_;
ThreadIndexer thread_indexer_;
arrow::internal::ThreadPool* thread_pool_;

AccumulationQueue l_batches_;
AccumulationQueue r_batches_;
std::unique_ptr<HashJoinSchema> schema_mgr_;
Expand All @@ -205,6 +203,7 @@ class JoinBenchmark {
int task_group_probe_;

struct {
uint64_t num_build_rows;
uint64_t num_probe_rows;
} stats_;
};
Expand All @@ -219,11 +218,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st,
st.ResumeTiming();
bm.RunJoin();
st.PauseTiming();
total_rows += bm.stats_.num_probe_rows;
total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows
: bm.stats_.num_build_rows);
}
st.ResumeTiming();
}
st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate);
st.counters["rows/sec"] =
benchmark::Counter(static_cast<double>(total_rows), benchmark::Counter::kIsRate);
}

template <typename... Args>
Expand Down Expand Up @@ -302,6 +303,7 @@ static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) {
settings.num_threads = static_cast<int>(st.range(0));
settings.num_build_batches = static_cast<int>(st.range(1));
settings.num_probe_batches = settings.num_threads;
settings.stats_probe_rows = false;

HashJoinBasicBenchmarkImpl(st, settings);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/swiss_join_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class RowArrayAccessor {
// Read operations (row comparison, column decoding)
// can be called by multiple threads concurrently.
//
struct RowArray {
struct ARROW_ACERO_EXPORT RowArray {
RowArray() : is_initialized_(false), hardware_flags_(0) {}

Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const ExecBatch& batch);
Expand Down
Loading