diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh index c1e7adf6a05e..3fcff9972336 100755 --- a/ci/scripts/cpp_build.sh +++ b/ci/scripts/cpp_build.sh @@ -141,7 +141,6 @@ else -DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \ -DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \ -DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \ - -DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \ -DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \ -DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \ -DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \ diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 85febbc5c9a7..114f79271d28 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -41,7 +41,6 @@ "cacheVariables": { "ARROW_BUILD_BENCHMARKS": "ON", "ARROW_BUILD_BENCHMARKS_REFERENCE": "ON", - "ARROW_BUILD_OPENMP_BENCHMARKS": "ON", "ARROW_BUILD_DETAILED_BENCHMARKS": "OFF", "CMAKE_BUILD_TYPE": "RelWithDebInfo" } diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 43e4e7603cfb..ee6315f8f0f9 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is configured" ON) define_option(ARROW_BUILD_BENCHMARKS_REFERENCE "Build the Arrow micro reference benchmarks" OFF) - define_option(ARROW_BUILD_OPENMP_BENCHMARKS - "Build the Arrow benchmarks that rely on OpenMP" OFF) - define_option(ARROW_BUILD_DETAILED_BENCHMARKS "Build benchmarks that do a longer exploration of performance" OFF) diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 54269f1df0eb..e6aa0560dfa8 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -221,18 +221,7 @@ if(ARROW_BUILD_BENCHMARKS) add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - find_package(OpenMP REQUIRED) - add_arrow_acero_benchmark(hash_join_benchmark - EXTRA_LINK_LIBS - OpenMP::OpenMP_CXX - SOURCES - hash_join_benchmark.cc) - if(MSVC) - target_compile_options(arrow-compute-hash-join-benchmark - PRIVATE "-openmp:experimental -openmp:llvm") - endif() - endif() + add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc) if(ARROW_BUILD_STATIC) target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static) @@ -240,17 +229,13 @@ if(ARROW_BUILD_BENCHMARKS) target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static) target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static) target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static) - endif() + target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static) else() target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared) target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared) - if(ARROW_BUILD_OPENMP_BENCHMARKS) - target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared) - endif() + target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared) endif() endif() diff --git a/cpp/src/arrow/acero/accumulation_queue.h b/cpp/src/arrow/acero/accumulation_queue.h index 92d62d5d99d1..b0e0b85a4f3d 100644 --- a/cpp/src/arrow/acero/accumulation_queue.h +++ b/cpp/src/arrow/acero/accumulation_queue.h @@ -34,7 +34,7 @@ using arrow::compute::ExecBatch; /// \brief A container that accumulates batches until they are ready to /// be processed. -class AccumulationQueue { +class ARROW_ACERO_EXPORT AccumulationQueue { public: AccumulationQueue() : row_count_(0) {} ~AccumulationQueue() = default; diff --git a/cpp/src/arrow/acero/hash_join.h b/cpp/src/arrow/acero/hash_join.h index a81ff274e5e3..c0faacf04baf 100644 --- a/cpp/src/arrow/acero/hash_join.h +++ b/cpp/src/arrow/acero/hash_join.h @@ -37,7 +37,7 @@ namespace acero { using util::AccumulationQueue; -class HashJoinImpl { +class ARROW_ACERO_EXPORT HashJoinImpl { public: using OutputBatchCallback = std::function; using BuildFinishedCallback = std::function; diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 0a56194f2a3c..c01e8a58933f 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -32,8 +32,6 @@ #include #include -#include - namespace arrow { namespace acero { struct BenchmarkSettings { @@ -56,6 +54,8 @@ struct BenchmarkSettings { int var_length_max = 20; // Maximum length of any var length types Expression residual_filter = literal(true); + + bool stats_probe_rows = true; }; class JoinBenchmark { @@ -128,6 +128,7 @@ class JoinBenchmark { for (ExecBatch& batch : r_batches_with_schema.batches) r_batches_.InsertBatch(std::move(batch)); + stats_.num_build_rows = settings.num_build_batches * settings.batch_size; stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; schema_mgr_ = std::make_unique(); @@ -141,14 +142,9 @@ class JoinBenchmark { join_ = *HashJoinImpl::MakeSwiss(); } - omp_set_num_threads(settings.num_threads); - auto schedule_callback = [](std::function func) -> Status { -#pragma omp task - { DCHECK_OK(func(omp_get_thread_num())); } - return Status::OK(); - }; - scheduler_ = TaskScheduler::Make(); + thread_pool_ = arrow::internal::GetCpuThreadPool(); + DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads)); DCHECK_OK(ctx_.Init(nullptr)); auto register_task_group_callback = [&](std::function task, @@ -157,7 +153,7 @@ class JoinBenchmark { }; auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) { - return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks); + return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id, num_tasks); }; DCHECK_OK(join_->Init( @@ -165,7 +161,7 @@ class JoinBenchmark { &(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter, std::move(register_task_group_callback), std::move(start_task_group_callback), [](int64_t, ExecBatch) { return Status::OK(); }, - [](int64_t) { return Status::OK(); })); + [&](int64_t) { return Status::OK(); })); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -178,25 +174,27 @@ class JoinBenchmark { scheduler_->RegisterEnd(); DCHECK_OK(scheduler_->StartScheduling( - 0 /*thread index*/, std::move(schedule_callback), - static_cast(2 * settings.num_threads) /*concurrent tasks*/, - settings.num_threads == 1)); + /*thread_id=*/0, + [&](std::function task) -> Status { + return thread_pool_->Spawn([&, task]() { DCHECK_OK(task(thread_indexer_())); }); + }, + thread_pool_->GetCapacity(), settings.num_threads == 1)); } void RunJoin() { -#pragma omp parallel - { - int tid = omp_get_thread_num(); -#pragma omp single - DCHECK_OK( - join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t thread_index) { - return scheduler_->StartTaskGroup(thread_index, task_group_probe_, - l_batches_.batch_count()); - })); - } + DCHECK_OK(join_->BuildHashTable( + /*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) { + return scheduler_->StartTaskGroup(thread_index, task_group_probe_, + l_batches_.batch_count()); + })); + + thread_pool_->WaitForIdle(); } std::unique_ptr scheduler_; + ThreadIndexer thread_indexer_; + arrow::internal::ThreadPool* thread_pool_; + AccumulationQueue l_batches_; AccumulationQueue r_batches_; std::unique_ptr schema_mgr_; @@ -205,6 +203,7 @@ class JoinBenchmark { int task_group_probe_; struct { + uint64_t num_build_rows; uint64_t num_probe_rows; } stats_; }; @@ -219,11 +218,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st, st.ResumeTiming(); bm.RunJoin(); st.PauseTiming(); - total_rows += bm.stats_.num_probe_rows; + total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows + : bm.stats_.num_build_rows); } st.ResumeTiming(); } - st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate); + st.counters["rows/sec"] = + benchmark::Counter(static_cast(total_rows), benchmark::Counter::kIsRate); } template @@ -302,6 +303,7 @@ static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) { settings.num_threads = static_cast(st.range(0)); settings.num_build_batches = static_cast(st.range(1)); settings.num_probe_batches = settings.num_threads; + settings.stats_probe_rows = false; HashJoinBasicBenchmarkImpl(st, settings); } diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 85f443b0323c..a518ac6ff667 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -175,7 +175,7 @@ class RowArrayAccessor { // Read operations (row comparison, column decoding) // can be called by multiple threads concurrently. // -struct RowArray { +struct ARROW_ACERO_EXPORT RowArray { RowArray() : is_initialized_(false), hardware_flags_(0) {} Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const ExecBatch& batch);