From 1836bbf1877a095f97578a5c3e7c3efc4ba2626c Mon Sep 17 00:00:00 2001 From: zenotme Date: Wed, 17 Sep 2025 20:33:14 +0800 Subject: [PATCH] add circular buffer using faa instead of cas --- .../sdk/common/circular_buffer.h | 2 + .../sdk/common/circular_buffer_range.h | 5 +- sdk/test/common/CMakeLists.txt | 1 + sdk/test/common/baseline_circular_buffer.h | 5 + sdk/test/common/circular_buffer_benchmark.cc | 38 +++- .../common/optimize_circular_buffer_test.cc | 171 ++++++++++++++ sdk/test/common/optmize_circular_buffer.h | 213 ++++++++++++++++++ 7 files changed, 431 insertions(+), 4 deletions(-) create mode 100644 sdk/test/common/optimize_circular_buffer_test.cc create mode 100644 sdk/test/common/optmize_circular_buffer.h diff --git a/sdk/include/opentelemetry/sdk/common/circular_buffer.h b/sdk/include/opentelemetry/sdk/common/circular_buffer.h index a35498ecea..ea5b54fc1e 100644 --- a/sdk/include/opentelemetry/sdk/common/circular_buffer.h +++ b/sdk/include/opentelemetry/sdk/common/circular_buffer.h @@ -59,7 +59,9 @@ class CircularBuffer auto range = PeekImpl().Take(n); static_assert(noexcept(callback(range)), "callback not allowed to throw"); tail_ += n; + // why not call back after tail_ += n callback(range); + // tail_ += n; } /** diff --git a/sdk/include/opentelemetry/sdk/common/circular_buffer_range.h b/sdk/include/opentelemetry/sdk/common/circular_buffer_range.h index 5e16c3d410..2576373e5e 100644 --- a/sdk/include/opentelemetry/sdk/common/circular_buffer_range.h +++ b/sdk/include/opentelemetry/sdk/common/circular_buffer_range.h @@ -75,7 +75,10 @@ class CircularBufferRange */ CircularBufferRange Take(size_t n) const noexcept { - assert(n <= size()); + // assert(n <= size()); + if (n >= size()) { + return CircularBufferRange{first_,second_}; + } if (first_.size() >= n) { return CircularBufferRange{nostd::span{first_.data(), n}}; diff --git a/sdk/test/common/CMakeLists.txt b/sdk/test/common/CMakeLists.txt index 3228a55f6e..2da01c5592 100644 --- a/sdk/test/common/CMakeLists.txt +++ b/sdk/test/common/CMakeLists.txt @@ -8,6 +8,7 @@ foreach( atomic_unique_ptr_test circular_buffer_range_test circular_buffer_test + optimize_circular_buffer_test attribute_utils_test attributemap_hash_test global_log_handle_test diff --git a/sdk/test/common/baseline_circular_buffer.h b/sdk/test/common/baseline_circular_buffer.h index 398a4d0385..b436aeda63 100644 --- a/sdk/test/common/baseline_circular_buffer.h +++ b/sdk/test/common/baseline_circular_buffer.h @@ -78,6 +78,11 @@ class BaselineCircularBuffer tail_ = head_; } + size_t size() noexcept { + std::lock_guard lock_guard{mutex_}; + return head_ - tail_; + } + private: std::mutex mutex_; uint64_t head_{0}; diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc index 2470cbe059..24418a797e 100644 --- a/sdk/test/common/circular_buffer_benchmark.cc +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -18,10 +18,12 @@ #include "opentelemetry/sdk/common/circular_buffer.h" #include "opentelemetry/sdk/common/circular_buffer_range.h" #include "test/common/baseline_circular_buffer.h" +#include "test/common/optmize_circular_buffer.h" using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; +using opentelemetry::sdk::common::OptimizedCircularBuffer; using opentelemetry::testing::BaselineCircularBuffer; const int N = 10000; @@ -52,6 +54,20 @@ static uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept return result; } +static uint64_t ConsumeBufferNumbers(OptimizedCircularBuffer &buffer) noexcept +{ + uint64_t result = 0; + buffer.Consume(buffer.size(), + [&](CircularBufferRange> &range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) noexcept { + result += *ptr; + ptr.Reset(); + return true; + }); + }); + return result; +} + template static void GenerateNumbersForThread(Buffer &buffer, int n, std::atomic &sum) noexcept { @@ -90,7 +106,9 @@ static void ConsumeNumbers(Buffer &buffer, uint64_t &sum, std::atomic &fin { sum += ConsumeBufferNumbers(buffer); } - sum += ConsumeBufferNumbers(buffer); + while (buffer.size()){ + sum += ConsumeBufferNumbers(buffer); + } } template @@ -122,7 +140,7 @@ static void BM_BaselineBuffer(benchmark::State &state) } } -BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4); +BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16); static void BM_LockFreeBuffer(benchmark::State &state) { @@ -136,6 +154,20 @@ static void BM_LockFreeBuffer(benchmark::State &state) } } -BENCHMARK(BM_LockFreeBuffer)->Arg(1)->Arg(2)->Arg(4); +BENCHMARK(BM_LockFreeBuffer)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16); + +static void BM_OptimizedBuffer(benchmark::State &state) +{ + const size_t max_elements = 500; + const int num_threads = static_cast(state.range(0)); + const int n = static_cast(N / num_threads); + OptimizedCircularBuffer buffer{max_elements}; + for (auto _ : state) + { + RunSimulation(buffer, num_threads, n); + } +} + +BENCHMARK(BM_OptimizedBuffer)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16); BENCHMARK_MAIN(); diff --git a/sdk/test/common/optimize_circular_buffer_test.cc b/sdk/test/common/optimize_circular_buffer_test.cc new file mode 100644 index 0000000000..05848acfa7 --- /dev/null +++ b/sdk/test/common/optimize_circular_buffer_test.cc @@ -0,0 +1,171 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/sdk/common/atomic_unique_ptr.h" +#include "opentelemetry/sdk/common/circular_buffer.h" +#include "test/common/optmize_circular_buffer.h" + +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBufferRange; +using opentelemetry::sdk::common::OptimizedCircularBuffer; + +static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; + +static void GenerateRandomNumbers(OptimizedCircularBuffer &buffer, + std::vector &numbers, + int n) +{ + for (int i = 0; i < n; ++i) + { + auto value = static_cast(RandomNumberGenerator()); + std::unique_ptr x{new uint32_t{value}}; + if (buffer.Add(x)) + { + numbers.push_back(value); + } + } +} + +static void RunNumberProducers(OptimizedCircularBuffer &buffer, + std::vector &numbers, + int num_threads, + int n) +{ + std::vector> thread_numbers(num_threads); + std::vector threads(num_threads); + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer), + std::ref(thread_numbers[thread_index]), n}; + } + for (auto &thread : threads) + { + thread.join(); + } + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), + thread_numbers[thread_index].end()); + } +} + +void RunNumberConsumer(OptimizedCircularBuffer &buffer, + std::atomic &exit, + std::vector &numbers) +{ + while (true) + { + if (exit && buffer.Peek().empty()) + { + return; + } + auto n = std::uniform_int_distribution{0, buffer.Peek().size()}(RandomNumberGenerator); + buffer.Consume(n, [&](CircularBufferRange> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr &ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); + } +} + +TEST(OptmizeCircularBufferTest, Add) +{ + OptimizedCircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + auto range = buffer.Peek(); + EXPECT_EQ(range.size(), 1); + range.ForEach([](const AtomicUniquePtr &y) { + EXPECT_EQ(*y, 11); + return true; + }); +} + +TEST(OptmizeCircularBufferTest, Clear) +{ + OptimizedCircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + buffer.Clear(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(OptmizeCircularBufferTest, AddOnFull) +{ + OptimizedCircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + std::unique_ptr x{new int{33}}; + EXPECT_FALSE(buffer.Add(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); +} + +TEST(OptmizeCircularBufferTest, Consume) +{ + OptimizedCircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + int count = 0; + buffer.Consume(5, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); + EXPECT_EQ(count, 5); +} + +TEST(OptmizeCircularBufferTest, Simulation) +{ + const int num_producer_threads = 4; + const int n = 25000; + for (size_t max_size : {1, 2, 10, 50, 100, 1000}) + { + OptimizedCircularBuffer buffer{max_size}; + std::vector producer_numbers; + std::vector consumer_numbers; + auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers), + num_producer_threads, n}; + std::atomic exit{false}; + auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit), + std::ref(consumer_numbers)}; + producers.join(); + exit = true; + consumer.join(); + std::sort(producer_numbers.begin(), producer_numbers.end()); + std::sort(consumer_numbers.begin(), consumer_numbers.end()); + + EXPECT_EQ(producer_numbers.size(), consumer_numbers.size()); + EXPECT_EQ(producer_numbers, consumer_numbers); + } +} diff --git a/sdk/test/common/optmize_circular_buffer.h b/sdk/test/common/optmize_circular_buffer.h new file mode 100644 index 0000000000..03bda76da5 --- /dev/null +++ b/sdk/test/common/optmize_circular_buffer.h @@ -0,0 +1,213 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/sdk/common/atomic_unique_ptr.h" +#include "opentelemetry/sdk/common/circular_buffer_range.h" +#include "opentelemetry/version.h" +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace common +{ +/* + * An optimized lock-free circular buffer that supports multiple concurrent producers + * and a single consumer. Uses FAA (Fetch-And-Add) for better performance. + */ +template +class OptimizedCircularBuffer +{ +public: + explicit OptimizedCircularBuffer(size_t max_size) + : data_{new AtomicUniquePtr[max_size + 1]}, capacity_{max_size + 1} + {} + + /** + * @return a range of the elements in the circular buffer + * + * Note: This method must only be called from the consumer thread. + */ + CircularBufferRange> Peek() const noexcept + { + return const_cast(this)->PeekImpl(); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * @param callback the callback to invoke with an AtomicUniquePtr to each + * consumed element. + * + * Note: The callback must set the passed AtomicUniquePtr to null. + * + * Note: This method must only be called from the consumer thread. + */ + template + void Consume(size_t n, Callback callback) noexcept + { + // comsume max(n, available) elements + auto range = PeekImpl().Take(n); + static_assert(noexcept(callback(range)), "callback not allowed to throw"); + // consume elements + callback(range); + // free elements to let producers add new elements + tail_ += range.size(); + tail_ %= capacity_; + count_.fetch_sub(range.size(), std::memory_order_release); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * + * Note: This method must only be called from the consumer thread. + */ + void Consume(size_t n) noexcept + { + Consume(n, [](CircularBufferRange> &range) noexcept { + range.ForEach([](AtomicUniquePtr &ptr) noexcept { + ptr.Reset(); + return true; + }); + }); + } + + /** + * Adds an element into the circular buffer. + * @param ptr a pointer to the element to add + * @return true if the element was successfully added; false, otherwise. + */ + bool Add(std::unique_ptr &ptr) noexcept + { + uint64_t count = count_.fetch_add(1, std::memory_order_acquire); + if (count >= capacity_ - 1) + { + // queue is full, rollback + count_.fetch_sub(1, std::memory_order_release); + return false; + } + + uint64_t head_pos = head_.fetch_add(1, std::memory_order_acquire); + uint64_t head_index = head_pos % capacity_; + // It should be a valid pos to add an element + assert(data_[head_index].Get() == nullptr); + + // set the element must be sucess + bool success = data_[head_index].SwapIfNull(ptr); + assert(success); + ptr.reset(); + return true; + } + + bool Add(std::unique_ptr &&ptr) noexcept + { + // rvalue to lvalue reference + bool result = Add(std::ref(ptr)); + ptr.reset(); + return result; + } + + /** + * Clear the circular buffer. + * + * Note: This method must only be called from the consumer thread. + */ + void Clear() noexcept { Consume(size()); } + + /** + * @return the maximum number of bytes that can be stored in the buffer. + */ + size_t max_size() const noexcept { return capacity_ - 1; } + + /** + * @return true if the buffer is empty. + */ + bool empty() const noexcept { return count_.load(std::memory_order_relaxed) == 0; } + + /** + * @return the number of bytes stored in the circular buffer. + * + * Note: this method will only return a correct snapshot of the size if called + * from the consumer thread. + */ + size_t size() const noexcept + { + return count_.load(std::memory_order_relaxed); + } + + /** + * @return the number of elements consumed from the circular buffer. + */ + uint64_t consumption_count() const noexcept { return tail_; } + + /** + * @return the number of elements added to the circular buffer. + */ + uint64_t production_count() const noexcept { return head_; } + +private: + std::unique_ptr[]> data_; + size_t capacity_; + std::atomic count_{0}; + std::atomic head_{0}; + uint64_t tail_{0}; + + CircularBufferRange> PeekImpl() noexcept + { + uint64_t current_count = count_.load(std::memory_order_relaxed); + if (current_count == 0) + { + return {}; + } + + auto data = data_.get(); + uint64_t available_count = 0; + uint64_t max_check = std::min(current_count, capacity_); + + for (uint64_t i = 0; i < max_check; ++i) + { + uint64_t index = (tail_ + i) % capacity_; + if (data[index].Get() != nullptr) + { + available_count++; + } + else + { + // Find the first null pointer, it's a element currently being added by producer + break; + } + } + + if (available_count == 0) + { + return {}; + } + + if (tail_ + available_count <= capacity_) + { + return CircularBufferRange>{nostd::span>{ + data + tail_, static_cast(available_count)}}; + } else { + // the elements are split into two parts + uint64_t first_part_size = capacity_ - tail_; + uint64_t second_part_size = available_count - first_part_size; + + return {nostd::span>{data + tail_, static_cast(first_part_size)}, + nostd::span>{data, static_cast(second_part_size)}}; + } + } +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE