From 00ef61743b5dd796724dd448b60fcead98fdbc0b Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 20 Feb 2025 23:07:28 +0800 Subject: [PATCH 1/6] Replace openmp with arrow-native multi-threading primitives --- cpp/src/arrow/acero/hash_join_benchmark.cc | 55 ++++++++++++---------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 0a56194f2a3c..692b76b16720 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -28,11 +28,11 @@ #include "arrow/testing/random.h" #include "arrow/util/thread_pool.h" +#include #include #include #include - -#include +#include namespace arrow { namespace acero { @@ -141,14 +141,9 @@ class JoinBenchmark { join_ = *HashJoinImpl::MakeSwiss(); } - omp_set_num_threads(settings.num_threads); - auto schedule_callback = [](std::function func) -> Status { -#pragma omp task - { DCHECK_OK(func(omp_get_thread_num())); } - return Status::OK(); - }; - scheduler_ = TaskScheduler::Make(); + thread_pool_ = arrow::internal::GetCpuThreadPool(); + DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads)); DCHECK_OK(ctx_.Init(nullptr)); auto register_task_group_callback = [&](std::function task, @@ -157,7 +152,7 @@ class JoinBenchmark { }; auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) { - return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks); + return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id, num_tasks); }; DCHECK_OK(join_->Init( @@ -165,7 +160,11 @@ class JoinBenchmark { &(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter, std::move(register_task_group_callback), std::move(start_task_group_callback), [](int64_t, ExecBatch) { return Status::OK(); }, - [](int64_t) { return Status::OK(); })); + [&](int64_t) { + std::unique_lock lk(finished_mutex_); + finished_cv_.notify_one(); + return Status::OK(); + })); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -178,25 +177,30 @@ class JoinBenchmark { scheduler_->RegisterEnd(); DCHECK_OK(scheduler_->StartScheduling( - 0 /*thread index*/, std::move(schedule_callback), - static_cast(2 * settings.num_threads) /*concurrent tasks*/, - settings.num_threads == 1)); + /*thread_id=*/0, + [&](std::function task) -> Status { + return thread_pool_->Spawn([&, task]() { DCHECK_OK(task(thread_indexer_())); }); + }, + thread_pool_->GetCapacity(), settings.num_threads == 1)); } void RunJoin() { -#pragma omp parallel - { - int tid = omp_get_thread_num(); -#pragma omp single - DCHECK_OK( - join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t thread_index) { - return scheduler_->StartTaskGroup(thread_index, task_group_probe_, - l_batches_.batch_count()); - })); - } + DCHECK_OK(join_->BuildHashTable( + /*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) { + return scheduler_->StartTaskGroup(thread_index, task_group_probe_, + l_batches_.batch_count()); + })); + + std::unique_lock lk(finished_mutex_); + finished_cv_.wait(lk); } std::unique_ptr scheduler_; + ThreadIndexer thread_indexer_; + arrow::internal::ThreadPool* thread_pool_; + std::condition_variable finished_cv_; + std::mutex finished_mutex_; + AccumulationQueue l_batches_; AccumulationQueue r_batches_; std::unique_ptr schema_mgr_; @@ -223,7 +227,8 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st, } st.ResumeTiming(); } - st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate); + st.counters["rows/sec"] = + benchmark::Counter(static_cast(total_rows), benchmark::Counter::kIsRate); } template From 6bac837ec769c39a0cf3c6db64031ede4f42090a Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 20 Feb 2025 23:13:12 +0800 Subject: [PATCH 2/6] Fundamentally remove openmp from arrow --- ci/scripts/cpp_build.sh | 1 - cpp/CMakePresets.json | 1 - cpp/cmake_modules/DefineOptions.cmake | 3 --- cpp/src/arrow/acero/CMakeLists.txt | 21 +++------------------ 4 files changed, 3 insertions(+), 23 deletions(-) diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh index c1e7adf6a05e..3fcff9972336 100755 --- a/ci/scripts/cpp_build.sh +++ b/ci/scripts/cpp_build.sh @@ -141,7 +141,6 @@ else -DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \ -DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \ -DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \ - -DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \ -DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \ -DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \ -DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \ diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 85febbc5c9a7..114f79271d28 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -41,7 +41,6 @@ "cacheVariables": { "ARROW_BUILD_BENCHMARKS": "ON", "ARROW_BUILD_BENCHMARKS_REFERENCE": "ON", - "ARROW_BUILD_OPENMP_BENCHMARKS": "ON", "ARROW_BUILD_DETAILED_BENCHMARKS": "OFF", "CMAKE_BUILD_TYPE": "RelWithDebInfo" } diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 43e4e7603cfb..ee6315f8f0f9 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is configured" ON) define_option(ARROW_BUILD_BENCHMARKS_REFERENCE "Build the Arrow micro reference benchmarks" OFF) - define_option(ARROW_BUILD_OPENMP_BENCHMARKS - "Build the Arrow benchmarks that rely on OpenMP" OFF) - define_option(ARROW_BUILD_DETAILED_BENCHMARKS "Build benchmarks that do a longer exploration of performance" OFF) diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 54269f1df0eb..e6aa0560dfa8 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -221,18 +221,7 @@ if(ARROW_BUILD_BENCHMARKS) add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - find_package(OpenMP REQUIRED) - add_arrow_acero_benchmark(hash_join_benchmark - EXTRA_LINK_LIBS - OpenMP::OpenMP_CXX - SOURCES - hash_join_benchmark.cc) - if(MSVC) - target_compile_options(arrow-compute-hash-join-benchmark - PRIVATE "-openmp:experimental -openmp:llvm") - endif() - endif() + add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc) if(ARROW_BUILD_STATIC) target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static) @@ -240,17 +229,13 @@ if(ARROW_BUILD_BENCHMARKS) target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static) target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static) target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static) - endif() + target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static) else() target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared) - endif() + target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared) endif() endif() From 492b429b792f2db004fefad7191a9c8659f610a1 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 20 Feb 2025 23:14:44 +0800 Subject: [PATCH 3/6] Support stats for build side rows and use it in certain benchmark --- cpp/src/arrow/acero/hash_join_benchmark.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 692b76b16720..424693ed5349 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -56,6 +56,8 @@ struct BenchmarkSettings { int var_length_max = 20; // Maximum length of any var length types Expression residual_filter = literal(true); + + bool stats_probe_rows = true; }; class JoinBenchmark { @@ -128,6 +130,7 @@ class JoinBenchmark { for (ExecBatch& batch : r_batches_with_schema.batches) r_batches_.InsertBatch(std::move(batch)); + stats_.num_build_rows = settings.num_build_batches * settings.batch_size; stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; schema_mgr_ = std::make_unique(); @@ -209,6 +212,7 @@ class JoinBenchmark { int task_group_probe_; struct { + uint64_t num_build_rows; uint64_t num_probe_rows; } stats_; }; @@ -223,7 +227,8 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st, st.ResumeTiming(); bm.RunJoin(); st.PauseTiming(); - total_rows += bm.stats_.num_probe_rows; + total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows + : bm.stats_.num_build_rows); } st.ResumeTiming(); } @@ -307,6 +312,7 @@ static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) { settings.num_threads = static_cast(st.range(0)); settings.num_build_batches = static_cast(st.range(1)); settings.num_probe_batches = settings.num_threads; + settings.stats_probe_rows = false; HashJoinBasicBenchmarkImpl(st, settings); } From c6460ace2b3d6c98860652af331c9f583373ea72 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 21 Feb 2025 00:43:33 +0800 Subject: [PATCH 4/6] Fix link error on Windows for hash join benchmark --- cpp/src/arrow/acero/accumulation_queue.h | 2 +- cpp/src/arrow/acero/hash_join.h | 2 +- cpp/src/arrow/acero/swiss_join_internal.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/accumulation_queue.h b/cpp/src/arrow/acero/accumulation_queue.h index 92d62d5d99d1..b0e0b85a4f3d 100644 --- a/cpp/src/arrow/acero/accumulation_queue.h +++ b/cpp/src/arrow/acero/accumulation_queue.h @@ -34,7 +34,7 @@ using arrow::compute::ExecBatch; /// \brief A container that accumulates batches until they are ready to /// be processed. -class AccumulationQueue { +class ARROW_ACERO_EXPORT AccumulationQueue { public: AccumulationQueue() : row_count_(0) {} ~AccumulationQueue() = default; diff --git a/cpp/src/arrow/acero/hash_join.h b/cpp/src/arrow/acero/hash_join.h index a81ff274e5e3..c0faacf04baf 100644 --- a/cpp/src/arrow/acero/hash_join.h +++ b/cpp/src/arrow/acero/hash_join.h @@ -37,7 +37,7 @@ namespace acero { using util::AccumulationQueue; -class HashJoinImpl { +class ARROW_ACERO_EXPORT HashJoinImpl { public: using OutputBatchCallback = std::function; using BuildFinishedCallback = std::function; diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 85f443b0323c..a518ac6ff667 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -175,7 +175,7 @@ class RowArrayAccessor { // Read operations (row comparison, column decoding) // can be called by multiple threads concurrently. // -struct RowArray { +struct ARROW_ACERO_EXPORT RowArray { RowArray() : is_initialized_(false), hardware_flags_(0) {} Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const ExecBatch& batch); From 4cba288d89799178ca8bee8d690d4ce612bb2fba Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 21 Feb 2025 10:02:08 +0800 Subject: [PATCH 5/6] Address review comments --- cpp/src/arrow/acero/hash_join_benchmark.cc | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 424693ed5349..f592e17b1e6f 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -163,11 +163,7 @@ class JoinBenchmark { &(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter, std::move(register_task_group_callback), std::move(start_task_group_callback), [](int64_t, ExecBatch) { return Status::OK(); }, - [&](int64_t) { - std::unique_lock lk(finished_mutex_); - finished_cv_.notify_one(); - return Status::OK(); - })); + [&](int64_t) { return Status::OK(); })); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -194,15 +190,12 @@ class JoinBenchmark { l_batches_.batch_count()); })); - std::unique_lock lk(finished_mutex_); - finished_cv_.wait(lk); + thread_pool_->WaitForIdle(); } std::unique_ptr scheduler_; ThreadIndexer thread_indexer_; arrow::internal::ThreadPool* thread_pool_; - std::condition_variable finished_cv_; - std::mutex finished_mutex_; AccumulationQueue l_batches_; AccumulationQueue r_batches_; From 1771c04d7486848723cb09ec14f73edf5967d037 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 21 Feb 2025 10:16:49 +0800 Subject: [PATCH 6/6] Remove useless include --- cpp/src/arrow/acero/hash_join_benchmark.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index f592e17b1e6f..c01e8a58933f 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -28,11 +28,9 @@ #include "arrow/testing/random.h" #include "arrow/util/thread_pool.h" -#include #include #include #include -#include namespace arrow { namespace acero {